[jira] [Commented] (FLINK-6492) Unclosed DataOutputViewStream in GenericArraySerializerConfigSnapshot#write()

2017-05-14 Thread Huafeng Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16010006#comment-16010006
 ] 

Huafeng Wang commented on FLINK-6492:
-

If nobody is woking on this, I'd like to take this one.

> Unclosed DataOutputViewStream in GenericArraySerializerConfigSnapshot#write()
> -
>
> Key: FLINK-6492
> URL: https://issues.apache.org/jira/browse/FLINK-6492
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Ted Yu
>Priority: Minor
>
> {code}
>InstantiationUtil.serializeObject(new DataOutputViewStream(out), 
> componentClass);
> {code}
> DataOutputViewStream instance should be closed upon return.
> TupleSerializerConfigSnapshot has similar issue.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic

2017-05-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16010003#comment-16010003
 ] 

ASF GitHub Bot commented on FLINK-6288:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/3766
  
@zjureel the rebases don't seem to be done correctly. The PR should contain 
the diff commits only.
I'm not sure what went wrong, but perhaps the most easiest way right now is 
cherry-pick your diff commits on a new branch checkedout from the latest master.


> FlinkKafkaProducer's custom Partitioner is always invoked with number of 
> partitions of default topic
> 
>
> Key: FLINK-6288
> URL: https://issues.apache.org/jira/browse/FLINK-6288
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Fang Yong
>
> The {{FlinkKafkaProducerBase}} supports routing records to topics besides the 
> default topic, but the custom {{Partitioner}} interface does not follow this 
> semantic.
> The partitioner is always invoked the {{partition}} method with the number of 
> partitions in the default topic, and not the number of partitions of the 
> current {{targetTopic}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3766: [FLINK-6288] fix FlinkKafkaProducer's custom Partitioner ...

2017-05-14 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/3766
  
@zjureel the rebases don't seem to be done correctly. The PR should contain 
the diff commits only.
I'm not sure what went wrong, but perhaps the most easiest way right now is 
cherry-pick your diff commits on a new branch checkedout from the latest master.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic

2017-05-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16010001#comment-16010001
 ] 

ASF GitHub Bot commented on FLINK-6288:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/3766
  
@zjureel thanks. You would need a proper rebase: `git rebase master` when 
you finish your feature branch, instead of merging the latest master.

Regarding timeout: doesn't the Kafka client have built-in timeout 
functionality?


> FlinkKafkaProducer's custom Partitioner is always invoked with number of 
> partitions of default topic
> 
>
> Key: FLINK-6288
> URL: https://issues.apache.org/jira/browse/FLINK-6288
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Fang Yong
>
> The {{FlinkKafkaProducerBase}} supports routing records to topics besides the 
> default topic, but the custom {{Partitioner}} interface does not follow this 
> semantic.
> The partitioner is always invoked the {{partition}} method with the number of 
> partitions in the default topic, and not the number of partitions of the 
> current {{targetTopic}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3766: [FLINK-6288] fix FlinkKafkaProducer's custom Partitioner ...

2017-05-14 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/3766
  
@zjureel thanks. You would need a proper rebase: `git rebase master` when 
you finish your feature branch, instead of merging the latest master.

Regarding timeout: doesn't the Kafka client have built-in timeout 
functionality?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic

2017-05-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16009993#comment-16009993
 ] 

ASF GitHub Bot commented on FLINK-6288:
---

Github user zjureel commented on the issue:

https://github.com/apache/flink/pull/3766
  
@tzulitai Thank you for your replay. 

For 1, the `ExecutorService` is used to control timeout of fetching kafka 
partitions. When fetch kafka partitions, a `Future` will be created and 
executed in `ExecutorService`, which will wait for some mills and throw 
exception for timeout.
For 2, I had deprecated constructors of 08 / 09/ 010 whose parameter is 
`KafkaPartitioner` and add the same constructors with parameter 
`FlinkKafkaPartitioner`.

I find the codes in master of apache/flink were modified relative large 
some days ago, and I try rebase these modification soon. I think you can review 
these issues after that, thank you.


> FlinkKafkaProducer's custom Partitioner is always invoked with number of 
> partitions of default topic
> 
>
> Key: FLINK-6288
> URL: https://issues.apache.org/jira/browse/FLINK-6288
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Fang Yong
>
> The {{FlinkKafkaProducerBase}} supports routing records to topics besides the 
> default topic, but the custom {{Partitioner}} interface does not follow this 
> semantic.
> The partitioner is always invoked the {{partition}} method with the number of 
> partitions in the default topic, and not the number of partitions of the 
> current {{targetTopic}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3766: [FLINK-6288] fix FlinkKafkaProducer's custom Partitioner ...

2017-05-14 Thread zjureel
Github user zjureel commented on the issue:

https://github.com/apache/flink/pull/3766
  
@tzulitai Thank you for your replay. 

For 1, the `ExecutorService` is used to control timeout of fetching kafka 
partitions. When fetch kafka partitions, a `Future` will be created and 
executed in `ExecutorService`, which will wait for some mills and throw 
exception for timeout.
For 2, I had deprecated constructors of 08 / 09/ 010 whose parameter is 
`KafkaPartitioner` and add the same constructors with parameter 
`FlinkKafkaPartitioner`.

I find the codes in master of apache/flink were modified relative large 
some days ago, and I try rebase these modification soon. I think you can review 
these issues after that, thank you.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4864) Shade Calcite dependency in flink-table

2017-05-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4864?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16009922#comment-16009922
 ] 

ASF GitHub Bot commented on FLINK-4864:
---

Github user wuchong closed the pull request at:

https://github.com/apache/flink/pull/2673


> Shade Calcite dependency in flink-table
> ---
>
> Key: FLINK-4864
> URL: https://issues.apache.org/jira/browse/FLINK-4864
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Jark Wu
>
> The Table API has a dependency on Apache Calcite.
> A user reported to have version conflicts when having a own Calcite 
> dependency in the classpath.
> The solution would be to shade away the Calcite dependency (Calcite's 
> transitive dependencies are already shaded).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #2673: [FLINK-4864] [table] Shade Calcite dependency in f...

2017-05-14 Thread wuchong
Github user wuchong closed the pull request at:

https://github.com/apache/flink/pull/2673


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6557) RocksDbStateBackendTest fails on Windows

2017-05-14 Thread Xiaogang Shi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6557?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16009910#comment-16009910
 ] 

Xiaogang Shi commented on FLINK-6557:
-

[~Zentol] Thanks a lot for pointing it out. As far as i know, the maximum 
length of directory paths in Windows is 260. The length of the path printed on 
the log however is only 181. Could you provide more information (e.g., stacks 
on exception) to help address the problem?

> RocksDbStateBackendTest fails on Windows
> 
>
> Key: FLINK-6557
> URL: https://issues.apache.org/jira/browse/FLINK-6557
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing, Tests
>Affects Versions: 1.3.0, 1.4.0
>Reporter: Chesnay Schepler
>
> The {{RocksDbStateBackendTest}} fails on windows when incremental checkpoint 
> is enabled.
> Based on the exception i guess the file name is just simply too long:
> {code}
> org.rocksdb.RocksDBException: IO error: Failed to create dir: 
> /C:/Users/Zento/AppData/Local/Temp/junit572330160893758355/junit5754599533651878867/job-ecbdb9df76fd3a39108dac7c515e3214_op-Test_uuid-6a43f1f6-1f35-443e-945c-aab3643e62fc/chk-0.tmp:
>  Invalid argument
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3889: [FLINK-6075] - Support Limit/Top(Sort) for Stream SQL

2017-05-14 Thread rtudoran
Github user rtudoran commented on the issue:

https://github.com/apache/flink/pull/3889
  
@hongyuhong @stefanobortoli @shijinkui i forgot to add you to the PR




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6075) Support Limit/Top(Sort) for Stream SQL

2017-05-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16009906#comment-16009906
 ] 

ASF GitHub Bot commented on FLINK-6075:
---

Github user rtudoran commented on the issue:

https://github.com/apache/flink/pull/3889
  
@hongyuhong @stefanobortoli @shijinkui i forgot to add you to the PR




