[jira] [Commented] (FLINK-5975) Mesos should support adding volumes to launched taskManagers

2017-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15923565#comment-15923565
 ] 

ASF GitHub Bot commented on FLINK-5975:
---

Github user EronWright commented on a diff in the pull request:

https://github.com/apache/flink/pull/3481#discussion_r105825035
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
 ---
@@ -259,9 +262,11 @@ public String toString() {
throw new IllegalStateException("unsupported 
container type");
}
if(containerInfo != null) {
+   containerInfo.addAllVolumes(params.containerVolumes());
--- End diff --

There's an unnecessary restriction here, that volumes may be used only if a 
container image is also used.   I bet you can use volumes with the Mesos 
containerizer without using an image. The code in the `case MESOS:` block 
should be reorganized to always set `containerInfo = 
Protos.ContainerInfo.newBuilder()`.  This way, `containerInfo` will never be 
null.


> Mesos should support adding volumes to launched taskManagers
> 
>
> Key: FLINK-5975
> URL: https://issues.apache.org/jira/browse/FLINK-5975
> Project: Flink
>  Issue Type: Improvement
>  Components: Mesos
>Affects Versions: 1.2.0, 1.3.0
>Reporter: Addison Higham
>Priority: Minor
>
> Flink needs access to shared storage.
> In many cases, this is HDFS, but it would be nice to also support file URIs 
> on an mounted NFS for example.
> Mesos exposes APIs for adding volumes, so it should be relatively simply to 
> add this.
> As an example, here is the spark code for supporting volumes: 
> https://github.com/apache/spark/blob/master/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala#L35
>  



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5975) Mesos should support adding volumes to launched taskManagers

2017-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15923564#comment-15923564
 ] 

ASF GitHub Bot commented on FLINK-5975:
---

Github user EronWright commented on a diff in the pull request:

https://github.com/apache/flink/pull/3481#discussion_r105826378
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
 ---
@@ -162,11 +182,65 @@ public static MesosTaskManagerParameters 
create(Configuration flinkConfig) {
throw new 
IllegalConfigurationException("invalid container type: " + containerTypeString);
}
 