> Support Limit/Top(Sort) for Stream SQL
> --
>
> Key: FLINK-6075
> URL: https://issues.apache.org/jira/browse/FLINK-6075
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: radu
>  Labels: features
> Attachments: sort.png
>
>
> These will be split in 3 separated JIRA issues. However, the design is the 
> same only the processing function differs in terms of the output. Hence, the 
> design is the same for all of them.
> Time target: Proc Time
> **SQL targeted query examples:**
> *Sort example*
> Q1)` SELECT a FROM stream1 GROUP BY HOP(proctime, INTERVAL '1' HOUR, INTERVAL 
> '3' HOUR) ORDER BY b` 
> Comment: window is defined using GROUP BY
> Comment: ASC or DESC keywords can be placed to mark the ordering type
> *Limit example*
> Q2) `SELECT a FROM stream1 WHERE rowtime BETWEEN current_timestamp - INTERVAL 
> '1' HOUR AND current_timestamp ORDER BY b LIMIT 10`
> Comment: window is defined using time ranges in the WHERE clause
> Comment: window is row triggered
> *Top example*
> Q3) `SELECT sum(a) OVER (ORDER BY proctime RANGE INTERVAL '1' HOUR PRECEDING 
> LIMIT 10) FROM stream1`  
> Comment: limit over the contents of the sliding window
> General Comments:
> -All these SQL clauses are supported only over windows (bounded collections 
> of data). 
> -Each of the 3 operators will be supported with each of the types of 
> expressing the windows. 
> **Description**
> The 3 operations (limit, top and sort) are similar in behavior as they all 
> require a sorted collection of the data on which the logic will be applied 
> (i.e., select a subset of the items or the entire sorted set). These 
> functions would make sense in the streaming context only in the context of a 
> window. Without defining a window the functions could never emit as the sort 
> operation would never trigger. If an SQL query will be provided without 
> limits an error will be thrown (`SELECT a FROM stream1 TOP 10` -> ERROR). 
> Although not targeted by this JIRA, in the case of working based on event 
> time order, the retraction mechanisms of windows and the lateness mechanisms 
> can be used to deal with out of order events and retraction/updates of 
> results.
> **Functionality example**
> We exemplify with the query below for all the 3 types of operators (sorting, 
> limit and top). Rowtime indicates when the HOP window will trigger – which 
> can be observed in the fact that outputs are generated only at those moments. 
> The HOP windows will trigger at every hour (fixed hour) and each event will 
> contribute/ be duplicated for 2 consecutive hour intervals. Proctime 
> indicates the processing time when a new event arrives in the system. Events 
> are of the type (a,b) with the ordering being applied on the b field.
> `SELECT a FROM stream1 HOP(proctime, INTERVAL '1' HOUR, INTERVAL '2' HOUR) 
> ORDER BY b (LIMIT 2/ TOP 2 / [ASC/DESC] `)
> ||Rowtime||   Proctime||  Stream1||   Limit 2||   Top 2|| Sort 
> [ASC]||
> | |10:00:00  |(aaa, 11)   |   | | 
>|
> | |10:05:00|(aab, 7)  |   | ||
> |10-11  |11:00:00  |  |   aab,aaa |aab,aaa  | aab,aaa 
>|
> | |11:03:00  |(aac,21)  |   | ||  
> 
> |11-12|12:00:00  |  | aab,aaa |aab,aaa  | aab,aaa,aac|
> | |12:10:00  |(abb,12)  |   | ||  
> 
> | |12:15:00  |(abb,12)  |   | ||  
> 
> |12-13  |13:00:00  |  |   abb,abb | abb,abb | 
> abb,abb,aac|
> |...|
> **Implementation option**
> Considering that the SQL operators will be associated with window boundaries, 
> the functionality will be implemented within the logic of the window as 
> follows.
> * Window assigner – selected based on the type of window used in SQL 
> (TUMBLING, SLIDING…)
> * Evictor/ Trigger – time or count evictor based on the definition of the 
> window boundaries
> * Apply – window function that sorts data and selects the output to trigger 
> (based on LIMIT/TOP parameters). All data will be sorted at once and result 
> outputted when the window is triggered
> An alternative implementation can be to use a fold window function to sort 
> the elements as they arrive, one at a time followed by a flatMap to filter 
> the number of outputs. 
> 

[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API

2017-05-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16009905#comment-16009905
 ] 

ASF GitHub Bot commented on FLINK-6232:
---

Github user rtudoran commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r116397679
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FilterRunner.scala
 ---
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime
+
+import org.apache.flink.api.common.functions.util.FunctionUtils
+import org.apache.flink.api.common.functions.{FilterFunction, 
RichFilterFunction}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.table.codegen.Compiler
+import org.slf4j.LoggerFactory
+
+class FilterRunner[IN] (
--- End diff --

@fhueske just for curiosity - is there an advantage of compiling it in the 
open - basically at runtime when it is deployed (if i am not mistaken) compared 
to making the compilation statically at compiler time of the query? 


> Support proctime inner equi-join between two streams in the SQL API
> ---
>
> Key: FLINK-6232
> URL: https://issues.apache.org/jira/browse/FLINK-6232
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: hongyuhong
>Assignee: hongyuhong
>
> The goal of this issue is to add support for inner equi-join on proc time 
> streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT o.proctime, o.productId, o.orderId, s.proctime AS shipTime 
> FROM Orders AS o 
> JOIN Shipments AS s 
> ON o.orderId = s.orderId 
> AND o.proctime BETWEEN s.proctime AND s.proctime + INTERVAL '1' HOUR;
> {code}
> The following restrictions should initially apply:
> * The join hint only support inner join
> * The ON clause should include equi-join condition
> * The time-condition {{o.proctime BETWEEN s.proctime AND s.proctime + 
> INTERVAL '1' HOUR}} only can use proctime that is a system attribute, the 
> time condition only support bounded time range like {{o.proctime BETWEEN 
> s.proctime - INTERVAL '1' HOUR AND s.proctime + INTERVAL '1' HOUR}}, not 
> support unbounded like {{o.proctime > s.protime}},  and  should include both 
> two stream's proctime attribute, {{o.proctime between proctime() and 
> proctime() + 1}} should also not be supported.
> This issue includes:
> * Design of the DataStream operator to deal with stream join
> * Translation from Calcite's RelNode representation (LogicalJoin). 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3715: [FLINK-6232][Table]Support proctime inner equi...

2017-05-14 Thread rtudoran
Github user rtudoran commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r116397679
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FilterRunner.scala
 ---
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime
+
+import org.apache.flink.api.common.functions.util.FunctionUtils
+import org.apache.flink.api.common.functions.{FilterFunction, 
RichFilterFunction}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.table.codegen.Compiler
+import org.slf4j.LoggerFactory
+
+class FilterRunner[IN] (
--- End diff --

@fhueske just for curiosity - is there an advantage of compiling it in the 
open - basically at runtime when it is deployed (if i am not mistaken) compared 
to making the compilation statically at compiler time of the query? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API

2017-05-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16009904#comment-16009904
 ] 

ASF GitHub Bot commented on FLINK-6232:
---

Github user rtudoran commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r116397441
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/JoinUtil.scala
 ---
@@ -199,31 +157,38 @@ object JoinUtil {
 val generator = new CodeGenerator(
   config,
   nullCheck,
-  returnType)
+  leftType,
+  Some(rightType))
+
+val conversion = generator.generateConverterResultExpression(
+  returnType.physicalTypeInfo,
+  returnType.physicalType.getFieldNames)
 
 // if other condition is null, the filterfunc always return true
 val body = if (otherCondition == null) {
--- End diff --

@fhueske @hongyuhong - i think it is Lietral(True)...at least when i was 
working with inner queries i was always testing for an always true condition



> Support proctime inner equi-join between two streams in the SQL API
> ---
>
> Key: FLINK-6232
> URL: https://issues.apache.org/jira/browse/FLINK-6232
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: hongyuhong
>Assignee: hongyuhong
>
> The goal of this issue is to add support for inner equi-join on proc time 
> streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT o.proctime, o.productId, o.orderId, s.proctime AS shipTime 
> FROM Orders AS o 
> JOIN Shipments AS s 
> ON o.orderId = s.orderId 
> AND o.proctime BETWEEN s.proctime AND s.proctime + INTERVAL '1' HOUR;
> {code}
> The following restrictions should initially apply:
> * The join hint only support inner join
> * The ON clause should include equi-join condition
> * The time-condition {{o.proctime BETWEEN s.proctime AND s.proctime + 
> INTERVAL '1' HOUR}} only can use proctime that is a system attribute, the 
> time condition only support bounded time range like {{o.proctime BETWEEN 
> s.proctime - INTERVAL '1' HOUR AND s.proctime + INTERVAL '1' HOUR}}, not 
> support unbounded like {{o.proctime > s.protime}},  and  should include both 
> two stream's proctime attribute, {{o.proctime between proctime() and 
> proctime() + 1}} should also not be supported.
> This issue includes:
> * Design of the DataStream operator to deal with stream join
> * Translation from Calcite's RelNode representation (LogicalJoin). 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3715: [FLINK-6232][Table]Support proctime inner equi...

2017-05-14 Thread rtudoran
Github user rtudoran commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r116397441
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/JoinUtil.scala
 ---
@@ -199,31 +157,38 @@ object JoinUtil {
 val generator = new CodeGenerator(
   config,
   nullCheck,
-  returnType)
+  leftType,
+  Some(rightType))
+
+val conversion = generator.generateConverterResultExpression(
+  returnType.physicalTypeInfo,
+  returnType.physicalType.getFieldNames)
 
 // if other condition is null, the filterfunc always return true
 val body = if (otherCondition == null) {
--- End diff --

@fhueske @hongyuhong - i think it is Lietral(True)...at least when i was 
working with inner queries i was always testing for an always true condition



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API

2017-05-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16009899#comment-16009899
 ] 

ASF GitHub Bot commented on FLINK-6232:
---

Github user rtudoran commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r116397057
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala
 ---
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.flink.api.common.functions.RichFilterFunction
+import org.apache.flink.api.java.functions.NullByteKeySelector
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.{StreamTableEnvironment, TableException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.plan.nodes.CommonJoin
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.runtime.join.{JoinUtil, ProcTimeInnerJoin}
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.types.Row
+
+/**
+  * Flink RelNode which matches along with JoinOperator and its related 
operations.
+  */
+class DataStreamJoin(
+   cluster: RelOptCluster,
+   traitSet: RelTraitSet,
+   leftNode: RelNode,
+   rightNode: RelNode,
+   joinNode: FlinkLogicalJoin,
+   leftSchema: RowSchema,
+   schema: RowSchema,
+   ruleDescription: String)
+  extends BiRel(cluster, traitSet, leftNode, rightNode)
+  with CommonJoin
+  with DataStreamRel {
+
+  override def deriveRowType() = schema.logicalType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataStreamJoin(
+  cluster,
+  traitSet,
+  inputs.get(0),
+  inputs.get(1),
+  joinNode,
+  leftSchema,
+  schema,
+  ruleDescription)
+  }
+
+  override def toString: String = {
+
+s"${joinTypeToString(joinNode.getJoinType)}" +
+  s"(condition: (${joinConditionToString(schema.logicalType,
+joinNode.getCondition, getExpressionString)}), " +
+  s"select: (${joinSelectionToString(schema.logicalType)}))"
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+super.explainTerms(pw)
+  .item("condition", joinConditionToString(schema.logicalType,
+joinNode.getCondition, getExpressionString))
+  .item("select", joinSelectionToString(schema.logicalType))
+  .item("joinType", joinTypeToString(joinNode.getJoinType))
+  }
+
+  override def translateToPlan(tableEnv: StreamTableEnvironment): 
DataStream[CRow] = {
+
+val config = tableEnv.getConfig
+
+// get the equality keys and other condition
+val (leftKeys, rightKeys, otherCondition) =
+  JoinUtil.analyzeJoinCondition(joinNode, getExpressionString)
+
+if (left.isInstanceOf[StreamTableSourceScan]
--- End diff --

@fhueske @hongyuhong 
I believe the logic is that we need to differentiate in this backbone class 
between the cases of JOIN: Stream - Stream and Stream - Table, and then have a 
condition for each. I believe this case is to restrict (for now) the 
implementation only for the Stream-Stream.
The question is now - will it always be the case that if we create an SQL 
query, that has one of the inputs a table - this will be of 
"StreamTableSourceScan" ...is it possible to have it of a different type?



> Support proctime inner equi-join between two streams in the SQL API
> 

[GitHub] flink pull request #3715: [FLINK-6232][Table]Support proctime inner equi...

2017-05-14 Thread rtudoran
Github user rtudoran commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r116397057
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala
 ---
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.flink.api.common.functions.RichFilterFunction
+import org.apache.flink.api.java.functions.NullByteKeySelector
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.{StreamTableEnvironment, TableException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.plan.nodes.CommonJoin
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.runtime.join.{JoinUtil, ProcTimeInnerJoin}
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.types.Row
+
+/**
+  * Flink RelNode which matches along with JoinOperator and its related 
operations.
+  */
+class DataStreamJoin(
+   cluster: RelOptCluster,
+   traitSet: RelTraitSet,
+   leftNode: RelNode,
+   rightNode: RelNode,
+   joinNode: FlinkLogicalJoin,
+   leftSchema: RowSchema,
+   schema: RowSchema,
+   ruleDescription: String)
+  extends BiRel(cluster, traitSet, leftNode, rightNode)
+  with CommonJoin
+  with DataStreamRel {
+
+  override def deriveRowType() = schema.logicalType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataStreamJoin(
+  cluster,
+  traitSet,
+  inputs.get(0),
+  inputs.get(1),
+  joinNode,
+  leftSchema,
+  schema,
+  ruleDescription)
+  }
+
+  override def toString: String = {
+
+s"${joinTypeToString(joinNode.getJoinType)}" +
+  s"(condition: (${joinConditionToString(schema.logicalType,
+joinNode.getCondition, getExpressionString)}), " +
+  s"select: (${joinSelectionToString(schema.logicalType)}))"
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+super.explainTerms(pw)
+  .item("condition", joinConditionToString(schema.logicalType,
+joinNode.getCondition, getExpressionString))
+  .item("select", joinSelectionToString(schema.logicalType))
+  .item("joinType", joinTypeToString(joinNode.getJoinType))
+  }
+
+  override def translateToPlan(tableEnv: StreamTableEnvironment): 
DataStream[CRow] = {
+
+val config = tableEnv.getConfig
+
+// get the equality keys and other condition
+val (leftKeys, rightKeys, otherCondition) =
+  JoinUtil.analyzeJoinCondition(joinNode, getExpressionString)
+
+if (left.isInstanceOf[StreamTableSourceScan]
--- End diff --

@fhueske @hongyuhong 
I believe the logic is that we need to differentiate in this backbone class 
between the cases of JOIN: Stream - Stream and Stream - Table, and then have a 
condition for each. I believe this case is to restrict (for now) the 
implementation only for the Stream-Stream.
The question is now - will it always be the case that if we create an SQL 
query, that has one of the inputs a table - this will be of 
"StreamTableSourceScan" ...is it possible to have it of a different type?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API

2017-05-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16009896#comment-16009896
 ] 

ASF GitHub Bot commented on FLINK-6232:
---

Github user rtudoran commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r116396723
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeInnerJoin.scala
 ---
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import java.util
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.functions.RichFilterFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * A CoProcessFunction to support stream join stream, currently just 
support inner-join
+  *
+  * @param leftStreamWindowSizethe left stream window size
+  * @param rightStreamWindowSizethe right stream window size
+  * @param element1Type  the input type of left stream
+  * @param element2Type  the input type of right stream
+  * @param filterFuncthe function of other non-equi condition include 
time condition
+  *
+  */
+class ProcTimeInnerJoin(
+  private val leftStreamWindowSize: Long,
+  private val rightStreamWindowSize: Long,
+  private val element1Type: TypeInformation[CRow],
+  private val element2Type: TypeInformation[CRow],
+  private val filterFunc: RichFilterFunction[Row])
+  extends CoProcessFunction[CRow, CRow, CRow] {
+
+  private var outputC: CRow = _
+  private var listToRemove: JList[Long] = _
+
+  /** state to hold left stream element **/
+  private var row1MapState: MapState[Long, JList[Row]] = _
+  /** state to hold right stream element **/
+  private var row2MapState: MapState[Long, JList[Row]] = _
+
+  /** state to record last timer of left stream, 0 means no timer **/
+  private var timerState1: ValueState[Long] = _
--- End diff --

@fhueske @hongyuhong As far as i know there is an automatic deduplication 
of timers. So even if you would register a timer for every event that comes, as 
long as it is registered to trigger on the same time - the onTimer method will 
be executed once.
If there is no way to register the triggering on the same time - then i am 
wondering whether you need a state. In case there would be a crash and the 
events would be restored - you would anyway had to register a timer again - 
which would happen also if you would only have a class variable field (e.g. a 
boolean).



> Support proctime inner equi-join between two streams in the SQL API
> ---
>
> Key: FLINK-6232
> URL: https://issues.apache.org/jira/browse/FLINK-6232
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: hongyuhong
>Assignee: hongyuhong
>
> The goal of this issue is to add support for inner equi-join on proc time 
> streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT o.proctime, o.productId, o.orderId, s.proctime AS shipTime 
> FROM Orders AS o 
> JOIN Shipments AS s 
> ON o.orderId = s.orderId 
> AND o.proctime BETWEEN s.proctime AND s.proctime + INTERVAL '1' HOUR;
> {code}
> The following restrictions should initially apply:
> * The join hint only support inner join
> * The ON clause should include equi-join condition
> * The time-condition {{o.proctime BETWEEN 

[GitHub] flink pull request #3715: [FLINK-6232][Table]Support proctime inner equi...

2017-05-14 Thread rtudoran
Github user rtudoran commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r116396723
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeInnerJoin.scala
 ---
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import java.util
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.functions.RichFilterFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * A CoProcessFunction to support stream join stream, currently just 
support inner-join
+  *
+  * @param leftStreamWindowSizethe left stream window size
+  * @param rightStreamWindowSizethe right stream window size
+  * @param element1Type  the input type of left stream
+  * @param element2Type  the input type of right stream
+  * @param filterFuncthe function of other non-equi condition include 
time condition
+  *
+  */
+class ProcTimeInnerJoin(
+  private val leftStreamWindowSize: Long,
+  private val rightStreamWindowSize: Long,
+  private val element1Type: TypeInformation[CRow],
+  private val element2Type: TypeInformation[CRow],
+  private val filterFunc: RichFilterFunction[Row])
+  extends CoProcessFunction[CRow, CRow, CRow] {
+
+  private var outputC: CRow = _
+  private var listToRemove: JList[Long] = _
+
+  /** state to hold left stream element **/
+  private var row1MapState: MapState[Long, JList[Row]] = _
+  /** state to hold right stream element **/
+  private var row2MapState: MapState[Long, JList[Row]] = _
+
+  /** state to record last timer of left stream, 0 means no timer **/
+  private var timerState1: ValueState[Long] = _
--- End diff --

@fhueske @hongyuhong As far as i know there is an automatic deduplication 
of timers. So even if you would register a timer for every event that comes, as 
long as it is registered to trigger on the same time - the onTimer method will 
be executed once.
If there is no way to register the triggering on the same time - then i am 
wondering whether you need a state. In case there would be a crash and the 
events would be restored - you would anyway had to register a timer again - 
which would happen also if you would only have a class variable field (e.g. a 
boolean).



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3620) Remove DbStateBackend

2017-05-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16009855#comment-16009855
 ] 

ASF GitHub Bot commented on FLINK-3620:
---

Github user xhook commented on the issue:

https://github.com/apache/flink/pull/1800
  
Hi @gyfora,
in https://issues.apache.org/jira/browse/FLINK-3620 it's said that it's 
decided to make DbStateBackend and external library. Unfortunately I can't find 
it anywhere. Does it exist?
Thank you!


> Remove DbStateBackend
> -
>
> Key: FLINK-3620
> URL: https://issues.apache.org/jira/browse/FLINK-3620
> Project: Flink
>  Issue Type: Task
>  Components: Streaming
>Reporter: Gyula Fora
>Assignee: Gyula Fora
>Priority: Trivial
>
> Based on the discussion on the flink mailing lists, we decided to remove the 
> DbStateBackend from flink-contrib and make it an external library instead.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #1800: [FLINK-3620] [streaming] Remove DbStateBackend

2017-05-14 Thread xhook
Github user xhook commented on the issue:

https://github.com/apache/flink/pull/1800
  
Hi @gyfora,
in https://issues.apache.org/jira/browse/FLINK-3620 it's said that it's 
decided to make DbStateBackend and external library. Unfortunately I can't find 
it anywhere. Does it exist?
Thank you!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-6551) OutputTag name should not be allowed to be empty

2017-05-14 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6551?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-6551:

Summary: OutputTag name should not be allowed to be empty  (was: OutputTag 
name should not be empty)

> OutputTag name should not be allowed to be empty
> 
>
> Key: FLINK-6551
> URL: https://issues.apache.org/jira/browse/FLINK-6551
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.3.0, 1.4.0
>Reporter: Chesnay Schepler
>
> When creating an OutputTag it is required to give it a name.
> While we do enforce that the name is not null we do not have a check in place 
> that prevents passing an empty string.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (FLINK-6440) Noisy logs from metric fetcher

2017-05-14 Thread Chesnay Schepler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16009838#comment-16009838
 ] 

Chesnay Schepler edited comment on FLINK-6440 at 5/14/17 8:23 PM:
--

I'm wondering what our options are here. We can't just disable the logging; 
there is the possibility that only the {{MetricQueryService}} is unreachable 
and this should be logged if that's the case.

We could limit the # of log messages in a given time frame, but this would mean 
that an unreachable MQS may only be logged after a long long time.

Finally, we could track the unreachable status of the MQS for each TaskManager; 
like a set that contains the paths. If a request fails it is added to the set, 
and we only log something when it is added to the set. Once a request succeeds 
it would be removed again. Problem is that we then would need some time-based 
clean-up code as the set could otherwise grow infinitely in cases where many 
TM's are being replaced (and thus are never reachable again).

Sadly there isn't something like a {{TaskmanagerStatusListener}} interface, 
this would be useful to track/clean-up state by {{TaskManager}}.


was (Author: zentol):
I'm wondering what our options are here. We can't just disable the logging; 
there is the possibility that only the {{MetricQueryService}} is unreachable 
and this should be logged if that's the case.

We could limit the # of log messages in a given time frame, but this would mean 
that an unreachable MQS may only be logged after a long long time.

Finally, we could track the unreachable status of the MQS; like a set that 
contains the paths. If a request fails it is added to the set, and we only log 
something when it is added to the set. Once a request succeeds it would be 
removed again. Problem is that we then would need some time-based clean-up code 
as the set could otherwise grow infinitely in cases where many TM's are being 
replaced (and thus are never reachable again).

Sadly there isn't something like a {{TaskmanagerStatusListener}} interface, 
this would be useful to track/clean-up state by {{TaskManager}}.

> Noisy logs from metric fetcher
> --
>
> Key: FLINK-6440
> URL: https://issues.apache.org/jira/browse/FLINK-6440
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Affects Versions: 1.3.0
>Reporter: Stephan Ewen
>Priority: Critical
> Fix For: 1.3.0
>
>
> In cases where TaskManagers fail, the web frontend in the Job Manager starts 
> logging the exception below every few seconds.
> I labeled this as critical, because it actually makes debugging in such a 
> situation complicated through a log that is flooded with noise.
> {code}
> 2017-05-03 19:37:07,823 WARN  
> org.apache.flink.runtime.webmonitor.metrics.MetricFetcher - Fetching 
> metrics failed.
> akka.pattern.AskTimeoutException: Ask timed out on 
> [Actor[akka.tcp://flink@herman:52175/user/MetricQueryService_136f717a6b91e248282cb2937d22088c]]
>  after [1 ms]
> at 
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334)
> at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
> at 
> scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694)
> at 
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691)
> at 
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474)
> at 
> akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425)
> at 
> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429)
> at 
> akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381)
> at java.lang.Thread.run(Thread.java:745)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6440) Noisy logs from metric fetcher