+   Option containerVolOpt = 
Option.apply(flinkConfig.getString(MESOS_RM_CONTAINER_VOLUMES));
+   List containerVolumes = 
buildVolumes(containerVolOpt);
+
return new MesosTaskManagerParameters(
cpus,
containerType,
Option.apply(imageName),
-   containeredParameters);
+   containeredParameters,
+   containerVolumes);
+   }
+
+   /**
+* Used to build volume specs for mesos. This allows for mounting 
additional volumes into a container
+*
+* @param containerVolumes a comma delimited optional string of 
[host_path:]container_path[:RO|RW] that
+* defines mount points for a container volume. 
If None or empty string, returns
+* an empty iterator
+*/
+   public static List buildVolumes(Option 
containerVolumes) {
+   if (containerVolumes.isEmpty()) {
+   return new ArrayList();
+   }
+   String[] specs = containerVolumes.get().split(",");
+   List vols = new ArrayList();
+   for (String s : specs) {
+   if (s.trim().isEmpty()) {
+   continue;
+   }
+   Protos.Volume.Builder vol = Protos.Volume.newBuilder();
+   vol.setMode(Protos.Volume.Mode.RW);
+
+   String[] parts = s.split(":");
+   switch (parts.length) {
+   case 1:
+   vol.setContainerPath(parts[0]);
+   break;
+   case 2:
+   try {
+   Protos.Volume.Mode mode = 
Protos.Volume.Mode.valueOf(parts[1].trim().toUpperCase());
+   vol.setMode(mode)
+   
.setContainerPath(parts[0]);
+   } catch (IllegalArgumentException e) {
--- End diff --

I gather that this code treats a two-part spec as `container:mode` first, 
then falls back to `host:container`.   Just curious, is there some precedent 
for that?  I'm not sure it makes sense;  for example, `/data:ro` would create 
an empty read-only volume, but what good is that? 


> Mesos should support adding volumes to launched taskManagers
> 
>
> Key: FLINK-5975
> URL: https://issues.apache.org/jira/browse/FLINK-5975
> Project: Flink
>  Issue Type: Improvement
>  Components: Mesos
>Affects Versions: 1.2.0, 1.3.0
>Reporter: Addison Higham
>Priority: Minor
>
> Flink needs access to shared storage.
> In many cases, this is HDFS, but it would be nice to also support file URIs 
> on an mounted NFS for example.
> Mesos exposes APIs for adding volumes, so it should be relatively simply to 
> add this.
> As an example, here is the spark code for supporting volumes: 
> https://github.com/apache/spark/blob/master/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala#L35
>  



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3481: [FLINK-5975] Add volume support to flink-mesos

2017-03-13 Thread EronWright
Github user EronWright commented on a diff in the pull request:

https://github.com/apache/flink/pull/3481#discussion_r105826378
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
 ---
@@ -162,11 +182,65 @@ public static MesosTaskManagerParameters 
create(Configuration flinkConfig) {
throw new 
IllegalConfigurationException("invalid container type: " + containerTypeString);
}
 
+   Option containerVolOpt = 
Option.apply(flinkConfig.getString(MESOS_RM_CONTAINER_VOLUMES));
+   List containerVolumes = 
buildVolumes(containerVolOpt);
+
return new MesosTaskManagerParameters(
cpus,
containerType,
Option.apply(imageName),
-   containeredParameters);
+   containeredParameters,
+   containerVolumes);
+   }
+
+   /**
+* Used to build volume specs for mesos. This allows for mounting 
additional volumes into a container
+*
+* @param containerVolumes a comma delimited optional string of 
[host_path:]container_path[:RO|RW] that
+* defines mount points for a container volume. 
If None or empty string, returns
+* an empty iterator
+*/
+   public static List buildVolumes(Option 
containerVolumes) {
+   if (containerVolumes.isEmpty()) {
+   return new ArrayList();
+   }
+   String[] specs = containerVolumes.get().split(",");
+   List vols = new ArrayList();
+   for (String s : specs) {
+   if (s.trim().isEmpty()) {
+   continue;
+   }
+   Protos.Volume.Builder vol = Protos.Volume.newBuilder();
+   vol.setMode(Protos.Volume.Mode.RW);
+
+   String[] parts = s.split(":");
+   switch (parts.length) {
+   case 1:
+   vol.setContainerPath(parts[0]);
+   break;
+   case 2:
+   try {
+   Protos.Volume.Mode mode = 
Protos.Volume.Mode.valueOf(parts[1].trim().toUpperCase());
+   vol.setMode(mode)
+   
.setContainerPath(parts[0]);
+   } catch (IllegalArgumentException e) {
--- End diff --

I gather that this code treats a two-part spec as `container:mode` first, 
then falls back to `host:container`.   Just curious, is there some precedent 
for that?  I'm not sure it makes sense;  for example, `/data:ro` would create 
an empty read-only volume, but what good is that? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3481: [FLINK-5975] Add volume support to flink-mesos

2017-03-13 Thread EronWright
Github user EronWright commented on a diff in the pull request:

https://github.com/apache/flink/pull/3481#discussion_r105825035
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
 ---
@@ -259,9 +262,11 @@ public String toString() {
throw new IllegalStateException("unsupported 
container type");
}
if(containerInfo != null) {
+   containerInfo.addAllVolumes(params.containerVolumes());
--- End diff --

There's an unnecessary restriction here, that volumes may be used only if a 
container image is also used.   I bet you can use volumes with the Mesos 
containerizer without using an image. The code in the `case MESOS:` block 
should be reorganized to always set `containerInfo = 
Protos.ContainerInfo.newBuilder()`.  This way, `containerInfo` will never be 
null.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3467: [FLINK-4545] preparations for removing the network buffer...

2017-03-13 Thread zhijiangW
Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/3467
  
@NicoK ,thank you for explanation,  and I already trace the code in your 
local branch. Wish your further change commit in global pool.

@StephanEwen , thanks for further elaboration. From my understanding, each 
task can decide the core number of buffers in `LocalBufferPool` based on input, 
output channels and configuration, the maximum number of buffers based on 
`ResultPartitionType`. And all the `LocalBufferPool`s make effect on the total 
number of buffers in `NetworkBufferPool`, may need consider maximum memory 
usages.

And my concern is to consider the memory usages in `NetworkBufferPool` 
before starts the `TaskManager`, and this part of memory should be added into 
the total resource of `TaskManager`. 
I am willing to do that as a part of my current work in [Fine-grained 
Resource Configuration](https://issues.apache.org/jira/browse/FLINK-5131) after 
this feature completes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer

2017-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15923554#comment-15923554
 ] 

ASF GitHub Bot commented on FLINK-4545:
---

Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/3467
  
@NicoK ,thank you for explanation,  and I already trace the code in your 
local branch. Wish your further change commit in global pool.

@StephanEwen , thanks for further elaboration. From my understanding, each 
task can decide the core number of buffers in `LocalBufferPool` based on input, 
output channels and configuration, the maximum number of buffers based on 
`ResultPartitionType`. And all the `LocalBufferPool`s make effect on the total 
number of buffers in `NetworkBufferPool`, may need consider maximum memory 
usages.

And my concern is to consider the memory usages in `NetworkBufferPool` 
before starts the `TaskManager`, and this part of memory should be added into 
the total resource of `TaskManager`. 
I am willing to do that as a part of my current work in [Fine-grained 
Resource Configuration](https://issues.apache.org/jira/browse/FLINK-5131) after 
this feature completes.


> Flink automatically manages TM network buffer
> -
>
> Key: FLINK-4545
> URL: https://issues.apache.org/jira/browse/FLINK-4545
> Project: Flink
>  Issue Type: Wish
>  Components: Network
>Reporter: Zhenzhong Xu
>
> Currently, the number of network buffer per task manager is preconfigured and 
> the memory is pre-allocated through taskmanager.network.numberOfBuffers 
> config. In a Job DAG with shuffle phase, this number can go up very high 
> depends on the TM cluster size. The formula for calculating the buffer count 
> is documented here 
> (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers).
>   
> #slots-per-TM^2 * #TMs * 4
> In a standalone deployment, we may need to control the task manager cluster 
> size dynamically and then leverage the up-coming Flink feature to support 
> scaling job parallelism/rescaling at runtime. 
> If the buffer count config is static at runtime and cannot be changed without 
> restarting task manager process, this may add latency and complexity for 
> scaling process. I am wondering if there is already any discussion around 
> whether the network buffer should be automatically managed by Flink or at 
> least expose some API to allow it to be reconfigured. Let me know if there is 
> any existing JIRA that I should follow.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5756) When there are many values under the same key in ListState, RocksDBStateBackend performances poor

2017-03-13 Thread shijinkui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15923530#comment-15923530
 ] 

shijinkui commented on FLINK-5756:
--

[~StephanEwen] Thank for your reply. [~SyinchwunLeo] Test the mini-benchmark 
please.
FLINK-5715 is nice.

> When there are many values under the same key in ListState, 
> RocksDBStateBackend performances poor
> -
>
> Key: FLINK-5756
> URL: https://issues.apache.org/jira/browse/FLINK-5756
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.2.0
> Environment: CentOS 7.2
>Reporter: Syinchwun Leo
>
> When using RocksDB as the StateBackend, if there are many values under the 
> same key in ListState, the windowState.get() operator performances very poor. 
> I also the the RocksDB using version 4.11.2, the performance is also very 
> poor. The problem is likely to related to RocksDB itself's get() operator 
> after using merge(). The problem may influences the window operation's 
> performance when the size is very large using ListState. I try to merge 5 
> values under the same key in RocksDB, It costs 120 seconds to execute get() 
> operation.
> ///
> The flink's code is as follows:
> {code}
> class SEventSource extends RichSourceFunction [SEvent] {
>   private var count = 0L
>   private val alphabet = 
> "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWZYX0987654321"
>   override def run(sourceContext: SourceContext[SEvent]): Unit = {
> while (true) {
>   for (i <- 0 until 5000) {
> sourceContext.collect(SEvent(1, "hello-"+count, alphabet,1))
> count += 1L
>   }
>   Thread.sleep(1000)
> }
>   }
> }
> env.addSource(new SEventSource)
>   .assignTimestampsAndWatermarks(new 
> AssignerWithPeriodicWatermarks[SEvent] {
> override def getCurrentWatermark: Watermark = {
>   new Watermark(System.currentTimeMillis())
> }
> override def extractTimestamp(t: SEvent, l: Long): Long = {
>   System.currentTimeMillis()
> }
>   })
>   .keyBy(0)
>   .window(SlidingEventTimeWindows.of(Time.seconds(20), Time.seconds(2)))
>   .apply(new WindowStatistic)
>   .map(x => (System.currentTimeMillis(), x))
>   .print()
> {code}
> 
> The RocksDB Test code:
> {code}
> val stringAppendOperator = new StringAppendOperator
> val options = new Options()
> options.setCompactionStyle(CompactionStyle.LEVEL)
>   .setCompressionType(CompressionType.SNAPPY_COMPRESSION)
>   .setLevelCompactionDynamicLevelBytes(true)
>   .setIncreaseParallelism(4)
>   .setUseFsync(true)
>   .setMaxOpenFiles(-1)
>   .setCreateIfMissing(true)
>   .setMergeOperator(stringAppendOperator)
> val write_options = new WriteOptions
> write_options.setSync(false)
> val rocksDB = RocksDB.open(options, "/**/Data/")
> val key = "key"
> val value = 
> "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321"
> val beginmerge = System.currentTimeMillis()
> for(i <- 0 to 5) {
>   rocksDB.merge(key.getBytes(), ("s"+ i + value).getBytes())
>   //rocksDB.put(key.getBytes, value.getBytes)
> }
> println("finish")
> val begin = System.currentTimeMillis()
> rocksDB.get(key.getBytes)
> val end = System.currentTimeMillis()
> println("merge cost:" + (begin - beginmerge))
> println("Time consuming:" + (end - begin))
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3107: [FLINK-5441] [table] Directly allow SQL queries on a Tabl...

2017-03-13 Thread wuchong
Github user wuchong commented on the issue:

https://github.com/apache/flink/pull/3107
  
updated the documentation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3406: [flink-5568] [Table API & SQL]Introduce interface for cat...

2017-03-13 Thread beyond1920
Github user beyond1920 commented on the issue:

https://github.com/apache/flink/pull/3406
  
@fhueske , thanks for your review. I already updated the pr based on your 
comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5441) Directly allow SQL queries on a Table

2017-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5441?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15923509#comment-15923509
 ] 

ASF GitHub Bot commented on FLINK-5441:
---

Github user wuchong commented on the issue:

https://github.com/apache/flink/pull/3107
  
updated the documentation.


> Directly allow SQL queries on a Table
> -
>
> Key: FLINK-5441
> URL: https://issues.apache.org/jira/browse/FLINK-5441
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Jark Wu
>
> Right now a user has to register a table before it can be used in SQL 
> queries. In order to allow more fluent programming we propose calling SQL 
> directly on a table. An underscore can be used to reference the current table:
> {code}
> myTable.sql("SELECT a, b, c FROM _ WHERE d = 12")
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-6034) Add KeyedStateHandle for the snapshots in keyed streams

2017-03-13 Thread Xiaogang Shi (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6034?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiaogang Shi updated FLINK-6034:

Description: 
Currently, the only type of the snapshots in keyed streams is 
{{KeyGroupsStateHandle}} which is full and store the states one group after 
another. With the introduction of incremental checkpointing, we need a higher 
level abstraction of keyed snapshots to allow flexible snapshot formats. 

The implementation of {{KeyedStateHandle}} s may vary a lot in different 
backends. The only information needed in {{KeyedStateHandle}} s is their key 
group range. When recovering the job with a different degree of parallelism, 
{{KeyedStateHandle}} s will be assigned to those subtasks whose key group 
ranges overlap with their ranges.

  was:
Currently, the only type of the snapshots in keyed streams is 
{{KeyGroupsStateHandle}} which is full and store the states one group after 
another. With the introduction of incremental checkpointing, we need a higher 
level abstraction of keyed snapshots to allow flexible snapshot formats. 

The implementation of {{KeyedStateHandle}}s may vary a lot in different 
backends. The only information needed in {{KeyedStateHandle}}s is their key 
group range. When recovering the job with a different degree of parallelism, 
{{KeyedStateHandle}}s will be assigned to those subtasks whose key group ranges 
overlap with their ranges.


> Add KeyedStateHandle for the snapshots in keyed streams
> ---
>
> Key: FLINK-6034
> URL: https://issues.apache.org/jira/browse/FLINK-6034
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> Currently, the only type of the snapshots in keyed streams is 
> {{KeyGroupsStateHandle}} which is full and store the states one group after 
> another. With the introduction of incremental checkpointing, we need a higher 
> level abstraction of keyed snapshots to allow flexible snapshot formats. 
> The implementation of {{KeyedStateHandle}} s may vary a lot in different 
> backends. The only information needed in {{KeyedStateHandle}} s is their key 
> group range. When recovering the job with a different degree of parallelism, 
> {{KeyedStateHandle}} s will be assigned to those subtasks whose key group 
> ranges overlap with their ranges.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6034) Add KeyedStateHandle for the snapshots in keyed streams

2017-03-13 Thread Xiaogang Shi (JIRA)
Xiaogang Shi created FLINK-6034:
---

 Summary: Add KeyedStateHandle for the snapshots in keyed streams
 Key: FLINK-6034
 URL: https://issues.apache.org/jira/browse/FLINK-6034
 Project: Flink
  Issue Type: Sub-task
Reporter: Xiaogang Shi
Assignee: Xiaogang Shi


Currently, the only type of the snapshots in keyed streams is 
{{KeyGroupsStateHandle}} which is full and store the states one group after 
another. With the introduction of incremental checkpointing, we need a higher 
level abstraction of keyed snapshots to allow flexible snapshot formats. 

The implementation of {{KeyedStateHandle}}s may vary a lot in different 
backends. The only information needed in {{KeyedStateHandle}}s is their key 
group range. When recovering the job with a different degree of parallelism, 
{{KeyedStateHandle}}s will be assigned to those subtasks whose key group ranges 
overlap with their ranges.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule

2017-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15923433#comment-15923433
 ] 

ASF GitHub Bot commented on FLINK-3849:
---

Github user godfreyhe commented on a diff in the pull request:

https://github.com/apache/flink/pull/3520#discussion_r105812230
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/PushFilterIntoTableSourceScanRuleBase.scala
 ---
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.common
+
+import org.apache.calcite.plan.RelOptRuleCall
+import org.apache.calcite.rel.core.{Calc, TableScan}
+import org.apache.calcite.rex.RexProgram
+import org.apache.flink.table.plan.nodes.dataset.DataSetCalc
+import org.apache.flink.table.plan.schema.TableSourceTable
+import org.apache.flink.table.plan.util.RexProgramExtractor
+import org.apache.flink.table.sources.FilterableTableSource
+
+trait PushFilterIntoTableSourceScanRuleBase {
+
+  private[flink] def pushFilterIntoScan(
+  call: RelOptRuleCall,
+  calc: Calc,
+  scan: TableScan,
+  tableSourceTable: TableSourceTable[_],
+  filterableSource: FilterableTableSource,
+  description: String): Unit = {
+
+if (filterableSource.isFilterPushedDown) {
+  // The rule can get triggered again due to the transformed "scan => 
filter"
+  // sequence created by the earlier execution of this rule when we 
could not
+  // push all the conditions into the scan
+  return
+}
+
+val program = calc.getProgram
+val (predicates, unconvertedRexNodes) =
+  RexProgramExtractor.extractConjunctiveConditions(
+program,
+call.builder().getRexBuilder,
+tableSourceTable.tableEnv.getFunctionCatalog)
+if (predicates.isEmpty) {
+  // no condition can be translated to expression
+  return
+}
+
+// trying to apply filter push down, set the flag to true no matter 
whether
+// we actually push any filters down.
+filterableSource.setFilterPushedDown(true)
+val remainingPredicates = filterableSource.applyPredicate(predicates)
+
+// check whether framework still need to do a filter
+val relBuilder = call.builder()
+val remainingCondition = {
+  if (remainingPredicates.length > 0 || unconvertedRexNodes.length > 
0) {
--- End diff --

nonEmpty is better than length > 0, i think  


> Add FilterableTableSource interface and translation rule
> 
>
> Key: FLINK-3849
> URL: https://issues.apache.org/jira/browse/FLINK-3849
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Kurt Young
>
> Add a {{FilterableTableSource}} interface for {{TableSource}} implementations 
> which support filter push-down.
> The interface could look as follows
> {code}
> def trait FilterableTableSource {
>   // returns unsupported predicate expression
>   def setPredicate(predicate: Expression): Expression
> }
> {code}
> In addition we need Calcite rules to push a predicate (or parts of it) into a 
> TableScan that refers to a {{FilterableTableSource}}. We might need to tweak 
> the cost model as well to push the optimizer in the right direction.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule

2017-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15923432#comment-15923432
 ] 

ASF GitHub Bot commented on FLINK-3849:
---

Github user godfreyhe commented on a diff in the pull request:

https://github.com/apache/flink/pull/3520#discussion_r105811515
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
 ---
@@ -42,63 +41,40 @@ class DataSetCalc(
 traitSet: RelTraitSet,
 input: RelNode,
 rowRelDataType: RelDataType,
-private[flink] val calcProgram: RexProgram, // for tests
+calcProgram: RexProgram,
 ruleDescription: String)
-  extends SingleRel(cluster, traitSet, input)
+  extends Calc(cluster, traitSet, input, calcProgram)
   with CommonCalc
   with DataSetRel {
 
-  override def deriveRowType() = rowRelDataType
+  override def deriveRowType(): RelDataType = rowRelDataType
 
-  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
-new DataSetCalc(
-  cluster,
-  traitSet,
-  inputs.get(0),
-  getRowType,
-  calcProgram,
-  ruleDescription)
+  override def copy(traitSet: RelTraitSet, child: RelNode, program: 
RexProgram): Calc = {
+new DataSetCalc(cluster, traitSet, child, getRowType, program, 
ruleDescription)
   }
 
   override def toString: String = calcToString(calcProgram, 
getExpressionString)
 
   override def explainTerms(pw: RelWriter): RelWriter = {
-super.explainTerms(pw)
-  .item("select", selectionToString(calcProgram, getExpressionString))
-  .itemIf("where",
-conditionToString(calcProgram, getExpressionString),
-calcProgram.getCondition != null)
+pw.input("input", getInput)
+.item("select", selectionToString(calcProgram, 
getExpressionString))
--- End diff --

 two spaces for indention


> Add FilterableTableSource interface and translation rule
> 
>
> Key: FLINK-3849
> URL: https://issues.apache.org/jira/browse/FLINK-3849
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Kurt Young
>
> Add a {{FilterableTableSource}} interface for {{TableSource}} implementations 
> which support filter push-down.
> The interface could look as follows
> {code}
> def trait FilterableTableSource {
>   // returns unsupported predicate expression
>   def setPredicate(predicate: Expression): Expression
> }
> {code}
> In addition we need Calcite rules to push a predicate (or parts of it) into a 
> TableScan that refers to a {{FilterableTableSource}}. We might need to tweak 
> the cost model as well to push the optimizer in the right direction.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3520: [FLINK-3849] [table] Add FilterableTableSource int...

2017-03-13 Thread godfreyhe
Github user godfreyhe commented on a diff in the pull request:

https://github.com/apache/flink/pull/3520#discussion_r105811515
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
 ---
@@ -42,63 +41,40 @@ class DataSetCalc(
 traitSet: RelTraitSet,
 input: RelNode,
 rowRelDataType: RelDataType,
-private[flink] val calcProgram: RexProgram, // for tests
+calcProgram: RexProgram,
 ruleDescription: String)
-  extends SingleRel(cluster, traitSet, input)
+  extends Calc(cluster, traitSet, input, calcProgram)
   with CommonCalc
   with DataSetRel {
 
-  override def deriveRowType() = rowRelDataType
+  override def deriveRowType(): RelDataType = rowRelDataType
 
-  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
-new DataSetCalc(
-  cluster,
-  traitSet,
-  inputs.get(0),
-  getRowType,
-  calcProgram,
-  ruleDescription)
+  override def copy(traitSet: RelTraitSet, child: RelNode, program: 
RexProgram): Calc = {
+new DataSetCalc(cluster, traitSet, child, getRowType, program, 
ruleDescription)
   }
 
   override def toString: String = calcToString(calcProgram, 
getExpressionString)
 
   override def explainTerms(pw: RelWriter): RelWriter = {
-super.explainTerms(pw)
-  .item("select", selectionToString(calcProgram, getExpressionString))
-  .itemIf("where",
-conditionToString(calcProgram, getExpressionString),
-calcProgram.getCondition != null)
+pw.input("input", getInput)
+.item("select", selectionToString(calcProgram, 
getExpressionString))
--- End diff --

 two spaces for indention


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule

2017-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15923434#comment-15923434
 ] 

ASF GitHub Bot commented on FLINK-3849:
---

Github user godfreyhe commented on a diff in the pull request:

https://github.com/apache/flink/pull/3520#discussion_r105814436
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/PushFilterIntoTableSourceScanRuleBase.scala
 ---
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.common
+
+import org.apache.calcite.plan.RelOptRuleCall
+import org.apache.calcite.rel.core.{Calc, TableScan}
+import org.apache.calcite.rex.RexProgram
+import org.apache.flink.table.plan.nodes.dataset.DataSetCalc
+import org.apache.flink.table.plan.schema.TableSourceTable
+import org.apache.flink.table.plan.util.RexProgramExtractor
+import org.apache.flink.table.sources.FilterableTableSource
+
+trait PushFilterIntoTableSourceScanRuleBase {
+
+  private[flink] def pushFilterIntoScan(
+  call: RelOptRuleCall,
+  calc: Calc,
+  scan: TableScan,
+  tableSourceTable: TableSourceTable[_],
+  filterableSource: FilterableTableSource,
+  description: String): Unit = {
+
+if (filterableSource.isFilterPushedDown) {
--- End diff --

tableSource should do not share between TableScan instances


> Add FilterableTableSource interface and translation rule
> 
>
> Key: FLINK-3849
> URL: https://issues.apache.org/jira/browse/FLINK-3849
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Kurt Young
>
> Add a {{FilterableTableSource}} interface for {{TableSource}} implementations 
> which support filter push-down.
> The interface could look as follows
> {code}
> def trait FilterableTableSource {
>   // returns unsupported predicate expression
>   def setPredicate(predicate: Expression): Expression
> }
> {code}
> In addition we need Calcite rules to push a predicate (or parts of it) into a 
> TableScan that refers to a {{FilterableTableSource}}. We might need to tweak 
> the cost model as well to push the optimizer in the right direction.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3520: [FLINK-3849] [table] Add FilterableTableSource int...

2017-03-13 Thread godfreyhe
Github user godfreyhe commented on a diff in the pull request:

https://github.com/apache/flink/pull/3520#discussion_r105814436
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/PushFilterIntoTableSourceScanRuleBase.scala
 ---
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.common
+
+import org.apache.calcite.plan.RelOptRuleCall
+import org.apache.calcite.rel.core.{Calc, TableScan}
+import org.apache.calcite.rex.RexProgram
+import org.apache.flink.table.plan.nodes.dataset.DataSetCalc
+import org.apache.flink.table.plan.schema.TableSourceTable
+import org.apache.flink.table.plan.util.RexProgramExtractor
+import org.apache.flink.table.sources.FilterableTableSource
+
+trait PushFilterIntoTableSourceScanRuleBase {
+
+  private[flink] def pushFilterIntoScan(
+  call: RelOptRuleCall,
+  calc: Calc,
+  scan: TableScan,
+  tableSourceTable: TableSourceTable[_],
+  filterableSource: FilterableTableSource,
+  description: String): Unit = {
+
+if (filterableSource.isFilterPushedDown) {
--- End diff --

tableSource should do not share between TableScan instances


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3520: [FLINK-3849] [table] Add FilterableTableSource int...

2017-03-13 Thread godfreyhe
Github user godfreyhe commented on a diff in the pull request:

https://github.com/apache/flink/pull/3520#discussion_r105812230
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/PushFilterIntoTableSourceScanRuleBase.scala
 ---
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.common
+
+import org.apache.calcite.plan.RelOptRuleCall
+import org.apache.calcite.rel.core.{Calc, TableScan}
+import org.apache.calcite.rex.RexProgram
+import org.apache.flink.table.plan.nodes.dataset.DataSetCalc
+import org.apache.flink.table.plan.schema.TableSourceTable
+import org.apache.flink.table.plan.util.RexProgramExtractor
+import org.apache.flink.table.sources.FilterableTableSource
+
+trait PushFilterIntoTableSourceScanRuleBase {
+
+  private[flink] def pushFilterIntoScan(
+  call: RelOptRuleCall,
+  calc: Calc,
+  scan: TableScan,
+  tableSourceTable: TableSourceTable[_],
+  filterableSource: FilterableTableSource,
+  description: String): Unit = {
+
+if (filterableSource.isFilterPushedDown) {
+  // The rule can get triggered again due to the transformed "scan => 
filter"
+  // sequence created by the earlier execution of this rule when we 
could not
+  // push all the conditions into the scan
+  return
+}
+
+val program = calc.getProgram
+val (predicates, unconvertedRexNodes) =
+  RexProgramExtractor.extractConjunctiveConditions(
+program,
+call.builder().getRexBuilder,
+tableSourceTable.tableEnv.getFunctionCatalog)
+if (predicates.isEmpty) {
+  // no condition can be translated to expression
+  return
+}
+
+// trying to apply filter push down, set the flag to true no matter 
whether
+// we actually push any filters down.
+filterableSource.setFilterPushedDown(true)
+val remainingPredicates = filterableSource.applyPredicate(predicates)
+
+// check whether framework still need to do a filter
+val relBuilder = call.builder()
+val remainingCondition = {
+  if (remainingPredicates.length > 0 || unconvertedRexNodes.length > 
0) {
--- End diff --

nonEmpty is better than length > 0, i think  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5134) Aggregate ResourceSpec for chained operators when generating job graph

2017-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15922949#comment-15922949
 ] 

ASF GitHub Bot commented on FLINK-5134:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3455
  
Thanks, looks good. Will merge this once the CI builds have passed!


> Aggregate ResourceSpec for chained operators when generating job graph
> --
>
> Key: FLINK-5134
> URL: https://issues.apache.org/jira/browse/FLINK-5134
> Project: Flink
>  Issue Type: Sub-task
>  Components: DataStream API
>Reporter: zhijiang
>Assignee: zhijiang
>
> This is a part of fine-grained resource configuration in flip-6.
> In *JobGraph* generation, each *JobVertex* may contain a series of chained 
> operators, and the resource of *JobVertex* should be aggregation of 
> individual resource in chained operators.
> For memory resource in *JobVertex*, the aggregation is the sum formula for 
> chained operators, and for cpu cores resource, the aggregation is the maximum 
> formula for chained operators.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3455: [FLINK-5134] [runtime] [FLIP-6] Aggregate ResourceSpec fo...

2017-03-13 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3455
  
Thanks, looks good. Will merge this once the CI builds have passed!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3503: [FLINK-5995][checkpoints] fix Get a Exception when creati...

2017-03-13 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3503
  
Looks good. I would actually like to extend the test by one more, just to 
make sure that the proper `ExecutionConfig` is passed (which is not currently 
tested - the execution config in the test is null).

A good way to do that would be to create a state of some non-standard type 
(like `File`) which goes to Kryo and then check that Kryo has the proper 
registration from the execution config.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5995) Get a Exception when creating the ListStateDescriptor with a TypeInformation

2017-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15922937#comment-15922937
 ] 

ASF GitHub Bot commented on FLINK-5995:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3503
  
Looks good. I would actually like to extend the test by one more, just to 
make sure that the proper `ExecutionConfig` is passed (which is not currently 
tested - the execution config in the test is null).

A good way to do that would be to create a state of some non-standard type 
(like `File`) which goes to Kryo and then check that Kryo has the proper 
registration from the execution config.


> Get a Exception when creating the ListStateDescriptor with a TypeInformation 
> -
>
> Key: FLINK-5995
> URL: https://issues.apache.org/jira/browse/FLINK-5995
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, State Backends, Checkpointing
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> When use OperatorState and creating the ListStateDescriptor with a 
> TypeInformation,I got a exception. The Exception info is:
> {code}
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:915)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:858)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:858)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.IllegalStateException: Serializer not yet initialized.
>   at 
> org.apache.flink.api.common.state.StateDescriptor.getSerializer(StateDescriptor.java:169)
>   at 
> org.apache.flink.api.common.state.ListStateDescriptor.getElementSerializer(ListStateDescriptor.java:93)
>   at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.getOperatorState(DefaultOperatorStateBackend.java:110)
>   at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.getOperatorState(DefaultOperatorStateBackend.java:91)
>   at 
> org.apache.flink.table.runtime.aggregate.UnboundedNonPartitionedProcessingOverProcessFunction.initializeState(UnboundedNonPartitionedProcessingOverProcessFunction.scala:104)
>   at 
> org.apache.flink.streaming.api.functions.util.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>   at 
> org.apache.flink.streaming.api.functions.util.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:106)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:242)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:681)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:669)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:251)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:670)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> So, I'll add `stateDescriptor.initializeSerializerUnlessSet()` call in the 
> `getOperatorState` method. I appreciate If anyone can give me some advice?



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6033) Support UNNEST query in the stream SQL API

2017-03-13 Thread Haohui Mai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15922923#comment-15922923
 ] 

Haohui Mai commented on FLINK-6033:
---

Discussed offline with [~fhueske] -- the proposed approach is to build on top 
of the user-define table function supported introduced in FLINK-4469, by adding 
a rule to transform the {{UNNEST}} keyword into the {{explode()}} function.

> Support UNNEST query in the stream SQL API
> --
>
> Key: FLINK-6033
> URL: https://issues.apache.org/jira/browse/FLINK-6033
> Project: Flink
>  Issue Type: Improvement
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> It would be nice to support the {{UNNEST}} keyword in the stream SQL API. 
> The keyword is widely used in queries that relate to nested fields.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (FLINK-5756) When there are many values under the same key in ListState, RocksDBStateBackend performances poor

2017-03-13 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15922909#comment-15922909
 ] 

Stephan Ewen edited comment on FLINK-5756 at 3/13/17 8:52 PM:
--

Thanks for opening this and sharing the test results.
I agree that the performance of RocksDB is not optimal and that we would like 
to get better performance out of the state backend. In general, RocksDB is 
heavily optimized for writes and for small values. Larger values (as you get 
with the merge) perform very bad.

Here are a few things we can do and have already started doing:

*Improve the other state backends*

  - We are currently making the in-memory state backend (object data) much 
stronger, with async snapshots (see FLINK-5715 )
  - It makes sense to eventually build an own state backend that operators on 
serialized data with managed memory

*Optimize the RocksDB State Backend*

  - We can try an avoid RocksDB's merge operation and instead use range 
iterators for ListState.
  - Quick benchmark of the same task in that approach gives *91ms* insert time 
and *35ms* get() time. That looks like worth exploring.

*Code for range-iterator mini-benchmark*
{code}
final File rocksDir = new File("/tmp/rdb");
FileUtils.deleteDirectory(rocksDir);

final Options options = new Options()
.setCompactionStyle(CompactionStyle.LEVEL)
.setLevelCompactionDynamicLevelBytes(true)
.setIncreaseParallelism(4)
.setUseFsync(false)
.setMaxOpenFiles(-1)
.setAllowOsBuffer(true)
.setDisableDataSync(true)
.setCreateIfMissing(true)
.setMergeOperator(new StringAppendOperator());

final WriteOptions write_options = new WriteOptions()
.setSync(false)
.setDisableWAL(true);

final RocksDB rocksDB = RocksDB.open(options, 
rocksDir.getAbsolutePath());

final String key = "key";
final String value = 
"abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321";

final byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8);
final byte[] valueBytes = 
value.getBytes(StandardCharsets.UTF_8);

final byte[] keyTemplate = Arrays.copyOf(keyBytes, 
keyBytes.length + 4);

final Unsafe unsafe = MemoryUtils.UNSAFE;
final long offset = unsafe.arrayBaseOffset(byte[].class) + 
keyTemplate.length - 4;

final int num = 5;
System.out.println("begin insert");

final long beginInsert = System.nanoTime();
for (int i = 0; i < num; i++) {
unsafe.putInt(keyTemplate, offset, i);
rocksDB.put(write_options, keyTemplate, valueBytes);
}
final long endInsert = System.nanoTime();
System.out.println("end insert - duration: " + ((endInsert - 
beginInsert) / 1_000_000) + " ms");

final byte[] resultHolder = new byte[num * valueBytes.length];

final long beginGet = System.nanoTime();

final RocksIterator iterator = rocksDB.newIterator();
int pos = 0;

// seek to start
unsafe.putInt(keyTemplate, offset, 0);
iterator.seek(keyTemplate);

// mark end
unsafe.putInt(keyTemplate, offset, -1);

// iterate
while (iterator.isValid()) {
byte[] currKey = iterator.key();
if (sameKey(keyBytes, currKey)) {
byte[] currValue = iterator.value();
System.arraycopy(currValue, 0, resultHolder, 
pos, currValue.length);
pos += currValue.length;
iterator.next();
}
else {
break;
}
}

final long endGet = System.nanoTime();

System.out.println("end get - duration: " + ((endGet - 
beginGet) / 1_000_000) + " ms");
{code}


was (Author: stephanewen):
Thanks for opening this and sharing the test results.
I agree that the performance of RocksDB is not optimal and that we would like 
to get better performance out of the state backend. In general, RocksDB is 
heavily optimized for writes and for small values. Larger values (as you get 
with the merge) perform very bad.

Here are a few things we can do 

[jira] [Comment Edited] (FLINK-5756) When there are many values under the same key in ListState, RocksDBStateBackend performances poor

2017-03-13 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15922909#comment-15922909
 ] 

Stephan Ewen edited comment on FLINK-5756 at 3/13/17 8:52 PM:
--

Thanks for opening this and sharing the test results.
I agree that the performance of RocksDB is not optimal and that we would like 
to get better performance out of the state backend. In general, RocksDB is 
heavily optimized for writes and for small values. Larger values (as you get 
with the merge) perform very bad.

Here are a few things we can do and have already started doing:

*Improve the other state backends*

  - We are currently making the in-memory state backend (object data) much 
stronger, with async snapshots (see FLINK-5715 )
  - It makes sense to eventually build an own state backend that operators on 
serialized data with managed memory

*Optimize the RocksDB State Backend*

  - We can try an avoid RocksDB's merge operation and instead use range 
iterators for ListState.
  - Quick benchmark of the same task in that approach gives *91ms* insert time 
and *35ms* {{get()}} time. That looks like worth exploring.

*Code for range-iterator mini-benchmark*
{code}
final File rocksDir = new File("/tmp/rdb");
FileUtils.deleteDirectory(rocksDir);

final Options options = new Options()
.setCompactionStyle(CompactionStyle.LEVEL)
.setLevelCompactionDynamicLevelBytes(true)
.setIncreaseParallelism(4)
.setUseFsync(false)
.setMaxOpenFiles(-1)
.setAllowOsBuffer(true)
.setDisableDataSync(true)
.setCreateIfMissing(true)
.setMergeOperator(new StringAppendOperator());

final WriteOptions write_options = new WriteOptions()
.setSync(false)
.setDisableWAL(true);

final RocksDB rocksDB = RocksDB.open(options, 
rocksDir.getAbsolutePath());

final String key = "key";
final String value = 
"abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321";

final byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8);
final byte[] valueBytes = 
value.getBytes(StandardCharsets.UTF_8);