2017-05-14 Thread Chesnay Schepler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16009838#comment-16009838
 ] 

Chesnay Schepler commented on FLINK-6440:
-

I'm wondering what our options are here. We can't just disable the logging; 
there is the possibility that only the {{MetricQueryService}} is unreachable 
and this should be logged if that's the case.

We could limit the # of log messages in a given time frame, but this would mean 
that an unreachable MQS may only be logged after a long long time.

Finally, we could track the unreachable status of the MQS; like a set that 
contains the paths. If a request fails it is added to the set, and we only log 
something when it is added to the set. Once a request succeeds it would be 
removed again. Problem is that we then would need some time-based clean-up code 
as the set could otherwise grow infinitely in cases where many TM's are being 
replaced (and thus are never reachable again).

Sadly there isn't something like a {{TaskmanagerStatusListener}} interface, 
this would be useful to track/clean-up state by {{TaskManager}}.

> Noisy logs from metric fetcher
> --
>
> Key: FLINK-6440
> URL: https://issues.apache.org/jira/browse/FLINK-6440
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Affects Versions: 1.3.0
>Reporter: Stephan Ewen
>Priority: Critical
> Fix For: 1.3.0
>
>
> In cases where TaskManagers fail, the web frontend in the Job Manager starts 
> logging the exception below every few seconds.
> I labeled this as critical, because it actually makes debugging in such a 
> situation complicated through a log that is flooded with noise.
> {code}
> 2017-05-03 19:37:07,823 WARN  
> org.apache.flink.runtime.webmonitor.metrics.MetricFetcher - Fetching 
> metrics failed.
> akka.pattern.AskTimeoutException: Ask timed out on 
> [Actor[akka.tcp://flink@herman:52175/user/MetricQueryService_136f717a6b91e248282cb2937d22088c]]
>  after [1 ms]
> at 
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334)
> at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
> at 
> scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694)
> at 
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691)
> at 
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474)
> at 
> akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425)
> at 
> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429)
> at 
> akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381)
> at java.lang.Thread.run(Thread.java:745)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6370) FileAlreadyExistsException on startup

2017-05-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16009832#comment-16009832
 ] 

ASF GitHub Bot commented on FLINK-6370:
---

GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/3896

[FLINK-6370] [webUI] Handle races for single file in FileServerHandler

This PR prevents a race between multiple requests trying to create the same 
file, which previously would cause one request to fail.

We can't just ignore the `FileAlreadyExistsException` since there's no 
guarantee that the file copying is complete. To prevent this scenario the 
copying is now done in a `synchronized` block, along with ignoring the 
`FileAlreadyExistsExceptions`.

Due to the small size of files loaded from the class-loader this should 
have no impact on the responsiveness of the webUI.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 6370_web_file

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3896.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3896


commit 4ebee58fa8c39d0aa6d68d1470ea3a850e92b954
Author: zentol 
Date:   2017-05-14T19:57:16Z

[FLINK-6370] [webUI] Handle races for single file in FileServerHandler




> FileAlreadyExistsException on startup
> -
>
> Key: FLINK-6370
> URL: https://issues.apache.org/jira/browse/FLINK-6370
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Affects Versions: 1.2.0
>Reporter: Andrey
>Assignee: Chesnay Schepler
>
> Currently static web resources are lazily cached onto disk during first 
> request. However if 2 concurrent requests will be executed, then 
> FileAlreadyExistsException will be in logs.
> {code}
> 2017-04-24 14:00:58,075 ERROR 
> org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler  - error 
> while responding [nioEventLoopGroup-3-2]
> java.nio.file.FileAlreadyExistsException: 
> /flink/web/flink-web-528f8cb8-dd60-433c-8f6c-df49ad0b79e0/index.html
>   at 
> sun.nio.fs.UnixException.translateToIOException(UnixException.java:88)
>   at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
>   at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
>   at 
> sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:214)
>   at 
> java.nio.file.spi.FileSystemProvider.newOutputStream(FileSystemProvider.java:434)
>   at java.nio.file.Files.newOutputStream(Files.java:216)
>   at java.nio.file.Files.copy(Files.java:3016)
>   at 
> org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler.respondAsLeader(StaticFileServerHandler.java:238)
>   at 
> org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler.channelRead0(StaticFileServerHandler.java:197)
>   at 
> org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler.channelRead0(StaticFileServerHandler.java:99)
>   at 
> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
>   at io.netty.handler.codec.http.router.Handler.routed(Handler.java:62)
> {code}
> Expect: 
> * extract all static resources on startup in main thread and before opening 
> http port.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3896: [FLINK-6370] [webUI] Handle races for single file ...

2017-05-14 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/3896

[FLINK-6370] [webUI] Handle races for single file in FileServerHandler

This PR prevents a race between multiple requests trying to create the same 
file, which previously would cause one request to fail.

We can't just ignore the `FileAlreadyExistsException` since there's no 
guarantee that the file copying is complete. To prevent this scenario the 
copying is now done in a `synchronized` block, along with ignoring the 
`FileAlreadyExistsExceptions`.

Due to the small size of files loaded from the class-loader this should 
have no impact on the responsiveness of the webUI.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 6370_web_file

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3896.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3896


commit 4ebee58fa8c39d0aa6d68d1470ea3a850e92b954
Author: zentol 
Date:   2017-05-14T19:57:16Z

[FLINK-6370] [webUI] Handle races for single file in FileServerHandler




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Assigned] (FLINK-6370) FileAlreadyExistsException on startup

2017-05-14 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler reassigned FLINK-6370:
---

Assignee: Chesnay Schepler

> FileAlreadyExistsException on startup
> -
>
> Key: FLINK-6370
> URL: https://issues.apache.org/jira/browse/FLINK-6370
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Affects Versions: 1.2.0
>Reporter: Andrey
>Assignee: Chesnay Schepler
>
> Currently static web resources are lazily cached onto disk during first 
> request. However if 2 concurrent requests will be executed, then 
> FileAlreadyExistsException will be in logs.
> {code}
> 2017-04-24 14:00:58,075 ERROR 
> org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler  - error 
> while responding [nioEventLoopGroup-3-2]
> java.nio.file.FileAlreadyExistsException: 
> /flink/web/flink-web-528f8cb8-dd60-433c-8f6c-df49ad0b79e0/index.html
>   at 
> sun.nio.fs.UnixException.translateToIOException(UnixException.java:88)
>   at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
>   at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
>   at 
> sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:214)
>   at 
> java.nio.file.spi.FileSystemProvider.newOutputStream(FileSystemProvider.java:434)
>   at java.nio.file.Files.newOutputStream(Files.java:216)
>   at java.nio.file.Files.copy(Files.java:3016)
>   at 
> org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler.respondAsLeader(StaticFileServerHandler.java:238)
>   at 
> org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler.channelRead0(StaticFileServerHandler.java:197)
>   at 
> org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler.channelRead0(StaticFileServerHandler.java:99)
>   at 
> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
>   at io.netty.handler.codec.http.router.Handler.routed(Handler.java:62)
> {code}
> Expect: 
> * extract all static resources on startup in main thread and before opening 
> http port.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6578) SharedBuffer creates self-loops when having elements with same value/timestamp.

2017-05-14 Thread Kostas Kloudas (JIRA)
Kostas Kloudas created FLINK-6578:
-

 Summary: SharedBuffer creates self-loops when having elements with 
same value/timestamp.
 Key: FLINK-6578
 URL: https://issues.apache.org/jira/browse/FLINK-6578
 Project: Flink
  Issue Type: Bug
  Components: CEP
Affects Versions: 1.3.0
Reporter: Kostas Kloudas
Assignee: Kostas Kloudas
 Fix For: 1.3.0


This is a test that fails with the current implementation due to the fact that 
the looping state accepts the two {{middleEvent1}} elements but the shared 
buffer cannot distinguish between them and gets trapped in an infinite loop 
leading to running out of memory.

{code}
@Test
public void testEagerZeroOrMoreSameElement() {
List inputEvents = new ArrayList<>();

Event startEvent = new Event(40, "c", 1.0);
Event middleEvent1 = new Event(41, "a", 2.0);
Event middleEvent2 = new Event(42, "a", 3.0);
Event middleEvent3 = new Event(43, "a", 4.0);
Event end1 = new Event(44, "b", 5.0);

inputEvents.add(new StreamRecord<>(startEvent, 1));
inputEvents.add(new StreamRecord<>(middleEvent1, 3));
inputEvents.add(new StreamRecord<>(middleEvent1, 3));
inputEvents.add(new StreamRecord<>(middleEvent1, 3));
inputEvents.add(new StreamRecord<>(middleEvent2, 4));
inputEvents.add(new StreamRecord<>(new Event(50, "d", 6.0), 5));
inputEvents.add(new StreamRecord<>(middleEvent3, 6));
inputEvents.add(new StreamRecord<>(middleEvent3, 6));
inputEvents.add(new StreamRecord<>(end1, 7));

Pattern pattern = 
Pattern.begin("start").where(new SimpleCondition() {
private static final long serialVersionUID = 
5726188262756267490L;

@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("c");
}
}).followedBy("middle").where(new SimpleCondition() {
private static final long serialVersionUID = 
5726188262756267490L;

@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("a");
}
}).oneOrMore().optional().followedBy("end1").where(new 
SimpleCondition() {
private static final long serialVersionUID = 
5726188262756267490L;

@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("b");
}
});

NFA nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);

final List resultingPatterns = 
feedNFA(inputEvents, nfa);

compareMaps(resultingPatterns, Lists.newArrayList(
Lists.newArrayList(startEvent, middleEvent1, 
middleEvent1, middleEvent1, middleEvent2, middleEvent3, middleEvent3, end1),
Lists.newArrayList(startEvent, middleEvent1, 
middleEvent1, middleEvent1, middleEvent2, middleEvent3, end1),
Lists.newArrayList(startEvent, middleEvent1, 
middleEvent1, middleEvent1, middleEvent2, end1),
Lists.newArrayList(startEvent, middleEvent1, 
middleEvent1, middleEvent1, end1),
Lists.newArrayList(startEvent, middleEvent1, 
middleEvent1, end1),
Lists.newArrayList(startEvent, middleEvent1, 
end1),
Lists.newArrayList(startEvent, end1)
));
}
{code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-6537) Umbrella issue for fixes to incremental snapshots

2017-05-14 Thread Stefan Richter (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6537?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter updated FLINK-6537:
--
Priority: Major  (was: Blocker)

> Umbrella issue for fixes to incremental snapshots
> -
>
> Key: FLINK-6537
> URL: https://issues.apache.org/jira/browse/FLINK-6537
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
> Fix For: 1.3.0
>
>
> This issue tracks ongoing fixes in the incremental checkpointing feature for 
> the 1.3 release.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-6505) Proactively cleanup local FS for RocksDBKeyedStateBackend on startup

2017-05-14 Thread Stefan Richter (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6505?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter updated FLINK-6505:
--
Priority: Major  (was: Blocker)

> Proactively cleanup local FS for RocksDBKeyedStateBackend on startup
> 
>
> Key: FLINK-6505
> URL: https://issues.apache.org/jira/browse/FLINK-6505
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.0
>Reporter: Stefan Richter
> Fix For: 1.3.0
>
>
> In {{RocksDBKeyedStateBackend}}, the {{instanceBasePath}} is cleared on 
> {{dispose()}}. I think it might make sense to also clear this directory when 
> the backend is created, in case something crashed and the backend never 
> reached {{dispose()}}. At least for previous runs of the same job, we can 
> know what to delete on restart. 
> In general, it is very important for this backend to clean up the local FS, 
> because the local quota might be very limited compared to the DFS. And a node 
> that runs out of local disk space can bring down the whole job, with no way 
> to recover (it might always get rescheduled to that node).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-6540) Add more thorough tests for RocksDB and incremental checkpointing

2017-05-14 Thread Stefan Richter (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter updated FLINK-6540:
--
Priority: Major  (was: Blocker)

> Add more thorough tests for RocksDB and incremental checkpointing
> -
>
> Key: FLINK-6540
> URL: https://issues.apache.org/jira/browse/FLINK-6540
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.0
>Reporter: Stefan Richter
> Fix For: 1.3.0
>
>
> We should have tests that cover all the sub-issues from the umbrella issue, 
> to prevent that similar problems can be reintroduced in the future.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Closed] (FLINK-6545) Make incremental checkpoints externalizable

2017-05-14 Thread Stefan Richter (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6545?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter closed FLINK-6545.
-
Resolution: Fixed

fixed in 098e46f2d2

> Make incremental checkpoints externalizable
> ---
>
> Key: FLINK-6545
> URL: https://issues.apache.org/jira/browse/FLINK-6545
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.0
>Reporter: Stefan Richter
>Priority: Blocker
> Fix For: 1.3.0
>
>
> Incremental checkpoints are currently not externalizible. The missing piece 
> is familiarizing the {{SavepointSerializer}}(s) with the new state handles 
> classes that we added for incremental checkpointing. Currently, some of those 
> (e.g. 
> {{org.apache.flink.contrib.streaming.state.RocksDBIncrementalKeyedStateHandle}})
>  currently live in the contrib.rocksdb package and need to be migrated. We 
> can also push them to a different abstraction level, i.e. 
> {{IncrementalKeyedStateHandle}} with some private data, referenced existing 
> shared data (from previous checkpoints), and (presumably) newly created 
> shared data (first created by the current checkpoint).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Closed] (FLINK-6535) JobID should not be part of the registration key to the SharedStateRegistry

2017-05-14 Thread Stefan Richter (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6535?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter closed FLINK-6535.
-
Resolution: Fixed

fixed in 4745d0c082

> JobID should not be part of the registration key to the SharedStateRegistry
> ---
>
> Key: FLINK-6535
> URL: https://issues.apache.org/jira/browse/FLINK-6535
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Blocker
> Fix For: 1.3.0
>
>
> Currently, the job id is part of the registration key in the 
> {{SharedStateRegistry}}. I suggest to remove this part of the key because the 
> job id changes after a restart. So when we do not update job ids in the 
> registry, referencing shared state will fail for future checkpoints. When we 
> update it, we basically replace the information. So I think there would be no 
> value in addint this the the composite key.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Closed] (FLINK-6534) SharedStateRegistry is disposing state handles from main thread

2017-05-14 Thread Stefan Richter (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6534?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter closed FLINK-6534.
-
Resolution: Fixed

fixed in 44fb035e02.

> SharedStateRegistry is disposing state handles from main thread
> ---
>
> Key: FLINK-6534
> URL: https://issues.apache.org/jira/browse/FLINK-6534
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Blocker
> Fix For: 1.3.0
>
>
> Currently, the {{ShareStateRegistry}} is deleting state handles that are no 
> longer referenced under the registry's lock and from the main thread. We 
> should use the {{CheckpointCoordinator}}'s  async IO executor to make this 
> non-blocking.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Closed] (FLINK-6533) Duplicated registration of new shared state when checkpoint confirmations are still pending

2017-05-14 Thread Stefan Richter (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6533?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter closed FLINK-6533.
-
Resolution: Fixed

fixed in 4745d0c082

> Duplicated registration of new shared state when checkpoint confirmations are 
> still pending
> ---
>
> Key: FLINK-6533
> URL: https://issues.apache.org/jira/browse/FLINK-6533
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Blocker
> Fix For: 1.3.0
>
>
> Each incremental RocksDB checkpoint n is registering new and existing shared 
> state with the {{SharedStateRegistry}} when it completes. Only then, the 
> backend is notified and all following checkpoints (n+x) can reference the new 
> state in checkpoint n.
> However, when a checkpoint n+1 is already starting before n was confirmed to 
> the backend, n+1 can assume some files as new, which were already contained 
> in n. It will upload the file to DFS again, creating a new state handle.
> Then, once n+1 completes, it could to register some state as new, which was 
> previously registered already by n, without n+1 knowing of this. Currently 
> this violates a precondition check, that the reference count for state that 
> is assumed as new is 1.
> While we cannot prevent duplicate uploads, we must resolve this situation in 
> the {{SharedStateREgistry}}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Closed] (FLINK-6527) OperatorSubtaskState has empty implementations of (un)/registerSharedStates

2017-05-14 Thread Stefan Richter (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6527?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter closed FLINK-6527.
-
Resolution: Fixed

fixed in efbb41bc63

> OperatorSubtaskState has empty implementations of (un)/registerSharedStates
> ---
>
> Key: FLINK-6527
> URL: https://issues.apache.org/jira/browse/FLINK-6527
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Blocker
> Fix For: 1.3.0
>
>
> With FLINK-5892 we introduced {{OperatorSubtaskState}} to replace 
> {{SubtaskState}}. However, meanwhile, the implementation of {{SubtaskState}} 
> was extended concurrently by implementation for incremental snapshots. This 
> was not carried over to the implementation of {{OperatorSubtaskState}}, as it 
> should.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Closed] (FLINK-6504) Lack of synchronization on materializedSstFiles in RocksDBKEyedStateBackend