final byte[] keyTemplate = Arrays.copyOf(keyBytes, 
keyBytes.length + 4);

final Unsafe unsafe = MemoryUtils.UNSAFE;
final long offset = unsafe.arrayBaseOffset(byte[].class) + 
keyTemplate.length - 4;

final int num = 5;
System.out.println("begin insert");

final long beginInsert = System.nanoTime();
for (int i = 0; i < num; i++) {
unsafe.putInt(keyTemplate, offset, i);
rocksDB.put(write_options, keyTemplate, valueBytes);
}
final long endInsert = System.nanoTime();
System.out.println("end insert - duration: " + ((endInsert - 
beginInsert) / 1_000_000) + " ms");

final byte[] resultHolder = new byte[num * valueBytes.length];

final long beginGet = System.nanoTime();

final RocksIterator iterator = rocksDB.newIterator();
int pos = 0;

// seek to start
unsafe.putInt(keyTemplate, offset, 0);
iterator.seek(keyTemplate);

// mark end
unsafe.putInt(keyTemplate, offset, -1);

// iterate
while (iterator.isValid()) {
byte[] currKey = iterator.key();
if (sameKey(keyBytes, currKey)) {
byte[] currValue = iterator.value();
System.arraycopy(currValue, 0, resultHolder, 
pos, currValue.length);
pos += currValue.length;
iterator.next();
}
else {
break;
}
}

final long endGet = System.nanoTime();

System.out.println("end get - duration: " + ((endGet - 
beginGet) / 1_000_000) + " ms");
{code}


was (Author: stephanewen):
Thanks for opening this and sharing the test results.
I agree that the performance of RocksDB is not optimal and that we would like 
to get better performance out of the state backend. In general, RocksDB is 
heavily optimized for writes and for small values. Larger values (as you get 
with the merge) perform very bad.

Here are a few things we can 

[jira] [Commented] (FLINK-5756) When there are many values under the same key in ListState, RocksDBStateBackend performances poor

2017-03-13 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15922909#comment-15922909
 ] 

Stephan Ewen commented on FLINK-5756:
-

Thanks for opening this and sharing the test results.
I agree that the performance of RocksDB is not optimal and that we would like 
to get better performance out of the state backend. In general, RocksDB is 
heavily optimized for writes and for small values. Larger values (as you get 
with the merge) perform very bad.

Here are a few things we can do and have already started doing:

*Improve the other state backends*

  - We are currently making the in-memory state backend (object data) much 
stronger, with async snapshots (see FLINK-5715 )
  - It makes sense to eventually build an own state backend that operators on 
serialized data with managed memory

*Optimize the RocksDB State Backend*

  - We can try an avoid RocksDB's merge operation and instead use range 
iterators for ListState.
  - Quick benchmark of the same task in that approach gives *91ms* insert time 
and *35ms* get() time. That looks like worth exploring.


*A tip to improve your benchmark*
  - Try to move all string operations out of the test loop. Prepare the bytes 
before and then call the RocksDB functions.
  - I redid the benchmark with the code below and it took *20* seconds to get 
the result of a merge. Still a lot of time...


*Code for range-iterator mini-benchmark*
{code}
final File rocksDir = new File("/tmp/rdb");
FileUtils.deleteDirectory(rocksDir);

final Options options = new Options()
.setCompactionStyle(CompactionStyle.LEVEL)
.setLevelCompactionDynamicLevelBytes(true)
.setIncreaseParallelism(4)
.setUseFsync(false)
.setMaxOpenFiles(-1)
.setAllowOsBuffer(true)
.setDisableDataSync(true)
.setCreateIfMissing(true)
.setMergeOperator(new StringAppendOperator());

final WriteOptions write_options = new WriteOptions()
.setSync(false)
.setDisableWAL(true);

final RocksDB rocksDB = RocksDB.open(options, 
rocksDir.getAbsolutePath());

final String key = "key";
final String value = 
"abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321";

final byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8);
final byte[] valueBytes = 
value.getBytes(StandardCharsets.UTF_8);

final byte[] keyTemplate = Arrays.copyOf(keyBytes, 
keyBytes.length + 4);

final Unsafe unsafe = MemoryUtils.UNSAFE;
final long offset = unsafe.arrayBaseOffset(byte[].class) + 
keyTemplate.length - 4;

final int num = 5;
System.out.println("begin insert");

final long beginInsert = System.nanoTime();
for (int i = 0; i < num; i++) {
unsafe.putInt(keyTemplate, offset, i);
rocksDB.put(write_options, keyTemplate, valueBytes);
}
final long endInsert = System.nanoTime();
System.out.println("end insert - duration: " + ((endInsert - 
beginInsert) / 1_000_000) + " ms");

final byte[] resultHolder = new byte[num * valueBytes.length];

final long beginGet = System.nanoTime();

final RocksIterator iterator = rocksDB.newIterator();
int pos = 0;

// seek to start
unsafe.putInt(keyTemplate, offset, 0);
iterator.seek(keyTemplate);

// mark end
unsafe.putInt(keyTemplate, offset, -1);

// iterate
while (iterator.isValid()) {
byte[] currKey = iterator.key();
if (sameKey(keyBytes, currKey)) {
byte[] currValue = iterator.value();
System.arraycopy(currValue, 0, resultHolder, 
pos, currValue.length);
pos += currValue.length;
iterator.next();
}
else {
break;
}
}

final long endGet = System.nanoTime();

System.out.println("end get - duration: " + ((endGet - 
beginGet) / 1_000_000) + " ms");
{code}

> When there are many values under the same key in ListState, 
> RocksDBStateBackend performances poor
> 

[jira] [Updated] (FLINK-6033) Support UNNEST query in the stream SQL API

2017-03-13 Thread Haohui Mai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6033?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Haohui Mai updated FLINK-6033:
--
Summary: Support UNNEST query in the stream SQL API  (was: Support UNNEST 
query in the stream SQL api)

> Support UNNEST query in the stream SQL API
> --
>
> Key: FLINK-6033
> URL: https://issues.apache.org/jira/browse/FLINK-6033
> Project: Flink
>  Issue Type: Improvement
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> It would be nice to support the {{UNNEST}} keyword in the stream SQL API. 
> The keyword is widely used in queries that relate to nested fields.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6033) Support UNNEST query in the stream SQL api

2017-03-13 Thread Haohui Mai (JIRA)
Haohui Mai created FLINK-6033:
-

 Summary: Support UNNEST query in the stream SQL api
 Key: FLINK-6033
 URL: https://issues.apache.org/jira/browse/FLINK-6033
 Project: Flink
  Issue Type: Improvement
Reporter: Haohui Mai
Assignee: Haohui Mai


It would be nice to support the {{UNNEST}} keyword in the stream SQL API. 
The keyword is widely used in queries that relate to nested fields.




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction

2017-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15922878#comment-15922878
 ] 

ASF GitHub Bot commented on FLINK-5929:
---

Github user sjwiesman commented on a diff in the pull request:

https://github.com/apache/flink/pull/3479#discussion_r105763162
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessAllWindowFunction.java
 ---
@@ -53,22 +57,47 @@ public InternalAggregateProcessAllWindowFunction(
}
 
@Override
-   public void apply(Byte key, final W window, Iterable input, 
Collector out) throws Exception {
+   public void open(Configuration parameters) throws Exception {
+   super.open(parameters);
ProcessAllWindowFunction wrappedFunction = 
this.wrappedFunction;
-   ProcessAllWindowFunction.Context context = 
wrappedFunction.new Context() {
-   @Override
-   public W window() {
-   return window;
-   }
-   };
+   this.ctx = new 
InternalProcessAllWindowContext<>(wrappedFunction);
+   }
 
+   @Override
+   public void process(Byte aByte, final W window, final 
InternalWindowContext context, Iterable input, Collector out) throws 
Exception {
final ACC acc = aggFunction.createAccumulator();
 
for (T val : input) {
aggFunction.add(val, acc);
}
 
-   wrappedFunction.process(context, 
Collections.singletonList(aggFunction.getResult(acc)), out);
+   this.ctx.window = window;
+   this.ctx.internalContext = context;
+   ProcessAllWindowFunction wrappedFunction = 
this.wrappedFunction;
+   wrappedFunction.process(ctx, 
Collections.singletonList(aggFunction.getResult(acc)), out);
+   }
+
+   @Override
+   public void clear(final W window, final InternalWindowContext context) 
throws Exception {
+   ProcessAllWindowFunction wrappedFunction = 
this.wrappedFunction;
+   final ProcessAllWindowFunction.Context ctx = 
wrappedFunction.new Context() {
--- End diff --

whoops  


> Allow Access to Per-Window State in ProcessWindowFunction
> -
>
> Key: FLINK-5929
> URL: https://issues.apache.org/jira/browse/FLINK-5929
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Aljoscha Krettek
>
> Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} 
> can access is scoped to the key of the window but not the window itself. That 
> is, state is global across all windows for a given key.
> For some use cases it is beneficial to keep state scoped to a window. For 
> example, if you expect to have several {{Trigger}} firings (due to early and 
> late firings) a user can keep state per window to keep some information 
> between those firings.
> The per-window state has to be cleaned up in some way. For this I see two 
> options:
>  - Keep track of all state that a user uses and clean up when we reach the 
> window GC horizon.
>  - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called 
> when we reach the window GC horizon that users can/should use to clean up 
> their state.
> On the API side, we can add a method {{windowState()}} on 
> {{ProcessWindowFunction.Context}} that retrieves the per-window state and 
> {{globalState()}} that would allow access to the (already available) global 
> state. The {{Context}} would then look like this:
> {code}
> /**
>  * The context holding window metadata
>  */
> public abstract class Context {
> /**
>  * @return The window that is being evaluated.
>  */
> public abstract W window();
> /**
>  * State accessor for per-key and per-window state.
>  */
> KeyedStateStore windowState();
> /**
>  * State accessor for per-key global state.
>  */
> KeyedStateStore globalState();
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3479: [FLINK-5929] Allow Access to Per-Window State in P...

2017-03-13 Thread sjwiesman
Github user sjwiesman commented on a diff in the pull request:

https://github.com/apache/flink/pull/3479#discussion_r105763162
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessAllWindowFunction.java
 ---
@@ -53,22 +57,47 @@ public InternalAggregateProcessAllWindowFunction(
}
 
@Override
-   public void apply(Byte key, final W window, Iterable input, 
Collector out) throws Exception {
+   public void open(Configuration parameters) throws Exception {
+   super.open(parameters);
ProcessAllWindowFunction wrappedFunction = 
this.wrappedFunction;
-   ProcessAllWindowFunction.Context context = 
wrappedFunction.new Context() {
-   @Override
-   public W window() {
-   return window;
-   }
-   };
+   this.ctx = new 
InternalProcessAllWindowContext<>(wrappedFunction);
+   }
 
+   @Override
+   public void process(Byte aByte, final W window, final 
InternalWindowContext context, Iterable input, Collector out) throws 
Exception {
final ACC acc = aggFunction.createAccumulator();
 
for (T val : input) {
aggFunction.add(val, acc);
}
 
-   wrappedFunction.process(context, 
Collections.singletonList(aggFunction.getResult(acc)), out);
+   this.ctx.window = window;
+   this.ctx.internalContext = context;
+   ProcessAllWindowFunction wrappedFunction = 
this.wrappedFunction;
+   wrappedFunction.process(ctx, 
Collections.singletonList(aggFunction.getResult(acc)), out);
+   }
+
+   @Override
+   public void clear(final W window, final InternalWindowContext context) 
throws Exception {
+   ProcessAllWindowFunction wrappedFunction = 
this.wrappedFunction;
+   final ProcessAllWindowFunction.Context ctx = 
wrappedFunction.new Context() {
--- End diff --

whoops 😱 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2814) DeltaIteration: DualInputPlanNode cannot be cast to SingleInputPlanNode

2017-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2814?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15922875#comment-15922875
 ] 

ASF GitHub Bot commented on FLINK-2814:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2029
  
@greghogan I think you are right with your suggestion. Would be great if 
you can create an alternate fix.


> DeltaIteration: DualInputPlanNode cannot be cast to SingleInputPlanNode
> ---
>
> Key: FLINK-2814
> URL: https://issues.apache.org/jira/browse/FLINK-2814
> Project: Flink
>  Issue Type: Bug
>  Components: Optimizer
>Affects Versions: 0.10.0
>Reporter: Greg Hogan
>Assignee: Rekha Joshi
>
> A delta iteration that closes with a solution set which is a {{JoinOperator}} 
> throws the following exception:
> {noformat}
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error.
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:444)
>   at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:345)
>   at org.apache.flink.client.program.Client.runBlocking(Client.java:289)
>   at 
> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:669)
>   at org.apache.flink.client.CliFrontend.run(CliFrontend.java:324)
>   at 
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:969)
>   at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1019)
> Caused by: java.lang.ClassCastException: 
> org.apache.flink.optimizer.plan.DualInputPlanNode cannot be cast to 
> org.apache.flink.optimizer.plan.SingleInputPlanNode
>   at 
> org.apache.flink.optimizer.dag.WorksetIterationNode.instantiate(WorksetIterationNode.java:432)
>   at 
> org.apache.flink.optimizer.dag.TwoInputNode.addLocalCandidates(TwoInputNode.java:557)
>   at 
> org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:478)
>   at 
> org.apache.flink.optimizer.dag.DataSinkNode.getAlternativePlans(DataSinkNode.java:204)
>   at 
> org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:309)
>   at 
> org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:308)
>   at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:500)
>   at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:402)
>   at 
> org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:271)
>   at 
> org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:543)
>   at org.apache.flink.client.program.Client.runBlocking(Client.java:350)
>   at 
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:64)
>   at 
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:796)
>   at org.apache.flink.api.java.DataSet.collect(DataSet.java:424)
>   at org.apache.flink.api.java.DataSet.print(DataSet.java:1365)
>   at Driver.main(Driver.java:366)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:497)
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:429)
>   ... 6 more
> {noformat}
> Temporary fix is to attach an identity mapper.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #2029: [FLINK-2814] Fix for DualInputPlanNode cannot be cast to ...

2017-03-13 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2029
  
@greghogan I think you are right with your suggestion. Would be great if 
you can create an alternate fix.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5090) Expose optionally detailed metrics about network queue lengths

2017-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15922825#comment-15922825
 ] 

ASF GitHub Bot commented on FLINK-5090:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3348#discussion_r105753418
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateMetrics.java
 ---
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+public class InputGateMetrics {
+
+   private final SingleInputGate inputGate;
+
+   private long lastTotal = -1;
+
+   private int lastMin = -1;
+
+   private int lastMax = -1;
+
+   private float lastAvg = -1.0f;
+
+   // 

+
+   private InputGateMetrics(SingleInputGate inputGate) {
+   this.inputGate = checkNotNull(inputGate);
+   }
+
+   // 

+
+   // these methods are package private to make access from the nested 
classes faster 
+
+   long refreshAndGetTotal() {
+   long total;
+   if ((total = lastTotal) == -1) {
+   refresh();
--- End diff --

Custom objects can't be displayed properly in the web interface since we 
call ```toString()``` on it. The same happens in most reporters; so this isn't 
really an option.

As it stands we don't have a single metrics that is guaranteed to be 100% 
consistent with other metrics. numRecordsOut and numBytesOut to not descriibe 
the same moment in time. Neither is this guaranteed for the checkpoint metrics; 
while these are updated all at once (from the outside), there is no mechanism 
that prevents this update in the middle of a report.

I don't know a lot about the network stack; so whether it is truly 
necessary to have all metrics describe one point in time I can't say.

If this is necessary the only way i can think of right now is abusing the 
View metric type. View's are meant an add-on for metrics that want to be 
updated in regular intervals (5 seconds) regardless of when their value is 
actually requested. A metric that only implements the View interface is never 
reported, but still updated, so you could have this view update a shared 
data-structure from which the other gauges simply retrieve the current value,

If this is not necessary i would simply separate them and don't worry about 
the performance overhead of the metrics; as long as this doesn't affect the job 
via taking locks or similar.



> Expose optionally detailed metrics about network queue lengths
> --
>
> Key: FLINK-5090
> URL: https://issues.apache.org/jira/browse/FLINK-5090
> Project: Flink
>  Issue Type: New Feature
>  Components: Metrics, Network
>Affects Versions: 1.1.3
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>
> For debugging purposes, it is important to have access to more detailed 
> metrics about the length of network input and output queues.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3348: [FLINK-5090] [network] Add metrics for details abo...

2017-03-13 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3348#discussion_r105753418
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateMetrics.java
 ---
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+public class InputGateMetrics {
+
+   private final SingleInputGate inputGate;
+
+   private long lastTotal = -1;
+
+   private int lastMin = -1;
+
+   private int lastMax = -1;
+
+   private float lastAvg = -1.0f;
+
+   // 

+
+   private InputGateMetrics(SingleInputGate inputGate) {
+   this.inputGate = checkNotNull(inputGate);
+   }
+
+   // 

+
+   // these methods are package private to make access from the nested 
classes faster 
+
+   long refreshAndGetTotal() {
+   long total;
+   if ((total = lastTotal) == -1) {
+   refresh();
--- End diff --

Custom objects can't be displayed properly in the web interface since we 
call ```toString()``` on it. The same happens in most reporters; so this isn't 
really an option.

As it stands we don't have a single metrics that is guaranteed to be 100% 
consistent with other metrics. numRecordsOut and numBytesOut to not descriibe 
the same moment in time. Neither is this guaranteed for the checkpoint metrics; 
while these are updated all at once (from the outside), there is no mechanism 
that prevents this update in the middle of a report.

I don't know a lot about the network stack; so whether it is truly 
necessary to have all metrics describe one point in time I can't say.

If this is necessary the only way i can think of right now is abusing the 
View metric type. View's are meant an add-on for metrics that want to be 
updated in regular intervals (5 seconds) regardless of when their value is 
actually requested. A metric that only implements the View interface is never 
reported, but still updated, so you could have this view update a shared 
data-structure from which the other gauges simply retrieve the current value,

If this is not necessary i would simply separate them and don't worry about 
the performance overhead of the metrics; as long as this doesn't affect the job 
via taking locks or similar.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6022) Improve support for Avro GenericRecord

2017-03-13 Thread Billy Newport (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15922810#comment-15922810
 ] 

Billy Newport commented on FLINK-6022:
--

Thanks for this Robert. Basically to help with the ones we've implemented, we'd 
need a way of registering our schema objects on the ExecutionConfig and then 
looking them up on deserialization or a one off call when the ExecutionConfig 
is inflated would work also. To be honest, we'd just need a way of registering 
a map of serializable state on the ExecutionConfig. That would be all we would 
need at least.

We are a little different than most I think in that we deal exclusively with 
GenericRecords with predeclared schemas, no code gened POJOs at all. We've 
kicked off the internal process of contributing so hopefully myself or Regina 
Chan (also here) can help contribute to this.

> Improve support for Avro GenericRecord
> --
>
> Key: FLINK-6022
> URL: https://issues.apache.org/jira/browse/FLINK-6022
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Robert Metzger
>
> Currently, Flink is serializing the schema for each Avro GenericRecord in the 
> stream.
> This leads to a lot of overhead over the wire/disk + high serialization costs.
> Therefore, I'm proposing to improve the support for GenericRecord in Flink by 
> shipping the schema to each serializer  through the AvroTypeInformation.
> Then, we can only support GenericRecords with the same type per stream, but 
> the performance will be much better.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6032) CEP-Clean up the operator state when not needed.

2017-03-13 Thread Kostas Kloudas (JIRA)
Kostas Kloudas created FLINK-6032:
-

 Summary: CEP-Clean up the operator state when not needed.
 Key: FLINK-6032
 URL: https://issues.apache.org/jira/browse/FLINK-6032
 Project: Flink
  Issue Type: Bug
  Components: CEP
Affects Versions: 1.3.0
Reporter: Kostas Kloudas
Assignee: Kostas Kloudas
 Fix For: 1.3.0






--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-5756) When there are many values under the same key in ListState, RocksDBStateBackend performances poor

2017-03-13 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5756?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen updated FLINK-5756:

Description: 
When using RocksDB as the StateBackend, if there are many values under the same 
key in ListState, the windowState.get() operator performances very poor. I also 
the the RocksDB using version 4.11.2, the performance is also very poor. The 
problem is likely to related to RocksDB itself's get() operator after using 
merge(). The problem may influences the window operation's performance when the 
size is very large using ListState. I try to merge 5 values under the same 
key in RocksDB, It costs 120 seconds to execute get() operation.

///
The flink's code is as follows:

{code}
class SEventSource extends RichSourceFunction [SEvent] {

  private var count = 0L

  private val alphabet = 
"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWZYX0987654321"
  override def run(sourceContext: SourceContext[SEvent]): Unit = {
while (true) {
  for (i <- 0 until 5000) {
sourceContext.collect(SEvent(1, "hello-"+count, alphabet,1))
count += 1L
  }
  Thread.sleep(1000)
}
  }
}

env.addSource(new SEventSource)
  .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[SEvent] 
{
override def getCurrentWatermark: Watermark = {
  new Watermark(System.currentTimeMillis())
}

override def extractTimestamp(t: SEvent, l: Long): Long = {
  System.currentTimeMillis()
}
  })
  .keyBy(0)
  .window(SlidingEventTimeWindows.of(Time.seconds(20), Time.seconds(2)))
  .apply(new WindowStatistic)
  .map(x => (System.currentTimeMillis(), x))
  .print()
{code}


The RocksDB Test code:

{code}
val stringAppendOperator = new StringAppendOperator
val options = new Options()
options.setCompactionStyle(CompactionStyle.LEVEL)
  .setCompressionType(CompressionType.SNAPPY_COMPRESSION)
  .setLevelCompactionDynamicLevelBytes(true)
  .setIncreaseParallelism(4)
  .setUseFsync(true)
  .setMaxOpenFiles(-1)
  .setCreateIfMissing(true)
  .setMergeOperator(stringAppendOperator)

val write_options = new WriteOptions
write_options.setSync(false)

val rocksDB = RocksDB.open(options, "/**/Data/")
val key = "key"
val value = 
"abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321"

val beginmerge = System.currentTimeMillis()
for(i <- 0 to 5) {
  rocksDB.merge(key.getBytes(), ("s"+ i + value).getBytes())
  //rocksDB.put(key.getBytes, value.getBytes)
}
println("finish")

val begin = System.currentTimeMillis()
rocksDB.get(key.getBytes)
val end = System.currentTimeMillis()

println("merge cost:" + (begin - beginmerge))
println("Time consuming:" + (end - begin))
  }
}
{code}

  was:
When using RocksDB as the StateBackend, if there are many values under the same 
key in ListState, the windowState.get() operator performances very poor. I also 
the the RocksDB using version 4.11.2, the performance is also very poor. The 
problem is likely to related to RocksDB itself's get() operator after using 
merge(). The problem may influences the window operation's performance when the 
size is very large using ListState. I try to merge 5 values under the same 
key in RocksDB, It costs 120 seconds to execute get() operation.

///
The flink's code is as follows:

class SEventSource extends RichSourceFunction [SEvent] {

  private var count = 0L

  private val alphabet = 
"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWZYX0987654321"
  override def run(sourceContext: SourceContext[SEvent]): Unit = {
while (true) {
  for (i <- 0 until 5000) {
sourceContext.collect(SEvent(1, "hello-"+count, alphabet,1))
count += 1L
  }
  Thread.sleep(1000)
}
  }
}

env.addSource(new SEventSource)
  .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[SEvent] 
{
override def getCurrentWatermark: Watermark = {
  new Watermark(System.currentTimeMillis())
}

override def extractTimestamp(t: SEvent, l: Long): Long = {
  System.currentTimeMillis()
}
  })
  .keyBy(0)
  .window(SlidingEventTimeWindows.of(Time.seconds(20), Time.seconds(2)))
  .apply(new WindowStatistic)
  .map(x => (System.currentTimeMillis(), x))
  .print()


The RocksDB Test code:

val stringAppendOperator = new StringAppendOperator
val options = new Options()

[jira] [Commented] (FLINK-5975) Mesos should support adding volumes to launched taskManagers

2017-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15922776#comment-15922776
 ] 

ASF GitHub Bot commented on FLINK-5975:
---

Github user addisonj commented on the issue:

https://github.com/apache/flink/pull/3481
  
@EronWright @zentol minor bump on this... any other steps to get this on 
the path to being merged?
Don't want to let this hang out for too long so I forgot about it :)




> Mesos should support adding volumes to launched taskManagers
> 
>
> Key: FLINK-5975
> URL: https://issues.apache.org/jira/browse/FLINK-5975
> Project: Flink
>  Issue Type: Improvement
>  Components: Mesos
>Affects Versions: 1.2.0, 1.3.0
>Reporter: Addison Higham
>Priority: Minor
>
> Flink needs access to shared storage.
> In many cases, this is HDFS, but it would be nice to also support file URIs 
> on an mounted NFS for example.
> Mesos exposes APIs for adding volumes, so it should be relatively simply to 
> add this.
> As an example, here is the spark code for supporting volumes: 
> https://github.com/apache/spark/blob/master/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala#L35
>  



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3481: [FLINK-5975] Add volume support to flink-mesos

2017-03-13 Thread addisonj
Github user addisonj commented on the issue:

https://github.com/apache/flink/pull/3481
  
@EronWright @zentol minor bump on this... any other steps to get this on 
the path to being merged?
Don't want to let this hang out for too long so I forgot about it :)




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Closed] (FLINK-6007) ConcurrentModificationException in WatermarkCallbackService