2017-05-14 Thread Stefan Richter (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6504?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter closed FLINK-6504.
-
Resolution: Fixed

fixed in 958773b71c

> Lack of synchronization on materializedSstFiles in RocksDBKEyedStateBackend
> ---
>
> Key: FLINK-6504
> URL: https://issues.apache.org/jira/browse/FLINK-6504
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.0
>Reporter: Stefan Richter
>Assignee: Xiaogang Shi
>Priority: Blocker
> Fix For: 1.3.0
>
>
> Concurrent checkpoints could access `materializedSstFiles` in the 
> `RocksDBStateBackend` concurrently. This should be avoided.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6535) JobID should not be part of the registration key to the SharedStateRegistry

2017-05-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16009707#comment-16009707
 ] 

ASF GitHub Bot commented on FLINK-6535:
---

Github user StefanRRichter closed the pull request at:

https://github.com/apache/flink/pull/3870


> JobID should not be part of the registration key to the SharedStateRegistry
> ---
>
> Key: FLINK-6535
> URL: https://issues.apache.org/jira/browse/FLINK-6535
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Blocker
> Fix For: 1.3.0
>
>
> Currently, the job id is part of the registration key in the 
> {{SharedStateRegistry}}. I suggest to remove this part of the key because the 
> job id changes after a restart. So when we do not update job ids in the 
> registry, referencing shared state will fail for future checkpoints. When we 
> update it, we basically replace the information. So I think there would be no 
> value in addint this the the composite key.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3870: [Flink 6537] Fixes and improvements for incrementa...

2017-05-14 Thread StefanRRichter
Github user StefanRRichter closed the pull request at:

https://github.com/apache/flink/pull/3870


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3870: [Flink 6537] Fixes and improvements for incremental check...

2017-05-14 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/3870
  
Thanks @rmetzger ! Merged.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6569) flink-table KafkaJsonTableSource example doesn't work

2017-05-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6569?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16009685#comment-16009685
 ] 

ASF GitHub Bot commented on FLINK-6569:
---

Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/3890
  
Thank you for addressing this so quickly.
+1 from my side.


> flink-table KafkaJsonTableSource example doesn't work
> -
>
> Key: FLINK-6569
> URL: https://issues.apache.org/jira/browse/FLINK-6569
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation, Table API & SQL
>Affects Versions: 1.3.0
>Reporter: Robert Metzger
>Assignee: Haohui Mai
> Fix For: 1.3.0
>
>
> The code example uses 
> {code}
> TypeInformation typeInfo = Types.ROW(
>   new String[] { "id", "name", "score" },
>   new TypeInformation[] { Types.INT(), Types.STRING(), Types.DOUBLE() }
> );
> {code}
> the correct way of using it is something like
> {code}
> TypeInformation typeInfo = Types.ROW_NAMED(
> new String[] { "id", "zip", "date" },
> Types.LONG, Types.INT, Types.SQL_DATE);
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3890: [FLINK-6569] flink-table KafkaJsonTableSource example doe...

2017-05-14 Thread rmetzger
Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/3890
  
Thank you for addressing this so quickly.
+1 from my side.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3870: [Flink 6537] Fixes and improvements for incremental check...

2017-05-14 Thread rmetzger
Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/3870
  
+1 to merge


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6221) Add Prometheus support to metrics

2017-05-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6221?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16009658#comment-16009658
 ] 

ASF GitHub Bot commented on FLINK-6221:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3833
  
Yu can find an example on how to shade here: 
https://github.com/apache/flink/blob/master/flink-metrics/flink-metrics-datadog/pom.xml.

Shading dependencies in reporters/connectors has become a safety-precaution 
form our side.

It is not that unlikely that user-code contains the same dependencies. For 
one user-code also includes other reporters, so by the very existence of this 
reporter there is a precedent :) Besides that something that pops up from time 
to time on the mailing lists is users talking to REST endpoints in their 
functions/source/sinks, which may also rely on http-related dependencies.


> Add Prometheus support to metrics
> -
>
> Key: FLINK-6221
> URL: https://issues.apache.org/jira/browse/FLINK-6221
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.2.0
>Reporter: Joshua Griffith
>Assignee: Maximilian Bode
>Priority: Minor
>
> [Prometheus|https://prometheus.io/] is becoming popular for metrics and 
> alerting. It's possible to use 
> [statsd-exporter|https://github.com/prometheus/statsd_exporter] to load Flink 
> metrics into Prometheus but it would be far easier if Flink supported 
> Promethus as a metrics reporter. A [dropwizard 
> client|https://github.com/prometheus/client_java/tree/master/simpleclient_dropwizard]
>  exists that could be integrated into the existing metrics system.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3833: [FLINK-6221] Add PrometheusReporter

2017-05-14 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3833
  
Yu can find an example on how to shade here: 
https://github.com/apache/flink/blob/master/flink-metrics/flink-metrics-datadog/pom.xml.

Shading dependencies in reporters/connectors has become a safety-precaution 
form our side.

It is not that unlikely that user-code contains the same dependencies. For 
one user-code also includes other reporters, so by the very existence of this 
reporter there is a precedent :) Besides that something that pops up from time 
to time on the mailing lists is users talking to REST endpoints in their 
functions/source/sinks, which may also rely on http-related dependencies.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6221) Add Prometheus support to metrics

2017-05-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6221?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16009654#comment-16009654
 ] 

ASF GitHub Bot commented on FLINK-6221:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3833#discussion_r116377143
  
--- Diff: 
flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java
 ---
@@ -133,61 +136,58 @@ public void notifyOfRemovedMetric(final Metric 
metric, final String metricName,
collectorsByMetricName.remove(metricName);
}
 