2017-03-13 Thread Kostas Kloudas (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6007?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kostas Kloudas closed FLINK-6007.
-
Resolution: Fixed

> ConcurrentModificationException in WatermarkCallbackService
> ---
>
> Key: FLINK-6007
> URL: https://issues.apache.org/jira/browse/FLINK-6007
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.3.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Blocker
> Fix For: 1.3.0
>
>
> Currently, if an attempt is made to call 
> {{InternalWatermarkCallbackService.unregisterKeyFromWatermarkCallback()}} 
> from within the {{OnWatermarkCallback}}, a 
> {{ConcurrentModificationException}} is thrown. The reason is that the 
> {{invokeOnWatermarkCallback}} iterates over the list of keys and calls the 
> callback for each one of them.
> To fix this, the deleted keys are put into a separate list, and the deletion 
> happens after the iteration over all keys has finished.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6007) ConcurrentModificationException in WatermarkCallbackService

2017-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6007?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15922767#comment-15922767
 ] 

ASF GitHub Bot commented on FLINK-6007:
---

Github user kl0u closed the pull request at:

https://github.com/apache/flink/pull/3514


> ConcurrentModificationException in WatermarkCallbackService
> ---
>
> Key: FLINK-6007
> URL: https://issues.apache.org/jira/browse/FLINK-6007
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.3.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Blocker
> Fix For: 1.3.0
>
>
> Currently, if an attempt is made to call 
> {{InternalWatermarkCallbackService.unregisterKeyFromWatermarkCallback()}} 
> from within the {{OnWatermarkCallback}}, a 
> {{ConcurrentModificationException}} is thrown. The reason is that the 
> {{invokeOnWatermarkCallback}} iterates over the list of keys and calls the 
> callback for each one of them.
> To fix this, the deleted keys are put into a separate list, and the deletion 
> happens after the iteration over all keys has finished.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6007) ConcurrentModificationException in WatermarkCallbackService

2017-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6007?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15922766#comment-15922766
 ] 

ASF GitHub Bot commented on FLINK-6007:
---

Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/3514
  
Merged at 14c1941d8eaa583eb8f7eeb5478e605850c0d355


> ConcurrentModificationException in WatermarkCallbackService
> ---
>
> Key: FLINK-6007
> URL: https://issues.apache.org/jira/browse/FLINK-6007
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.3.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Blocker
> Fix For: 1.3.0
>
>
> Currently, if an attempt is made to call 
> {{InternalWatermarkCallbackService.unregisterKeyFromWatermarkCallback()}} 
> from within the {{OnWatermarkCallback}}, a 
> {{ConcurrentModificationException}} is thrown. The reason is that the 
> {{invokeOnWatermarkCallback}} iterates over the list of keys and calls the 
> callback for each one of them.
> To fix this, the deleted keys are put into a separate list, and the deletion 
> happens after the iteration over all keys has finished.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3514: [FLINK-6007] Allow key removal from within the watermark ...

2017-03-13 Thread kl0u
Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/3514
  
Merged at 14c1941d8eaa583eb8f7eeb5478e605850c0d355


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3514: [FLINK-6007] Allow key removal from within the wat...

2017-03-13 Thread kl0u
Github user kl0u closed the pull request at:

https://github.com/apache/flink/pull/3514


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6007) ConcurrentModificationException in WatermarkCallbackService

2017-03-13 Thread Kostas Kloudas (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6007?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15922763#comment-15922763
 ] 

Kostas Kloudas commented on FLINK-6007:
---

Merged at 14c1941d8eaa583eb8f7eeb5478e605850c0d355

> ConcurrentModificationException in WatermarkCallbackService
> ---
>
> Key: FLINK-6007
> URL: https://issues.apache.org/jira/browse/FLINK-6007
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.3.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Blocker
> Fix For: 1.3.0
>
>
> Currently, if an attempt is made to call 
> {{InternalWatermarkCallbackService.unregisterKeyFromWatermarkCallback()}} 
> from within the {{OnWatermarkCallback}}, a 
> {{ConcurrentModificationException}} is thrown. The reason is that the 
> {{invokeOnWatermarkCallback}} iterates over the list of keys and calls the 
> callback for each one of them.
> To fix this, the deleted keys are put into a separate list, and the deletion 
> happens after the iteration over all keys has finished.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3348: [FLINK-5090] [network] Add metrics for details abo...

2017-03-13 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/3348#discussion_r105742696
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateMetrics.java
 ---
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+public class InputGateMetrics {
+
+   private final SingleInputGate inputGate;
+
+   private long lastTotal = -1;
+
+   private int lastMin = -1;
+
+   private int lastMax = -1;
+
+   private float lastAvg = -1.0f;
+
+   // 

+
+   private InputGateMetrics(SingleInputGate inputGate) {
+   this.inputGate = checkNotNull(inputGate);
+   }
+
+   // 

+
+   // these methods are package private to make access from the nested 
classes faster 
+
+   long refreshAndGetTotal() {
+   long total;
+   if ((total = lastTotal) == -1) {
+   refresh();
--- End diff --

I guess the main reason for this was performance in case all (up to) 4 
metrics are requested. What's the preferred way of exposing these kind of 
metrics? Should I gather all 4 in a custom object and enclose that one in a 
Gauge? How could the metrics then be displayed, e.g. in the web interface?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6026) Return type of flatMap with lambda function not correctly resolved

2017-03-13 Thread Luke Hutchison (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15922732#comment-15922732
 ] 

Luke Hutchison commented on FLINK-6026:
---

Makes sense, I wondered if that may be the issue here. 

I guess the real question is why the type system is not able to pass 
information from the local variable definition across two chained calls, when 
it can unify the types between the single call and its nested lambda. The type 
info is still present in the final local variable type declaration, even in the 
chained case. 

I guess the way to fix this is to file a feature request for The Java and/or 
Eclipse compiler to increase their maximum type propagation depth?

> Return type of flatMap with lambda function not correctly resolved
> --
>
> Key: FLINK-6026
> URL: https://issues.apache.org/jira/browse/FLINK-6026
> Project: Flink
>  Issue Type: Bug
>  Components: Core, DataSet API, DataStream API
>Affects Versions: 1.2.0
>Reporter: Luke Hutchison
>Priority: Minor
>
> I get an error if I try naming a flatMap operation:
> {code}
> DataSet> y = x.flatMap((t, out) -> 
> out.collect(t)).name("op");
> {code}
> Type mismatch: cannot convert from 
> FlatMapOperator,Object> to 
> DataSet>
> If I try to do it as two steps, I get the error that DataSet does not have a 
> .name(String) method:
> {code}
> DataSet> y = x.flatMap((t, out) -> out.collect(t));
> y.name("op");
> {code}
> If I use Eclipse type inference on x, it shows me that the output type is not 
> correctly inferred:
> {code}
> FlatMapOperator, Object> y = x.flatMap((t, out) -> 
> out.collect(t));
> y.name("op");   // This now works, but "Object" is not the output type
> {code}
> However, these steps still cannot be chained -- the following still gives an 
> error:
> {code}
> FlatMapOperator, Object> y = x.flatMap((t, out) -> 
> out.collect(t)).name("op");
> {code}
> i.e. first you have to assign the result to a field, so that the type is 
> fully specified; then you can name the operation.
> And the weird thing is that you can give the correct, more specific type for 
> the local variable, without a type narrowing error:
> {code}
> FlatMapOperator, Tuple2> y = 
> x.flatMap((t, out) -> out.collect(t));
> y.name("op");   // This works, although chaining these two lines still does 
> not work
> {code}
> If the types of the lambda args are specified, then everything works:
> {code}
> DataSet> y = x.flatMap((Tuple2 t, 
> Collector> out) -> out.collect(t)).name("op");
> {code}
> So, at least two things are going on here:
> (1) type inference is not working correctly for the lambda parameters
> (2) this breaks type inference for intermediate expressions, unless the type 
> can be resolved using a local variable definition
> Is this a bug in the type signature of flatMap? (Or a compiler bug or 
> limitation, or a fundamental limitation of Java 8 type inference?)
> It seems odd that the type of a local variable definition can make the result 
> of the flatMap operator *more* specific, taking the type from 
> {code}
> FlatMapOperator, Object>
> {code}
> to 
> {code}
> FlatMapOperator, Tuple2>
> {code}
> i.e. if the output type is provided in the local variable definition, it is 
> properly unified with the type of the parameter t of collect(t), however that 
> type is not propagated out of that call.
> Can anything be done about this in Flink? I have hit this problem a few times.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5090) Expose optionally detailed metrics about network queue lengths

2017-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15922726#comment-15922726
 ] 

ASF GitHub Bot commented on FLINK-5090:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/3348#discussion_r105742696
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateMetrics.java
 ---
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+public class InputGateMetrics {
+
+   private final SingleInputGate inputGate;
+
+   private long lastTotal = -1;
+
+   private int lastMin = -1;
+
+   private int lastMax = -1;
+
+   private float lastAvg = -1.0f;
+
+   // 

+
+   private InputGateMetrics(SingleInputGate inputGate) {
+   this.inputGate = checkNotNull(inputGate);
+   }
+
+   // 

+
+   // these methods are package private to make access from the nested 
classes faster 
+
+   long refreshAndGetTotal() {
+   long total;
+   if ((total = lastTotal) == -1) {
+   refresh();
--- End diff --

I guess the main reason for this was performance in case all (up to) 4 
metrics are requested. What's the preferred way of exposing these kind of 
metrics? Should I gather all 4 in a custom object and enclose that one in a 
Gauge? How could the metrics then be displayed, e.g. in the web interface?


> Expose optionally detailed metrics about network queue lengths
> --
>
> Key: FLINK-5090
> URL: https://issues.apache.org/jira/browse/FLINK-5090
> Project: Flink
>  Issue Type: New Feature
>  Components: Metrics, Network
>Affects Versions: 1.1.3
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>
> For debugging purposes, it is important to have access to more detailed 
> metrics about the length of network input and output queues.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6024) Need more fine-grained info for "InvalidProgramException: This type (...) cannot be used as key"

2017-03-13 Thread Luke Hutchison (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6024?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15922699#comment-15922699
 ] 

Luke Hutchison commented on FLINK-6024:
---

This was for the batch API (DataSet).

More generally, why should the requirements be different here for batch and 
streaming APIs? I have come across other differences too between the APIs that 
do not seem to be related to limitations of one of the processing modes. 
Shouldn't more be shared between the two APIs? 

> Need more fine-grained info for "InvalidProgramException: This type (...) 
> cannot be used as key"
> 
>
> Key: FLINK-6024
> URL: https://issues.apache.org/jira/browse/FLINK-6024
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.2.0
>Reporter: Luke Hutchison
>
> I got this very confusing exception:
> {noformat}
> InvalidProgramException: This type (MyType) cannot be used as key
> {noformat}
> I dug through the code, and could not find what was causing this. The help 
> text for type.isKeyType(), in Keys.java:329, right before the exception is 
> thrown, says: "Checks whether this type can be used as a key. As a bare 
> minimum, types have to be hashable and comparable to be keys." However, this 
> didn't solve the problem.
> I discovered that in my case, the error was occurring because I added a new 
> constructor to the type, and I didn't have a default constructor. This is 
> probably quite a common thing to happen for POJOs, so the error message 
> should give some detail saying that this is the problem.
> Other things that can cause this to fail, including that the class is not 
> public, or the constructor is not public, or the key field is not public, or 
> that the key field is not a serializable type, or the key is not Comparable, 
> or the key is not hashable, should be given in the error message instead, 
> depending on the actual cause of the problem.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3040: [FLINK-3850] Add forward field annotations to DataSet

2017-03-13 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/3040
  
Thanks for the update @tonycox. I haven't looked at the new changes yet but 
just commented on the join. Please let me know if you have further questions. 

Thanks, Fabian


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3040: [FLINK-3850] Add forward field annotations to Data...

2017-03-13 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3040#discussion_r105731937
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
 ---
@@ -199,10 +200,21 @@ class DataSetJoin(
 
 val joinOpName = s"where: ($joinConditionToString), join: 
($joinSelectionToString)"
 
+//consider all fields not which are not keys are forwarded
+val leftIndices = (0 until 
left.getRowType.getFieldCount).diff(leftKeys)
--- End diff --

A Calcite join forwards all fields of both sides. If the left input is 
`(l1, l2, l3)` and the right input is `(r1, r2)`, then the result of the join 
will be `(l1, l2, l3, r1, r2)` for all pairs of left and right that satisfy the 
join condition. It does not matter which of the fields is a key field. If the 
join condition is `l1 == r2`, both fields are forwarded. Since DataSet joins 
organize the input data sets based on the key attributes (partition and sort) 
this attributes are especially interesting for forward field annotations.

Actually, I just noticed that we have to distinguish the type of the join 
(inner, left, right, full). We can only forward the fields of the inner side 
(both for inner join, left for left join, right for right join, none for full 
outer join) because the outer side might have been padded with `null` values. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3850) Add forward field annotations to DataSet operators generated by the Table API

2017-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15917780#comment-15917780
 ] 

ASF GitHub Bot commented on FLINK-3850:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/3040
  
Thanks for the update @tonycox. I haven't looked at the new changes yet but 
just commented on the join. Please let me know if you have further questions. 

Thanks, Fabian


> Add forward field annotations to DataSet operators generated by the Table API
> -
>
> Key: FLINK-3850
> URL: https://issues.apache.org/jira/browse/FLINK-3850
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Nikolay Vasilishin
>
> The DataSet API features semantic annotations [1] to hint the optimizer which 
> input fields an operator copies. This information is valuable for the 
> optimizer because it can infer that certain physical properties such as 
> partitioning or sorting are not destroyed by user functions and thus generate 
> more efficient execution plans.
> The Table API is built on top of the DataSet API and generates DataSet 
> programs and code for user-defined functions. Hence, it knows exactly which 
> fields are modified and which not. We should use this information to 
> automatically generate forward field annotations and attach them to the 
> operators. This can help to significantly improve the performance of certain 
> jobs.
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/index.html#semantic-annotations



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-3850) Add forward field annotations to DataSet operators generated by the Table API

2017-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15917356#comment-15917356
 ] 

ASF GitHub Bot commented on FLINK-3850:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3040#discussion_r105731937
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
 ---
@@ -199,10 +200,21 @@ class DataSetJoin(
 
 val joinOpName = s"where: ($joinConditionToString), join: 
($joinSelectionToString)"
 
+//consider all fields not which are not keys are forwarded
+val leftIndices = (0 until 
left.getRowType.getFieldCount).diff(leftKeys)
--- End diff --

A Calcite join forwards all fields of both sides. If the left input is 
`(l1, l2, l3)` and the right input is `(r1, r2)`, then the result of the join 
will be `(l1, l2, l3, r1, r2)` for all pairs of left and right that satisfy the 
join condition. It does not matter which of the fields is a key field. If the 
join condition is `l1 == r2`, both fields are forwarded. Since DataSet joins 
organize the input data sets based on the key attributes (partition and sort) 
this attributes are especially interesting for forward field annotations.

Actually, I just noticed that we have to distinguish the type of the join 
(inner, left, right, full). We can only forward the fields of the inner side 
(both for inner join, left for left join, right for right join, none for full 
outer join) because the outer side might have been padded with `null` values. 


> Add forward field annotations to DataSet operators generated by the Table API
> -
>
> Key: FLINK-3850
> URL: https://issues.apache.org/jira/browse/FLINK-3850
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Nikolay Vasilishin
>
> The DataSet API features semantic annotations [1] to hint the optimizer which 
> input fields an operator copies. This information is valuable for the 
> optimizer because it can infer that certain physical properties such as 
> partitioning or sorting are not destroyed by user functions and thus generate 
> more efficient execution plans.
> The Table API is built on top of the DataSet API and generates DataSet 
> programs and code for user-defined functions. Hence, it knows exactly which 
> fields are modified and which not. We should use this information to 
> automatically generate forward field annotations and attach them to the 
> operators. This can help to significantly improve the performance of certain 
> jobs.
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/index.html#semantic-annotations



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-3850) Add forward field annotations to DataSet operators generated by the Table API

2017-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15907875#comment-15907875
 ] 

ASF GitHub Bot commented on FLINK-3850:
---

Github user tonycox commented on the issue:

https://github.com/apache/flink/pull/3040
  
@fhueske I fixed commented points.
Could you explain the join keys forwarding?


> Add forward field annotations to DataSet operators generated by the Table API
> -
>
> Key: FLINK-3850
> URL: https://issues.apache.org/jira/browse/FLINK-3850
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Nikolay Vasilishin
>
> The DataSet API features semantic annotations [1] to hint the optimizer which 
> input fields an operator copies. This information is valuable for the 
> optimizer because it can infer that certain physical properties such as 
> partitioning or sorting are not destroyed by user functions and thus generate 
> more efficient execution plans.
> The Table API is built on top of the DataSet API and generates DataSet 
> programs and code for user-defined functions. Hence, it knows exactly which 
> fields are modified and which not. We should use this information to 
> automatically generate forward field annotations and attach them to the 
> operators. This can help to significantly improve the performance of certain 
> jobs.
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/index.html#semantic-annotations



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3040: [FLINK-3850] Add forward field annotations to DataSet

2017-03-13 Thread tonycox
Github user tonycox commented on the issue:

https://github.com/apache/flink/pull/3040
  
@fhueske I fixed commented points.
Could you explain the join keys forwarding?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

2017-03-13 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3406#discussion_r105712740
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala
 ---
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog
+
+import java.io.IOException
+import java.lang.reflect.Modifier
+import java.net.URL
+import java.util.Properties
+
+import org.apache.flink.table.annotation.TableType
+import org.apache.flink.table.api.{AmbiguousTableSourceConverterException, 
NoMatchedTableSourceConverterException}
+import org.apache.flink.table.plan.schema.TableSourceTable
+import org.apache.flink.table.plan.stats.FlinkStatistic
+import org.apache.flink.table.sources.TableSource
+import org.apache.flink.util.InstantiationUtil
+import org.reflections.Reflections
+import org.slf4j.{Logger, LoggerFactory}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+/**
+  * The utility class is used to convert ExternalCatalogTable to 
TableSourceTable.
+  */
+object ExternalTableSourceUtil {
+
+  // config file to specify scan package to search TableSourceConverter
+  private val tableSourceConverterConfigFileName = 
"tableSourceConverter.properties"
+
+  private val LOG: Logger = LoggerFactory.getLogger(this.getClass)
+
+  // registered table type with the TableSourceConverter.
+  // Key is table type name, Value is set of converter class.
+  private val tableTypeToTableSourceConvertersClazz = {
+val registeredConverters =
+  new mutable.HashMap[String, mutable.Set[Class[_ <: 
TableSourceConverter[_
+  with mutable.MultiMap[String, Class[_ <: 
TableSourceConverter[_]]]
+// scan all config files to find TableSourceConverters which are 
annotationed with TableType.
+val resourceUrls = 
getClass.getClassLoader.getResources(tableSourceConverterConfigFileName)
+while (resourceUrls.hasMoreElements) {
+  val url = resourceUrls.nextElement()
+  parseScanPackageFromConfigFile(url) match {
+case Some(scanPackage) =>
+  val clazzWithAnnotations = new Reflections(scanPackage)
+  .getTypesAnnotatedWith(classOf[TableType])
+  clazzWithAnnotations.asScala.foreach(clazz =>
+if (classOf[TableSourceConverter[_]].isAssignableFrom(clazz)) {
+  if (Modifier.isAbstract(clazz.getModifiers()) ||
+  Modifier.isInterface(clazz.getModifiers)) {
+LOG.warn(s"Class ${clazz.getName} is annotated with 
TableType " +
+s"but an abstract class or interface.")
+  } else {
+val tableTypeAnnotation: TableType =
+  clazz.getAnnotation(classOf[TableType])
+val tableType = tableTypeAnnotation.value()
+val converterClazz = clazz.asInstanceOf[Class[_ <: 
TableSourceConverter[_]]]
+registeredConverters.addBinding(tableType, converterClazz)
+LOG.info(s"Registers the converter ${clazz.getName} to 
table type [$tableType]. ")
+  }
+} else {
+  LOG.warn(
+s"Class ${clazz.getName} is annotated with TableType, " +
+s"but does not implement the TableSourceConverter 
interface.")
+}
+  )
+case None =>
+  LOG.warn(s"Fail to get scan package from config file [$url].")
+  }
+}
+registeredConverters
+  }
+
+  /**
+* Converts an [[ExternalCatalogTable]] instance to a 
[[TableSourceTable]] instance
+*
+* @param externalCatalogTable the [[ExternalCatalogTable]] instance 
which to convert
+* @return converted [[TableSourceTable]] instance from the input 
catalog table
+*/
+  def 

[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

2017-03-13 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3406#discussion_r105714201
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSourceConverter.scala
 ---
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources
+
+import org.apache.flink.table.annotation.TableType
+import org.apache.flink.table.catalog.{ExternalCatalogTable, 
TableSourceConverter}
+
+import scala.collection.JavaConverters._
+import java.util.{Set => JSet}
+
+import com.google.common.collect.ImmutableSet
+
+/**
+  * The class defines a converter used to convert [[CsvTableSource]] to
+  * or from [[ExternalCatalogTable]].
+  */
+@TableType(value = "csv")
+class CsvTableSourceConverter extends TableSourceConverter[CsvTableSource] 
{
+
+  private val required: JSet[String] = ImmutableSet.of("path")
--- End diff --

I'd add fieldDelim and rowDelim to the required parameters as well. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

2017-03-13 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3406#discussion_r105704013
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ReadonlyExternalCatalog.scala
 ---
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog
+
+import java.util.{List => JList}
+
+import org.apache.flink.table.api._
+
+/**
+  * This class is responsible for read table/database from external 
catalog.
+  * Its main responsibilities is provide tables for calcite catalog, it 
looks up databases or tables
+  * in the external catalog.
+  */
+trait ReadonlyExternalCatalog {
--- End diff --

Rename to `ExternalCatalog`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

2017-03-13 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3406#discussion_r105703538
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/InMemoryExternalCatalog.scala
 ---
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog
+
+import org.apache.flink.table.api.{DatabaseAlreadyExistException, 
DatabaseNotExistException, TableAlreadyExistException, TableNotExistException}
+import java.util.{List => JList}
+
+import scala.collection.mutable.HashMap
+import scala.collection.JavaConverters._
+
+/**
+  * This class is an in-memory implementation of 
[[ReadonlyExternalCatalog]].
+  *
+  * It could be used for testing or developing instead of used in 
production environment.
+  */
+class InMemoryExternalCatalog extends CRUDExternalCatalog {
+
+  private val databases = new HashMap[String, Database]
+
+  @throws[DatabaseNotExistException]
+  @throws[TableAlreadyExistException]
+  override def createTable(
+  table: ExternalCatalogTable,
+  ignoreIfExists: Boolean): Unit = synchronized {
+val dbName = table.identifier.database
+val tables = getTables(dbName)
+val tableName = table.identifier.table
+if (tables.contains(tableName)) {
+  if (!ignoreIfExists) {
+throw new TableAlreadyExistException(dbName, tableName)
+  }
+} else {
+  tables.put(tableName, table)
+}
+  }
+
+  @throws[DatabaseNotExistException]
+  @throws[TableNotExistException]
+  override def dropTable(
+  dbName: String,
+  tableName: String,
+  ignoreIfNotExists: Boolean): Unit = synchronized {
+val tables = getTables(dbName)
+if (tables.remove(tableName).isEmpty && !ignoreIfNotExists) {
+  throw new TableNotExistException(dbName, tableName)
+}
+  }
+
+  @throws[DatabaseNotExistException]
+  @throws[TableNotExistException]
+  override def alterTable(
+  table: ExternalCatalogTable,
+  ignoreIfNotExists: Boolean): Unit = synchronized {
+val dbName = table.identifier.database
+val tables = getTables(dbName)
+val tableName = table.identifier.table
+if (tables.contains(tableName)) {
+  tables.put(tableName, table)
+} else if (!ignoreIfNotExists) {
+  throw new TableNotExistException(dbName, tableName)
+}
+  }
+
+  @throws[DatabaseNotExistException]
+  override def listTables(dbName: String): JList[String] = synchronized {
+val tables = getTables(dbName)
+tables.keys.toList.asJava
+  }
+
+  @throws[DatabaseNotExistException]
+  @throws[TableNotExistException]
+  override def getTable(dbName: String, tableName: String): 
ExternalCatalogTable = synchronized {
+val tables = getTables(dbName)
+tables.get(tableName) match {
+  case Some(table) => table
+  case None => throw new TableNotExistException(dbName, tableName)
+}
+  }
+
+  @throws[DatabaseAlreadyExistException]
+  override def createDatabase(
+  db: ExternalCatalogDatabase,
+  ignoreIfExists: Boolean): Unit = synchronized {
+val dbName = db.dbName
+if (databases.contains(dbName)) {
+  if (!ignoreIfExists) {
+throw new DatabaseAlreadyExistException(dbName)
+  }
+} else {
+  databases.put(dbName, new Database(db))
+}
+  }
+
+  @throws[DatabaseNotExistException]
+  override def alterDatabase(
+  db: ExternalCatalogDatabase,
+  ignoreIfNotExists: Boolean): Unit = synchronized {
+val dbName = db.dbName
+databases.get(dbName) match {
+  case Some(database) => database.db = db
+  case None =>
+if (!ignoreIfNotExists) {
+  throw new DatabaseNotExistException(dbName)
+}
+}
+  }
+
+ 

[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

2017-03-13 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3406#discussion_r105703767
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/CRUDExternalCatalog.scala
 ---
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog
+
+import org.apache.flink.table.api._
+
+/**
+  * This class is responsible for interact with external catalog.
+  * Its main responsibilities including:
+  * 
+  *  create/drop/alter database or tables for DDL operations
+  *  provide tables for calcite catalog, it looks up databases or 
tables in the external catalog
+  * 
+  */
+trait CRUDExternalCatalog extends ReadonlyExternalCatalog {
--- End diff --

Rename to `CrudExternalCatalog`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-2491) Checkpointing only works if all operators/tasks are still running

2017-03-13 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2491?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-2491:

Summary: Checkpointing only works if all operators/tasks are still running  
(was: Operators are not participating in state checkpointing in some cases)

> Checkpointing only works if all operators/tasks are still running
> -
>
> Key: FLINK-2491
> URL: https://issues.apache.org/jira/browse/FLINK-2491
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, State Backends, Checkpointing
>Affects Versions: 0.10.0
>Reporter: Robert Metzger
>Assignee: Márton Balassi
>Priority: Critical
> Fix For: 1.0.0
>
>
> While implementing a test case for the Kafka Consumer, I came across the 
> following bug:
> Consider the following topology, with the operator parallelism in parentheses:
> Source (2) --> Sink (1).
> In this setup, the {{snapshotState()}} method is called on the source, but 
> not on the Sink.
> The sink receives the generated data.
> only one of the two sources is generating data.
> I've implemented a test case for this, you can find it here: 
> https://github.com/rmetzger/flink/blob/para_checkpoint_bug/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ParallelismChangeCheckpoinedITCase.java



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-2491) Checkpointing only works if all operators/tasks are still running

2017-03-13 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2491?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-2491:

Component/s: (was: Streaming)
 State Backends, Checkpointing
 DataStream API

> Checkpointing only works if all operators/tasks are still running
> -
>
> Key: FLINK-2491
> URL: https://issues.apache.org/jira/browse/FLINK-2491
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, State Backends, Checkpointing
>Affects Versions: 0.10.0
>Reporter: Robert Metzger
>Assignee: Márton Balassi
>Priority: Critical
> Fix For: 1.0.0
>
>
> While implementing a test case for the Kafka Consumer, I came across the 
> following bug:
> Consider the following topology, with the operator parallelism in parentheses:
> Source (2) --> Sink (1).
> In this setup, the {{snapshotState()}} method is called on the source, but 
> not on the Sink.
> The sink receives the generated data.
> only one of the two sources is generating data.
> I've implemented a test case for this, you can find it here: 
> https://github.com/rmetzger/flink/blob/para_checkpoint_bug/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ParallelismChangeCheckpoinedITCase.java



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-2263) ExecutionGraph uses Thread.sleep to delay execution retries

2017-03-13 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2263?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15907858#comment-15907858
 ] 

Aljoscha Krettek commented on FLINK-2263:
-

[~StephanEwen] and [~till.rohrmann] is this still valid with all the recent 
changes to those parts?

> ExecutionGraph uses Thread.sleep to delay execution retries
> ---
>
> Key: FLINK-2263
> URL: https://issues.apache.org/jira/browse/FLINK-2263
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 0.10.0
>Reporter: Márton Balassi
> Fix For: 1.0.0
>
>
> The delay between execution retries is done with a future containing a 
> Thread.sleep call, which effectively blocks an executor. This behavior can 
> potentially block the whole JobManager from the user perspective.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Closed] (FLINK-5222) Rename StateBackend interface to StateBinder

2017-03-13 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5222?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed FLINK-5222.
---
   Resolution: Fixed
Fix Version/s: 1.3.0

Fixed as part of another change.

> Rename StateBackend interface to StateBinder
> 
>
> Key: FLINK-5222
> URL: https://issues.apache.org/jira/browse/FLINK-5222
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Guowei Ma
>Assignee: Guowei Ma
>Priority: Minor
> Fix For: 1.3.0
>
>
> The `StateBackend` interface  makes people confused sometime because it is 
> not implemented by any state backend.
> The main function of the interface is to create state from a state descriptor.
> So giving another name to `StateBackend` may be more clear.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-5991) Expose Broadcast Operator State through public APIs

2017-03-13 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5991?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-5991:

Component/s: (was: Streaming)
 DataStream API

> Expose Broadcast Operator State through public APIs
> ---
>
> Key: FLINK-5991
> URL: https://issues.apache.org/jira/browse/FLINK-5991
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API, State Backends, Checkpointing
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.3.0
>
>
> The broadcast operator state functionality was added in FLINK-5265, it just 
> hasn't been exposed through any public APIs yet.
> Currently, we have 2 streaming connector features for 1.3 that are pending on 
> broadcast state: rescalable Kinesis / Kafka consumers with shard / partition 
> discovery (FLINK-4821 & FLINK-4022). We should consider exposing broadcast 
> state for the 1.3 release also.
> This JIRA also serves the purpose to discuss how we want to expose it.
> To initiate the discussion, I propose:
> 1. For the more powerful {{CheckpointedFunction}}, add the following to the 
> {{OperatorStateStore}} interface:
> {code}
>  ListState getBroadcastOperatorState(ListStateDescriptor 
> stateDescriptor);
>  ListState 
> getBroadcastSerializableListState(String stateName);
> {code}
> 2. For a simpler {{ListCheckpointed}} variant, we probably should have a 
> separate {{BroadcastListCheckpointed}} interface.
> Extending {{ListCheckpointed}} to let the user define either the list state 
> type of either {{PARTITIONABLE}} or {{BROADCAST}} might also be possible, if 
> we can rely on a contract that the value doesn't change. Or we expose a 
> defining method (e.g. {{getListStateType()}}) that is called only once in the 
> operator. This would break user code, but can be considered because it is 
> marked as {{PublicEvolving}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-4858) Remove Legacy Checkpointing Interfaces

2017-03-13 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4858?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-4858:

Component/s: (was: Streaming)
 DataStream API

> Remove Legacy Checkpointing Interfaces
> --
>
> Key: FLINK-4858
> URL: https://issues.apache.org/jira/browse/FLINK-4858
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API, State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Aljoscha Krettek
>Priority: Blocker
>
> This issue tracks the removal of the deprecated/legacy interfaces that are 
> still in the code and can only be removed once the move to the new Flink 1.2 
> checkpointing is completely in place.
> So far, these are {{StreamCheckpointedOperator}} and the legacy snapshot 
> methods on the testing harnesses. Furthermore, codepaths and classes touching 
> legacy state (e.g. {{StateAssignmentOperation}} or {{TaskStateHandles}} can 
> be cleaned up. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)

2017-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15907793#comment-15907793
 ] 

ASF GitHub Bot commented on FLINK-5823:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3522
  
CI tests pass (except one profile which does not complete within the limit 
of 50 minutes)


> Store Checkpoint Root Metadata in StateBackend (not in HA custom store)
> ---
>
> Key: FLINK-5823
> URL: https://issues.apache.org/jira/browse/FLINK-5823
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.3.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3522: [FLINK-5823] [checkpoints] State Backends also handle Che...

2017-03-13 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3522
  
CI tests pass (except one profile which does not complete within the limit 
of 50 minutes)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5846) CEP: make the operators backwards compatible.

2017-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5846?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15907780#comment-15907780
 ] 

ASF GitHub Bot commented on FLINK-5846:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3445#discussion_r105707379
  
--- Diff: 
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java
 ---
@@ -75,9 +94,15 @@ public void testKeyedCEPFunctionMigration() throws 
Exception {
BasicTypeInfo.INT_TYPE_INFO);
 
harness.setup();
-   harness.initializeStateFromLegacyCheckpoint(getResourceFilename(
-   "cep-keyed-savepoint-1.2"));
-// "cep-keyed-savepoint-1.1"));
+
+   if (from11) {
--- End diff --

No problem! I will do it.


> CEP: make the operators backwards compatible.
> -
>
> Key: FLINK-5846
> URL: https://issues.apache.org/jira/browse/FLINK-5846
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Affects Versions: 1.3.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
> Fix For: 1.3.0
>
>
> This targets making the new CEP operators compatible with their previous 
> versions from Flink 1.1 and Flink 1.2.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3445: [FLINK-5846] [cep] Make the CEP operators backward...

2017-03-13 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3445#discussion_r105707379
  
--- Diff: 
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java
 ---
@@ -75,9 +94,15 @@ public void testKeyedCEPFunctionMigration() throws 
Exception {
BasicTypeInfo.INT_TYPE_INFO);
 
harness.setup();
-   harness.initializeStateFromLegacyCheckpoint(getResourceFilename(
-   "cep-keyed-savepoint-1.2"));
-// "cep-keyed-savepoint-1.1"));
+
+   if (from11) {
--- End diff --

No problem! I will do it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3477: [Flink-3318][cep] Add support for quantifiers to C...

2017-03-13 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3477#discussion_r105700201
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -247,27 +275,148 @@ public int compare(final StateTransition o1, 
final StateTransition o2) {
 * @return Collection of computation states which result from the 
current one
 */
private Collection computeNextStates(
-   final ComputationState computationState,
-   final T event,
-   final long timestamp) {
-   Stack states = new Stack<>();
-   ArrayList resultingComputationStates = new 
ArrayList<>();
-   State state = computationState.getState();
+   final ComputationState computationState,
+   final T event,
+   final long timestamp) {
+   final ArrayList resultingComputationStates 
= new ArrayList<>();
+
+   final OutgoingEdges outgoingEdges = 
createDecisionGraph(computationState, event);
+
+   // Create the computing version based on the previously 
computed edges
+   // We need to defer the creation of computation states until we 
know how many edges start
+   // at this computation state so that we can assign proper 
version
+   final List edges = outgoingEdges.getEdges();
+   Integer takeBranchesToVisit = Math.max(0, 
outgoingEdges.getTotalTakeBranches() - 1);
+   Integer ignoreBranchesToVisit = 
outgoingEdges.getTotalIgnoreBranches();
+   for (StateTransition edge : edges) {
+   switch (edge.getAction()) {
+   case IGNORE: {
+   if (!computationState.isStartState()) {
+   final DeweyNumber version;
+   if 
(!isEquivalentState(edge.getTargetState(), computationState.getState())) {
+   version = 
computationState.getVersion().increase(ignoreBranchesToVisit).addStage();
+   ignoreBranchesToVisit--;
+   } else {
+   final int toIncrease = 
calculateIncreasingSelfState(outgoingEdges.getTotalIgnoreBranches(),
--- End diff --

Move both the arguments to different lines.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3477: [Flink-3318][cep] Add support for quantifiers to C...

2017-03-13 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3477#discussion_r105700521
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -247,27 +275,148 @@ public int compare(final StateTransition o1, 
final StateTransition o2) {
 * @return Collection of computation states which result from the 
current one
 */
private Collection computeNextStates(
-   final ComputationState computationState,
-   final T event,
-   final long timestamp) {
-   Stack states = new Stack<>();
-   ArrayList resultingComputationStates = new 
ArrayList<>();
-   State state = computationState.getState();
+   final ComputationState computationState,
+   final T event,
+   final long timestamp) {
+   final ArrayList resultingComputationStates 
= new ArrayList<>();
+
+   final OutgoingEdges outgoingEdges = 
createDecisionGraph(computationState, event);
+
+   // Create the computing version based on the previously 
computed edges
+   // We need to defer the creation of computation states until we 
know how many edges start
+   // at this computation state so that we can assign proper 
version
+   final List edges = outgoingEdges.getEdges();
+   Integer takeBranchesToVisit = Math.max(0, 
outgoingEdges.getTotalTakeBranches() - 1);
+   Integer ignoreBranchesToVisit = 
outgoingEdges.getTotalIgnoreBranches();
+   for (StateTransition edge : edges) {
+   switch (edge.getAction()) {
+   case IGNORE: {
+   if (!computationState.isStartState()) {
+   final DeweyNumber version;
+   if 
(!isEquivalentState(edge.getTargetState(), computationState.getState())) {
+   version = 
computationState.getVersion().increase(ignoreBranchesToVisit).addStage();
+   ignoreBranchesToVisit--;
+   } else {
+   final int toIncrease = 
calculateIncreasingSelfState(outgoingEdges.getTotalIgnoreBranches(),
+   
outgoingEdges.getTotalTakeBranches());
+   version = 
computationState.getVersion().increase(toIncrease);
+   }
+
+   resultingComputationStates.add(
+   
ComputationState.createState(
+   
edge.getTargetState(),
+   
computationState.getPreviousState(),
+   
computationState.getEvent(),
+   
computationState.getTimestamp(),
+   version,
+   
computationState.getStartTimestamp()
+   )
+   );
+   sharedBuffer.lock(
+   
edge.getTargetState().getName(),
+   
computationState.getEvent(),
+   
computationState.getTimestamp());
+   }
+   }
+   break;
+   case TAKE:
+   final State newState = 
edge.getTargetState();
+   final State consumingState = 
edge.getSourceState();
+   final State previousEventState = 
computationState.getPreviousState();
+
+   final T previousEvent = 
computationState.getEvent();
+   final DeweyNumber currentVersion = 
computationState.getVersion();
+
+   final DeweyNumber 
newComputationStateVersion = new 
DeweyNumber(currentVersion).addStage().increase(takeBranchesToVisit);
+   takeBranchesToVisit--;
+
+   final long startTimestamp;
 

[GitHub] flink pull request #3477: [Flink-3318][cep] Add support for quantifiers to C...

2017-03-13 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3477#discussion_r105702715
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
 ---
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.cep.pattern;
+
+public enum Quantifier {
--- End diff --

Here you can add some flags, like `isLooping`, `isAtLeastOnce`, etc and set 
them accordingly in the constructors. This will allow you to move all the `is*` 
methods like `isLooping()`, `isAtLeastOnce` from the `NFACompiler` here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3477: [Flink-3318][cep] Add support for quantifiers to C...

2017-03-13 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3477#discussion_r105697591
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java
 ---
@@ -52,14 +57,35 @@ public ComputationState(
this.timestamp = timestamp;
this.version = version;
this.startTimestamp = startTimestamp;
+   this.previousState = previousState;
+   }
+
--- End diff --

Move the 3 constructors at the end of the class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3477: [Flink-3318][cep] Add support for quantifiers to C...

2017-03-13 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3477#discussion_r105699655
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -247,27 +275,148 @@ public int compare(final StateTransition o1, 
final StateTransition o2) {
 * @return Collection of computation states which result from the 
current one
 */
private Collection computeNextStates(
-   final ComputationState computationState,
-   final T event,
-   final long timestamp) {
-   Stack states = new Stack<>();
-   ArrayList resultingComputationStates = new 
ArrayList<>();
-   State state = computationState.getState();
+   final ComputationState computationState,
+   final T event,
+   final long timestamp) {
+   final ArrayList resultingComputationStates 
= new ArrayList<>();
+
+   final OutgoingEdges outgoingEdges = 
createDecisionGraph(computationState, event);
+
+   // Create the computing version based on the previously 
computed edges
+   // We need to defer the creation of computation states until we 
know how many edges start
+   // at this computation state so that we can assign proper 
version
+   final List edges = outgoingEdges.getEdges();
+   Integer takeBranchesToVisit = Math.max(0, 
outgoingEdges.getTotalTakeBranches() - 1);
--- End diff --

Unbox these two `Integer`s to `int`s.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3477: [Flink-3318][cep] Add support for quantifiers to C...

2017-03-13 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3477#discussion_r105704530
  
--- Diff: 
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java 
---
@@ -272,37 +372,42 @@ public void testBranchingPattern() {
public boolean filter(Event value) throws Exception {
return value.getName().equals("start");
}
-   })
-   
.followedBy("middle-first").subtype(SubEvent.class).where(new 
FilterFunction() {
+   }).followedBy("middle-first").subtype(SubEvent.class)
--- End diff --

this can be as before, in one line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3477: [Flink-3318][cep] Add support for quantifiers to C...

2017-03-13 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3477#discussion_r105697452
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java
 ---
@@ -41,8 +43,11 @@
// Timestamp of the first element in the pattern
private final long startTimestamp;
 
-   public ComputationState(
+   private final State previousState;
+
--- End diff --

Insert one tab before each argument so that it is easier to separate the 
arguments from the body of the method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3477: [Flink-3318][cep] Add support for quantifiers to C...

2017-03-13 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3477#discussion_r105698150
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -71,19 +70,16 @@
private final NonDuplicatingTypeSerializer 
nonDuplicatingTypeSerializer;
 
// Buffer used to store the matched events
-   private final SharedBuffer sharedBuffer;
+   private final SharedBuffer sharedBuffer;
 
// Set of all NFA states
-   private final Set states;
--- End diff --

Why `List` and not `Set`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3477: [Flink-3318][cep] Add support for quantifiers to C...

2017-03-13 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3477#discussion_r105699988
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -247,27 +275,148 @@ public int compare(final StateTransition o1, 
final StateTransition o2) {
 * @return Collection of computation states which result from the 
current one
 */
private Collection computeNextStates(
-   final ComputationState computationState,
-   final T event,
-   final long timestamp) {
-   Stack states = new Stack<>();
-   ArrayList resultingComputationStates = new 
ArrayList<>();
-   State state = computationState.getState();
+   final ComputationState computationState,
+   final T event,
+   final long timestamp) {
+   final ArrayList resultingComputationStates 
= new ArrayList<>();
--- End diff --

Move the `resultingComputationStates` just before the `for 
(StateTransition edge : edges) {...}`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3477: [Flink-3318][cep] Add support for quantifiers to C...

2017-03-13 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3477#discussion_r105700500
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -247,27 +275,148 @@ public int compare(final StateTransition o1, 
final StateTransition o2) {
 * @return Collection of computation states which result from the 
current one
 */
private Collection computeNextStates(
-   final ComputationState computationState,
-   final T event,
-   final long timestamp) {
-   Stack states = new Stack<>();
-   ArrayList resultingComputationStates = new 
ArrayList<>();
-   State state = computationState.getState();
+   final ComputationState computationState,
+   final T event,
+   final long timestamp) {
+   final ArrayList resultingComputationStates 
= new ArrayList<>();
+
+   final OutgoingEdges outgoingEdges = 
createDecisionGraph(computationState, event);
+
+   // Create the computing version based on the previously 
computed edges
+   // We need to defer the creation of computation states until we 
know how many edges start
+   // at this computation state so that we can assign proper 
version
+   final List edges = outgoingEdges.getEdges();
+   Integer takeBranchesToVisit = Math.max(0, 
outgoingEdges.getTotalTakeBranches() - 1);
+   Integer ignoreBranchesToVisit = 
outgoingEdges.getTotalIgnoreBranches();
+   for (StateTransition edge : edges) {
+   switch (edge.getAction()) {
+   case IGNORE: {
+   if (!computationState.isStartState()) {
+   final DeweyNumber version;
+   if 
(!isEquivalentState(edge.getTargetState(), computationState.getState())) {
+   version = 
computationState.getVersion().increase(ignoreBranchesToVisit).addStage();
+   ignoreBranchesToVisit--;
+   } else {
+   final int toIncrease = 
calculateIncreasingSelfState(outgoingEdges.getTotalIgnoreBranches(),
+   
outgoingEdges.getTotalTakeBranches());
+   version = 
computationState.getVersion().increase(toIncrease);
+   }
+
+   resultingComputationStates.add(
+   
ComputationState.createState(
+   
edge.getTargetState(),
+   
computationState.getPreviousState(),
+   
computationState.getEvent(),
+   
computationState.getTimestamp(),
+   version,
+   
computationState.getStartTimestamp()
+   )
+   );
+   sharedBuffer.lock(
+   
edge.getTargetState().getName(),
+   
computationState.getEvent(),
+   
computationState.getTimestamp());
+   }
+   }
+   break;
+   case TAKE:
+   final State newState = 
edge.getTargetState();
+   final State consumingState = 
edge.getSourceState();
+   final State previousEventState = 
computationState.getPreviousState();
+
+   final T previousEvent = 
computationState.getEvent();
+   final DeweyNumber currentVersion = 
computationState.getVersion();
+
+   final DeweyNumber 
newComputationStateVersion = new 
DeweyNumber(currentVersion).addStage().increase(takeBranchesToVisit);
+   takeBranchesToVisit--;
+
+   final long startTimestamp;
 

[GitHub] flink pull request #3477: [Flink-3318][cep] Add support for quantifiers to C...

2017-03-13 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3477#discussion_r105704399
  
--- Diff: 
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java 
---
@@ -272,37 +372,42 @@ public void testBranchingPattern() {
public boolean filter(Event value) throws Exception {
return value.getName().equals("start");
}
-   })
-   
.followedBy("middle-first").subtype(SubEvent.class).where(new 
FilterFunction() {
+   }).followedBy("middle-first").subtype(SubEvent.class)
+   .where(new FilterFunction() {
private static final long serialVersionUID = 
6215754202506583964L;
 
@Override
public boolean filter(SubEvent value) throws 
Exception {
-   return value.getVolume() > 5.0;
+   return value.getVolume() >
+  5.0;
}
})
-   
.followedBy("middle-second").subtype(SubEvent.class).where(new 
FilterFunction() {
+   .followedBy("middle-second").subtype(SubEvent.class)
+   .where(new FilterFunction() {
private static final long serialVersionUID = 
6215754202506583964L;
 
@Override
public boolean filter(SubEvent value) throws 
Exception {
-   return 
value.getName().equals("next-one");
+   return value.getName()
+   .equals(
+   "next-one");
}
})
.followedBy("end").where(new FilterFunction() {
private static final long serialVersionUID = 
7056763917392056548L;
 
@Override
public boolean filter(Event value) throws 
Exception {
-   return value.getName().equals("end");
+   return value.getName()
--- End diff --

this can be as before, in one line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3477: [Flink-3318][cep] Add support for quantifiers to C...

2017-03-13 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3477#discussion_r105698031
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -71,19 +70,16 @@
private final NonDuplicatingTypeSerializer 
nonDuplicatingTypeSerializer;
 
// Buffer used to store the matched events
--- End diff --

Why `String` and not `State`? This may have problems with backwards 
compatibility.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3477: [Flink-3318][cep] Add support for quantifiers to C...

2017-03-13 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3477#discussion_r105704335
  
--- Diff: 
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java 
---
@@ -272,37 +372,42 @@ public void testBranchingPattern() {
public boolean filter(Event value) throws Exception {
return value.getName().equals("start");
}
-   })
-   
.followedBy("middle-first").subtype(SubEvent.class).where(new 
FilterFunction() {
+   }).followedBy("middle-first").subtype(SubEvent.class)
+   .where(new FilterFunction() {
private static final long serialVersionUID = 
6215754202506583964L;
 
@Override
public boolean filter(SubEvent value) throws 
Exception {
-   return value.getVolume() > 5.0;
+   return value.getVolume() >
+  5.0;
}
})
-   
.followedBy("middle-second").subtype(SubEvent.class).where(new 
FilterFunction() {
+   .followedBy("middle-second").subtype(SubEvent.class)
+   .where(new FilterFunction() {
private static final long serialVersionUID = 
6215754202506583964L;
 
@Override
public boolean filter(SubEvent value) throws 
Exception {
-   return 
value.getName().equals("next-one");
+   return value.getName()
+   .equals(
+   "next-one");
}
})
.followedBy("end").where(new FilterFunction() {
private static final long serialVersionUID = 
7056763917392056548L;
 
@Override
public boolean filter(Event value) throws 
Exception {
-   return value.getName().equals("end");
+   return value.getName()
+   .equals("end");
}
});
 
NFA nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
 
List> resultingPatterns = new ArrayList<>();
 
-   for (StreamRecord inputEvent: inputEvents) {
+   for (StreamRecord inputEvent : inputEvents) {
--- End diff --

this can be as before, in one line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3477: [Flink-3318][cep] Add support for quantifiers to C...

2017-03-13 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3477#discussion_r105699441
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -209,33 +204,66 @@ public boolean equals(Object obj) {
return 
nonDuplicatingTypeSerializer.equals(other.nonDuplicatingTypeSerializer) &&
sharedBuffer.equals(other.sharedBuffer) &&
states.equals(other.states) &&
-   windowTime == other.windowTime &&
-   startEventCounter == other.startEventCounter;
+   windowTime == other.windowTime;
} else {
return false;
}
}
 
@Override
public int hashCode() {
-   return Objects.hash(nonDuplicatingTypeSerializer, sharedBuffer, 
states, windowTime, startEventCounter);
+   return Objects.hash(nonDuplicatingTypeSerializer, sharedBuffer, 
states, windowTime);
+   }
+
+   private static  boolean isEquivalentState(final State s1, final 
State s2) {
+   return s1.getName().equals(s2.getName());
}
 
/**
-* Comparator used for imposing the assumption that IGNORE is always 
the last StateTransition in a state.
-*/
-   private interface StateTransitionComparator extends 
Comparator, Serializable {}
-   private final Comparator stateTransitionComparator 
= new StateTransitionComparator() {
-   private static final long serialVersionUID = 
-2775474935413622278L;
+* Structures to keep decisions based on the transition actions, that 
counts the number of taken actions.
+*/
+   private static class OutgoingEdges {
+   private List edges = new ArrayList<>();
 
-   @Override
-   public int compare(final StateTransition o1, final 
StateTransition o2) {
-   if (o1.getAction() == o2.getAction()) {
-   return 0;
+   private final State currentState;
+
+   private int totalTakeBranches = 0;
+   private int totalIgnoreBranches = 0;
+
+   OutgoingEdges(final State currentState) {
+   this.currentState = currentState;
+   }
+
+   void add(StateTransition edge) {
+
+   if (!isSelfIgnore(edge)) {
+   if (edge.getAction() == 
StateTransitionAction.IGNORE) {
+   totalIgnoreBranches++;
+   } else if (edge.getAction() == 
StateTransitionAction.TAKE) {
+   totalTakeBranches++;
+   }
}
-   return o1.getAction() == StateTransitionAction.IGNORE ? 
1 : -1;
+
+   edges.add(edge);
+   }
+
+   int getTotalIgnoreBranches() {
+   return totalIgnoreBranches;
+   }
+   int getTotalTakeBranches() {
+   return totalTakeBranches;
}
-   };
+
+   List getEdges() {
+   return edges;
+   }
+
--- End diff --

Why is this method needed? The two end-points of the IGNORE edge, won't 
they be the same state? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3477: [Flink-3318][cep] Add support for quantifiers to C...

2017-03-13 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3477#discussion_r105704267
  
--- Diff: 
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java 
---
@@ -272,37 +372,42 @@ public void testBranchingPattern() {
public boolean filter(Event value) throws Exception {
return value.getName().equals("start");
}
-   })
-   
.followedBy("middle-first").subtype(SubEvent.class).where(new 
FilterFunction() {
+   }).followedBy("middle-first").subtype(SubEvent.class)
+   .where(new FilterFunction() {
private static final long serialVersionUID = 
6215754202506583964L;
 
@Override
public boolean filter(SubEvent value) throws 
Exception {
-   return value.getVolume() > 5.0;
+   return value.getVolume() >
--- End diff --

this can be as before, in one line. In general we try to minimize 
unnecessary changes ;).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3477: [Flink-3318][cep] Add support for quantifiers to C...

2017-03-13 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3477#discussion_r105702323
  
--- Diff: 
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
 ---
@@ -18,9 +18,13 @@
 
 package org.apache.flink.cep.nfa.compiler;
 
--- End diff --

Unused imports.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3477: [Flink-3318][cep] Add support for quantifiers to C...

2017-03-13 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3477#discussion_r105698626
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -209,33 +204,66 @@ public boolean equals(Object obj) {
return 
nonDuplicatingTypeSerializer.equals(other.nonDuplicatingTypeSerializer) &&
sharedBuffer.equals(other.sharedBuffer) &&
states.equals(other.states) &&
-   windowTime == other.windowTime &&
-   startEventCounter == other.startEventCounter;
+   windowTime == other.windowTime;
} else {
return false;
}
}
 
@Override
public int hashCode() {
-   return Objects.hash(nonDuplicatingTypeSerializer, sharedBuffer, 
states, windowTime, startEventCounter);
+   return Objects.hash(nonDuplicatingTypeSerializer, sharedBuffer, 
states, windowTime);
+   }
+
--- End diff --

Why is this method is needed? Is the name comparison enough? And if it is 
needed indeed, I think it should move to the `State` class with a comment on 
why it is needed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3477: [Flink-3318][cep] Add support for quantifiers to C...

2017-03-13 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3477#discussion_r105701064
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
 ---
@@ -217,23 +217,22 @@ public void prune(long pruningTimestamp) {
// use a depth first search to reconstruct the previous 
relations
while (!extractionStates.isEmpty()) {
ExtractionState extractionState = 
extractionStates.pop();
-   DeweyNumber currentVersion = 
extractionState.getVersion();
// current path of the depth first search
Stack> currentPath = 
extractionState.getPath();
 
// termination criterion
-   if (currentVersion.length() == 1) {
-   LinkedHashMultimap completePath = 
LinkedHashMultimap.create();
+   if (extractionState.getEntry() == null) {
+   final LinkedHashMultimap 
completePath = LinkedHashMultimap.create();
 
while(!currentPath.isEmpty()) {
-   SharedBufferEntry 
currentEntry = currentPath.pop();
+   final SharedBufferEntry 
currentEntry = currentPath.pop();
 

completePath.put(currentEntry.getKey(), currentEntry.getValueTime().getValue());
}
 
result.add(completePath);
} else {
-   SharedBufferEntry currentEntry = 
extractionState.getEntry();
--- End diff --

The `SharedBufferEntry currentEntry = extractionState.getEntry();` 
can be moved outside the for-loop right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3477: [Flink-3318][cep] Add support for quantifiers to C...

2017-03-13 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3477#discussion_r105700356
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -247,27 +275,148 @@ public int compare(final StateTransition o1, 
final StateTransition o2) {
 * @return Collection of computation states which result from the 
current one
 */
private Collection computeNextStates(
-   final ComputationState computationState,
-   final T event,
-   final long timestamp) {
-   Stack states = new Stack<>();
-   ArrayList resultingComputationStates = new 
ArrayList<>();
-   State state = computationState.getState();
+   final ComputationState computationState,
+   final T event,
+   final long timestamp) {
+   final ArrayList resultingComputationStates 
= new ArrayList<>();
+
+   final OutgoingEdges outgoingEdges = 
createDecisionGraph(computationState, event);
+
+   // Create the computing version based on the previously 
computed edges
+   // We need to defer the creation of computation states until we 
know how many edges start
+   // at this computation state so that we can assign proper 
version
+   final List edges = outgoingEdges.getEdges();
+   Integer takeBranchesToVisit = Math.max(0, 
outgoingEdges.getTotalTakeBranches() - 1);
+   Integer ignoreBranchesToVisit = 
outgoingEdges.getTotalIgnoreBranches();
+   for (StateTransition edge : edges) {
+   switch (edge.getAction()) {
+   case IGNORE: {
+   if (!computationState.isStartState()) {
+   final DeweyNumber version;
+   if 
(!isEquivalentState(edge.getTargetState(), computationState.getState())) {
+   version = 
computationState.getVersion().increase(ignoreBranchesToVisit).addStage();
+   ignoreBranchesToVisit--;
+   } else {
+   final int toIncrease = 
calculateIncreasingSelfState(outgoingEdges.getTotalIgnoreBranches(),
+   
outgoingEdges.getTotalTakeBranches());
+   version = 
computationState.getVersion().increase(toIncrease);
+   }
+
+   resultingComputationStates.add(
+   
ComputationState.createState(
+   
edge.getTargetState(),
+   
computationState.getPreviousState(),
+   
computationState.getEvent(),
+   
computationState.getTimestamp(),
+   version,
+   
computationState.getStartTimestamp()
+   )
+   );
+   sharedBuffer.lock(
+   
edge.getTargetState().getName(),
+   
computationState.getEvent(),
+   
computationState.getTimestamp());
+   }
+   }
+   break;
+   case TAKE:
+   final State newState = 
edge.getTargetState();
+   final State consumingState = 
edge.getSourceState();
+   final State previousEventState = 
computationState.getPreviousState();
+
+   final T previousEvent = 
computationState.getEvent();
+   final DeweyNumber currentVersion = 
computationState.getVersion();
+
+   final DeweyNumber 
newComputationStateVersion = new 
DeweyNumber(currentVersion).addStage().increase(takeBranchesToVisit);
+   takeBranchesToVisit--;
+
+   final long startTimestamp;
 

[GitHub] flink pull request #3477: [Flink-3318][cep] Add support for quantifiers to C...

2017-03-13 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3477#discussion_r105701669
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransition.java
 ---
@@ -55,6 +65,7 @@ public boolean equals(Object obj) {
StateTransition other = (StateTransition) obj;
 
--- End diff --

You have updated the `equals`, you should also update the `hashCode`  and 
the `toString` accordingly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Resolved] (FLINK-3123) Allow setting custom start-offsets for the Kafka consumer

2017-03-13 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3123?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai resolved FLINK-3123.

   Resolution: Fixed
Fix Version/s: 1.3.0

Resolved for {{master}} with 
http://git-wip-us.apache.org/repos/asf/flink/commit/5f08e53

> Allow setting custom start-offsets for the Kafka consumer
> -
>
> Key: FLINK-3123
> URL: https://issues.apache.org/jira/browse/FLINK-3123
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Robert Metzger
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.3.0, 1.0.0
>
>
> Currently, the Kafka consumer only allows to start reading from the earliest 
> available offset or the current offset.
> Sometimes, users want to set a specific start offset themselves.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (FLINK-5949) Flink on YARN checks for Kerberos credentials for non-Kerberos authentication methods

2017-03-13 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5949?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai reassigned FLINK-5949:
--

Assignee: Tzu-Li (Gordon) Tai

> Flink on YARN checks for Kerberos credentials for non-Kerberos authentication 
> methods
> -
>
> Key: FLINK-5949
> URL: https://issues.apache.org/jira/browse/FLINK-5949
> Project: Flink
>  Issue Type: Bug
>  Components: Security, YARN
>Affects Versions: 1.2.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.3.0, 1.2.1
>
>
> Reported in ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Yarn-and-MapR-Kerberos-issue-td11996.html
> The problem is that the Flink on YARN client incorrectly assumes 
> {{UserGroupInformation.isSecurityEnabled()}} returns {{true}} only for 
> Kerberos authentication modes, whereas it actually returns {{true}} for other 
> kinds of authentications too.
> We could make use of {{UserGroupInformation.getAuthenticationMethod()}} to 
> check for {{KERBEROS}} only.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Resolved] (FLINK-6025) User code ClassLoader not used when KryoSerializer fallbacks to serialization for copying

2017-03-13 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6025?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai resolved FLINK-6025.

   Resolution: Fixed
Fix Version/s: 1.2.1
   1.1.5
   1.3.0

Resolved for {{master}} with 
http://git-wip-us.apache.org/repos/asf/flink/commit/f214317

Resolved for {{release-1.2}} with 
http://git-wip-us.apache.org/repos/asf/flink/commit/b7d288f

Resolved for {{release-1.1}} with 
http://git-wip-us.apache.org/repos/asf/flink/commit/e50bf65

> User code ClassLoader not used when KryoSerializer fallbacks to serialization 
> for copying
> -
>
> Key: FLINK-6025
> URL: https://issues.apache.org/jira/browse/FLINK-6025
> Project: Flink
>  Issue Type: Bug
>  Components: Type Serialization System
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.3.0, 1.1.5, 1.2.1
>
>
> Reported in ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/AWS-exception-serialization-problem-td12063.html
> This is caused by a known Kryo issue with its {{JavaSerializer}}: 
> https://github.com/EsotericSoftware/kryo/pull/483.
> It happens when a {{Throwable}} is to be copied by the {{KryoSerialzer}}. 
> Since we use the {{JavaSerializer}} for throwables, and {{JavaSerializer}} 
> doesn't support copying, the {{KryoSerializer}} fallbacks to use 
> de-/serialization for the throwable. The problem is that on deserialization, 
> the classloader that the {{ObjectInputStream}} uses may be overriden, and 
> doesn't specifically uses Kryo's configured classloader (i.e., the user code 
> class loader), and results in {{ClassNotFoundException}}.
> Generally, this may happen if the user also registers to use the 
> {{JavaSerializer}} for their types.
> To fix the problem for {{Throwable}} serializing in the {{KryoSerializer}}, 
> we could either consider registering our own fixed {{JavaSerializer}} for 
> throwables, or wait for the Kryo fix to be released (to be fixed in Kryo 
> 4.0.1 release).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5441) Directly allow SQL queries on a Table

2017-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5441?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15907695#comment-15907695
 ] 

ASF GitHub Bot commented on FLINK-5441:
---

Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/3107
  
Thanks for the quick update @wuchong. The code looks good to merge. Can you 
add some documentation to the SQL section?


> Directly allow SQL queries on a Table
> -
>
> Key: FLINK-5441
> URL: https://issues.apache.org/jira/browse/FLINK-5441
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Jark Wu
>
> Right now a user has to register a table before it can be used in SQL 
> queries. In order to allow more fluent programming we propose calling SQL 
> directly on a table. An underscore can be used to reference the current table:
> {code}
> myTable.sql("SELECT a, b, c FROM _ WHERE d = 12")
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3107: [FLINK-5441] [table] Directly allow SQL queries on a Tabl...

2017-03-13 Thread twalthr
Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/3107
  
Thanks for the quick update @wuchong. The code looks good to merge. Can you 
add some documentation to the SQL section?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6025) User code ClassLoader not used when KryoSerializer fallbacks to serialization for copying

2017-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15907692#comment-15907692
 ] 

ASF GitHub Bot commented on FLINK-6025:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3517


> User code ClassLoader not used when KryoSerializer fallbacks to serialization 
> for copying
> -
>
> Key: FLINK-6025
> URL: https://issues.apache.org/jira/browse/FLINK-6025
> Project: Flink
>  Issue Type: Bug
>  Components: Type Serialization System
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>
> Reported in ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/AWS-exception-serialization-problem-td12063.html
> This is caused by a known Kryo issue with its {{JavaSerializer}}: 
> https://github.com/EsotericSoftware/kryo/pull/483.
> It happens when a {{Throwable}} is to be copied by the {{KryoSerialzer}}. 
> Since we use the {{JavaSerializer}} for throwables, and {{JavaSerializer}} 
> doesn't support copying, the {{KryoSerializer}} fallbacks to use 
> de-/serialization for the throwable. The problem is that on deserialization, 
> the classloader that the {{ObjectInputStream}} uses may be overriden, and 
> doesn't specifically uses Kryo's configured classloader (i.e., the user code 
> class loader), and results in {{ClassNotFoundException}}.
> Generally, this may happen if the user also registers to use the 
> {{JavaSerializer}} for their types.
> To fix the problem for {{Throwable}} serializing in the {{KryoSerializer}}, 
> we could either consider registering our own fixed {{JavaSerializer}} for 
> throwables, or wait for the Kryo fix to be released (to be fixed in Kryo 
> 4.0.1 release).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3517: [FLINK-6025] [core] Add Flink's own JavaSerializer...

2017-03-13 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3517


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2687: [FLINK-3123] [kafka] Allow custom specific start o...

2017-03-13 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2687


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3123) Allow setting custom start-offsets for the Kafka consumer

2017-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3123?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15907691#comment-15907691
 ] 

ASF GitHub Bot commented on FLINK-3123:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2687


> Allow setting custom start-offsets for the Kafka consumer
> -
>
> Key: FLINK-3123
> URL: https://issues.apache.org/jira/browse/FLINK-3123
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Robert Metzger
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.0.0
>
>
> Currently, the Kafka consumer only allows to start reading from the earliest 
> available offset or the current offset.
> Sometimes, users want to set a specific start offset themselves.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #2687: [FLINK-3123] [kafka] Allow custom specific start offsets ...

2017-03-13 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/2687
  
Thanks for the fast reviews :-) Merging ..


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


  1   2   3   >