+   @SuppressWarnings("unchecked")
+   private static String getLogicalScope(MetricGroup group) {
+   return ((FrontMetricGroup) 
group).getLogicalScope(CHARACTER_FILTER, SCOPE_SEPARATOR);
+   }
+
private Collector createGauge(final Gauge gauge, final String name, 
final String identifier, final List labelNames, final List 
labelValues) {
-   return io.prometheus.client.Gauge
-   .build()
-   .name(name)
-   .help(identifier)
-   .labelNames(toArray(labelNames, String.class))
-   .create()
-   .setChild(new io.prometheus.client.Gauge.Child() {
-   @Override
-   public double get() {
-   final Object value = gauge.getValue();
-   if (value instanceof Double) {
-   return (double) value;
-   }
-   if (value instanceof Number) {
-   return ((Number) 
value).doubleValue();
-   } else if (value instanceof Boolean) {
-   return ((Boolean) value) ? 1 : 
0;
-   } else {
-   log.debug("Invalid type for 
Gauge {}: {}, only number types and booleans are supported by this reporter.",
-   gauge, 
value.getClass().getName());
-   return 0;
-   }
+   return newGauge(name, identifier, labelNames, labelValues, new 
io.prometheus.client.Gauge.Child() {
+   @Override
+   public double get() {
+   final Object value = gauge.getValue();
+   if (value instanceof Double) {
+   return (double) value;
+   }
+   if (value instanceof Number) {
+   return ((Number) value).doubleValue();
+   } else if (value instanceof Boolean) {
+   return ((Boolean) value) ? 1 : 0;
+   } else {
+   LOG.debug("Invalid type for Gauge {}: 
{}, only number types and booleans are supported by this reporter.",
+   gauge, 
value.getClass().getName());
+   return 0;
}
-   }, toArray(labelValues, String.class));
+   }
+   });
}
 
private static Collector createGauge(final Counter counter, final 
String name, final String identifier, final List labelNames, final 
List labelValues) {
-   return io.prometheus.client.Gauge
-   .build()
-   .name(name)
-   .help(identifier)
-   .labelNames(toArray(labelNames, String.class))
-   .create()
-   .setChild(new io.prometheus.client.Gauge.Child() {
-   @Override
-   public double get() {
-   return (double) counter.getCount();
-   }
-   }, toArray(labelValues, String.class));
+   return newGauge(name, identifier, labelNames, labelValues, new 
io.prometheus.client.Gauge.Child() {
+   @Override
+   public double get() {
+   return (double) counter.getCount();
+   }
+   });
+   }
+
+   private Collector createGauge(final Meter meter, final String name, 
final String identifier, final List labelNames, final List 
labelValues) {
+   return newGauge(name + 

[GitHub] flink pull request #3833: [FLINK-6221] Add PrometheusReporter

2017-05-14 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3833#discussion_r116377143
  
--- Diff: 
flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java
 ---
@@ -133,61 +136,58 @@ public void notifyOfRemovedMetric(final Metric 
metric, final String metricName,
collectorsByMetricName.remove(metricName);
}
 
+   @SuppressWarnings("unchecked")
+   private static String getLogicalScope(MetricGroup group) {
+   return ((FrontMetricGroup) 
group).getLogicalScope(CHARACTER_FILTER, SCOPE_SEPARATOR);
+   }
+
private Collector createGauge(final Gauge gauge, final String name, 
final String identifier, final List labelNames, final List 
labelValues) {
-   return io.prometheus.client.Gauge
-   .build()
-   .name(name)
-   .help(identifier)
-   .labelNames(toArray(labelNames, String.class))
-   .create()
-   .setChild(new io.prometheus.client.Gauge.Child() {
-   @Override
-   public double get() {
-   final Object value = gauge.getValue();
-   if (value instanceof Double) {
-   return (double) value;
-   }
-   if (value instanceof Number) {
-   return ((Number) 
value).doubleValue();
-   } else if (value instanceof Boolean) {
-   return ((Boolean) value) ? 1 : 
0;
-   } else {
-   log.debug("Invalid type for 
Gauge {}: {}, only number types and booleans are supported by this reporter.",
-   gauge, 
value.getClass().getName());
-   return 0;
-   }
+   return newGauge(name, identifier, labelNames, labelValues, new 
io.prometheus.client.Gauge.Child() {
+   @Override
+   public double get() {
+   final Object value = gauge.getValue();
+   if (value instanceof Double) {
+   return (double) value;
+   }
+   if (value instanceof Number) {
+   return ((Number) value).doubleValue();
+   } else if (value instanceof Boolean) {
+   return ((Boolean) value) ? 1 : 0;
+   } else {
+   LOG.debug("Invalid type for Gauge {}: 
{}, only number types and booleans are supported by this reporter.",
+   gauge, 
value.getClass().getName());
+   return 0;
}
-   }, toArray(labelValues, String.class));
+   }
+   });
}
 
private static Collector createGauge(final Counter counter, final 
String name, final String identifier, final List labelNames, final 
List labelValues) {
-   return io.prometheus.client.Gauge
-   .build()
-   .name(name)
-   .help(identifier)
-   .labelNames(toArray(labelNames, String.class))
-   .create()
-   .setChild(new io.prometheus.client.Gauge.Child() {
-   @Override
-   public double get() {
-   return (double) counter.getCount();
-   }
-   }, toArray(labelValues, String.class));
+   return newGauge(name, identifier, labelNames, labelValues, new 
io.prometheus.client.Gauge.Child() {
+   @Override
+   public double get() {
+   return (double) counter.getCount();
+   }
+   });
+   }
+
+   private Collector createGauge(final Meter meter, final String name, 
final String identifier, final List labelNames, final List 
labelValues) {
+   return newGauge(name + "_rate", identifier, labelNames, 
labelValues, new io.prometheus.client.Gauge.Child() {
--- End diff --

Meter metrics should contain "rate" in their name already, so we don't have 
to append "_rate" here.


---
If your project is 

[jira] [Commented] (FLINK-6221) Add Prometheus support to metrics

2017-05-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6221?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16009649#comment-16009649
 ] 

ASF GitHub Bot commented on FLINK-6221:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3833#discussion_r116376926
  
--- Diff: 
flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java
 ---
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.prometheus;
+
+import com.google.common.base.CharMatcher;
+import com.google.common.collect.ImmutableList;
+import fi.iki.elonen.NanoHTTPD;
+import io.prometheus.client.Collector;
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.exporter.common.TextFormat;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.metrics.CharacterFilter;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.HistogramStatistics;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
+import org.apache.flink.runtime.metrics.groups.FrontMetricGroup;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+import static com.google.common.collect.Iterables.toArray;
+
+@PublicEvolving
+public class PrometheusReporter implements MetricReporter {
+   private static final Logger log = 
LoggerFactory.getLogger(PrometheusReporter.class);
+
+   static final String ARG_PORT = "port";
+   private static final intDEFAULT_PORT = 9249;
+
+   private static final Pattern UNALLOWED_CHAR_PATTERN = 
Pattern.compile("[^a-zA-Z0-9:_]");
+   private static final CharacterFilter CHARACTER_FILTER   = new 
CharacterFilter() {
+   @Override
+   public String filterCharacters(String input) {
+   return replaceInvalidChars(input);
+   }
+   };
+
+   private static final char SCOPE_SEPARATOR = '_';
+
+   private PrometheusEndpoint prometheusEndpoint;
+   private Map collectorsByMetricName = new HashMap<>();
+
+   @VisibleForTesting
+   static String replaceInvalidChars(final String input) {
+   // https://prometheus.io/docs/instrumenting/writing_exporters/
+   // Only [a-zA-Z0-9:_] are valid in metric names, any other 
characters should be sanitized to an underscore.
+   return UNALLOWED_CHAR_PATTERN.matcher(input).replaceAll("_");
+   }
+
+   @Override
+   public void open(MetricConfig config) {
+   int port = config.getInteger(ARG_PORT, DEFAULT_PORT);
+   log.info("Using port {}.", port);
+   prometheusEndpoint = new PrometheusEndpoint(port);
+   try {
+   prometheusEndpoint.start(NanoHTTPD.SOCKET_READ_TIMEOUT, 
true);
+   } catch (IOException e) {
+   log.error("Could not start PrometheusEndpoint on port " 
+ port, e);
+   }
+   }
+
+   @Override
+   public void close() {
+   prometheusEndpoint.stop();
+   CollectorRegistry.defaultRegistry.clear();
+   }
+
+   @Override
+   public void notifyOfAddedMetric(final Metric 

[GitHub] flink pull request #3833: [FLINK-6221] Add PrometheusReporter

2017-05-14 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3833#discussion_r116376926
  
--- Diff: 
flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java
 ---
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.prometheus;
+
+import com.google.common.base.CharMatcher;
+import com.google.common.collect.ImmutableList;
+import fi.iki.elonen.NanoHTTPD;
+import io.prometheus.client.Collector;
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.exporter.common.TextFormat;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.metrics.CharacterFilter;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.HistogramStatistics;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
+import org.apache.flink.runtime.metrics.groups.FrontMetricGroup;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+import static com.google.common.collect.Iterables.toArray;
+
+@PublicEvolving
+public class PrometheusReporter implements MetricReporter {
+   private static final Logger log = 
LoggerFactory.getLogger(PrometheusReporter.class);
+
+   static final String ARG_PORT = "port";
+   private static final intDEFAULT_PORT = 9249;
+
+   private static final Pattern UNALLOWED_CHAR_PATTERN = 
Pattern.compile("[^a-zA-Z0-9:_]");
+   private static final CharacterFilter CHARACTER_FILTER   = new 
CharacterFilter() {
+   @Override
+   public String filterCharacters(String input) {
+   return replaceInvalidChars(input);
+   }
+   };
+
+   private static final char SCOPE_SEPARATOR = '_';
+
+   private PrometheusEndpoint prometheusEndpoint;
+   private Map collectorsByMetricName = new HashMap<>();
+
+   @VisibleForTesting
+   static String replaceInvalidChars(final String input) {
+   // https://prometheus.io/docs/instrumenting/writing_exporters/
+   // Only [a-zA-Z0-9:_] are valid in metric names, any other 
characters should be sanitized to an underscore.
+   return UNALLOWED_CHAR_PATTERN.matcher(input).replaceAll("_");
+   }
+
+   @Override
+   public void open(MetricConfig config) {
+   int port = config.getInteger(ARG_PORT, DEFAULT_PORT);
+   log.info("Using port {}.", port);
+   prometheusEndpoint = new PrometheusEndpoint(port);
+   try {
+   prometheusEndpoint.start(NanoHTTPD.SOCKET_READ_TIMEOUT, 
true);
+   } catch (IOException e) {
+   log.error("Could not start PrometheusEndpoint on port " 
+ port, e);
+   }
+   }
+
+   @Override
+   public void close() {
+   prometheusEndpoint.stop();
+   CollectorRegistry.defaultRegistry.clear();
+   }
+
+   @Override
+   public void notifyOfAddedMetric(final Metric metric, final String 
metricName, final MetricGroup group) {
+   final String scope = 
((FrontMetricGroup) 
group).getLogicalScope(CHARACTER_FILTER, SCOPE_SEPARATOR);
+   List dimensionKeys = new 

[jira] [Commented] (FLINK-6221) Add Prometheus support to metrics

2017-05-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6221?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16009646#comment-16009646
 ] 

ASF GitHub Bot commented on FLINK-6221:
---

Github user mbode commented on the issue:

https://github.com/apache/flink/pull/3833
  
Okay, guava is not used anymore. I am not sure about the shading part. 
Would you expect either prometheus clients or nanohttpd to be used in Flink 
user code? Or are there other advantages to shading? If so, could you point me 
to a module I could copy the "Flink way of shading" from?


> Add Prometheus support to metrics
> -
>
> Key: FLINK-6221
> URL: https://issues.apache.org/jira/browse/FLINK-6221
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.2.0
>Reporter: Joshua Griffith
>Assignee: Maximilian Bode
>Priority: Minor
>
> [Prometheus|https://prometheus.io/] is becoming popular for metrics and 
> alerting. It's possible to use 
> [statsd-exporter|https://github.com/prometheus/statsd_exporter] to load Flink 
> metrics into Prometheus but it would be far easier if Flink supported 
> Promethus as a metrics reporter. A [dropwizard 
> client|https://github.com/prometheus/client_java/tree/master/simpleclient_dropwizard]
>  exists that could be integrated into the existing metrics system.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3833: [FLINK-6221] Add PrometheusReporter

2017-05-14 Thread mbode
Github user mbode commented on the issue:

https://github.com/apache/flink/pull/3833
  
Okay, guava is not used anymore. I am not sure about the shading part. 
Would you expect either prometheus clients or nanohttpd to be used in Flink 
user code? Or are there other advantages to shading? If so, could you point me 
to a module I could copy the "Flink way of shading" from?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6221) Add Prometheus support to metrics

2017-05-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6221?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16009643#comment-16009643
 ] 

ASF GitHub Bot commented on FLINK-6221:
---

Github user mbode commented on a diff in the pull request:

https://github.com/apache/flink/pull/3833#discussion_r116376242
  
--- Diff: 
flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java
 ---
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.prometheus;
+
+import com.google.common.base.CharMatcher;
+import com.google.common.collect.ImmutableList;
+import fi.iki.elonen.NanoHTTPD;
+import io.prometheus.client.Collector;
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.exporter.common.TextFormat;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.metrics.CharacterFilter;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.HistogramStatistics;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
+import org.apache.flink.runtime.metrics.groups.FrontMetricGroup;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+import static com.google.common.collect.Iterables.toArray;
+
+@PublicEvolving
+public class PrometheusReporter implements MetricReporter {
+   private static final Logger log = 
LoggerFactory.getLogger(PrometheusReporter.class);
+
+   static final String ARG_PORT = "port";
+   private static final intDEFAULT_PORT = 9249;
+
+   private static final Pattern UNALLOWED_CHAR_PATTERN = 
Pattern.compile("[^a-zA-Z0-9:_]");
+   private static final CharacterFilter CHARACTER_FILTER   = new 
CharacterFilter() {
+   @Override
+   public String filterCharacters(String input) {
+   return replaceInvalidChars(input);
+   }
+   };
+
+   private static final char SCOPE_SEPARATOR = '_';
+
+   private PrometheusEndpoint prometheusEndpoint;
+   private Map collectorsByMetricName = new HashMap<>();
+
+   @VisibleForTesting
+   static String replaceInvalidChars(final String input) {
+   // https://prometheus.io/docs/instrumenting/writing_exporters/
+   // Only [a-zA-Z0-9:_] are valid in metric names, any other 
characters should be sanitized to an underscore.
+   return UNALLOWED_CHAR_PATTERN.matcher(input).replaceAll("_");
+   }
+
+   @Override
+   public void open(MetricConfig config) {
+   int port = config.getInteger(ARG_PORT, DEFAULT_PORT);
+   log.info("Using port {}.", port);
+   prometheusEndpoint = new PrometheusEndpoint(port);
+   try {
+   prometheusEndpoint.start(NanoHTTPD.SOCKET_READ_TIMEOUT, 
true);
+   } catch (IOException e) {
+   log.error("Could not start PrometheusEndpoint on port " 
+ port, e);
+   }
+   }
+
+   @Override
+   public void close() {
+   prometheusEndpoint.stop();
+   CollectorRegistry.defaultRegistry.clear();
+   }
+
+   @Override
+   public void notifyOfAddedMetric(final Metric 

[GitHub] flink pull request #3833: [FLINK-6221] Add PrometheusReporter

2017-05-14 Thread mbode
Github user mbode commented on a diff in the pull request:

https://github.com/apache/flink/pull/3833#discussion_r116376242
  
--- Diff: 
flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java
 ---
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.prometheus;
+
+import com.google.common.base.CharMatcher;
+import com.google.common.collect.ImmutableList;
+import fi.iki.elonen.NanoHTTPD;
+import io.prometheus.client.Collector;
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.exporter.common.TextFormat;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.metrics.CharacterFilter;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.HistogramStatistics;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
+import org.apache.flink.runtime.metrics.groups.FrontMetricGroup;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+import static com.google.common.collect.Iterables.toArray;
+
+@PublicEvolving
+public class PrometheusReporter implements MetricReporter {
+   private static final Logger log = 
LoggerFactory.getLogger(PrometheusReporter.class);
+
+   static final String ARG_PORT = "port";
+   private static final intDEFAULT_PORT = 9249;
+
+   private static final Pattern UNALLOWED_CHAR_PATTERN = 
Pattern.compile("[^a-zA-Z0-9:_]");
+   private static final CharacterFilter CHARACTER_FILTER   = new 
CharacterFilter() {
+   @Override
+   public String filterCharacters(String input) {
+   return replaceInvalidChars(input);
+   }
+   };
+
+   private static final char SCOPE_SEPARATOR = '_';
+
+   private PrometheusEndpoint prometheusEndpoint;
+   private Map collectorsByMetricName = new HashMap<>();
+
+   @VisibleForTesting
+   static String replaceInvalidChars(final String input) {
+   // https://prometheus.io/docs/instrumenting/writing_exporters/
+   // Only [a-zA-Z0-9:_] are valid in metric names, any other 
characters should be sanitized to an underscore.
+   return UNALLOWED_CHAR_PATTERN.matcher(input).replaceAll("_");
+   }
+
+   @Override
+   public void open(MetricConfig config) {
+   int port = config.getInteger(ARG_PORT, DEFAULT_PORT);
+   log.info("Using port {}.", port);
+   prometheusEndpoint = new PrometheusEndpoint(port);
+   try {
+   prometheusEndpoint.start(NanoHTTPD.SOCKET_READ_TIMEOUT, 
true);
+   } catch (IOException e) {
+   log.error("Could not start PrometheusEndpoint on port " 
+ port, e);
+   }
+   }
+
+   @Override
+   public void close() {
+   prometheusEndpoint.stop();
+   CollectorRegistry.defaultRegistry.clear();
+   }
+
+   @Override
+   public void notifyOfAddedMetric(final Metric metric, final String 
metricName, final MetricGroup group) {
+   final String scope = 
((FrontMetricGroup) 
group).getLogicalScope(CHARACTER_FILTER, SCOPE_SEPARATOR);
+   List dimensionKeys = new 

[jira] [Commented] (FLINK-6221) Add Prometheus support to metrics

2017-05-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6221?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16009639#comment-16009639
 ] 

ASF GitHub Bot commented on FLINK-6221:
---

Github user mbode commented on a diff in the pull request:

https://github.com/apache/flink/pull/3833#discussion_r116375209
  
--- Diff: 
flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java
 ---
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.prometheus;
+
+import com.google.common.base.CharMatcher;
+import com.google.common.collect.ImmutableList;
+import fi.iki.elonen.NanoHTTPD;
+import io.prometheus.client.Collector;
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.exporter.common.TextFormat;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.metrics.CharacterFilter;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.HistogramStatistics;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
+import org.apache.flink.runtime.metrics.groups.FrontMetricGroup;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+import static com.google.common.collect.Iterables.toArray;
+
+@PublicEvolving
+public class PrometheusReporter implements MetricReporter {
+   private static final Logger log = 
LoggerFactory.getLogger(PrometheusReporter.class);
+
+   static final String ARG_PORT = "port";
+   private static final intDEFAULT_PORT = 9249;
+
+   private static final Pattern UNALLOWED_CHAR_PATTERN = 
Pattern.compile("[^a-zA-Z0-9:_]");
+   private static final CharacterFilter CHARACTER_FILTER   = new 
CharacterFilter() {
+   @Override
+   public String filterCharacters(String input) {
+   return replaceInvalidChars(input);
+   }
+   };
+
+   private static final char SCOPE_SEPARATOR = '_';
+
+   private PrometheusEndpoint prometheusEndpoint;
+   private Map collectorsByMetricName = new HashMap<>();
+
+   @VisibleForTesting
+   static String replaceInvalidChars(final String input) {
+   // https://prometheus.io/docs/instrumenting/writing_exporters/
+   // Only [a-zA-Z0-9:_] are valid in metric names, any other 
characters should be sanitized to an underscore.
+   return UNALLOWED_CHAR_PATTERN.matcher(input).replaceAll("_");
+   }
+
+   @Override
+   public void open(MetricConfig config) {
+   int port = config.getInteger(ARG_PORT, DEFAULT_PORT);
+   log.info("Using port {}.", port);
+   prometheusEndpoint = new PrometheusEndpoint(port);
+   try {
+   prometheusEndpoint.start(NanoHTTPD.SOCKET_READ_TIMEOUT, 
true);
+   } catch (IOException e) {
+   log.error("Could not start PrometheusEndpoint on port " 
+ port, e);
+   }
+   }
+
+   @Override
+   public void close() {
+   prometheusEndpoint.stop();
--- End diff --

I don't think so, `NanoHTTPD.stop()` seems to catch everything.


> Add Prometheus support to metrics
> 

[GitHub] flink pull request #3833: [FLINK-6221] Add PrometheusReporter

2017-05-14 Thread mbode
Github user mbode commented on a diff in the pull request:

https://github.com/apache/flink/pull/3833#discussion_r116375209
  
--- Diff: 
flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java
 ---
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.prometheus;
+
+import com.google.common.base.CharMatcher;
+import com.google.common.collect.ImmutableList;
+import fi.iki.elonen.NanoHTTPD;
+import io.prometheus.client.Collector;
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.exporter.common.TextFormat;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.metrics.CharacterFilter;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.HistogramStatistics;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
+import org.apache.flink.runtime.metrics.groups.FrontMetricGroup;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+import static com.google.common.collect.Iterables.toArray;
+
+@PublicEvolving
+public class PrometheusReporter implements MetricReporter {
+   private static final Logger log = 
LoggerFactory.getLogger(PrometheusReporter.class);
+
+   static final String ARG_PORT = "port";
+   private static final intDEFAULT_PORT = 9249;
+
+   private static final Pattern UNALLOWED_CHAR_PATTERN = 
Pattern.compile("[^a-zA-Z0-9:_]");
+   private static final CharacterFilter CHARACTER_FILTER   = new 
CharacterFilter() {
+   @Override
+   public String filterCharacters(String input) {
+   return replaceInvalidChars(input);
+   }
+   };
+
+   private static final char SCOPE_SEPARATOR = '_';
+
+   private PrometheusEndpoint prometheusEndpoint;
+   private Map collectorsByMetricName = new HashMap<>();
+
+   @VisibleForTesting
+   static String replaceInvalidChars(final String input) {
+   // https://prometheus.io/docs/instrumenting/writing_exporters/
+   // Only [a-zA-Z0-9:_] are valid in metric names, any other 
characters should be sanitized to an underscore.
+   return UNALLOWED_CHAR_PATTERN.matcher(input).replaceAll("_");
+   }
+
+   @Override
+   public void open(MetricConfig config) {
+   int port = config.getInteger(ARG_PORT, DEFAULT_PORT);
+   log.info("Using port {}.", port);
+   prometheusEndpoint = new PrometheusEndpoint(port);
+   try {
+   prometheusEndpoint.start(NanoHTTPD.SOCKET_READ_TIMEOUT, 
true);
+   } catch (IOException e) {
+   log.error("Could not start PrometheusEndpoint on port " 
+ port, e);
+   }
+   }
+
+   @Override
+   public void close() {
+   prometheusEndpoint.stop();
--- End diff --

I don't think so, `NanoHTTPD.stop()` seems to catch everything.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a