[jira] [Created] (FLINK-7646) Restart failed jobs with configurable parallelism range

2017-09-19 Thread Elias Levy (JIRA)
Elias Levy created FLINK-7646:
-

 Summary: Restart failed jobs with configurable parallelism range
 Key: FLINK-7646
 URL: https://issues.apache.org/jira/browse/FLINK-7646
 Project: Flink
  Issue Type: Improvement
  Components: DataStream API
Affects Versions: 1.3.2
Reporter: Elias Levy


Currently, if a TaskManager fails the whole job is terminated and then, 
depending on the restart policy, may be attempted to be restarted.  If the 
failed TaskManager has not been replaced, and there are no spare task slots in 
the cluster, the job will fail to be restarted.

There are situations where restoring or adding a new TaskManager may take a 
while  For instance, in AWS an Auto Scaling Group can only be used to manage a 
group of instances in a single availability zone.  If you have a cluster of 
TaskManagers that spans an AZ, managed by one ASG per AZ, and an AZ goes dark, 
the other ASGs won't scale automatically to make up for the lost TaskManagers.  
To resolve the situation the healthy ASGs will need to be modified manually or 
by systems external to AWS.

With that in mind, it would be useful if you could specify a range for the 
parallelism parameter.  Under normal circumstances the job would execute with 
the maximum parallelism of the range.  But if TaskManagers were lost and not 
replaced after some time, the job would accept being execute with some lower 
parallelism within the range.

I understand that this may not be feasible with checkpoints, as savepoints are 
supposed to be the mechanism used to change parallelism of a stateful job.  
Therefore, this proposal may need to wait until the implementation of the 
periodic savepoint feature (FLINK-4511).

This feature would aid the availability of Flink jobs.




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-6733) Remove commented out AvgAggregationFunction.java

2017-09-19 Thread Kurt Young (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6733?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young closed FLINK-6733.
-
Resolution: Fixed

> Remove commented out AvgAggregationFunction.java
> 
>
> Key: FLINK-6733
> URL: https://issues.apache.org/jira/browse/FLINK-6733
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API
>Reporter: Dawid Wysakowicz
>Assignee: Mikhail Lipkovich
>Priority: Trivial
> Fix For: 1.4.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Reopened] (FLINK-6733) Remove commented out AvgAggregationFunction.java

2017-09-19 Thread Kurt Young (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6733?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young reopened FLINK-6733:
---
  Assignee: Mikhail Lipkovich  (was: Kurt Young)

> Remove commented out AvgAggregationFunction.java
> 
>
> Key: FLINK-6733
> URL: https://issues.apache.org/jira/browse/FLINK-6733
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API
>Reporter: Dawid Wysakowicz
>Assignee: Mikhail Lipkovich
>Priority: Trivial
> Fix For: 1.4.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4685: [hotfix] Fix typo for English spells in Debugging ...

2017-09-19 Thread YMorisawa
GitHub user YMorisawa opened a pull request:

https://github.com/apache/flink/pull/4685

[hotfix] Fix typo for English spells in Debugging Classloading documentation

Fixed some basic spelling mistake in Docs.
---
devided -> divided
deploymens -> deployments
posible -> possible
starting a the Flink cluster -> starting a Flink cluster
beginnign -> beginning
---

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/YMorisawa/flink hotfix-doc-classloading

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4685.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4685


commit 729ec1fbb8237a5aff811d88b5b826fbd0de9974
Author: desktop 
Date:   2017-09-20T03:38:26Z

[hotfix] Fix typo in Debugging Classloading documentation




---


[jira] [Updated] (FLINK-6733) Remove commented out AvgAggregationFunction.java

2017-09-19 Thread Kurt Young (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6733?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young updated FLINK-6733:
--
  Priority: Trivial  (was: Major)
Issue Type: Improvement  (was: Bug)

> Remove commented out AvgAggregationFunction.java
> 
>
> Key: FLINK-6733
> URL: https://issues.apache.org/jira/browse/FLINK-6733
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API
>Reporter: Dawid Wysakowicz
>Assignee: Kurt Young
>Priority: Trivial
> Fix For: 1.4.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-6733) Remove commented out AvgAggregationFunction.java

2017-09-19 Thread Kurt Young (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6733?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young closed FLINK-6733.
-
Resolution: Fixed

> Remove commented out AvgAggregationFunction.java
> 
>
> Key: FLINK-6733
> URL: https://issues.apache.org/jira/browse/FLINK-6733
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API
>Reporter: Dawid Wysakowicz
>Assignee: Kurt Young
>Priority: Trivial
> Fix For: 1.4.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Reopened] (FLINK-6733) Remove commented out AvgAggregationFunction.java

2017-09-19 Thread Kurt Young (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6733?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young reopened FLINK-6733:
---
  Assignee: Kurt Young  (was: Mikhail Lipkovich)

> Remove commented out AvgAggregationFunction.java
> 
>
> Key: FLINK-6733
> URL: https://issues.apache.org/jira/browse/FLINK-6733
> Project: Flink
>  Issue Type: Bug
>  Components: Java API
>Reporter: Dawid Wysakowicz
>Assignee: Kurt Young
> Fix For: 1.4.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6173) flink-table not pack-in com.fasterxml.jackson.* in after #FLINK-5414

2017-09-19 Thread Kent Murra (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6173?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16172649#comment-16172649
 ] 

Kent Murra commented on FLINK-6173:
---

The guidance to use a previous version of Maven seems to be for building flink. 
 I'm getting this issue when using the built-and-published flink libraries.

> flink-table not pack-in com.fasterxml.jackson.* in after #FLINK-5414
> 
>
> Key: FLINK-6173
> URL: https://issues.apache.org/jira/browse/FLINK-6173
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Zhenghua Gao
>
> Currently, flink-table will pack-in com.fasterxml.jackson.* and rename them 
> to org.apache.flink.shaded.calcite.com.fasterxml.jackson.*
> If a project depends on flink-table, and uses fasterxml as follows(function 
> explain uses fasterxml indirectly):
> {code:title=WordCount.scala|borderStyle=solid}
> object WordCountWithTable {
>   def main(args: Array[String]): Unit = {
> // set up execution environment
> val env = ExecutionEnvironment.getExecutionEnvironment
> val tEnv = TableEnvironment.getTableEnvironment(env)
> val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 
> 1))
> val expr = input.toTable(tEnv)
> val result = expr
>   .groupBy('word)
>   .select('word, 'frequency.sum as 'frequency)
>   .filter('frequency === 2)
> println(tEnv.explain(result))
> result.toDataSet[WC].print()
>   }
>   case class WC(word: String, frequency: Long)
> }
> {code}
> It actually uses org.apache.flink.shaded.calcite.com.fasterxml.jackson.*
> I found after FLINK-5414,  flink-table didn't pack-in com.fasterxml.jackson.* 
> and the project would throw class not found exception.
> {code:borderStyle=solid}
> Exception in thread "main" java.lang.NoClassDefFoundError: 
> org/apache/flink/shaded/calcite/com/fasterxml/jackson/databind/ObjectMapper
>   at 
> org.apache.flink.table.explain.PlanJsonParser.getSqlExecutionPlan(PlanJsonParser.java:32)
>   at 
> org.apache.flink.table.api.BatchTableEnvironment.explain(BatchTableEnvironment.scala:143)
>   at 
> org.apache.flink.table.api.BatchTableEnvironment.explain(BatchTableEnvironment.scala:164)
>   at 
> org.apache.flink.quickstart.WordCountWithTable$.main(WordCountWithTable.scala:34)
>   at 
> org.apache.flink.quickstart.WordCountWithTable.main(WordCountWithTable.scala)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
> Caused by: java.lang.ClassNotFoundException: 
> org.apache.flink.shaded.calcite.com.fasterxml.jackson.databind.ObjectMapper
>   at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>   at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>   ... 10 more
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7645) Modify system-metrics part show in the document

2017-09-19 Thread Hai Zhou_UTC+8 (JIRA)
Hai Zhou_UTC+8 created FLINK-7645:
-

 Summary: Modify system-metrics part show in the document
 Key: FLINK-7645
 URL: https://issues.apache.org/jira/browse/FLINK-7645
 Project: Flink
  Issue Type: Improvement
  Components: Documentation, Metrics
Reporter: Hai Zhou_UTC+8
 Fix For: 1.4.0


the system-metrics contents structure:
Currently,

{noformat}
├── System metrics
├── Latency tracking
├── Dashboard integration

{noformat}

I think the following is more reasonable,


{noformat}
├── System metrics
   ├── CPU
   ├── Memory
   ├── Threads
   ├── GarbageCollection
   ├── ClassLoader
   ├── Network
   ├── Cluster
   ├── Availability
   ├── Checkpointing
   ├── IO
   └── Connectors
  └──  Kafka Connectors
├── Latency tracking
├── Dashboard integration
{noformat}




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6733) Remove commented out AvgAggregationFunction.java

2017-09-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6733?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16172577#comment-16172577
 ] 

ASF GitHub Bot commented on FLINK-6733:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4630


> Remove commented out AvgAggregationFunction.java
> 
>
> Key: FLINK-6733
> URL: https://issues.apache.org/jira/browse/FLINK-6733
> Project: Flink
>  Issue Type: Bug
>  Components: Java API
>Reporter: Dawid Wysakowicz
>Assignee: Mikhail Lipkovich
> Fix For: 1.4.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4630: [FLINK-6733] Remove commented out AvgAggregationFu...

2017-09-19 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4630


---


[jira] [Closed] (FLINK-6733) Remove commented out AvgAggregationFunction.java

2017-09-19 Thread Kurt Young (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6733?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young closed FLINK-6733.
-
   Resolution: Fixed
Fix Version/s: 1.4.0

> Remove commented out AvgAggregationFunction.java
> 
>
> Key: FLINK-6733
> URL: https://issues.apache.org/jira/browse/FLINK-6733
> Project: Flink
>  Issue Type: Bug
>  Components: Java API
>Reporter: Dawid Wysakowicz
>Assignee: Mikhail Lipkovich
> Fix For: 1.4.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7644) Line the extra semicolon in the source code

2017-09-19 Thread Hai Zhou_UTC+8 (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7644?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hai Zhou_UTC+8 updated FLINK-7644:
--
Description: 
eg. 
final TestDuplicateSerializer keySerializer = new TestDuplicateSerializer()*;;*

Unnecessary semicolon in the end of line.

  was:
eg. 
final TestDuplicateSerializer keySerializer = new TestDuplicateSerializer();;

Unnecessary semicolon in the end of line.


> Line the extra semicolon in the source code
> ---
>
> Key: FLINK-7644
> URL: https://issues.apache.org/jira/browse/FLINK-7644
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Hai Zhou_UTC+8
>Assignee: Hai Zhou_UTC+8
>Priority: Minor
> Fix For: 1.4.0, 1.3.3
>
>
> eg.   
> final TestDuplicateSerializer keySerializer = new 
> TestDuplicateSerializer()*;;*
> Unnecessary semicolon in the end of line.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7644) Line the extra semicolon in the source code

2017-09-19 Thread Hai Zhou_UTC+8 (JIRA)
Hai Zhou_UTC+8 created FLINK-7644:
-

 Summary: Line the extra semicolon in the source code
 Key: FLINK-7644
 URL: https://issues.apache.org/jira/browse/FLINK-7644
 Project: Flink
  Issue Type: Improvement
  Components: Build System
Reporter: Hai Zhou_UTC+8
Assignee: Hai Zhou_UTC+8
Priority: Minor
 Fix For: 1.4.0, 1.3.3


eg. 
final TestDuplicateSerializer keySerializer = new TestDuplicateSerializer();;

Unnecessary semicolon in the end of line.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-6444) Add a check that '@VisibleForTesting' methods are only used in tests

2017-09-19 Thread Hai Zhou_UTC+8 (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6444?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hai Zhou_UTC+8 reassigned FLINK-6444:
-

Assignee: Hai Zhou_UTC+8

> Add a check that '@VisibleForTesting' methods are only used in tests
> 
>
> Key: FLINK-6444
> URL: https://issues.apache.org/jira/browse/FLINK-6444
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Stephan Ewen
>Assignee: Hai Zhou_UTC+8
>
> Some methods are annotated with {{@VisibleForTesting}}. These methods should 
> only be called from tests.
> This is currently not enforced / checked during the build. We should add such 
> a check.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-7626) Add some metric description about checkpoints

2017-09-19 Thread Hai Zhou_UTC+8 (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7626?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hai Zhou_UTC+8 closed FLINK-7626.
-
Resolution: Fixed

> Add some metric description about checkpoints
> -
>
> Key: FLINK-7626
> URL: https://issues.apache.org/jira/browse/FLINK-7626
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation, Metrics
>Affects Versions: 1.3.2
>Reporter: Hai Zhou_UTC+8
>Assignee: Hai Zhou_UTC+8
> Fix For: 1.4.0, 1.3.3
>
>
> I export the metrics to the logfile via 
> Slf4jReporter(https://issues.apache.org/jira/browse/FLINK-4831), and found 
> that there are some checkpoint metrics that are not described in the 
> document, so I added.
> {noformat}
> //Number of total checkpoints (in progress, completed, failed)
> totalNumberOfCheckpoints
>  //Number of in progress checkpoints.
> numberOfInProgressCheckpoints
> //Number of successfully completed checkpoints
> numberOfCompletedCheckpoints
> //Number of failed checkpoints.
> numberOfFailedCheckpoints
> //Timestamp when the checkpoint was restored at the coordinator.
> lastCheckpointRestoreTimestamp
> //Buffered bytes during alignment over all subtasks.
> lastCheckpointAlignmentBuffered
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7634) Add option to create a savepoint while canceling a job in the dashboard

2017-09-19 Thread Hai Zhou_UTC+8 (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16172561#comment-16172561
 ] 

Hai Zhou_UTC+8 commented on FLINK-7634:
---

+1

> Add option to create a savepoint while canceling a job in the dashboard 
> 
>
> Key: FLINK-7634
> URL: https://issues.apache.org/jira/browse/FLINK-7634
> Project: Flink
>  Issue Type: Improvement
>  Components: JobManager
>Affects Versions: 1.3.2
>Reporter: Elias Levy
>Priority: Minor
>
> Currently there appears to be no way to trigger a savepoint in the dashboard, 
> to cancel a job while taking a savepoint, to list savepoints, or to list 
> external checkpoints.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7613) Fix documentation error in QuickStart

2017-09-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16172533#comment-16172533
 ] 

ASF GitHub Bot commented on FLINK-7613:
---

Github user raymondtay commented on a diff in the pull request:

https://github.com/apache/flink/pull/4666#discussion_r139849283
  
--- Diff: docs/dev/datastream_api.md ---
@@ -497,7 +497,9 @@ env.generateSequence(1,10).map(new 
MyMapper()).setBufferTimeout(timeoutMillis);
 LocalStreamEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment
 env.setBufferTimeout(timeoutMillis)
 
-env.genereateSequence(1,10).map(myMap).setBufferTimeout(timeoutMillis)
+def myMap : Long => Long = _ + 1 // `generateSequence` returns a 
`DataStream[Long]` type
--- End diff --

I'm taking the perspective of the absolute beginner since this doc was 
linked from the QuickStart section; when i started with the QuickStart section 
most parts of the doc were good enough (imo) to guide me w/o referring to the 
javadocs (which i think makes perfect sense) so i thought it might be a good 
idea to continue this tradition and make things explicitly clear. What do you 
think @zentol ?


> Fix documentation error in QuickStart
> -
>
> Key: FLINK-7613
> URL: https://issues.apache.org/jira/browse/FLINK-7613
> Project: Flink
>  Issue Type: Task
>  Components: Documentation
>Affects Versions: 1.4.0
>Reporter: Raymond Tay
>Priority: Minor
>
> In the `QuickStart => Run The Example` section, there's a typographical error 
> which points the reader to `*jobmanager* but it should be `*taskmanager*` in 
> Apache Flink 1.4.x. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4666: [FLINK-7613][Documentation] Fixed typographical er...

2017-09-19 Thread raymondtay
Github user raymondtay commented on a diff in the pull request:

https://github.com/apache/flink/pull/4666#discussion_r139849283
  
--- Diff: docs/dev/datastream_api.md ---
@@ -497,7 +497,9 @@ env.generateSequence(1,10).map(new 
MyMapper()).setBufferTimeout(timeoutMillis);
 LocalStreamEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment
 env.setBufferTimeout(timeoutMillis)
 
-env.genereateSequence(1,10).map(myMap).setBufferTimeout(timeoutMillis)
+def myMap : Long => Long = _ + 1 // `generateSequence` returns a 
`DataStream[Long]` type
--- End diff --

I'm taking the perspective of the absolute beginner since this doc was 
linked from the QuickStart section; when i started with the QuickStart section 
most parts of the doc were good enough (imo) to guide me w/o referring to the 
javadocs (which i think makes perfect sense) so i thought it might be a good 
idea to continue this tradition and make things explicitly clear. What do you 
think @zentol ?


---


[jira] [Commented] (FLINK-6233) Support rowtime inner equi-join between two streams in the SQL API

2017-09-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16172532#comment-16172532
 ] 

ASF GitHub Bot commented on FLINK-6233:
---

Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4625#discussion_r139849018
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/RowTimeBoundedStreamInnerJoin.scala
 ---
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+
+/**
+  * The function to execute row(event) time bounded stream inner-join.
+  */
+class RowTimeBoundedStreamInnerJoin(
+leftLowerBound: Long,
+leftUpperBound: Long,
+allowedLateness: Long,
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+leftTimeIdx: Int,
+rightTimeIdx: Int)
+extends TimeBoundedStreamInnerJoin(
+  leftLowerBound,
+  leftUpperBound,
+  allowedLateness,
+  leftType,
+  rightType,
+  genJoinFuncName,
+  genJoinFuncCode,
+  leftTimeIdx,
+  rightTimeIdx,
+  JoinTimeIndicator.ROWTIME) {
+
+  override def checkRowOutOfDate(timeForRow: Long, watermark: Long) = {
+timeForRow <= watermark - allowedLateness
+  }
+
+  override def updateOperatorTime(ctx: CoProcessFunction[CRow, CRow, 
CRow]#Context): Unit = {
+rightOperatorTime =
+  if (ctx.timerService().currentWatermark() > 0) 
ctx.timerService().currentWatermark()
--- End diff --

Totally understand  


> Support rowtime inner equi-join between two streams in the SQL API
> --
>
> Key: FLINK-6233
> URL: https://issues.apache.org/jira/browse/FLINK-6233
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: hongyuhong
>Assignee: Xingcan Cui
>
> The goal of this issue is to add support for inner equi-join on proc time 
> streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT o.rowtime , o.productId, o.orderId, s.rowtime AS shipTime 
> FROM Orders AS o 
> JOIN Shipments AS s 
> ON o.orderId = s.orderId 
> AND o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL '1' HOUR;
> {code}
> The following restrictions should initially apply:
> * The join hint only support inner join
> * The ON clause should include equi-join condition
> * The time-condition {{o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL 
> '1' HOUR}} only can use rowtime that is a system attribute, the time 
> condition only support bounded time range like {{o.rowtime BETWEEN s.rowtime 
> - INTERVAL '1' HOUR AND s.rowtime + INTERVAL '1' HOUR}}, not support 
> unbounded like {{o.rowtime  s.rowtime}} ,  and  should include both two 
> stream's rowtime attribute, {{o.rowtime between rowtime () and rowtime () + 
> 1}} should also not be supported.
> An row-time streams join will not be able to handle late data, because this 
> would mean in insert a row into a sorted order shift all other computations. 
> This would be too expensive to maintain. Therefore, we will throw an error if 
> a user tries to use an row-time stream join with late data handling.
> This issue includes:
> * Design of the DataStream operator to deal with stream join
> * Translation from Calcite's RelNode representation (LogicalJoin). 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...

2017-09-19 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4625#discussion_r139849018
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/RowTimeBoundedStreamInnerJoin.scala
 ---
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+
+/**
+  * The function to execute row(event) time bounded stream inner-join.
+  */
+class RowTimeBoundedStreamInnerJoin(
+leftLowerBound: Long,
+leftUpperBound: Long,
+allowedLateness: Long,
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+leftTimeIdx: Int,
+rightTimeIdx: Int)
+extends TimeBoundedStreamInnerJoin(
+  leftLowerBound,
+  leftUpperBound,
+  allowedLateness,
+  leftType,
+  rightType,
+  genJoinFuncName,
+  genJoinFuncCode,
+  leftTimeIdx,
+  rightTimeIdx,
+  JoinTimeIndicator.ROWTIME) {
+
+  override def checkRowOutOfDate(timeForRow: Long, watermark: Long) = {
+timeForRow <= watermark - allowedLateness
+  }
+
+  override def updateOperatorTime(ctx: CoProcessFunction[CRow, CRow, 
CRow]#Context): Unit = {
+rightOperatorTime =
+  if (ctx.timerService().currentWatermark() > 0) 
ctx.timerService().currentWatermark()
--- End diff --

Totally understand 😄 


---


[jira] [Commented] (FLINK-7613) Fix documentation error in QuickStart

2017-09-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16172526#comment-16172526
 ] 

ASF GitHub Bot commented on FLINK-7613:
---

Github user raymondtay commented on a diff in the pull request:

https://github.com/apache/flink/pull/4666#discussion_r139848500
  
--- Diff: docs/dev/execution_configuration.md ---
@@ -42,44 +42,44 @@ var executionConfig = env.getConfig
 
 The following configuration options are available: (the default is bold)
 
-- **`enableClosureCleaner()`** / `disableClosureCleaner()`. The closure 
cleaner is enabled by default. The closure cleaner removes unneeded references 
to the surrounding class of anonymous functions inside Flink programs.
+- **`enableClosureCleaner()`** / **`disableClosureCleaner()`**. The 
closure cleaner is enabled by default. The closure cleaner removes unneeded 
references to the surrounding class of anonymous functions inside Flink 
programs.
--- End diff --

@zentol I wasn't aware of that convention in all honesty, thank you for 
pointing it out.


> Fix documentation error in QuickStart
> -
>
> Key: FLINK-7613
> URL: https://issues.apache.org/jira/browse/FLINK-7613
> Project: Flink
>  Issue Type: Task
>  Components: Documentation
>Affects Versions: 1.4.0
>Reporter: Raymond Tay
>Priority: Minor
>
> In the `QuickStart => Run The Example` section, there's a typographical error 
> which points the reader to `*jobmanager* but it should be `*taskmanager*` in 
> Apache Flink 1.4.x. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4666: [FLINK-7613][Documentation] Fixed typographical er...

2017-09-19 Thread raymondtay
Github user raymondtay commented on a diff in the pull request:

https://github.com/apache/flink/pull/4666#discussion_r139848500
  
--- Diff: docs/dev/execution_configuration.md ---
@@ -42,44 +42,44 @@ var executionConfig = env.getConfig
 
 The following configuration options are available: (the default is bold)
 
-- **`enableClosureCleaner()`** / `disableClosureCleaner()`. The closure 
cleaner is enabled by default. The closure cleaner removes unneeded references 
to the surrounding class of anonymous functions inside Flink programs.
+- **`enableClosureCleaner()`** / **`disableClosureCleaner()`**. The 
closure cleaner is enabled by default. The closure cleaner removes unneeded 
references to the surrounding class of anonymous functions inside Flink 
programs.
--- End diff --

@zentol I wasn't aware of that convention in all honesty, thank you for 
pointing it out.


---


[jira] [Commented] (FLINK-7531) Move existing REST handler to flink-runtime

2017-09-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7531?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16172471#comment-16172471
 ] 

ASF GitHub Bot commented on FLINK-7531:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4600


> Move existing REST handler to flink-runtime
> ---
>
> Key: FLINK-7531
> URL: https://issues.apache.org/jira/browse/FLINK-7531
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST, Webfrontend
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
> Fix For: 1.4.0
>
>
> Since the new REST endpoints live in {{flink-runtime}} we should move the 
> existing rest handlers to {{flink-runtime}} as well. The static web server 
> content remains in {{flink-runtime-web}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4600: [FLINK-7531] Move Flink legacy rest handler to fli...

2017-09-19 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4600


---


[jira] [Closed] (FLINK-7531) Move existing REST handler to flink-runtime

2017-09-19 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7531?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-7531.

   Resolution: Fixed
Fix Version/s: 1.4.0

Fixed via 4fc019a96a08446d7ba5f57664904abcd585e31c

> Move existing REST handler to flink-runtime
> ---
>
> Key: FLINK-7531
> URL: https://issues.apache.org/jira/browse/FLINK-7531
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST, Webfrontend
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
> Fix For: 1.4.0
>
>
> Since the new REST endpoints live in {{flink-runtime}} we should move the 
> existing rest handlers to {{flink-runtime}} as well. The static web server 
> content remains in {{flink-runtime-web}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-7529) Retrieve complete REST address from RestfulGateway

2017-09-19 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7529?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-7529.

   Resolution: Fixed
Fix Version/s: 1.4.0

Added via 32770103253e01cd61c8634378cfa1b26707e19a

> Retrieve complete REST address from RestfulGateway
> --
>
> Key: FLINK-7529
> URL: https://issues.apache.org/jira/browse/FLINK-7529
> Project: Flink
>  Issue Type: Improvement
>  Components: REST, Webfrontend
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> At the moment we only retrieve the web port from the 
> {{JobManager}}/{{Dispatcher}}. Instead it would be better to retrieve the 
> complete REST address from these components, including the protocol 
> (http/https) and the hostname. That way we wouldn't have to pass information 
> like wether https is enabled to all REST handlers where it is used for the 
> generation of the redirection address.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4599: [FLINK-7529] Retrieve complete REST address from g...

2017-09-19 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4599


---


[jira] [Commented] (FLINK-7529) Retrieve complete REST address from RestfulGateway

2017-09-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16172461#comment-16172461
 ] 

ASF GitHub Bot commented on FLINK-7529:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4599


> Retrieve complete REST address from RestfulGateway
> --
>
> Key: FLINK-7529
> URL: https://issues.apache.org/jira/browse/FLINK-7529
> Project: Flink
>  Issue Type: Improvement
>  Components: REST, Webfrontend
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> At the moment we only retrieve the web port from the 
> {{JobManager}}/{{Dispatcher}}. Instead it would be better to retrieve the 
> complete REST address from these components, including the protocol 
> (http/https) and the hostname. That way we wouldn't have to pass information 
> like wether https is enabled to all REST handlers where it is used for the 
> generation of the redirection address.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7528) Create Dispatcher REST endpoint

2017-09-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16172458#comment-16172458
 ] 

ASF GitHub Bot commented on FLINK-7528:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4598


> Create Dispatcher REST endpoint
> ---
>
> Key: FLINK-7528
> URL: https://issues.apache.org/jira/browse/FLINK-7528
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> Create and integrate the {{DispatcherRestEndpoint}} with the {{Dispatcher}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4598: [FLINK-7528] Create DispatcherRestEndpoint and int...

2017-09-19 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4598


---


[jira] [Closed] (FLINK-7528) Create Dispatcher REST endpoint

2017-09-19 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7528?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-7528.

   Resolution: Fixed
Fix Version/s: 1.4.0

Added via 6a62f1455313ee8fae0ff79945da61fb67ec8edb

> Create Dispatcher REST endpoint
> ---
>
> Key: FLINK-7528
> URL: https://issues.apache.org/jira/browse/FLINK-7528
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> Create and integrate the {{DispatcherRestEndpoint}} with the {{Dispatcher}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-7527) Add redirection logic to AbstractRestHandler

2017-09-19 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7527?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-7527.

   Resolution: Fixed
Fix Version/s: 1.4.0

Added via 75e84e04f5a3e2766e331fd05ddb725fe9b00d99

> Add redirection logic to AbstractRestHandler
> 
>
> Key: FLINK-7527
> URL: https://issues.apache.org/jira/browse/FLINK-7527
> Project: Flink
>  Issue Type: Improvement
>  Components: REST
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> The {{AbstractRestHandler}} should extend the {{RedirectHandler}} introduced 
> with FLINK-7459 in order to add redirection functionality to the 
> {{AbstractRestHandler}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7527) Add redirection logic to AbstractRestHandler

2017-09-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7527?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16172445#comment-16172445
 ] 

ASF GitHub Bot commented on FLINK-7527:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4597


> Add redirection logic to AbstractRestHandler
> 
>
> Key: FLINK-7527
> URL: https://issues.apache.org/jira/browse/FLINK-7527
> Project: Flink
>  Issue Type: Improvement
>  Components: REST
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
>
> The {{AbstractRestHandler}} should extend the {{RedirectHandler}} introduced 
> with FLINK-7459 in order to add redirection functionality to the 
> {{AbstractRestHandler}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4597: [FLINK-7527] [rest] Let AbstractRestHandler extend...

2017-09-19 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4597


---


[jira] [Commented] (FLINK-7643) HadoopFileSystem always reloads GlobalConfiguration

2017-09-19 Thread Bowen Li (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16172361#comment-16172361
 ] 

Bowen Li commented on FLINK-7643:
-

Looks like it's related to FLINK-7365

> HadoopFileSystem always reloads GlobalConfiguration
> ---
>
> Key: FLINK-7643
> URL: https://issues.apache.org/jira/browse/FLINK-7643
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Ufuk Celebi
>
> HadoopFileSystem always reloads GlobalConfiguration, which potentially leads 
> to a lot of noise in the logs, because this happens on each checkpoint.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7643) HadoopFileSystem always reloads GlobalConfiguration

2017-09-19 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7643?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-7643:
-
Affects Version/s: 1.4.0

> HadoopFileSystem always reloads GlobalConfiguration
> ---
>
> Key: FLINK-7643
> URL: https://issues.apache.org/jira/browse/FLINK-7643
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Ufuk Celebi
>
> HadoopFileSystem always reloads GlobalConfiguration, which potentially leads 
> to a lot of noise in the logs, because this happens on each checkpoint.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7514) fix BackPressureStatsTrackerITCase releasing buffers twice

2017-09-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16172286#comment-16172286
 ] 

ASF GitHub Bot commented on FLINK-7514:
---

Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/4591
  
@zentol unfortunately, when changing to using Netty's buffer counting, it 
will check any form of illegal reference count usages, and a double-free is one 
of them. Even without Netty, this pattern could result from an invalid use and 
may be guarded the same way.


> fix BackPressureStatsTrackerITCase releasing buffers twice
> --
>
> Key: FLINK-7514
> URL: https://issues.apache.org/jira/browse/FLINK-7514
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> {{BackPressureStatsTrackerITCase#testBackPressuredProducer()}} is releasing 
> its buffers twice which should be fixed.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4591: [FLINK-7514][tests] fix BackPressureStatsTrackerITCase re...

2017-09-19 Thread NicoK
Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/4591
  
@zentol unfortunately, when changing to using Netty's buffer counting, it 
will check any form of illegal reference count usages, and a double-free is one 
of them. Even without Netty, this pattern could result from an invalid use and 
may be guarded the same way.


---


[jira] [Commented] (FLINK-7527) Add redirection logic to AbstractRestHandler

2017-09-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7527?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16172278#comment-16172278
 ] 

ASF GitHub Bot commented on FLINK-7527:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4597#discussion_r139805450
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ErrorResponseBody.java
 ---
@@ -38,6 +40,10 @@ public ErrorResponseBody(String error) {
this(Collections.singletonList(error));
}
 
+   public ErrorResponseBody(Throwable throwable) {
--- End diff --

True. Will remove it.


> Add redirection logic to AbstractRestHandler
> 
>
> Key: FLINK-7527
> URL: https://issues.apache.org/jira/browse/FLINK-7527
> Project: Flink
>  Issue Type: Improvement
>  Components: REST
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
>
> The {{AbstractRestHandler}} should extend the {{RedirectHandler}} introduced 
> with FLINK-7459 in order to add redirection functionality to the 
> {{AbstractRestHandler}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4597: [FLINK-7527] [rest] Let AbstractRestHandler extend...

2017-09-19 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4597#discussion_r139805450
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ErrorResponseBody.java
 ---
@@ -38,6 +40,10 @@ public ErrorResponseBody(String error) {
this(Collections.singletonList(error));
}
 
+   public ErrorResponseBody(Throwable throwable) {
--- End diff --

True. Will remove it.


---


[jira] [Commented] (FLINK-7603) Support within clause in MatchRecognize

2017-09-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7603?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16172085#comment-16172085
 ] 

ASF GitHub Bot commented on FLINK-7603:
---

GitHub user bowenli86 opened a pull request:

https://github.com/apache/flink/pull/4684

[FLINK-7603][savepoint/doc] Document how to take a savepoint on YARN

## What is the purpose of the change

The documentation should have a separate entry for savepoint related CLI 
commands in combination with YARN. It is currently not documented that you have 
to supply the application id, nor how you can pass it.

## Brief change log

- *add instruction of taking savepoints on YARN to both Savepoint and CLI 
doc*

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

## Documentation

  - Does this pull request introduce a new feature? (no)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/bowenli86/flink FLINK-7603

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4684.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4684


commit 6386983239bd3024b395c865ec4fd33e232ca5a3
Author: Bowen Li 
Date:   2017-08-30T16:35:03Z

FLINK-7422 Upgrade Kinesis Client Library (KCL) and AWS SDK in 
flink-connector-kinesis

commit 381cd4156b84673a1d32d2db3f7b2d748d90d980
Author: Bowen Li 
Date:   2017-09-07T06:33:37Z

Merge remote-tracking branch 'upstream/master'

commit dcf40bd821187b848d924f7f4df6805b1b924c16
Author: Bowen Li 
Date:   2017-09-15T18:00:03Z

Merge remote-tracking branch 'upstream/master'

commit 169ea0a3bee1ba315d39fa49c16e9bd7c71d1338
Author: Bowen Li 
Date:   2017-09-18T06:25:26Z

Merge remote-tracking branch 'upstream/master'

commit 659e91c18ade8eb65d355b5b85ae2d402a61ff5e
Author: Bowen Li 
Date:   2017-09-18T23:50:48Z

Merge remote-tracking branch 'upstream/master'

commit 990c4648a1427ca7c3c27453fe2a40cd5cac3734
Author: Bowen Li 
Date:   2017-09-19T17:18:54Z

Merge remote-tracking branch 'upstream/master'

commit ae4b5647e25d6e6915486c6ac4a42e887a53101c
Author: Bowen Li 
Date:   2017-09-19T17:44:16Z

FLINK-7603 Document how to take a savepoint on YARN

commit f82db156e6cabab9f43b7c3ec658b0ddc1a0637c
Author: Bowen Li 
Date:   2017-09-19T17:48:48Z

update doc




> Support within clause in MatchRecognize
> ---
>
> Key: FLINK-7603
> URL: https://issues.apache.org/jira/browse/FLINK-7603
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API & SQL
>Reporter: Dian Fu
>Assignee: Dian Fu
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4684: [FLINK-7603][savepoint/doc] Document how to take a...

2017-09-19 Thread bowenli86
GitHub user bowenli86 opened a pull request:

https://github.com/apache/flink/pull/4684

[FLINK-7603][savepoint/doc] Document how to take a savepoint on YARN

## What is the purpose of the change

The documentation should have a separate entry for savepoint related CLI 
commands in combination with YARN. It is currently not documented that you have 
to supply the application id, nor how you can pass it.

## Brief change log

- *add instruction of taking savepoints on YARN to both Savepoint and CLI 
doc*

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

## Documentation

  - Does this pull request introduce a new feature? (no)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/bowenli86/flink FLINK-7603

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4684.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4684


commit 6386983239bd3024b395c865ec4fd33e232ca5a3
Author: Bowen Li 
Date:   2017-08-30T16:35:03Z

FLINK-7422 Upgrade Kinesis Client Library (KCL) and AWS SDK in 
flink-connector-kinesis

commit 381cd4156b84673a1d32d2db3f7b2d748d90d980
Author: Bowen Li 
Date:   2017-09-07T06:33:37Z

Merge remote-tracking branch 'upstream/master'

commit dcf40bd821187b848d924f7f4df6805b1b924c16
Author: Bowen Li 
Date:   2017-09-15T18:00:03Z

Merge remote-tracking branch 'upstream/master'

commit 169ea0a3bee1ba315d39fa49c16e9bd7c71d1338
Author: Bowen Li 
Date:   2017-09-18T06:25:26Z

Merge remote-tracking branch 'upstream/master'

commit 659e91c18ade8eb65d355b5b85ae2d402a61ff5e
Author: Bowen Li 
Date:   2017-09-18T23:50:48Z

Merge remote-tracking branch 'upstream/master'

commit 990c4648a1427ca7c3c27453fe2a40cd5cac3734
Author: Bowen Li 
Date:   2017-09-19T17:18:54Z

Merge remote-tracking branch 'upstream/master'

commit ae4b5647e25d6e6915486c6ac4a42e887a53101c
Author: Bowen Li 
Date:   2017-09-19T17:44:16Z

FLINK-7603 Document how to take a savepoint on YARN

commit f82db156e6cabab9f43b7c3ec658b0ddc1a0637c
Author: Bowen Li 
Date:   2017-09-19T17:48:48Z

update doc




---


[jira] [Closed] (FLINK-6549) Improve error message for type mismatches with side outputs

2017-09-19 Thread Bowen Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li closed FLINK-6549.
---
Resolution: Fixed

> Improve error message for type mismatches with side outputs
> ---
>
> Key: FLINK-6549
> URL: https://issues.apache.org/jira/browse/FLINK-6549
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.3.0, 1.4.0
>Reporter: Chesnay Schepler
>Assignee: Bowen Li
>Priority: Minor
>
> A type mismatch when using side outputs causes a ClassCastException to be 
> thrown. It would be neat to include the name of the OutputTags in the 
> exception message.
> This can occur when multiple {{OutputTag]}s with different types but 
> identical names are being used.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4665: [Flink-7611]add metrics to measure the num of data...

2017-09-19 Thread Aitozi
Github user Aitozi commented on a diff in the pull request:

https://github.com/apache/flink/pull/4665#discussion_r139742368
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 ---
@@ -405,6 +411,8 @@ public void merge(W mergeResult,
// windowAssigner is event time and current timestamp + allowed 
lateness no less than element timestamp
if (isSkippedElement && lateDataOutputTag != null && 
isElementLate(element)) {
sideOutput(element);
+   } else if (isSkippedElement) {
--- End diff --

i think when the `isSkippedElement` is true,  the `isElementLate(element)` 
is always be true. Because `isSkippedElement` is true when all the assigned 
window's window.endtime + allowLateness < currentLowWatermark, and 
`isElementLate` is true when element.time + allowLateness < 
currentLowWatermark. and element.time is <= bigest window.endtime.  so does 
`isElementLate` always be true when isSkippedElement is true?  And i think if i 
want to rule  out the situation that **because no windows were assigned to 
it.**, i just need to judge  whether the variable `Collection elementWindows` 
is empty?


---


[jira] [Commented] (FLINK-7630) Allow passing a File or an InputStream to ParameterTool.fromPropertiesFile()

2017-09-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16171912#comment-16171912
 ] 

ASF GitHub Bot commented on FLINK-7630:
---

Github user tony810430 closed the pull request at:

https://github.com/apache/flink/pull/4678


> Allow passing a File or an InputStream to ParameterTool.fromPropertiesFile()
> 
>
> Key: FLINK-7630
> URL: https://issues.apache.org/jira/browse/FLINK-7630
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API
>Reporter: Wei-Che Wei
>Assignee: Wei-Che Wei
>Priority: Minor
> Fix For: 1.4.0, 1.3.3
>
>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-user-ParameterTool-fromPropertiesFile-to-get-resource-file-inside-my-jar-tp15482.html
> From this discussion, it seems that the current functionality of 
> {{ParameterTool.fromPropertiesFile}} is not enough.
> It will be more useful if {{ParameterTool.fromPropertiesFile}} can provide 
> more kinds of parameter type such as {{File}} and {{InputStream}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4678: [FLINK-7630] [Java API] Allow passing a File or an...

2017-09-19 Thread tony810430
Github user tony810430 closed the pull request at:

https://github.com/apache/flink/pull/4678


---


[jira] [Commented] (FLINK-7630) Allow passing a File or an InputStream to ParameterTool.fromPropertiesFile()

2017-09-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16171911#comment-16171911
 ] 

ASF GitHub Bot commented on FLINK-7630:
---

Github user tony810430 commented on the issue:

https://github.com/apache/flink/pull/4678
  
I thought it would be closed automatically. But it's okay, I will close it. 
Thanks for your quick review.


> Allow passing a File or an InputStream to ParameterTool.fromPropertiesFile()
> 
>
> Key: FLINK-7630
> URL: https://issues.apache.org/jira/browse/FLINK-7630
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API
>Reporter: Wei-Che Wei
>Assignee: Wei-Che Wei
>Priority: Minor
> Fix For: 1.4.0, 1.3.3
>
>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-user-ParameterTool-fromPropertiesFile-to-get-resource-file-inside-my-jar-tp15482.html
> From this discussion, it seems that the current functionality of 
> {{ParameterTool.fromPropertiesFile}} is not enough.
> It will be more useful if {{ParameterTool.fromPropertiesFile}} can provide 
> more kinds of parameter type such as {{File}} and {{InputStream}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4678: [FLINK-7630] [Java API] Allow passing a File or an InputS...

2017-09-19 Thread tony810430
Github user tony810430 commented on the issue:

https://github.com/apache/flink/pull/4678
  
I thought it would be closed automatically. But it's okay, I will close it. 
Thanks for your quick review.


---


[jira] [Commented] (FLINK-6233) Support rowtime inner equi-join between two streams in the SQL API

2017-09-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16171909#comment-16171909
 ] 

ASF GitHub Bot commented on FLINK-6233:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/4625
  
Btw, I noticed I did not reply to your comments. 

I think it would be good to have the eager state cleaning in the initial 
version. Shouldn't be too much effort. Basically, getting the condition right 
and calling `remove()` on the `Map.Entry`.

What do you mean by "distinguish the < and <=signs"? If there is an 
off-by-one issue in the computation of the window boundaries, it needs to be 
fixed with this PR. We shouldn't merge a semantically incorrect operator (of 
course there might be bugs...). Performance issues are OK but the semantics 
must be correct.

Regarding the `"misc"` test failures, yes that can happen. No need to worry 
about that as long as the `""` libraries build passes. I'll run the tests 
anyway again before merging ;-)


> Support rowtime inner equi-join between two streams in the SQL API
> --
>
> Key: FLINK-6233
> URL: https://issues.apache.org/jira/browse/FLINK-6233
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: hongyuhong
>Assignee: Xingcan Cui
>
> The goal of this issue is to add support for inner equi-join on proc time 
> streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT o.rowtime , o.productId, o.orderId, s.rowtime AS shipTime 
> FROM Orders AS o 
> JOIN Shipments AS s 
> ON o.orderId = s.orderId 
> AND o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL '1' HOUR;
> {code}
> The following restrictions should initially apply:
> * The join hint only support inner join
> * The ON clause should include equi-join condition
> * The time-condition {{o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL 
> '1' HOUR}} only can use rowtime that is a system attribute, the time 
> condition only support bounded time range like {{o.rowtime BETWEEN s.rowtime 
> - INTERVAL '1' HOUR AND s.rowtime + INTERVAL '1' HOUR}}, not support 
> unbounded like {{o.rowtime  s.rowtime}} ,  and  should include both two 
> stream's rowtime attribute, {{o.rowtime between rowtime () and rowtime () + 
> 1}} should also not be supported.
> An row-time streams join will not be able to handle late data, because this 
> would mean in insert a row into a sorted order shift all other computations. 
> This would be too expensive to maintain. Therefore, we will throw an error if 
> a user tries to use an row-time stream join with late data handling.
> This issue includes:
> * Design of the DataStream operator to deal with stream join
> * Translation from Calcite's RelNode representation (LogicalJoin). 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4625: [FLINK-6233] [table] Support time-bounded stream inner jo...

2017-09-19 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/4625
  
Btw, I noticed I did not reply to your comments. 

I think it would be good to have the eager state cleaning in the initial 
version. Shouldn't be too much effort. Basically, getting the condition right 
and calling `remove()` on the `Map.Entry`.

What do you mean by "distinguish the < and <=signs"? If there is an 
off-by-one issue in the computation of the window boundaries, it needs to be 
fixed with this PR. We shouldn't merge a semantically incorrect operator (of 
course there might be bugs...). Performance issues are OK but the semantics 
must be correct.

Regarding the `"misc"` test failures, yes that can happen. No need to worry 
about that as long as the `""` libraries build passes. I'll run the tests 
anyway again before merging ;-)


---


[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...

2017-09-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4625#discussion_r139728162
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/RowTimeBoundedStreamInnerJoin.scala
 ---
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+
+/**
+  * The function to execute row(event) time bounded stream inner-join.
+  */
+class RowTimeBoundedStreamInnerJoin(
+leftLowerBound: Long,
+leftUpperBound: Long,
+allowedLateness: Long,
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+leftTimeIdx: Int,
+rightTimeIdx: Int)
+extends TimeBoundedStreamInnerJoin(
+  leftLowerBound,
+  leftUpperBound,
+  allowedLateness,
+  leftType,
+  rightType,
+  genJoinFuncName,
+  genJoinFuncCode,
+  leftTimeIdx,
+  rightTimeIdx,
+  JoinTimeIndicator.ROWTIME) {
+
+  override def checkRowOutOfDate(timeForRow: Long, watermark: Long) = {
+timeForRow <= watermark - allowedLateness
+  }
+
+  override def updateOperatorTime(ctx: CoProcessFunction[CRow, CRow, 
CRow]#Context): Unit = {
+rightOperatorTime =
+  if (ctx.timerService().currentWatermark() > 0) 
ctx.timerService().currentWatermark()
--- End diff --

OK, let's keep it here. Changing the value of watermarks won't be possible 
as it is built into the DataStream API and some users rely on the current 
behavior. The curse of public APIs ;-)


---


[jira] [Commented] (FLINK-6233) Support rowtime inner equi-join between two streams in the SQL API

2017-09-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16171900#comment-16171900
 ] 

ASF GitHub Bot commented on FLINK-6233:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4625#discussion_r139728162
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/RowTimeBoundedStreamInnerJoin.scala
 ---
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+
+/**
+  * The function to execute row(event) time bounded stream inner-join.
+  */
+class RowTimeBoundedStreamInnerJoin(
+leftLowerBound: Long,
+leftUpperBound: Long,
+allowedLateness: Long,
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+leftTimeIdx: Int,
+rightTimeIdx: Int)
+extends TimeBoundedStreamInnerJoin(
+  leftLowerBound,
+  leftUpperBound,
+  allowedLateness,
+  leftType,
+  rightType,
+  genJoinFuncName,
+  genJoinFuncCode,
+  leftTimeIdx,
+  rightTimeIdx,
+  JoinTimeIndicator.ROWTIME) {
+
+  override def checkRowOutOfDate(timeForRow: Long, watermark: Long) = {
+timeForRow <= watermark - allowedLateness
+  }
+
+  override def updateOperatorTime(ctx: CoProcessFunction[CRow, CRow, 
CRow]#Context): Unit = {
+rightOperatorTime =
+  if (ctx.timerService().currentWatermark() > 0) 
ctx.timerService().currentWatermark()
--- End diff --

OK, let's keep it here. Changing the value of watermarks won't be possible 
as it is built into the DataStream API and some users rely on the current 
behavior. The curse of public APIs ;-)


> Support rowtime inner equi-join between two streams in the SQL API
> --
>
> Key: FLINK-6233
> URL: https://issues.apache.org/jira/browse/FLINK-6233
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: hongyuhong
>Assignee: Xingcan Cui
>
> The goal of this issue is to add support for inner equi-join on proc time 
> streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT o.rowtime , o.productId, o.orderId, s.rowtime AS shipTime 
> FROM Orders AS o 
> JOIN Shipments AS s 
> ON o.orderId = s.orderId 
> AND o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL '1' HOUR;
> {code}
> The following restrictions should initially apply:
> * The join hint only support inner join
> * The ON clause should include equi-join condition
> * The time-condition {{o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL 
> '1' HOUR}} only can use rowtime that is a system attribute, the time 
> condition only support bounded time range like {{o.rowtime BETWEEN s.rowtime 
> - INTERVAL '1' HOUR AND s.rowtime + INTERVAL '1' HOUR}}, not support 
> unbounded like {{o.rowtime  s.rowtime}} ,  and  should include both two 
> stream's rowtime attribute, {{o.rowtime between rowtime () and rowtime () + 
> 1}} should also not be supported.
> An row-time streams join will not be able to handle late data, because this 
> would mean in insert a row into a sorted order shift all other computations. 
> This would be too expensive to maintain. Therefore, we will throw an error if 
> a user tries to use an row-time stream join with late data handling.
> This issue includes:
> * Design of the DataStream operator to deal with stream join
> * Translation from Calcite's RelNode representation (LogicalJoin). 



--
This 

[jira] [Commented] (FLINK-6233) Support rowtime inner equi-join between two streams in the SQL API

2017-09-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16171894#comment-16171894
 ] 

ASF GitHub Bot commented on FLINK-6233:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4625#discussion_r139727416
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeBoundedStreamInnerJoin.scala
 ---
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+
+/**
+  * The function to execute processing time bounded stream inner-join.
+  */
+class ProcTimeBoundedStreamInnerJoin(
+leftLowerBound: Long,
+leftUpperBound: Long,
+allowedLateness: Long,
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+genJoinFuncName: String,
+genJoinFuncCode: String)
+extends TimeBoundedStreamInnerJoin(
+  leftLowerBound,
+  leftUpperBound,
+  allowedLateness,
+  leftType,
+  rightType,
+  genJoinFuncName,
+  genJoinFuncCode,
+  leftTimeIdx = -1,
+  rightTimeIdx = -1,
+  JoinTimeIndicator.PROCTIME) {
+
+  override def checkRowOutOfDate(timeForRow: Long, watermark: Long) = false
+
+  override def updateOperatorTime(ctx: CoProcessFunction[CRow, CRow, 
CRow]#Context): Unit = {
+rightOperatorTime = ctx.timerService().currentProcessingTime()
+leftOperatorTime = ctx.timerService().currentProcessingTime()
+  }
+
+  override def getTimeForLeftStream(
+  context: CoProcessFunction[CRow, CRow, CRow]#Context,
+  row: CRow): Long = {
+context.timerService().currentProcessingTime()
--- End diff --

To be honest, I would not put too much effort into the processing time 
case, especially not if it affects the performance of event-time processing. 
Processing time is non-deterministic anyway. The reason I brought this up is 
because I wasn't sure of the side effects if the the row proctime > operator 
time. If this is not an issue, we can keep it like this. 

Otherwise, the easiest solution would be to just add a comment to the 
invocations of `updateOperatorTime` that this call must be the first call in 
all processing methods (`processElement()`, `onTimer()`). Since this is just 
internal API, this should be fine.


> Support rowtime inner equi-join between two streams in the SQL API
> --
>
> Key: FLINK-6233
> URL: https://issues.apache.org/jira/browse/FLINK-6233
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: hongyuhong
>Assignee: Xingcan Cui
>
> The goal of this issue is to add support for inner equi-join on proc time 
> streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT o.rowtime , o.productId, o.orderId, s.rowtime AS shipTime 
> FROM Orders AS o 
> JOIN Shipments AS s 
> ON o.orderId = s.orderId 
> AND o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL '1' HOUR;
> {code}
> The following restrictions should initially apply:
> * The join hint only support inner join
> * The ON clause should include equi-join condition
> * The time-condition {{o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL 
> '1' HOUR}} only can use rowtime that is a system attribute, the time 
> condition only support bounded time range like {{o.rowtime BETWEEN s.rowtime 
> - INTERVAL '1' HOUR AND s.rowtime + INTERVAL '1' HOUR}}, not support 
> unbounded like {{o.rowtime  s.rowtime}} ,  and  should include both two 
> stream's rowtime 

[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...

2017-09-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4625#discussion_r139727416
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeBoundedStreamInnerJoin.scala
 ---
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+
+/**
+  * The function to execute processing time bounded stream inner-join.
+  */
+class ProcTimeBoundedStreamInnerJoin(
+leftLowerBound: Long,
+leftUpperBound: Long,
+allowedLateness: Long,
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+genJoinFuncName: String,
+genJoinFuncCode: String)
+extends TimeBoundedStreamInnerJoin(
+  leftLowerBound,
+  leftUpperBound,
+  allowedLateness,
+  leftType,
+  rightType,
+  genJoinFuncName,
+  genJoinFuncCode,
+  leftTimeIdx = -1,
+  rightTimeIdx = -1,
+  JoinTimeIndicator.PROCTIME) {
+
+  override def checkRowOutOfDate(timeForRow: Long, watermark: Long) = false
+
+  override def updateOperatorTime(ctx: CoProcessFunction[CRow, CRow, 
CRow]#Context): Unit = {
+rightOperatorTime = ctx.timerService().currentProcessingTime()
+leftOperatorTime = ctx.timerService().currentProcessingTime()
+  }
+
+  override def getTimeForLeftStream(
+  context: CoProcessFunction[CRow, CRow, CRow]#Context,
+  row: CRow): Long = {
+context.timerService().currentProcessingTime()
--- End diff --

To be honest, I would not put too much effort into the processing time 
case, especially not if it affects the performance of event-time processing. 
Processing time is non-deterministic anyway. The reason I brought this up is 
because I wasn't sure of the side effects if the the row proctime > operator 
time. If this is not an issue, we can keep it like this. 

Otherwise, the easiest solution would be to just add a comment to the 
invocations of `updateOperatorTime` that this call must be the first call in 
all processing methods (`processElement()`, `onTimer()`). Since this is just 
internal API, this should be fine.


---


[jira] [Commented] (FLINK-6233) Support rowtime inner equi-join between two streams in the SQL API

2017-09-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16171867#comment-16171867
 ] 

ASF GitHub Bot commented on FLINK-6233:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4625#discussion_r139723927
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala
 ---
@@ -131,340 +116,308 @@ class TimeBoundedStreamInnerJoin(
 // Initialize the data caches.
 val leftListTypeInfo: TypeInformation[JList[Row]] = new 
ListTypeInfo[Row](leftType)
 val leftStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
-  new MapStateDescriptor[Long, JList[Row]](timeIndicator + 
"InnerJoinLeftCache",
-BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
leftListTypeInfo)
+  new MapStateDescriptor[Long, JList[Row]](
+timeIndicator + "InnerJoinLeftCache",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
+leftListTypeInfo)
 leftCache = getRuntimeContext.getMapState(leftStateDescriptor)
 
 val rightListTypeInfo: TypeInformation[JList[Row]] = new 
ListTypeInfo[Row](rightType)
 val rightStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
-  new MapStateDescriptor[Long, JList[Row]](timeIndicator + 
"InnerJoinRightCache",
-BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rightListTypeInfo)
+  new MapStateDescriptor[Long, JList[Row]](
+timeIndicator + "InnerJoinRightCache",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
+rightListTypeInfo)
 rightCache = getRuntimeContext.getMapState(rightStateDescriptor)
 
 // Initialize the timer states.
 val leftTimerStateDesc: ValueStateDescriptor[Long] =
-  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinLeftTimerState",
-classOf[Long])
+  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinLeftTimerState", classOf[Long])
 leftTimerState = getRuntimeContext.getState(leftTimerStateDesc)
 
 val rightTimerStateDesc: ValueStateDescriptor[Long] =
-  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinRightTimerState",
-classOf[Long])
+  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinRightTimerState", classOf[Long])
 rightTimerState = getRuntimeContext.getState(rightTimerStateDesc)
   }
 
   /**
-* Process records from the left stream.
-*
-* @param cRowValue the input record
-* @param ctx   the context to register timer or get current time
-* @param out   the collector for outputting results
-*
+* Process rows from the left stream.
 */
   override def processElement1(
-cRowValue: CRow,
-ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
-out: Collector[CRow]): Unit = {
-val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, true)
-getCurrentOperatorTime(ctx)
+  cRowValue: CRow,
+  ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+  out: Collector[CRow]): Unit = {
+updateOperatorTime(ctx)
+val rowTime: Long = getTimeForLeftStream(ctx, cRowValue)
+val oppositeLowerBound: Long = rowTime - rightRelativeSize
+val oppositeUpperBound: Long = rowTime + leftRelativeSize
 processElement(
   cRowValue,
-  timeForRecord,
+  rowTime,
   ctx,
   out,
   leftOperatorTime,
+  oppositeLowerBound,
+  oppositeUpperBound,
   rightOperatorTime,
   rightTimerState,
   leftCache,
   rightCache,
-  true
+  leftRow = true
 )
   }
 
   /**
-* Process records from the right stream.
-*
-* @param cRowValue the input record
-* @param ctx   the context to get current time
-* @param out   the collector for outputting results
-*
+* Process rows from the right stream.
 */
   override def processElement2(
-cRowValue: CRow,
-ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
-out: Collector[CRow]): Unit = {
-val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, false)
-getCurrentOperatorTime(ctx)
+  cRowValue: CRow,
+  ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+  out: Collector[CRow]): Unit = {
+updateOperatorTime(ctx)
+val rowTime: Long = getTimeForRightStream(ctx, cRowValue)
+val oppositeLowerBound: Long = rowTime - leftRelativeSize
+val oppositeUpperBound: Long =  rowTime + rightRelativeSize
 

[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...

2017-09-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4625#discussion_r139723927
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala
 ---
@@ -131,340 +116,308 @@ class TimeBoundedStreamInnerJoin(
 // Initialize the data caches.
 val leftListTypeInfo: TypeInformation[JList[Row]] = new 
ListTypeInfo[Row](leftType)
 val leftStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
-  new MapStateDescriptor[Long, JList[Row]](timeIndicator + 
"InnerJoinLeftCache",
-BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
leftListTypeInfo)
+  new MapStateDescriptor[Long, JList[Row]](
+timeIndicator + "InnerJoinLeftCache",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
+leftListTypeInfo)
 leftCache = getRuntimeContext.getMapState(leftStateDescriptor)
 
 val rightListTypeInfo: TypeInformation[JList[Row]] = new 
ListTypeInfo[Row](rightType)
 val rightStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
-  new MapStateDescriptor[Long, JList[Row]](timeIndicator + 
"InnerJoinRightCache",
-BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rightListTypeInfo)
+  new MapStateDescriptor[Long, JList[Row]](
+timeIndicator + "InnerJoinRightCache",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
+rightListTypeInfo)
 rightCache = getRuntimeContext.getMapState(rightStateDescriptor)
 
 // Initialize the timer states.
 val leftTimerStateDesc: ValueStateDescriptor[Long] =
-  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinLeftTimerState",
-classOf[Long])
+  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinLeftTimerState", classOf[Long])
 leftTimerState = getRuntimeContext.getState(leftTimerStateDesc)
 
 val rightTimerStateDesc: ValueStateDescriptor[Long] =
-  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinRightTimerState",
-classOf[Long])
+  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinRightTimerState", classOf[Long])
 rightTimerState = getRuntimeContext.getState(rightTimerStateDesc)
   }
 
   /**
-* Process records from the left stream.
-*
-* @param cRowValue the input record
-* @param ctx   the context to register timer or get current time
-* @param out   the collector for outputting results
-*
+* Process rows from the left stream.
 */
   override def processElement1(
-cRowValue: CRow,
-ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
-out: Collector[CRow]): Unit = {
-val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, true)
-getCurrentOperatorTime(ctx)
+  cRowValue: CRow,
+  ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+  out: Collector[CRow]): Unit = {
+updateOperatorTime(ctx)
+val rowTime: Long = getTimeForLeftStream(ctx, cRowValue)
+val oppositeLowerBound: Long = rowTime - rightRelativeSize
+val oppositeUpperBound: Long = rowTime + leftRelativeSize
 processElement(
   cRowValue,
-  timeForRecord,
+  rowTime,
   ctx,
   out,
   leftOperatorTime,
+  oppositeLowerBound,
+  oppositeUpperBound,
   rightOperatorTime,
   rightTimerState,
   leftCache,
   rightCache,
-  true
+  leftRow = true
 )
   }
 
   /**
-* Process records from the right stream.
-*
-* @param cRowValue the input record
-* @param ctx   the context to get current time
-* @param out   the collector for outputting results
-*
+* Process rows from the right stream.
 */
   override def processElement2(
-cRowValue: CRow,
-ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
-out: Collector[CRow]): Unit = {
-val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, false)
-getCurrentOperatorTime(ctx)
+  cRowValue: CRow,
+  ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+  out: Collector[CRow]): Unit = {
+updateOperatorTime(ctx)
+val rowTime: Long = getTimeForRightStream(ctx, cRowValue)
+val oppositeLowerBound: Long = rowTime - leftRelativeSize
+val oppositeUpperBound: Long =  rowTime + rightRelativeSize
 processElement(
   cRowValue,
-  timeForRecord,
+  rowTime,
   ctx,
   out,
   rightOperatorTime,
+  oppositeLowerBound,
+  oppositeUpperBound,
   leftOperatorTime,

[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...

2017-09-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4625#discussion_r139723011
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala
 ---
@@ -131,340 +116,308 @@ class TimeBoundedStreamInnerJoin(
 // Initialize the data caches.
 val leftListTypeInfo: TypeInformation[JList[Row]] = new 
ListTypeInfo[Row](leftType)
 val leftStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
-  new MapStateDescriptor[Long, JList[Row]](timeIndicator + 
"InnerJoinLeftCache",
-BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
leftListTypeInfo)
+  new MapStateDescriptor[Long, JList[Row]](
+timeIndicator + "InnerJoinLeftCache",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
+leftListTypeInfo)
 leftCache = getRuntimeContext.getMapState(leftStateDescriptor)
 
 val rightListTypeInfo: TypeInformation[JList[Row]] = new 
ListTypeInfo[Row](rightType)
 val rightStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
-  new MapStateDescriptor[Long, JList[Row]](timeIndicator + 
"InnerJoinRightCache",
-BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rightListTypeInfo)
+  new MapStateDescriptor[Long, JList[Row]](
+timeIndicator + "InnerJoinRightCache",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
+rightListTypeInfo)
 rightCache = getRuntimeContext.getMapState(rightStateDescriptor)
 
 // Initialize the timer states.
 val leftTimerStateDesc: ValueStateDescriptor[Long] =
-  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinLeftTimerState",
-classOf[Long])
+  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinLeftTimerState", classOf[Long])
 leftTimerState = getRuntimeContext.getState(leftTimerStateDesc)
 
 val rightTimerStateDesc: ValueStateDescriptor[Long] =
-  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinRightTimerState",
-classOf[Long])
+  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinRightTimerState", classOf[Long])
 rightTimerState = getRuntimeContext.getState(rightTimerStateDesc)
   }
 
   /**
-* Process records from the left stream.
-*
-* @param cRowValue the input record
-* @param ctx   the context to register timer or get current time
-* @param out   the collector for outputting results
-*
+* Process rows from the left stream.
 */
   override def processElement1(
-cRowValue: CRow,
-ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
-out: Collector[CRow]): Unit = {
-val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, true)
-getCurrentOperatorTime(ctx)
+  cRowValue: CRow,
+  ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+  out: Collector[CRow]): Unit = {
+updateOperatorTime(ctx)
+val rowTime: Long = getTimeForLeftStream(ctx, cRowValue)
+val oppositeLowerBound: Long = rowTime - rightRelativeSize
+val oppositeUpperBound: Long = rowTime + leftRelativeSize
 processElement(
   cRowValue,
-  timeForRecord,
+  rowTime,
   ctx,
   out,
   leftOperatorTime,
+  oppositeLowerBound,
+  oppositeUpperBound,
   rightOperatorTime,
   rightTimerState,
   leftCache,
   rightCache,
-  true
+  leftRow = true
 )
   }
 
   /**
-* Process records from the right stream.
-*
-* @param cRowValue the input record
-* @param ctx   the context to get current time
-* @param out   the collector for outputting results
-*
+* Process rows from the right stream.
 */
   override def processElement2(
-cRowValue: CRow,
-ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
-out: Collector[CRow]): Unit = {
-val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, false)
-getCurrentOperatorTime(ctx)
+  cRowValue: CRow,
+  ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+  out: Collector[CRow]): Unit = {
+updateOperatorTime(ctx)
+val rowTime: Long = getTimeForRightStream(ctx, cRowValue)
+val oppositeLowerBound: Long = rowTime - leftRelativeSize
+val oppositeUpperBound: Long =  rowTime + rightRelativeSize
 processElement(
   cRowValue,
-  timeForRecord,
+  rowTime,
   ctx,
   out,
   rightOperatorTime,
+  oppositeLowerBound,
+  oppositeUpperBound,
   leftOperatorTime,

[jira] [Commented] (FLINK-6233) Support rowtime inner equi-join between two streams in the SQL API

2017-09-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16171860#comment-16171860
 ] 

ASF GitHub Bot commented on FLINK-6233:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4625#discussion_r139723011
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala
 ---
@@ -131,340 +116,308 @@ class TimeBoundedStreamInnerJoin(
 // Initialize the data caches.
 val leftListTypeInfo: TypeInformation[JList[Row]] = new 
ListTypeInfo[Row](leftType)
 val leftStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
-  new MapStateDescriptor[Long, JList[Row]](timeIndicator + 
"InnerJoinLeftCache",
-BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
leftListTypeInfo)
+  new MapStateDescriptor[Long, JList[Row]](
+timeIndicator + "InnerJoinLeftCache",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
+leftListTypeInfo)
 leftCache = getRuntimeContext.getMapState(leftStateDescriptor)
 
 val rightListTypeInfo: TypeInformation[JList[Row]] = new 
ListTypeInfo[Row](rightType)
 val rightStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
-  new MapStateDescriptor[Long, JList[Row]](timeIndicator + 
"InnerJoinRightCache",
-BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rightListTypeInfo)
+  new MapStateDescriptor[Long, JList[Row]](
+timeIndicator + "InnerJoinRightCache",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
+rightListTypeInfo)
 rightCache = getRuntimeContext.getMapState(rightStateDescriptor)
 
 // Initialize the timer states.
 val leftTimerStateDesc: ValueStateDescriptor[Long] =
-  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinLeftTimerState",
-classOf[Long])
+  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinLeftTimerState", classOf[Long])
 leftTimerState = getRuntimeContext.getState(leftTimerStateDesc)
 
 val rightTimerStateDesc: ValueStateDescriptor[Long] =
-  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinRightTimerState",
-classOf[Long])
+  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinRightTimerState", classOf[Long])
 rightTimerState = getRuntimeContext.getState(rightTimerStateDesc)
   }
 
   /**
-* Process records from the left stream.
-*
-* @param cRowValue the input record
-* @param ctx   the context to register timer or get current time
-* @param out   the collector for outputting results
-*
+* Process rows from the left stream.
 */
   override def processElement1(
-cRowValue: CRow,
-ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
-out: Collector[CRow]): Unit = {
-val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, true)
-getCurrentOperatorTime(ctx)
+  cRowValue: CRow,
+  ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+  out: Collector[CRow]): Unit = {
+updateOperatorTime(ctx)
+val rowTime: Long = getTimeForLeftStream(ctx, cRowValue)
+val oppositeLowerBound: Long = rowTime - rightRelativeSize
+val oppositeUpperBound: Long = rowTime + leftRelativeSize
 processElement(
   cRowValue,
-  timeForRecord,
+  rowTime,
   ctx,
   out,
   leftOperatorTime,
+  oppositeLowerBound,
+  oppositeUpperBound,
   rightOperatorTime,
   rightTimerState,
   leftCache,
   rightCache,
-  true
+  leftRow = true
 )
   }
 
   /**
-* Process records from the right stream.
-*
-* @param cRowValue the input record
-* @param ctx   the context to get current time
-* @param out   the collector for outputting results
-*
+* Process rows from the right stream.
 */
   override def processElement2(
-cRowValue: CRow,
-ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
-out: Collector[CRow]): Unit = {
-val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, false)
-getCurrentOperatorTime(ctx)
+  cRowValue: CRow,
+  ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+  out: Collector[CRow]): Unit = {
+updateOperatorTime(ctx)
+val rowTime: Long = getTimeForRightStream(ctx, cRowValue)
+val oppositeLowerBound: Long = rowTime - leftRelativeSize
+val oppositeUpperBound: Long =  rowTime + rightRelativeSize
 

[jira] [Commented] (FLINK-6233) Support rowtime inner equi-join between two streams in the SQL API

2017-09-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16171856#comment-16171856
 ] 

ASF GitHub Bot commented on FLINK-6233:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4625#discussion_r139722338
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala
 ---
@@ -131,340 +116,308 @@ class TimeBoundedStreamInnerJoin(
 // Initialize the data caches.
 val leftListTypeInfo: TypeInformation[JList[Row]] = new 
ListTypeInfo[Row](leftType)
 val leftStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
-  new MapStateDescriptor[Long, JList[Row]](timeIndicator + 
"InnerJoinLeftCache",
-BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
leftListTypeInfo)
+  new MapStateDescriptor[Long, JList[Row]](
+timeIndicator + "InnerJoinLeftCache",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
+leftListTypeInfo)
 leftCache = getRuntimeContext.getMapState(leftStateDescriptor)
 
 val rightListTypeInfo: TypeInformation[JList[Row]] = new 
ListTypeInfo[Row](rightType)
 val rightStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
-  new MapStateDescriptor[Long, JList[Row]](timeIndicator + 
"InnerJoinRightCache",
-BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rightListTypeInfo)
+  new MapStateDescriptor[Long, JList[Row]](
+timeIndicator + "InnerJoinRightCache",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
+rightListTypeInfo)
 rightCache = getRuntimeContext.getMapState(rightStateDescriptor)
 
 // Initialize the timer states.
 val leftTimerStateDesc: ValueStateDescriptor[Long] =
-  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinLeftTimerState",
-classOf[Long])
+  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinLeftTimerState", classOf[Long])
 leftTimerState = getRuntimeContext.getState(leftTimerStateDesc)
 
 val rightTimerStateDesc: ValueStateDescriptor[Long] =
-  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinRightTimerState",
-classOf[Long])
+  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinRightTimerState", classOf[Long])
 rightTimerState = getRuntimeContext.getState(rightTimerStateDesc)
   }
 
   /**
-* Process records from the left stream.
-*
-* @param cRowValue the input record
-* @param ctx   the context to register timer or get current time
-* @param out   the collector for outputting results
-*
+* Process rows from the left stream.
 */
   override def processElement1(
-cRowValue: CRow,
-ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
-out: Collector[CRow]): Unit = {
-val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, true)
-getCurrentOperatorTime(ctx)
+  cRowValue: CRow,
+  ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+  out: Collector[CRow]): Unit = {
+updateOperatorTime(ctx)
+val rowTime: Long = getTimeForLeftStream(ctx, cRowValue)
+val oppositeLowerBound: Long = rowTime - rightRelativeSize
+val oppositeUpperBound: Long = rowTime + leftRelativeSize
 processElement(
   cRowValue,
-  timeForRecord,
+  rowTime,
   ctx,
   out,
   leftOperatorTime,
+  oppositeLowerBound,
+  oppositeUpperBound,
   rightOperatorTime,
   rightTimerState,
   leftCache,
   rightCache,
-  true
+  leftRow = true
 )
   }
 
   /**
-* Process records from the right stream.
-*
-* @param cRowValue the input record
-* @param ctx   the context to get current time
-* @param out   the collector for outputting results
-*
+* Process rows from the right stream.
 */
   override def processElement2(
-cRowValue: CRow,
-ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
-out: Collector[CRow]): Unit = {
-val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, false)
-getCurrentOperatorTime(ctx)
+  cRowValue: CRow,
+  ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+  out: Collector[CRow]): Unit = {
+updateOperatorTime(ctx)
+val rowTime: Long = getTimeForRightStream(ctx, cRowValue)
+val oppositeLowerBound: Long = rowTime - leftRelativeSize
+val oppositeUpperBound: Long =  rowTime + rightRelativeSize
 

[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...

2017-09-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4625#discussion_r139722338
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala
 ---
@@ -131,340 +116,308 @@ class TimeBoundedStreamInnerJoin(
 // Initialize the data caches.
 val leftListTypeInfo: TypeInformation[JList[Row]] = new 
ListTypeInfo[Row](leftType)
 val leftStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
-  new MapStateDescriptor[Long, JList[Row]](timeIndicator + 
"InnerJoinLeftCache",
-BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
leftListTypeInfo)
+  new MapStateDescriptor[Long, JList[Row]](
+timeIndicator + "InnerJoinLeftCache",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
+leftListTypeInfo)
 leftCache = getRuntimeContext.getMapState(leftStateDescriptor)
 
 val rightListTypeInfo: TypeInformation[JList[Row]] = new 
ListTypeInfo[Row](rightType)
 val rightStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
-  new MapStateDescriptor[Long, JList[Row]](timeIndicator + 
"InnerJoinRightCache",
-BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rightListTypeInfo)
+  new MapStateDescriptor[Long, JList[Row]](
+timeIndicator + "InnerJoinRightCache",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
+rightListTypeInfo)
 rightCache = getRuntimeContext.getMapState(rightStateDescriptor)
 
 // Initialize the timer states.
 val leftTimerStateDesc: ValueStateDescriptor[Long] =
-  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinLeftTimerState",
-classOf[Long])
+  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinLeftTimerState", classOf[Long])
 leftTimerState = getRuntimeContext.getState(leftTimerStateDesc)
 
 val rightTimerStateDesc: ValueStateDescriptor[Long] =
-  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinRightTimerState",
-classOf[Long])
+  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinRightTimerState", classOf[Long])
 rightTimerState = getRuntimeContext.getState(rightTimerStateDesc)
   }
 
   /**
-* Process records from the left stream.
-*
-* @param cRowValue the input record
-* @param ctx   the context to register timer or get current time
-* @param out   the collector for outputting results
-*
+* Process rows from the left stream.
 */
   override def processElement1(
-cRowValue: CRow,
-ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
-out: Collector[CRow]): Unit = {
-val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, true)
-getCurrentOperatorTime(ctx)
+  cRowValue: CRow,
+  ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+  out: Collector[CRow]): Unit = {
+updateOperatorTime(ctx)
+val rowTime: Long = getTimeForLeftStream(ctx, cRowValue)
+val oppositeLowerBound: Long = rowTime - rightRelativeSize
+val oppositeUpperBound: Long = rowTime + leftRelativeSize
 processElement(
   cRowValue,
-  timeForRecord,
+  rowTime,
   ctx,
   out,
   leftOperatorTime,
+  oppositeLowerBound,
+  oppositeUpperBound,
   rightOperatorTime,
   rightTimerState,
   leftCache,
   rightCache,
-  true
+  leftRow = true
 )
   }
 
   /**
-* Process records from the right stream.
-*
-* @param cRowValue the input record
-* @param ctx   the context to get current time
-* @param out   the collector for outputting results
-*
+* Process rows from the right stream.
 */
   override def processElement2(
-cRowValue: CRow,
-ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
-out: Collector[CRow]): Unit = {
-val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, false)
-getCurrentOperatorTime(ctx)
+  cRowValue: CRow,
+  ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+  out: Collector[CRow]): Unit = {
+updateOperatorTime(ctx)
+val rowTime: Long = getTimeForRightStream(ctx, cRowValue)
+val oppositeLowerBound: Long = rowTime - leftRelativeSize
+val oppositeUpperBound: Long =  rowTime + rightRelativeSize
 processElement(
   cRowValue,
-  timeForRecord,
+  rowTime,
   ctx,
   out,
   rightOperatorTime,
+  oppositeLowerBound,
+  oppositeUpperBound,
   leftOperatorTime,

[jira] [Commented] (FLINK-7630) Allow passing a File or an InputStream to ParameterTool.fromPropertiesFile()

2017-09-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16171852#comment-16171852
 ] 

ASF GitHub Bot commented on FLINK-7630:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4678
  
I merged, could you please close this PR?

And thanks for working on this!  


> Allow passing a File or an InputStream to ParameterTool.fromPropertiesFile()
> 
>
> Key: FLINK-7630
> URL: https://issues.apache.org/jira/browse/FLINK-7630
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API
>Reporter: Wei-Che Wei
>Assignee: Wei-Che Wei
>Priority: Minor
> Fix For: 1.4.0, 1.3.3
>
>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-user-ParameterTool-fromPropertiesFile-to-get-resource-file-inside-my-jar-tp15482.html
> From this discussion, it seems that the current functionality of 
> {{ParameterTool.fromPropertiesFile}} is not enough.
> It will be more useful if {{ParameterTool.fromPropertiesFile}} can provide 
> more kinds of parameter type such as {{File}} and {{InputStream}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4678: [FLINK-7630] [Java API] Allow passing a File or an InputS...

2017-09-19 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4678
  
I merged, could you please close this PR?

And thanks for working on this! 😃 


---


[jira] [Closed] (FLINK-7630) Allow passing a File or an InputStream to ParameterTool.fromPropertiesFile()

2017-09-19 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed FLINK-7630.
---
Resolution: Fixed

Implemented on release-1.3 in
100951e279ad352f3b8921970217cd8edf14ac20

Implemented on master in
a66315a5cc52a2596e76f09641867be6ce22242c

> Allow passing a File or an InputStream to ParameterTool.fromPropertiesFile()
> 
>
> Key: FLINK-7630
> URL: https://issues.apache.org/jira/browse/FLINK-7630
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API
>Reporter: Wei-Che Wei
>Assignee: Wei-Che Wei
>Priority: Minor
> Fix For: 1.4.0, 1.3.3
>
>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-user-ParameterTool-fromPropertiesFile-to-get-resource-file-inside-my-jar-tp15482.html
> From this discussion, it seems that the current functionality of 
> {{ParameterTool.fromPropertiesFile}} is not enough.
> It will be more useful if {{ParameterTool.fromPropertiesFile}} can provide 
> more kinds of parameter type such as {{File}} and {{InputStream}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6733) Remove commented out AvgAggregationFunction.java

2017-09-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6733?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16171814#comment-16171814
 ] 

ASF GitHub Bot commented on FLINK-6733:
---

Github user mlipkovich commented on the issue:

https://github.com/apache/flink/pull/4630
  
@zentol thanks for your comment. As I understand I can't merge it without a 
formal review. May I ask you to review it?


> Remove commented out AvgAggregationFunction.java
> 
>
> Key: FLINK-6733
> URL: https://issues.apache.org/jira/browse/FLINK-6733
> Project: Flink
>  Issue Type: Bug
>  Components: Java API
>Reporter: Dawid Wysakowicz
>Assignee: Mikhail Lipkovich
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4630: [FLINK-6733] Remove commented out AvgAggregationFunction....

2017-09-19 Thread mlipkovich
Github user mlipkovich commented on the issue:

https://github.com/apache/flink/pull/4630
  
@zentol thanks for your comment. As I understand I can't merge it without a 
formal review. May I ask you to review it?


---


[GitHub] flink pull request #4638: [FLINK-6563] [table] Add time indicator support to...

2017-09-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4638#discussion_r139714565
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
 ---
@@ -62,10 +88,107 @@
DeserializationSchema deserializationSchema,
TypeInformation typeInfo) {
 
-   this.topic = Preconditions.checkNotNull(topic, "Topic");
-   this.properties = Preconditions.checkNotNull(properties, 
"Properties");
-   this.deserializationSchema = 
Preconditions.checkNotNull(deserializationSchema, "Deserialization schema");
-   this.typeInfo = Preconditions.checkNotNull(typeInfo, "Type 
information");
+   this.topic = Preconditions.checkNotNull(topic, "Topic must not 
be null.");
+   this.properties = Preconditions.checkNotNull(properties, 
"Properties must not be null.");
+   this.deserializationSchema = 
Preconditions.checkNotNull(deserializationSchema, "Deserialization schema must 
not be null.");
+   this.typeInfo = Preconditions.checkNotNull(typeInfo, "Type 
information must not be null.");
+   }
+
+   /**
+* Adds processing time attribute to the table. The attribute is 
appended to each row.
+*
+* @param proctime The name of the added processing time attribute.
+*/
+   public void addProcTimeAttribute(String proctime) {
+   Preconditions.checkNotNull(proctime, "Processing time attribute 
must not be null.");
+   this.procTimeAttribute = proctime;
+   }
+
+   /**
+* Adds an ingestion time attribute to the table. The attribute is 
append at the end of each row.
+*
+* For each row, the ingestion time attribute is initialized with 
the current time when the row
+* is read from Kafka. From there on, it behaves as an event time 
attribute.
+*
+* @param ingestionTime The name of the added ingestion time attribute.
+*/
+   public void addIngestionTimeAttribute(String ingestionTime) {
+   Preconditions.checkNotNull(ingestionTime, "Ingestion time 
attribute must not be null.");
+   if (this.rowTimeAttribute != null) {
+   throw new ValidationException(
+   "You can only specify an ingestion time 
attribute OR a row time attribute.");
+   }
+   this.rowTimeAttribute = ingestionTime;
--- End diff --

Yes, that's inconsistently handled. Thanks for pointing this out.
I removed `this.ingestionTimeAttribute`


---


[jira] [Commented] (FLINK-6563) Expose time indicator attributes in the KafkaTableSource

2017-09-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6563?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16171806#comment-16171806
 ] 

ASF GitHub Bot commented on FLINK-6563:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4638#discussion_r139714565
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
 ---
@@ -62,10 +88,107 @@
DeserializationSchema deserializationSchema,
TypeInformation typeInfo) {
 
-   this.topic = Preconditions.checkNotNull(topic, "Topic");
-   this.properties = Preconditions.checkNotNull(properties, 
"Properties");
-   this.deserializationSchema = 
Preconditions.checkNotNull(deserializationSchema, "Deserialization schema");
-   this.typeInfo = Preconditions.checkNotNull(typeInfo, "Type 
information");
+   this.topic = Preconditions.checkNotNull(topic, "Topic must not 
be null.");
+   this.properties = Preconditions.checkNotNull(properties, 
"Properties must not be null.");
+   this.deserializationSchema = 
Preconditions.checkNotNull(deserializationSchema, "Deserialization schema must 
not be null.");
+   this.typeInfo = Preconditions.checkNotNull(typeInfo, "Type 
information must not be null.");
+   }
+
+   /**
+* Adds processing time attribute to the table. The attribute is 
appended to each row.
+*
+* @param proctime The name of the added processing time attribute.
+*/
+   public void addProcTimeAttribute(String proctime) {
+   Preconditions.checkNotNull(proctime, "Processing time attribute 
must not be null.");
+   this.procTimeAttribute = proctime;
+   }
+
+   /**
+* Adds an ingestion time attribute to the table. The attribute is 
append at the end of each row.
+*
+* For each row, the ingestion time attribute is initialized with 
the current time when the row
+* is read from Kafka. From there on, it behaves as an event time 
attribute.
+*
+* @param ingestionTime The name of the added ingestion time attribute.
+*/
+   public void addIngestionTimeAttribute(String ingestionTime) {
+   Preconditions.checkNotNull(ingestionTime, "Ingestion time 
attribute must not be null.");
+   if (this.rowTimeAttribute != null) {
+   throw new ValidationException(
+   "You can only specify an ingestion time 
attribute OR a row time attribute.");
+   }
+   this.rowTimeAttribute = ingestionTime;
--- End diff --

Yes, that's inconsistently handled. Thanks for pointing this out.
I removed `this.ingestionTimeAttribute`


> Expose time indicator attributes in the KafkaTableSource
> 
>
> Key: FLINK-6563
> URL: https://issues.apache.org/jira/browse/FLINK-6563
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>Priority: Critical
> Fix For: 1.4.0
>
>
> This is a follow up for FLINK-5884.
> After FLINK-5884 requires the {{TableSource}} interfaces to expose the 
> processing time and the event time for the data stream. This jira proposes to 
> expose these two information in the Kafka table source.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5944) Flink should support reading Snappy Files

2017-09-19 Thread Mikhail Lipkovich (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16171805#comment-16171805
 ] 

Mikhail Lipkovich commented on FLINK-5944:
--

The build has failed again but errors seem to be unrelated (kafka-connector and 
scala examples). Locally it was built and tested.
[~Zentol] could you please review changes?

> Flink should support reading Snappy Files
> -
>
> Key: FLINK-5944
> URL: https://issues.apache.org/jira/browse/FLINK-5944
> Project: Flink
>  Issue Type: New Feature
>  Components: Batch Connectors and Input/Output Formats
>Reporter: Ilya Ganelin
>Assignee: Mikhail Lipkovich
>  Labels: features
>
> Snappy is an extremely performant compression format that's widely used 
> offering fast decompression/compression. 
> This can be easily implemented by creating a SnappyInflaterInputStreamFactory 
> and updating the initDefaultInflateInputStreamFactories in FileInputFormat.
> Flink already includes the Snappy dependency in the project. 
> There is a minor gotcha in this. If we wish to use this with Hadoop, then we 
> must provide two separate implementations since Hadoop uses a different 
> version of the snappy format than Snappy Java (which is the xerial/snappy 
> included in Flink). 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7575) Dashboard jobs/tasks metrics display 0 when metrics are not yet available

2017-09-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7575?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16171797#comment-16171797
 ] 

ASF GitHub Bot commented on FLINK-7575:
---

Github user jameslafa commented on the issue:

https://github.com/apache/flink/pull/4647
  
@zentol I agree, it make it a lot easier to read. I just pushed the update.
Thanks for your feedback.


> Dashboard jobs/tasks metrics display 0 when metrics are not yet available
> -
>
> Key: FLINK-7575
> URL: https://issues.apache.org/jira/browse/FLINK-7575
> Project: Flink
>  Issue Type: Improvement
>  Components: Webfrontend
>Affects Versions: 1.3.2
>Reporter: James Lafa
>Assignee: James Lafa
>Priority: Minor
>
> The web frontend is currently displaying "0" when a metric is not available 
> yet (ex: records-in/out, bytes-in/out). 
> 0 is misleading and it's preferable to display no value while the value is 
> still unknown.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4647: [FLINK-7575] [WEB-DASHBOARD] Display "Fetching..." instea...

2017-09-19 Thread jameslafa
Github user jameslafa commented on the issue:

https://github.com/apache/flink/pull/4647
  
@zentol I agree, it make it a lot easier to read. I just pushed the update.
Thanks for your feedback.


---


[jira] [Commented] (FLINK-6563) Expose time indicator attributes in the KafkaTableSource

2017-09-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6563?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16171795#comment-16171795
 ] 

ASF GitHub Bot commented on FLINK-6563:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4638#discussion_r139711376
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
 ---
@@ -106,8 +240,191 @@
return deserializationSchema;
}
 
-   @Override
-   public String explainSource() {
-   return "";
+   /**
+* Assigns ingestion time timestamps and watermarks.
+*/
+   public static class IngestionTimeWatermarkAssigner implements 
AssignerWithPeriodicWatermarks {
+
+   private long curTime = Long.MIN_VALUE;
+
+   @Override
+   public long extractTimestamp(Row element, long 
previousElementTimestamp) {
+   long t = System.currentTimeMillis();
+   if (t > curTime) {
+   curTime = t;
+   }
+   return curTime;
+   }
+
+   @Nullable
+   @Override
+   public Watermark getCurrentWatermark() {
+   return new Watermark(curTime - 1);
+   }
+   }
+
+   protected AssignerWithPeriodicWatermarks getAssigner() {
+   return this.timestampAssigner;
+   }
+
+   /**
+* Checks that the provided row time attribute is valid, determines its 
position in the schema,
+* and adjusts the return type.
+*
+* @param rowtime The attribute to check.
+*/
+   private void configureRowTimeAttribute(String rowtime) {
+   Preconditions.checkNotNull(rowtime, "Row time attribute must 
not be null.");
+
+   if (this.ingestionTimeAttribute != null) {
+   throw new ValidationException(
+   "You can only specify a row time attribute OR 
an ingestion time attribute.");
+   }
+
+   if (this.rowTimeAttribute != null) {
+   throw new ValidationException(
+   "Row time attribute can only be specified 
once.");
+   }
+
+   // get current fields
+   String[] fieldNames = ((RowTypeInfo) 
this.getReturnType()).getFieldNames();
+   TypeInformation[] fieldTypes = ((RowTypeInfo) 
this.getReturnType()).getFieldTypes();
+
+   // check if the rowtime field exists and remember position
+   this.rowtimeFieldPos = -1;
+   for (int i = 0; i < fieldNames.length; i++) {
+   if (fieldNames[i].equals(rowtime)) {
+   if (fieldTypes[i] != Types.LONG) {
+   throw new 
IllegalArgumentException("Specified rowtime field must be of type BIGINT. " +
+   "Available fields: " + 
toSchemaString(fieldNames, fieldTypes));
+   }
+   this.rowtimeFieldPos = i;
+   break;
+   }
+   }
+   if (this.rowtimeFieldPos < 0) {
+   throw new IllegalArgumentException("Specified rowtime 
field must be present in data. " +
+   "Available fields: " + 
toSchemaString(fieldNames, fieldTypes));
+   }
+   this.rowTimeAttribute = rowtime;
+
+   // adjust result type by removing rowtime field (will be added 
later)
+   String[] newNames = new String[fieldNames.length - 1];
+   TypeInformation[] newTypes = new 
TypeInformation[fieldTypes.length - 1];
+   for (int i = 0; i < rowtimeFieldPos; i++) {
+   newNames[i] = fieldNames[i];
+   newTypes[i] = fieldTypes[i];
+   }
+   for (int i = rowtimeFieldPos + 1; i < fieldNames.length; i++) {
+   newNames[i - 1] = fieldNames[i];
+   newTypes[i - 1] = fieldTypes[i];
+   }
+   this.typeInfo = new RowTypeInfo(newTypes, newNames);
+   }
+
+   /**
+* Util method to create a schema description.
+*
+* @param fieldNames The names of the fields.
+* @param fieldTypes The types of the fields.
+* @return A string describing the schema of the given field 
information.
+*/
+   private String toSchemaString(String[] fieldNames, TypeInformation[] 
fieldTypes) {
+   Preconditions.checkArgument(fieldNames.length == 
fieldTypes.length);
+
+   

[GitHub] flink pull request #4638: [FLINK-6563] [table] Add time indicator support to...

2017-09-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4638#discussion_r139711376
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
 ---
@@ -106,8 +240,191 @@
return deserializationSchema;
}
 
-   @Override
-   public String explainSource() {
-   return "";
+   /**
+* Assigns ingestion time timestamps and watermarks.
+*/
+   public static class IngestionTimeWatermarkAssigner implements 
AssignerWithPeriodicWatermarks {
+
+   private long curTime = Long.MIN_VALUE;
+
+   @Override
+   public long extractTimestamp(Row element, long 
previousElementTimestamp) {
+   long t = System.currentTimeMillis();
+   if (t > curTime) {
+   curTime = t;
+   }
+   return curTime;
+   }
+
+   @Nullable
+   @Override
+   public Watermark getCurrentWatermark() {
+   return new Watermark(curTime - 1);
+   }
+   }
+
+   protected AssignerWithPeriodicWatermarks getAssigner() {
+   return this.timestampAssigner;
+   }
+
+   /**
+* Checks that the provided row time attribute is valid, determines its 
position in the schema,
+* and adjusts the return type.
+*
+* @param rowtime The attribute to check.
+*/
+   private void configureRowTimeAttribute(String rowtime) {
+   Preconditions.checkNotNull(rowtime, "Row time attribute must 
not be null.");
+
+   if (this.ingestionTimeAttribute != null) {
+   throw new ValidationException(
+   "You can only specify a row time attribute OR 
an ingestion time attribute.");
+   }
+
+   if (this.rowTimeAttribute != null) {
+   throw new ValidationException(
+   "Row time attribute can only be specified 
once.");
+   }
+
+   // get current fields
+   String[] fieldNames = ((RowTypeInfo) 
this.getReturnType()).getFieldNames();
+   TypeInformation[] fieldTypes = ((RowTypeInfo) 
this.getReturnType()).getFieldTypes();
+
+   // check if the rowtime field exists and remember position
+   this.rowtimeFieldPos = -1;
+   for (int i = 0; i < fieldNames.length; i++) {
+   if (fieldNames[i].equals(rowtime)) {
+   if (fieldTypes[i] != Types.LONG) {
+   throw new 
IllegalArgumentException("Specified rowtime field must be of type BIGINT. " +
+   "Available fields: " + 
toSchemaString(fieldNames, fieldTypes));
+   }
+   this.rowtimeFieldPos = i;
+   break;
+   }
+   }
+   if (this.rowtimeFieldPos < 0) {
+   throw new IllegalArgumentException("Specified rowtime 
field must be present in data. " +
+   "Available fields: " + 
toSchemaString(fieldNames, fieldTypes));
+   }
+   this.rowTimeAttribute = rowtime;
+
+   // adjust result type by removing rowtime field (will be added 
later)
+   String[] newNames = new String[fieldNames.length - 1];
+   TypeInformation[] newTypes = new 
TypeInformation[fieldTypes.length - 1];
+   for (int i = 0; i < rowtimeFieldPos; i++) {
+   newNames[i] = fieldNames[i];
+   newTypes[i] = fieldTypes[i];
+   }
+   for (int i = rowtimeFieldPos + 1; i < fieldNames.length; i++) {
+   newNames[i - 1] = fieldNames[i];
+   newTypes[i - 1] = fieldTypes[i];
+   }
+   this.typeInfo = new RowTypeInfo(newTypes, newNames);
+   }
+
+   /**
+* Util method to create a schema description.
+*
+* @param fieldNames The names of the fields.
+* @param fieldTypes The types of the fields.
+* @return A string describing the schema of the given field 
information.
+*/
+   private String toSchemaString(String[] fieldNames, TypeInformation[] 
fieldTypes) {
+   Preconditions.checkArgument(fieldNames.length == 
fieldTypes.length);
+
+   StringBuilder sb = new StringBuilder("[");
+   for (int i = 0; i < fieldNames.length - 1; i++) {
+   sb.append(fieldNames[i]);
+   sb.append(": ");
+   if 

[jira] [Commented] (FLINK-7630) Allow passing a File or an InputStream to ParameterTool.fromPropertiesFile()

2017-09-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16171778#comment-16171778
 ] 

ASF GitHub Bot commented on FLINK-7630:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4678
  
The changes look good!  I especially like the added tests and 
documentation.


> Allow passing a File or an InputStream to ParameterTool.fromPropertiesFile()
> 
>
> Key: FLINK-7630
> URL: https://issues.apache.org/jira/browse/FLINK-7630
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API
>Reporter: Wei-Che Wei
>Assignee: Wei-Che Wei
>Priority: Minor
> Fix For: 1.4.0, 1.3.3
>
>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-user-ParameterTool-fromPropertiesFile-to-get-resource-file-inside-my-jar-tp15482.html
> From this discussion, it seems that the current functionality of 
> {{ParameterTool.fromPropertiesFile}} is not enough.
> It will be more useful if {{ParameterTool.fromPropertiesFile}} can provide 
> more kinds of parameter type such as {{File}} and {{InputStream}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4678: [FLINK-7630] [Java API] Allow passing a File or an InputS...

2017-09-19 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4678
  
The changes look good! 👍 I especially like the added tests and 
documentation.


---


[jira] [Commented] (FLINK-7357) HOP_START() HOP_END() does not work when using HAVING clause with GROUP BY HOP window

2017-09-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16171774#comment-16171774
 ] 

ASF GitHub Bot commented on FLINK-7357:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/4521
  
Thanks for the PR @walterddr. 

We also need to replace group aux functions in the expressions of the 
filter predicate to support something like this:

```
HAVING 
  SUM(a) > 0 AND
  QUARTER(HOP_START(ts, INTERVAL '15' MINUTE, INTERVAL '1' MINUTE)) = 1
```

But that's just a minor change. 
I'll fix that, extend the tests to cover the case, and do some refactorings 
before I merge this PR.

Cheers, Fabian


> HOP_START() HOP_END() does not work when using HAVING clause with GROUP BY 
> HOP window
> -
>
> Key: FLINK-7357
> URL: https://issues.apache.org/jira/browse/FLINK-7357
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.3.1
>Reporter: Rong Rong
>Assignee: Rong Rong
>
> The following SQL does not compile:
> {code:title=invalid_having_hop_start_sql}
> SELECT 
>   c AS k, 
>   COUNT(a) AS v, 
>   HOP_START(rowtime, INTERVAL '1' MINUTE, INTERVAL '1' MINUTE) AS 
> windowStart, 
>   HOP_END(rowtime, INTERVAL '1' MINUTE, INTERVAL '1' MINUTE) AS windowEnd 
> FROM 
>   T1 
> GROUP BY 
>   HOP(rowtime, INTERVAL '1' MINUTE, INTERVAL '1' MINUTE), 
>   c 
> HAVING 
>   SUM(b) > 1
> {code}
> While individually keeping HAVING clause or HOP_START field compiles and runs 
> without issue.
> more details: 
> https://github.com/apache/flink/compare/master...walterddr:having_does_not_work_with_hop_start_end



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4521: [FLINK-7357] [table] Created extended rules for WindowSta...

2017-09-19 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/4521
  
Thanks for the PR @walterddr. 

We also need to replace group aux functions in the expressions of the 
filter predicate to support something like this:

```
HAVING 
  SUM(a) > 0 AND
  QUARTER(HOP_START(ts, INTERVAL '15' MINUTE, INTERVAL '1' MINUTE)) = 1
```

But that's just a minor change. 
I'll fix that, extend the tests to cover the case, and do some refactorings 
before I merge this PR.

Cheers, Fabian


---


[jira] [Commented] (FLINK-7635) Support sideOutput in ProcessWindowFunciton & ProcessAllWindowFunction

2017-09-19 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16171770#comment-16171770
 ] 

Aljoscha Krettek commented on FLINK-7635:
-

Sounds good! +1 The important thing is also to have unit tests for the changes. 
A good starting point for that is {{ProcessOperatorTest}} and 
{{SideOutputITCase}}.

> Support sideOutput in ProcessWindowFunciton & ProcessAllWindowFunction
> --
>
> Key: FLINK-7635
> URL: https://issues.apache.org/jira/browse/FLINK-7635
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API, Scala API
>Reporter: Chen Qin
>Assignee: Bowen Li
>Priority: Minor
> Fix For: 1.4.0, 1.3.3
>
>
> [FLINK-4460|https://issues.apache.org/jira/browse/FLINK-4460] only 
> implemented output to ProcessFunction Context. It would be nice to add 
> support to ProcessWindow and ProcessAllWindow functions as well. [email 
> threads|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/question-on-sideoutput-from-ProcessWindow-function-td15500.html]
> [~aljoscha] I thought this is good warm up task for ppl to learn how window 
> function works in general. Otherwise feel free to assign back to me.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7635) Support sideOutput in ProcessWindowFunciton & ProcessAllWindowFunction

2017-09-19 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7635?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-7635:

Summary: Support sideOutput in ProcessWindowFunciton & 
ProcessAllWindowFunction  (was: support sideOutput in ProcessWindowFunciton & 
ProcessAllWindowFunction)

> Support sideOutput in ProcessWindowFunciton & ProcessAllWindowFunction
> --
>
> Key: FLINK-7635
> URL: https://issues.apache.org/jira/browse/FLINK-7635
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API, Scala API
>Reporter: Chen Qin
>Assignee: Bowen Li
>Priority: Minor
> Fix For: 1.4.0, 1.3.3
>
>
> [FLINK-4460|https://issues.apache.org/jira/browse/FLINK-4460] only 
> implemented output to ProcessFunction Context. It would be nice to add 
> support to ProcessWindow and ProcessAllWindow functions as well. [email 
> threads|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/question-on-sideoutput-from-ProcessWindow-function-td15500.html]
> [~aljoscha] I thought this is good warm up task for ppl to learn how window 
> function works in general. Otherwise feel free to assign back to me.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4665: [Flink-7611]add metrics to measure the num of data...

2017-09-19 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4665#discussion_r139702816
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 ---
@@ -405,6 +411,8 @@ public void merge(W mergeResult,
// windowAssigner is event time and current timestamp + allowed 
lateness no less than element timestamp
if (isSkippedElement && lateDataOutputTag != null && 
isElementLate(element)) {
sideOutput(element);
+   } else if (isSkippedElement) {
--- End diff --

I think we also need to check whether it's late. An element could also be 
skipped because no windows were assigned to it.


---


[jira] [Commented] (FLINK-7552) Extend SinkFunction interface with SinkContext

2017-09-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16171753#comment-16171753
 ] 

ASF GitHub Bot commented on FLINK-7552:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4616
  
@pnowojski and @EronWright I changed the name of the `Context`, I added a 
test in `StreamSinkOperatorTest`, I added methods or querying current 
processing time/watermark to the context. I changed the `timestamp()` method to 
return a primitive `long` and I added a method `hasTimestamp()`. Also, 
`timestamp()` now throws an exception if no timestamp is available.

What do you think?


> Extend SinkFunction interface with SinkContext
> --
>
> Key: FLINK-7552
> URL: https://issues.apache.org/jira/browse/FLINK-7552
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 1.4.0
>
>
> Now that we require Java 8 we can extend the {{SinkFunction}} interface 
> without breaking backwards compatibility. I'm proposing this:
> {code}
> /**
>  * Interface for implementing user defined sink functionality.
>  *
>  * @param  Input type parameter.
>  */
> @Public
> public interface SinkFunction extends Function, Serializable {
>   /**
>* Function for standard sink behaviour. This function is called for 
> every record.
>*
>* @param value The input record.
>* @throws Exception
>* @deprecated Use {@link #invoke(SinkContext, Object)}.
>*/
>   @Deprecated
>   default void invoke(IN value) throws Exception {
>   }
>   /**
>* Writes the given value to the sink. This function is called for 
> every record.
>*
>* @param context Additional context about the input record.
>* @param value The input record.
>* @throws Exception
>*/
>   default void invoke(SinkContext context, IN value) throws Exception {
>   invoke(value);
>   }
>   /**
>* Context that {@link SinkFunction SinkFunctions } can use for getting 
> additional data about
>* an input record.
>*
>* @param  The type of elements accepted by the sink.
>*/
>   @Public // Interface might be extended in the future with additional 
> methods.
>   interface SinkContext {
>   /**
>* Returns the timestamp of the current input record.
>*/
>   long timestamp();
>   }
> }
> {code}
> For now, this only allows access to the element timestamp. This would allow 
> us to fix the abomination that is {{FlinkKafkaProducer010}}, which is a 
> hybrid {{SinkFunction}}/{{StreamOperator}} only because it needs access to 
> timestamps.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4616: [FLINK-7552] [FLINK-7553] Enhance SinkInterface / Use in ...

2017-09-19 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4616
  
@pnowojski and @EronWright I changed the name of the `Context`, I added a 
test in `StreamSinkOperatorTest`, I added methods or querying current 
processing time/watermark to the context. I changed the `timestamp()` method to 
return a primitive `long` and I added a method `hasTimestamp()`. Also, 
`timestamp()` now throws an exception if no timestamp is available.

What do you think?


---


[jira] [Created] (FLINK-7643) HadoopFileSystem always reloads GlobalConfiguration

2017-09-19 Thread Ufuk Celebi (JIRA)
Ufuk Celebi created FLINK-7643:
--

 Summary: HadoopFileSystem always reloads GlobalConfiguration
 Key: FLINK-7643
 URL: https://issues.apache.org/jira/browse/FLINK-7643
 Project: Flink
  Issue Type: Bug
  Components: State Backends, Checkpointing
Reporter: Ufuk Celebi


HadoopFileSystem always reloads GlobalConfiguration, which potentially leads to 
a lot of noise in the logs, because this happens on each checkpoint.




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-7581) Name netty threads of rest components

2017-09-19 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7581?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-7581.
---
Resolution: Fixed

1.4: 06d8e83009628a325e012635339b7e0b2821ddea

> Name netty threads of rest components
> -
>
> Key: FLINK-7581
> URL: https://issues.apache.org/jira/browse/FLINK-7581
> Project: Flink
>  Issue Type: Improvement
>  Components: REST
>Affects Versions: 1.4.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
> Fix For: 1.4.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-7583) Create singleton isntance for the content type header

2017-09-19 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7583?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-7583.
---
Resolution: Fixed

1.4: ad4c41532ac22a0accf82c7ea9427aed9b71aaf4

> Create singleton isntance for the content type header
> -
>
> Key: FLINK-7583
> URL: https://issues.apache.org/jira/browse/FLINK-7583
> Project: Flink
>  Issue Type: Improvement
>  Components: REST
>Affects Versions: 1.4.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
> Fix For: 1.4.0
>
>
> The content type header for all rest requests/responses is always the same, 
> but we currently allocate a separate string for each request/response. We 
> should instead use a static constant.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4665: [Flink-7611]add metrics to measure the num of data droppe...

2017-09-19 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/4665
  
Would be good to rename the metric to `numLateRecordsDropped` to be more 
consistent with existing metrics.

@aljoscha will have to comment on whether the counting is correct or not.

It may also be interesting to count the number of late elements that are 
NOT dropped.


---


[jira] [Commented] (FLINK-5944) Flink should support reading Snappy Files

2017-09-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16171711#comment-16171711
 ] 

ASF GitHub Bot commented on FLINK-5944:
---

GitHub user mlipkovich opened a pull request:

https://github.com/apache/flink/pull/4683

[FLINK-5944] Support reading of Snappy files

## What is the purpose of the change

Support reading of Snappy compressed text files (both Xerial and Hadoop 
snappy)

## Brief change log

  - *Added InputStreamFactories for Xerial and Hadoop snappy*
  - *Added config parameter to control whether Xerial or Hadoop snappy 
should be used*

## Verifying this change

  - *Manually verified the change by running word count for text files 
compressed using different Snappy versions*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no

## Documentation

  - Does this pull request introduce a new feature? yes
  - If yes, how is the feature documented? JavaDocs 



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mlipkovich/flink FLINK-5944

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4683.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4683


commit c4d4016f1e6b44833d24994c97532b4c5243e4d2
Author: Mikhail Lipkovich 
Date:   2017-09-19T13:34:10Z

[FLINK-5944] Support reading of Snappy files




> Flink should support reading Snappy Files
> -
>
> Key: FLINK-5944
> URL: https://issues.apache.org/jira/browse/FLINK-5944
> Project: Flink
>  Issue Type: New Feature
>  Components: Batch Connectors and Input/Output Formats
>Reporter: Ilya Ganelin
>Assignee: Mikhail Lipkovich
>  Labels: features
>
> Snappy is an extremely performant compression format that's widely used 
> offering fast decompression/compression. 
> This can be easily implemented by creating a SnappyInflaterInputStreamFactory 
> and updating the initDefaultInflateInputStreamFactories in FileInputFormat.
> Flink already includes the Snappy dependency in the project. 
> There is a minor gotcha in this. If we wish to use this with Hadoop, then we 
> must provide two separate implementations since Hadoop uses a different 
> version of the snappy format than Snappy Java (which is the xerial/snappy 
> included in Flink). 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4683: [FLINK-5944] Support reading of Snappy files

2017-09-19 Thread mlipkovich
GitHub user mlipkovich opened a pull request:

https://github.com/apache/flink/pull/4683

[FLINK-5944] Support reading of Snappy files

## What is the purpose of the change

Support reading of Snappy compressed text files (both Xerial and Hadoop 
snappy)

## Brief change log

  - *Added InputStreamFactories for Xerial and Hadoop snappy*
  - *Added config parameter to control whether Xerial or Hadoop snappy 
should be used*

## Verifying this change

  - *Manually verified the change by running word count for text files 
compressed using different Snappy versions*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no

## Documentation

  - Does this pull request introduce a new feature? yes
  - If yes, how is the feature documented? JavaDocs 



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mlipkovich/flink FLINK-5944

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4683.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4683


commit c4d4016f1e6b44833d24994c97532b4c5243e4d2
Author: Mikhail Lipkovich 
Date:   2017-09-19T13:34:10Z

[FLINK-5944] Support reading of Snappy files




---


[jira] [Commented] (FLINK-7449) Improve and enhance documentation for incremental checkpoints

2017-09-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16171686#comment-16171686
 ] 

ASF GitHub Bot commented on FLINK-7449:
---

Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/4543#discussion_r139691558
  
--- Diff: docs/ops/state/checkpoints.md ---
@@ -99,3 +99,296 @@ above).
 ```sh
 $ bin/flink run -s :checkpointMetaDataPath [:runArgs]
 ```
+
+## Incremental Checkpoints
+
+### Synopsis
+
+Incremental checkpoints can significantly reduce checkpointing time in 
comparison to full checkpoints, at the cost of a
+(potentially) longer recovery time. The core idea is that incremental 
checkpoints only record changes in state since the
+previously-completed checkpoint instead of producing a full, 
self-contained backup of the state backend. In this way,
+incremental checkpoints can build upon previous checkpoints.
+
+RocksDBStateBackend is currently the only backend that supports 
incremental checkpoints.
+
+Flink leverages RocksDB's internal backup mechanism in a way that is 
self-consolidating over time. As a result, the
+incremental checkpoint history in Flink does not grow indefinitely, and 
old checkpoints are eventually subsumed and
+pruned automatically.
+
+``While we strongly encourage the use of incremental checkpoints for Flink 
jobs with large state, please note that this is
+a new feature and currently not enabled by default``.
+
+To enable this feature, users can instantiate a `RocksDBStateBackend` with 
the corresponding boolean flag in the
+constructor set to `true`, e.g.:
+
+```java
+   RocksDBStateBackend backend =
+   new RocksDBStateBackend(filebackend, true);
+```
+
+### Use-case for Incremental Checkpoints
+
+Checkpoints are the centrepiece of Flink’s fault tolerance mechanism and 
each checkpoint represents a consistent
+snapshot of the distributed state of a Flink job from which the system can 
recover in case of a software or machine
+failure (see [here]({{ site.baseurl 
}}/internals/stream_checkpointing.html). 
+
+Flink creates checkpoints periodically to track the progress of a job so 
that, in case of failure, only those
+(hopefully few) *events that have been processed after the last completed 
checkpoint* must be reprocessed from the data
+source. The number of events that must be reprocessed has implications for 
recovery time, and so for fastest recovery,
+we want to *take checkpoints as often as possible*.
+
+However, checkpoints are not without performance cost and can introduce 
*considerable overhead* to the system. This
+overhead can lead to lower throughput and higher latency during the time 
that checkpoints are created. One reason is
+that, traditionally, each checkpoint in Flink always represented the 
*complete state* of the job at the time of the
+checkpoint, and all of the state had to be written to stable storage 
(typically some distributed file system) for every
+single checkpoint. Writing multiple terabytes (or more) of state data for 
each checkpoint can obviously create
+significant load for the I/O and network subsystems, on top of the normal 
load from pipeline’s data processing work.
+
+Before incremental checkpoints, users were stuck with a suboptimal 
tradeoff between recovery time and checkpointing
+overhead. Fast recovery and low checkpointing overhead were conflicting 
goals. And this is exactly the problem that
+incremental checkpoints solve.
+
+
+### Basics of Incremental Checkpoints
+
+In this section, for the sake of getting the concept across, we will 
briefly discuss the idea behind incremental
+checkpoints in a simplified manner.
+
+Our motivation for incremental checkpointing stemmed from the observation 
that it is often wasteful to write the full
+state of a job for every single checkpoint. In most cases, the state 
between two checkpoints is not drastically
+different, and only a fraction of the state data is modified and some new 
data added. Instead of writing the full state
+into each checkpoint again and again, we could record only changes in 
state since the previous checkpoint. As long as we
+have the previous checkpoint and the state changes for the current 
checkpoint, we can restore the full, current state
+for the job. This is the basic principle of incremental checkpoints, that 
each checkpoint can build upon a history of
+previous checkpoints to avoid writing redundant information.
+
+Figure 1 illustrates the basic idea of incremental checkpointing in 
comparison to full checkpointing.
+
+The state of the job evolves over time and for checkpoints ``CP 1`` to 
``CP 2``, a full 

[jira] [Commented] (FLINK-7449) Improve and enhance documentation for incremental checkpoints

2017-09-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16171696#comment-16171696
 ] 

ASF GitHub Bot commented on FLINK-7449:
---

Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/4543#discussion_r139687800
  
--- Diff: docs/ops/state/checkpoints.md ---
@@ -99,3 +99,296 @@ above).
 ```sh
 $ bin/flink run -s :checkpointMetaDataPath [:runArgs]
 ```
+
+## Incremental Checkpoints
+
+### Synopsis
+
+Incremental checkpoints can significantly reduce checkpointing time in 
comparison to full checkpoints, at the cost of a
+(potentially) longer recovery time. The core idea is that incremental 
checkpoints only record changes in state since the
+previously-completed checkpoint instead of producing a full, 
self-contained backup of the state backend. In this way,
+incremental checkpoints can build upon previous checkpoints.
+
+RocksDBStateBackend is currently the only backend that supports 
incremental checkpoints.
+
+Flink leverages RocksDB's internal backup mechanism in a way that is 
self-consolidating over time. As a result, the
+incremental checkpoint history in Flink does not grow indefinitely, and 
old checkpoints are eventually subsumed and
+pruned automatically.
+
+``While we strongly encourage the use of incremental checkpoints for Flink 
jobs with large state, please note that this is
+a new feature and currently not enabled by default``.
+
+To enable this feature, users can instantiate a `RocksDBStateBackend` with 
the corresponding boolean flag in the
+constructor set to `true`, e.g.:
+
+```java
+   RocksDBStateBackend backend =
+   new RocksDBStateBackend(filebackend, true);
+```
+
+### Use-case for Incremental Checkpoints
+
+Checkpoints are the centrepiece of Flink’s fault tolerance mechanism and 
each checkpoint represents a consistent
+snapshot of the distributed state of a Flink job from which the system can 
recover in case of a software or machine
+failure (see [here]({{ site.baseurl 
}}/internals/stream_checkpointing.html). 
+
+Flink creates checkpoints periodically to track the progress of a job so 
that, in case of failure, only those
+(hopefully few) *events that have been processed after the last completed 
checkpoint* must be reprocessed from the data
+source. The number of events that must be reprocessed has implications for 
recovery time, and so for fastest recovery,
+we want to *take checkpoints as often as possible*.
+
+However, checkpoints are not without performance cost and can introduce 
*considerable overhead* to the system. This
+overhead can lead to lower throughput and higher latency during the time 
that checkpoints are created. One reason is
+that, traditionally, each checkpoint in Flink always represented the 
*complete state* of the job at the time of the
+checkpoint, and all of the state had to be written to stable storage 
(typically some distributed file system) for every
+single checkpoint. Writing multiple terabytes (or more) of state data for 
each checkpoint can obviously create
+significant load for the I/O and network subsystems, on top of the normal 
load from pipeline’s data processing work.
+
+Before incremental checkpoints, users were stuck with a suboptimal 
tradeoff between recovery time and checkpointing
+overhead. Fast recovery and low checkpointing overhead were conflicting 
goals. And this is exactly the problem that
+incremental checkpoints solve.
+
+
+### Basics of Incremental Checkpoints
+
+In this section, for the sake of getting the concept across, we will 
briefly discuss the idea behind incremental
+checkpoints in a simplified manner.
+
+Our motivation for incremental checkpointing stemmed from the observation 
that it is often wasteful to write the full
+state of a job for every single checkpoint. In most cases, the state 
between two checkpoints is not drastically
+different, and only a fraction of the state data is modified and some new 
data added. Instead of writing the full state
+into each checkpoint again and again, we could record only changes in 
state since the previous checkpoint. As long as we
+have the previous checkpoint and the state changes for the current 
checkpoint, we can restore the full, current state
+for the job. This is the basic principle of incremental checkpoints, that 
each checkpoint can build upon a history of
+previous checkpoints to avoid writing redundant information.
+
+Figure 1 illustrates the basic idea of incremental checkpointing in 
comparison to full checkpointing.
+
+The state of the job evolves over time and for checkpoints ``CP 1`` to 
``CP 2``, a full 

[jira] [Commented] (FLINK-6563) Expose time indicator attributes in the KafkaTableSource

2017-09-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6563?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16171704#comment-16171704
 ] 

ASF GitHub Bot commented on FLINK-6563:
---

Github user uybhatti commented on a diff in the pull request:

https://github.com/apache/flink/pull/4638#discussion_r139692943
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
 ---
@@ -62,10 +88,107 @@
DeserializationSchema deserializationSchema,
TypeInformation typeInfo) {
 
-   this.topic = Preconditions.checkNotNull(topic, "Topic");
-   this.properties = Preconditions.checkNotNull(properties, 
"Properties");
-   this.deserializationSchema = 
Preconditions.checkNotNull(deserializationSchema, "Deserialization schema");
-   this.typeInfo = Preconditions.checkNotNull(typeInfo, "Type 
information");
+   this.topic = Preconditions.checkNotNull(topic, "Topic must not 
be null.");
+   this.properties = Preconditions.checkNotNull(properties, 
"Properties must not be null.");
+   this.deserializationSchema = 
Preconditions.checkNotNull(deserializationSchema, "Deserialization schema must 
not be null.");
+   this.typeInfo = Preconditions.checkNotNull(typeInfo, "Type 
information must not be null.");
+   }
+
+   /**
+* Adds processing time attribute to the table. The attribute is 
appended to each row.
+*
+* @param proctime The name of the added processing time attribute.
+*/
+   public void addProcTimeAttribute(String proctime) {
+   Preconditions.checkNotNull(proctime, "Processing time attribute 
must not be null.");
+   this.procTimeAttribute = proctime;
+   }
+
+   /**
+* Adds an ingestion time attribute to the table. The attribute is 
append at the end of each row.
+*
+* For each row, the ingestion time attribute is initialized with 
the current time when the row
+* is read from Kafka. From there on, it behaves as an event time 
attribute.
+*
+* @param ingestionTime The name of the added ingestion time attribute.
+*/
+   public void addIngestionTimeAttribute(String ingestionTime) {
+   Preconditions.checkNotNull(ingestionTime, "Ingestion time 
attribute must not be null.");
+   if (this.rowTimeAttribute != null) {
+   throw new ValidationException(
+   "You can only specify an ingestion time 
attribute OR a row time attribute.");
+   }
+   this.rowTimeAttribute = ingestionTime;
--- End diff --

it should be `this.ingestionTimeAttribute = ingestionTime;` 
Otherwise no need of `ingestionTimeAttribute` variable


> Expose time indicator attributes in the KafkaTableSource
> 
>
> Key: FLINK-6563
> URL: https://issues.apache.org/jira/browse/FLINK-6563
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>Priority: Critical
> Fix For: 1.4.0
>
>
> This is a follow up for FLINK-5884.
> After FLINK-5884 requires the {{TableSource}} interfaces to expose the 
> processing time and the event time for the data stream. This jira proposes to 
> expose these two information in the Kafka table source.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7449) Improve and enhance documentation for incremental checkpoints

2017-09-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16171694#comment-16171694
 ] 

ASF GitHub Bot commented on FLINK-7449:
---

Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/4543#discussion_r139691882
  
--- Diff: docs/ops/state/checkpoints.md ---
@@ -99,3 +99,296 @@ above).
 ```sh
 $ bin/flink run -s :checkpointMetaDataPath [:runArgs]
 ```
+
+## Incremental Checkpoints
+
+### Synopsis
+
+Incremental checkpoints can significantly reduce checkpointing time in 
comparison to full checkpoints, at the cost of a
+(potentially) longer recovery time. The core idea is that incremental 
checkpoints only record changes in state since the
+previously-completed checkpoint instead of producing a full, 
self-contained backup of the state backend. In this way,
+incremental checkpoints can build upon previous checkpoints.
+
+RocksDBStateBackend is currently the only backend that supports 
incremental checkpoints.
+
+Flink leverages RocksDB's internal backup mechanism in a way that is 
self-consolidating over time. As a result, the
+incremental checkpoint history in Flink does not grow indefinitely, and 
old checkpoints are eventually subsumed and
+pruned automatically.
+
+``While we strongly encourage the use of incremental checkpoints for Flink 
jobs with large state, please note that this is
+a new feature and currently not enabled by default``.
+
+To enable this feature, users can instantiate a `RocksDBStateBackend` with 
the corresponding boolean flag in the
+constructor set to `true`, e.g.:
+
+```java
+   RocksDBStateBackend backend =
+   new RocksDBStateBackend(filebackend, true);
+```
+
+### Use-case for Incremental Checkpoints
+
+Checkpoints are the centrepiece of Flink’s fault tolerance mechanism and 
each checkpoint represents a consistent
+snapshot of the distributed state of a Flink job from which the system can 
recover in case of a software or machine
+failure (see [here]({{ site.baseurl 
}}/internals/stream_checkpointing.html). 
+
+Flink creates checkpoints periodically to track the progress of a job so 
that, in case of failure, only those
+(hopefully few) *events that have been processed after the last completed 
checkpoint* must be reprocessed from the data
+source. The number of events that must be reprocessed has implications for 
recovery time, and so for fastest recovery,
+we want to *take checkpoints as often as possible*.
+
+However, checkpoints are not without performance cost and can introduce 
*considerable overhead* to the system. This
+overhead can lead to lower throughput and higher latency during the time 
that checkpoints are created. One reason is
+that, traditionally, each checkpoint in Flink always represented the 
*complete state* of the job at the time of the
+checkpoint, and all of the state had to be written to stable storage 
(typically some distributed file system) for every
+single checkpoint. Writing multiple terabytes (or more) of state data for 
each checkpoint can obviously create
+significant load for the I/O and network subsystems, on top of the normal 
load from pipeline’s data processing work.
+
+Before incremental checkpoints, users were stuck with a suboptimal 
tradeoff between recovery time and checkpointing
+overhead. Fast recovery and low checkpointing overhead were conflicting 
goals. And this is exactly the problem that
+incremental checkpoints solve.
+
+
+### Basics of Incremental Checkpoints
+
+In this section, for the sake of getting the concept across, we will 
briefly discuss the idea behind incremental
+checkpoints in a simplified manner.
+
+Our motivation for incremental checkpointing stemmed from the observation 
that it is often wasteful to write the full
+state of a job for every single checkpoint. In most cases, the state 
between two checkpoints is not drastically
+different, and only a fraction of the state data is modified and some new 
data added. Instead of writing the full state
+into each checkpoint again and again, we could record only changes in 
state since the previous checkpoint. As long as we
+have the previous checkpoint and the state changes for the current 
checkpoint, we can restore the full, current state
+for the job. This is the basic principle of incremental checkpoints, that 
each checkpoint can build upon a history of
+previous checkpoints to avoid writing redundant information.
+
+Figure 1 illustrates the basic idea of incremental checkpointing in 
comparison to full checkpointing.
+
+The state of the job evolves over time and for checkpoints ``CP 1`` to 
``CP 2``, a full 

[jira] [Commented] (FLINK-7449) Improve and enhance documentation for incremental checkpoints

2017-09-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16171695#comment-16171695
 ] 

ASF GitHub Bot commented on FLINK-7449:
---

Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/4543#discussion_r139692206
  
--- Diff: docs/ops/state/checkpoints.md ---
@@ -99,3 +99,296 @@ above).
 ```sh
 $ bin/flink run -s :checkpointMetaDataPath [:runArgs]
 ```
+
+## Incremental Checkpoints
+
+### Synopsis
+
+Incremental checkpoints can significantly reduce checkpointing time in 
comparison to full checkpoints, at the cost of a
+(potentially) longer recovery time. The core idea is that incremental 
checkpoints only record changes in state since the
+previously-completed checkpoint instead of producing a full, 
self-contained backup of the state backend. In this way,
+incremental checkpoints can build upon previous checkpoints.
+
+RocksDBStateBackend is currently the only backend that supports 
incremental checkpoints.
+
+Flink leverages RocksDB's internal backup mechanism in a way that is 
self-consolidating over time. As a result, the
+incremental checkpoint history in Flink does not grow indefinitely, and 
old checkpoints are eventually subsumed and
+pruned automatically.
+
+``While we strongly encourage the use of incremental checkpoints for Flink 
jobs with large state, please note that this is
+a new feature and currently not enabled by default``.
+
+To enable this feature, users can instantiate a `RocksDBStateBackend` with 
the corresponding boolean flag in the
+constructor set to `true`, e.g.:
+
+```java
+   RocksDBStateBackend backend =
+   new RocksDBStateBackend(filebackend, true);
+```
+
+### Use-case for Incremental Checkpoints
+
+Checkpoints are the centrepiece of Flink’s fault tolerance mechanism and 
each checkpoint represents a consistent
+snapshot of the distributed state of a Flink job from which the system can 
recover in case of a software or machine
+failure (see [here]({{ site.baseurl 
}}/internals/stream_checkpointing.html). 
+
+Flink creates checkpoints periodically to track the progress of a job so 
that, in case of failure, only those
+(hopefully few) *events that have been processed after the last completed 
checkpoint* must be reprocessed from the data
+source. The number of events that must be reprocessed has implications for 
recovery time, and so for fastest recovery,
+we want to *take checkpoints as often as possible*.
+
+However, checkpoints are not without performance cost and can introduce 
*considerable overhead* to the system. This
+overhead can lead to lower throughput and higher latency during the time 
that checkpoints are created. One reason is
+that, traditionally, each checkpoint in Flink always represented the 
*complete state* of the job at the time of the
+checkpoint, and all of the state had to be written to stable storage 
(typically some distributed file system) for every
+single checkpoint. Writing multiple terabytes (or more) of state data for 
each checkpoint can obviously create
+significant load for the I/O and network subsystems, on top of the normal 
load from pipeline’s data processing work.
+
+Before incremental checkpoints, users were stuck with a suboptimal 
tradeoff between recovery time and checkpointing
+overhead. Fast recovery and low checkpointing overhead were conflicting 
goals. And this is exactly the problem that
+incremental checkpoints solve.
+
+
+### Basics of Incremental Checkpoints
+
+In this section, for the sake of getting the concept across, we will 
briefly discuss the idea behind incremental
+checkpoints in a simplified manner.
+
+Our motivation for incremental checkpointing stemmed from the observation 
that it is often wasteful to write the full
+state of a job for every single checkpoint. In most cases, the state 
between two checkpoints is not drastically
+different, and only a fraction of the state data is modified and some new 
data added. Instead of writing the full state
+into each checkpoint again and again, we could record only changes in 
state since the previous checkpoint. As long as we
+have the previous checkpoint and the state changes for the current 
checkpoint, we can restore the full, current state
+for the job. This is the basic principle of incremental checkpoints, that 
each checkpoint can build upon a history of
+previous checkpoints to avoid writing redundant information.
+
+Figure 1 illustrates the basic idea of incremental checkpointing in 
comparison to full checkpointing.
+
+The state of the job evolves over time and for checkpoints ``CP 1`` to 
``CP 2``, a full 

[jira] [Commented] (FLINK-7449) Improve and enhance documentation for incremental checkpoints

2017-09-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16171692#comment-16171692
 ] 

ASF GitHub Bot commented on FLINK-7449:
---

Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/4543#discussion_r139689544
  
--- Diff: docs/ops/state/checkpoints.md ---
@@ -99,3 +99,296 @@ above).
 ```sh
 $ bin/flink run -s :checkpointMetaDataPath [:runArgs]
 ```
+
+## Incremental Checkpoints
+
+### Synopsis
+
+Incremental checkpoints can significantly reduce checkpointing time in 
comparison to full checkpoints, at the cost of a
+(potentially) longer recovery time. The core idea is that incremental 
checkpoints only record changes in state since the
+previously-completed checkpoint instead of producing a full, 
self-contained backup of the state backend. In this way,
+incremental checkpoints can build upon previous checkpoints.
+
+RocksDBStateBackend is currently the only backend that supports 
incremental checkpoints.
+
+Flink leverages RocksDB's internal backup mechanism in a way that is 
self-consolidating over time. As a result, the
+incremental checkpoint history in Flink does not grow indefinitely, and 
old checkpoints are eventually subsumed and
+pruned automatically.
+
+``While we strongly encourage the use of incremental checkpoints for Flink 
jobs with large state, please note that this is
+a new feature and currently not enabled by default``.
+
+To enable this feature, users can instantiate a `RocksDBStateBackend` with 
the corresponding boolean flag in the
+constructor set to `true`, e.g.:
+
+```java
+   RocksDBStateBackend backend =
+   new RocksDBStateBackend(filebackend, true);
+```
+
+### Use-case for Incremental Checkpoints
+
+Checkpoints are the centrepiece of Flink’s fault tolerance mechanism and 
each checkpoint represents a consistent
+snapshot of the distributed state of a Flink job from which the system can 
recover in case of a software or machine
+failure (see [here]({{ site.baseurl 
}}/internals/stream_checkpointing.html). 
+
+Flink creates checkpoints periodically to track the progress of a job so 
that, in case of failure, only those
+(hopefully few) *events that have been processed after the last completed 
checkpoint* must be reprocessed from the data
+source. The number of events that must be reprocessed has implications for 
recovery time, and so for fastest recovery,
+we want to *take checkpoints as often as possible*.
+
+However, checkpoints are not without performance cost and can introduce 
*considerable overhead* to the system. This
+overhead can lead to lower throughput and higher latency during the time 
that checkpoints are created. One reason is
+that, traditionally, each checkpoint in Flink always represented the 
*complete state* of the job at the time of the
+checkpoint, and all of the state had to be written to stable storage 
(typically some distributed file system) for every
+single checkpoint. Writing multiple terabytes (or more) of state data for 
each checkpoint can obviously create
+significant load for the I/O and network subsystems, on top of the normal 
load from pipeline’s data processing work.
+
+Before incremental checkpoints, users were stuck with a suboptimal 
tradeoff between recovery time and checkpointing
+overhead. Fast recovery and low checkpointing overhead were conflicting 
goals. And this is exactly the problem that
+incremental checkpoints solve.
+
+
+### Basics of Incremental Checkpoints
+
+In this section, for the sake of getting the concept across, we will 
briefly discuss the idea behind incremental
+checkpoints in a simplified manner.
+
+Our motivation for incremental checkpointing stemmed from the observation 
that it is often wasteful to write the full
+state of a job for every single checkpoint. In most cases, the state 
between two checkpoints is not drastically
+different, and only a fraction of the state data is modified and some new 
data added. Instead of writing the full state
+into each checkpoint again and again, we could record only changes in 
state since the previous checkpoint. As long as we
+have the previous checkpoint and the state changes for the current 
checkpoint, we can restore the full, current state
+for the job. This is the basic principle of incremental checkpoints, that 
each checkpoint can build upon a history of
+previous checkpoints to avoid writing redundant information.
+
+Figure 1 illustrates the basic idea of incremental checkpointing in 
comparison to full checkpointing.
+
+The state of the job evolves over time and for checkpoints ``CP 1`` to 
``CP 2``, a full 

[jira] [Commented] (FLINK-7449) Improve and enhance documentation for incremental checkpoints

2017-09-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16171693#comment-16171693
 ] 

ASF GitHub Bot commented on FLINK-7449:
---

Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/4543#discussion_r139684257
  
--- Diff: docs/ops/state/checkpoints.md ---
@@ -99,3 +99,296 @@ above).
 ```sh
 $ bin/flink run -s :checkpointMetaDataPath [:runArgs]
 ```
+
+## Incremental Checkpoints
+
+### Synopsis
+
+Incremental checkpoints can significantly reduce checkpointing time in 
comparison to full checkpoints, at the cost of a
+(potentially) longer recovery time. The core idea is that incremental 
checkpoints only record changes in state since the
+previously-completed checkpoint instead of producing a full, 
self-contained backup of the state backend. In this way,
+incremental checkpoints can build upon previous checkpoints.
+
+RocksDBStateBackend is currently the only backend that supports 
incremental checkpoints.
+
+Flink leverages RocksDB's internal backup mechanism in a way that is 
self-consolidating over time. As a result, the
+incremental checkpoint history in Flink does not grow indefinitely, and 
old checkpoints are eventually subsumed and
+pruned automatically.
+
+``While we strongly encourage the use of incremental checkpoints for Flink 
jobs with large state, please note that this is
+a new feature and currently not enabled by default``.
+
+To enable this feature, users can instantiate a `RocksDBStateBackend` with 
the corresponding boolean flag in the
+constructor set to `true`, e.g.:
+
+```java
+   RocksDBStateBackend backend =
+   new RocksDBStateBackend(filebackend, true);
+```
+
+### Use-case for Incremental Checkpoints
+
+Checkpoints are the centrepiece of Flink’s fault tolerance mechanism and 
each checkpoint represents a consistent
+snapshot of the distributed state of a Flink job from which the system can 
recover in case of a software or machine
+failure (see [here]({{ site.baseurl 
}}/internals/stream_checkpointing.html). 
+
+Flink creates checkpoints periodically to track the progress of a job so 
that, in case of failure, only those
+(hopefully few) *events that have been processed after the last completed 
checkpoint* must be reprocessed from the data
+source. The number of events that must be reprocessed has implications for 
recovery time, and so for fastest recovery,
+we want to *take checkpoints as often as possible*.
+
+However, checkpoints are not without performance cost and can introduce 
*considerable overhead* to the system. This
+overhead can lead to lower throughput and higher latency during the time 
that checkpoints are created. One reason is
+that, traditionally, each checkpoint in Flink always represented the 
*complete state* of the job at the time of the
+checkpoint, and all of the state had to be written to stable storage 
(typically some distributed file system) for every
+single checkpoint. Writing multiple terabytes (or more) of state data for 
each checkpoint can obviously create
+significant load for the I/O and network subsystems, on top of the normal 
load from pipeline’s data processing work.
--- End diff --

on top of the normal load from the pipeline’s data processing work.

(add "the")


> Improve and enhance documentation for incremental checkpoints
> -
>
> Key: FLINK-7449
> URL: https://issues.apache.org/jira/browse/FLINK-7449
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 1.4.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Minor
>
> We should provide more details about incremental checkpoints in the 
> documentation.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7449) Improve and enhance documentation for incremental checkpoints

2017-09-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16171698#comment-16171698
 ] 

ASF GitHub Bot commented on FLINK-7449:
---

Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/4543#discussion_r139690107
  
--- Diff: docs/ops/state/checkpoints.md ---
@@ -99,3 +99,296 @@ above).
 ```sh
 $ bin/flink run -s :checkpointMetaDataPath [:runArgs]
 ```
+
+## Incremental Checkpoints
+
+### Synopsis
+
+Incremental checkpoints can significantly reduce checkpointing time in 
comparison to full checkpoints, at the cost of a
+(potentially) longer recovery time. The core idea is that incremental 
checkpoints only record changes in state since the
+previously-completed checkpoint instead of producing a full, 
self-contained backup of the state backend. In this way,
+incremental checkpoints can build upon previous checkpoints.
+
+RocksDBStateBackend is currently the only backend that supports 
incremental checkpoints.
+
+Flink leverages RocksDB's internal backup mechanism in a way that is 
self-consolidating over time. As a result, the
+incremental checkpoint history in Flink does not grow indefinitely, and 
old checkpoints are eventually subsumed and
+pruned automatically.
+
+``While we strongly encourage the use of incremental checkpoints for Flink 
jobs with large state, please note that this is
+a new feature and currently not enabled by default``.
+
+To enable this feature, users can instantiate a `RocksDBStateBackend` with 
the corresponding boolean flag in the
+constructor set to `true`, e.g.:
+
+```java
+   RocksDBStateBackend backend =
+   new RocksDBStateBackend(filebackend, true);
+```
+
+### Use-case for Incremental Checkpoints
+
+Checkpoints are the centrepiece of Flink’s fault tolerance mechanism and 
each checkpoint represents a consistent
+snapshot of the distributed state of a Flink job from which the system can 
recover in case of a software or machine
+failure (see [here]({{ site.baseurl 
}}/internals/stream_checkpointing.html). 
+
+Flink creates checkpoints periodically to track the progress of a job so 
that, in case of failure, only those
+(hopefully few) *events that have been processed after the last completed 
checkpoint* must be reprocessed from the data
+source. The number of events that must be reprocessed has implications for 
recovery time, and so for fastest recovery,
+we want to *take checkpoints as often as possible*.
+
+However, checkpoints are not without performance cost and can introduce 
*considerable overhead* to the system. This
+overhead can lead to lower throughput and higher latency during the time 
that checkpoints are created. One reason is
+that, traditionally, each checkpoint in Flink always represented the 
*complete state* of the job at the time of the
+checkpoint, and all of the state had to be written to stable storage 
(typically some distributed file system) for every
+single checkpoint. Writing multiple terabytes (or more) of state data for 
each checkpoint can obviously create
+significant load for the I/O and network subsystems, on top of the normal 
load from pipeline’s data processing work.
+
+Before incremental checkpoints, users were stuck with a suboptimal 
tradeoff between recovery time and checkpointing
+overhead. Fast recovery and low checkpointing overhead were conflicting 
goals. And this is exactly the problem that
+incremental checkpoints solve.
+
+
+### Basics of Incremental Checkpoints
+
+In this section, for the sake of getting the concept across, we will 
briefly discuss the idea behind incremental
+checkpoints in a simplified manner.
+
+Our motivation for incremental checkpointing stemmed from the observation 
that it is often wasteful to write the full
+state of a job for every single checkpoint. In most cases, the state 
between two checkpoints is not drastically
+different, and only a fraction of the state data is modified and some new 
data added. Instead of writing the full state
+into each checkpoint again and again, we could record only changes in 
state since the previous checkpoint. As long as we
+have the previous checkpoint and the state changes for the current 
checkpoint, we can restore the full, current state
+for the job. This is the basic principle of incremental checkpoints, that 
each checkpoint can build upon a history of
+previous checkpoints to avoid writing redundant information.
+
+Figure 1 illustrates the basic idea of incremental checkpointing in 
comparison to full checkpointing.
+
+The state of the job evolves over time and for checkpoints ``CP 1`` to 
``CP 2``, a full 

[jira] [Commented] (FLINK-7449) Improve and enhance documentation for incremental checkpoints

2017-09-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16171687#comment-16171687
 ] 

ASF GitHub Bot commented on FLINK-7449:
---

Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/4543#discussion_r139687994
  
--- Diff: docs/ops/state/checkpoints.md ---
@@ -99,3 +99,296 @@ above).
 ```sh
 $ bin/flink run -s :checkpointMetaDataPath [:runArgs]
 ```
+
+## Incremental Checkpoints
+
+### Synopsis
+
+Incremental checkpoints can significantly reduce checkpointing time in 
comparison to full checkpoints, at the cost of a
+(potentially) longer recovery time. The core idea is that incremental 
checkpoints only record changes in state since the
+previously-completed checkpoint instead of producing a full, 
self-contained backup of the state backend. In this way,
+incremental checkpoints can build upon previous checkpoints.
+
+RocksDBStateBackend is currently the only backend that supports 
incremental checkpoints.
+
+Flink leverages RocksDB's internal backup mechanism in a way that is 
self-consolidating over time. As a result, the
+incremental checkpoint history in Flink does not grow indefinitely, and 
old checkpoints are eventually subsumed and
+pruned automatically.
+
+``While we strongly encourage the use of incremental checkpoints for Flink 
jobs with large state, please note that this is
+a new feature and currently not enabled by default``.
+
+To enable this feature, users can instantiate a `RocksDBStateBackend` with 
the corresponding boolean flag in the
+constructor set to `true`, e.g.:
+
+```java
+   RocksDBStateBackend backend =
+   new RocksDBStateBackend(filebackend, true);
+```
+
+### Use-case for Incremental Checkpoints
+
+Checkpoints are the centrepiece of Flink’s fault tolerance mechanism and 
each checkpoint represents a consistent
+snapshot of the distributed state of a Flink job from which the system can 
recover in case of a software or machine
+failure (see [here]({{ site.baseurl 
}}/internals/stream_checkpointing.html). 
+
+Flink creates checkpoints periodically to track the progress of a job so 
that, in case of failure, only those
+(hopefully few) *events that have been processed after the last completed 
checkpoint* must be reprocessed from the data
+source. The number of events that must be reprocessed has implications for 
recovery time, and so for fastest recovery,
+we want to *take checkpoints as often as possible*.
+
+However, checkpoints are not without performance cost and can introduce 
*considerable overhead* to the system. This
+overhead can lead to lower throughput and higher latency during the time 
that checkpoints are created. One reason is
+that, traditionally, each checkpoint in Flink always represented the 
*complete state* of the job at the time of the
+checkpoint, and all of the state had to be written to stable storage 
(typically some distributed file system) for every
+single checkpoint. Writing multiple terabytes (or more) of state data for 
each checkpoint can obviously create
+significant load for the I/O and network subsystems, on top of the normal 
load from pipeline’s data processing work.
+
+Before incremental checkpoints, users were stuck with a suboptimal 
tradeoff between recovery time and checkpointing
+overhead. Fast recovery and low checkpointing overhead were conflicting 
goals. And this is exactly the problem that
+incremental checkpoints solve.
+
+
+### Basics of Incremental Checkpoints
+
+In this section, for the sake of getting the concept across, we will 
briefly discuss the idea behind incremental
+checkpoints in a simplified manner.
+
+Our motivation for incremental checkpointing stemmed from the observation 
that it is often wasteful to write the full
+state of a job for every single checkpoint. In most cases, the state 
between two checkpoints is not drastically
+different, and only a fraction of the state data is modified and some new 
data added. Instead of writing the full state
+into each checkpoint again and again, we could record only changes in 
state since the previous checkpoint. As long as we
+have the previous checkpoint and the state changes for the current 
checkpoint, we can restore the full, current state
+for the job. This is the basic principle of incremental checkpoints, that 
each checkpoint can build upon a history of
+previous checkpoints to avoid writing redundant information.
+
+Figure 1 illustrates the basic idea of incremental checkpointing in 
comparison to full checkpointing.
+
+The state of the job evolves over time and for checkpoints ``CP 1`` to 
``CP 2``, a full 

[jira] [Commented] (FLINK-7449) Improve and enhance documentation for incremental checkpoints

2017-09-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16171689#comment-16171689
 ] 

ASF GitHub Bot commented on FLINK-7449:
---

Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/4543#discussion_r139687055
  
--- Diff: docs/ops/state/checkpoints.md ---
@@ -99,3 +99,296 @@ above).
 ```sh
 $ bin/flink run -s :checkpointMetaDataPath [:runArgs]
 ```
+
+## Incremental Checkpoints
+
+### Synopsis
+
+Incremental checkpoints can significantly reduce checkpointing time in 
comparison to full checkpoints, at the cost of a
+(potentially) longer recovery time. The core idea is that incremental 
checkpoints only record changes in state since the
+previously-completed checkpoint instead of producing a full, 
self-contained backup of the state backend. In this way,
+incremental checkpoints can build upon previous checkpoints.
+
+RocksDBStateBackend is currently the only backend that supports 
incremental checkpoints.
+
+Flink leverages RocksDB's internal backup mechanism in a way that is 
self-consolidating over time. As a result, the
+incremental checkpoint history in Flink does not grow indefinitely, and 
old checkpoints are eventually subsumed and
+pruned automatically.
+
+``While we strongly encourage the use of incremental checkpoints for Flink 
jobs with large state, please note that this is
+a new feature and currently not enabled by default``.
+
+To enable this feature, users can instantiate a `RocksDBStateBackend` with 
the corresponding boolean flag in the
+constructor set to `true`, e.g.:
+
+```java
+   RocksDBStateBackend backend =
+   new RocksDBStateBackend(filebackend, true);
+```
+
+### Use-case for Incremental Checkpoints
+
+Checkpoints are the centrepiece of Flink’s fault tolerance mechanism and 
each checkpoint represents a consistent
+snapshot of the distributed state of a Flink job from which the system can 
recover in case of a software or machine
+failure (see [here]({{ site.baseurl 
}}/internals/stream_checkpointing.html). 
+
+Flink creates checkpoints periodically to track the progress of a job so 
that, in case of failure, only those
+(hopefully few) *events that have been processed after the last completed 
checkpoint* must be reprocessed from the data
+source. The number of events that must be reprocessed has implications for 
recovery time, and so for fastest recovery,
+we want to *take checkpoints as often as possible*.
+
+However, checkpoints are not without performance cost and can introduce 
*considerable overhead* to the system. This
+overhead can lead to lower throughput and higher latency during the time 
that checkpoints are created. One reason is
+that, traditionally, each checkpoint in Flink always represented the 
*complete state* of the job at the time of the
+checkpoint, and all of the state had to be written to stable storage 
(typically some distributed file system) for every
+single checkpoint. Writing multiple terabytes (or more) of state data for 
each checkpoint can obviously create
+significant load for the I/O and network subsystems, on top of the normal 
load from pipeline’s data processing work.
+
+Before incremental checkpoints, users were stuck with a suboptimal 
tradeoff between recovery time and checkpointing
+overhead. Fast recovery and low checkpointing overhead were conflicting 
goals. And this is exactly the problem that
+incremental checkpoints solve.
+
+
+### Basics of Incremental Checkpoints
+
+In this section, for the sake of getting the concept across, we will 
briefly discuss the idea behind incremental
+checkpoints in a simplified manner.
+
+Our motivation for incremental checkpointing stemmed from the observation 
that it is often wasteful to write the full
+state of a job for every single checkpoint. In most cases, the state 
between two checkpoints is not drastically
+different, and only a fraction of the state data is modified and some new 
data added. Instead of writing the full state
+into each checkpoint again and again, we could record only changes in 
state since the previous checkpoint. As long as we
+have the previous checkpoint and the state changes for the current 
checkpoint, we can restore the full, current state
+for the job. This is the basic principle of incremental checkpoints, that 
each checkpoint can build upon a history of
+previous checkpoints to avoid writing redundant information.
+
+Figure 1 illustrates the basic idea of incremental checkpointing in 
comparison to full checkpointing.
+
+The state of the job evolves over time and for checkpoints ``CP 1`` to 
``CP 2``, a full 

[jira] [Commented] (FLINK-7449) Improve and enhance documentation for incremental checkpoints

2017-09-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16171684#comment-16171684
 ] 

ASF GitHub Bot commented on FLINK-7449:
---

Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/4543#discussion_r139691118
  
--- Diff: docs/ops/state/checkpoints.md ---
@@ -99,3 +99,296 @@ above).
 ```sh
 $ bin/flink run -s :checkpointMetaDataPath [:runArgs]
 ```
+
+## Incremental Checkpoints
+
+### Synopsis
+
+Incremental checkpoints can significantly reduce checkpointing time in 
comparison to full checkpoints, at the cost of a
+(potentially) longer recovery time. The core idea is that incremental 
checkpoints only record changes in state since the
+previously-completed checkpoint instead of producing a full, 
self-contained backup of the state backend. In this way,
+incremental checkpoints can build upon previous checkpoints.
+
+RocksDBStateBackend is currently the only backend that supports 
incremental checkpoints.
+
+Flink leverages RocksDB's internal backup mechanism in a way that is 
self-consolidating over time. As a result, the
+incremental checkpoint history in Flink does not grow indefinitely, and 
old checkpoints are eventually subsumed and
+pruned automatically.
+
+``While we strongly encourage the use of incremental checkpoints for Flink 
jobs with large state, please note that this is
+a new feature and currently not enabled by default``.
+
+To enable this feature, users can instantiate a `RocksDBStateBackend` with 
the corresponding boolean flag in the
+constructor set to `true`, e.g.:
+
+```java
+   RocksDBStateBackend backend =
+   new RocksDBStateBackend(filebackend, true);
+```
+
+### Use-case for Incremental Checkpoints
+
+Checkpoints are the centrepiece of Flink’s fault tolerance mechanism and 
each checkpoint represents a consistent
+snapshot of the distributed state of a Flink job from which the system can 
recover in case of a software or machine
+failure (see [here]({{ site.baseurl 
}}/internals/stream_checkpointing.html). 
+
+Flink creates checkpoints periodically to track the progress of a job so 
that, in case of failure, only those
+(hopefully few) *events that have been processed after the last completed 
checkpoint* must be reprocessed from the data
+source. The number of events that must be reprocessed has implications for 
recovery time, and so for fastest recovery,
+we want to *take checkpoints as often as possible*.
+
+However, checkpoints are not without performance cost and can introduce 
*considerable overhead* to the system. This
+overhead can lead to lower throughput and higher latency during the time 
that checkpoints are created. One reason is
+that, traditionally, each checkpoint in Flink always represented the 
*complete state* of the job at the time of the
+checkpoint, and all of the state had to be written to stable storage 
(typically some distributed file system) for every
+single checkpoint. Writing multiple terabytes (or more) of state data for 
each checkpoint can obviously create
+significant load for the I/O and network subsystems, on top of the normal 
load from pipeline’s data processing work.
+
+Before incremental checkpoints, users were stuck with a suboptimal 
tradeoff between recovery time and checkpointing
+overhead. Fast recovery and low checkpointing overhead were conflicting 
goals. And this is exactly the problem that
+incremental checkpoints solve.
+
+
+### Basics of Incremental Checkpoints
+
+In this section, for the sake of getting the concept across, we will 
briefly discuss the idea behind incremental
+checkpoints in a simplified manner.
+
+Our motivation for incremental checkpointing stemmed from the observation 
that it is often wasteful to write the full
+state of a job for every single checkpoint. In most cases, the state 
between two checkpoints is not drastically
+different, and only a fraction of the state data is modified and some new 
data added. Instead of writing the full state
+into each checkpoint again and again, we could record only changes in 
state since the previous checkpoint. As long as we
+have the previous checkpoint and the state changes for the current 
checkpoint, we can restore the full, current state
+for the job. This is the basic principle of incremental checkpoints, that 
each checkpoint can build upon a history of
+previous checkpoints to avoid writing redundant information.
+
+Figure 1 illustrates the basic idea of incremental checkpointing in 
comparison to full checkpointing.
+
+The state of the job evolves over time and for checkpoints ``CP 1`` to 
``CP 2``, a full 

[jira] [Commented] (FLINK-7449) Improve and enhance documentation for incremental checkpoints

2017-09-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16171683#comment-16171683
 ] 

ASF GitHub Bot commented on FLINK-7449:
---

Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/4543#discussion_r139686793
  
--- Diff: docs/ops/state/checkpoints.md ---
@@ -99,3 +99,296 @@ above).
 ```sh
 $ bin/flink run -s :checkpointMetaDataPath [:runArgs]
 ```
+
+## Incremental Checkpoints
+
+### Synopsis
+
+Incremental checkpoints can significantly reduce checkpointing time in 
comparison to full checkpoints, at the cost of a
+(potentially) longer recovery time. The core idea is that incremental 
checkpoints only record changes in state since the
+previously-completed checkpoint instead of producing a full, 
self-contained backup of the state backend. In this way,
+incremental checkpoints can build upon previous checkpoints.
+
+RocksDBStateBackend is currently the only backend that supports 
incremental checkpoints.
+
+Flink leverages RocksDB's internal backup mechanism in a way that is 
self-consolidating over time. As a result, the
+incremental checkpoint history in Flink does not grow indefinitely, and 
old checkpoints are eventually subsumed and
+pruned automatically.
+
+``While we strongly encourage the use of incremental checkpoints for Flink 
jobs with large state, please note that this is
+a new feature and currently not enabled by default``.
+
+To enable this feature, users can instantiate a `RocksDBStateBackend` with 
the corresponding boolean flag in the
+constructor set to `true`, e.g.:
+
+```java
+   RocksDBStateBackend backend =
+   new RocksDBStateBackend(filebackend, true);
+```
+
+### Use-case for Incremental Checkpoints
+
+Checkpoints are the centrepiece of Flink’s fault tolerance mechanism and 
each checkpoint represents a consistent
+snapshot of the distributed state of a Flink job from which the system can 
recover in case of a software or machine
+failure (see [here]({{ site.baseurl 
}}/internals/stream_checkpointing.html). 
+
+Flink creates checkpoints periodically to track the progress of a job so 
that, in case of failure, only those
+(hopefully few) *events that have been processed after the last completed 
checkpoint* must be reprocessed from the data
+source. The number of events that must be reprocessed has implications for 
recovery time, and so for fastest recovery,
+we want to *take checkpoints as often as possible*.
+
+However, checkpoints are not without performance cost and can introduce 
*considerable overhead* to the system. This
+overhead can lead to lower throughput and higher latency during the time 
that checkpoints are created. One reason is
+that, traditionally, each checkpoint in Flink always represented the 
*complete state* of the job at the time of the
+checkpoint, and all of the state had to be written to stable storage 
(typically some distributed file system) for every
+single checkpoint. Writing multiple terabytes (or more) of state data for 
each checkpoint can obviously create
+significant load for the I/O and network subsystems, on top of the normal 
load from pipeline’s data processing work.
+
+Before incremental checkpoints, users were stuck with a suboptimal 
tradeoff between recovery time and checkpointing
+overhead. Fast recovery and low checkpointing overhead were conflicting 
goals. And this is exactly the problem that
+incremental checkpoints solve.
+
+
+### Basics of Incremental Checkpoints
+
+In this section, for the sake of getting the concept across, we will 
briefly discuss the idea behind incremental
+checkpoints in a simplified manner.
+
+Our motivation for incremental checkpointing stemmed from the observation 
that it is often wasteful to write the full
+state of a job for every single checkpoint. In most cases, the state 
between two checkpoints is not drastically
+different, and only a fraction of the state data is modified and some new 
data added. Instead of writing the full state
+into each checkpoint again and again, we could record only changes in 
state since the previous checkpoint. As long as we
+have the previous checkpoint and the state changes for the current 
checkpoint, we can restore the full, current state
+for the job. This is the basic principle of incremental checkpoints, that 
each checkpoint can build upon a history of
+previous checkpoints to avoid writing redundant information.
+
+Figure 1 illustrates the basic idea of incremental checkpointing in 
comparison to full checkpointing.
+
+The state of the job evolves over time and for checkpoints ``CP 1`` to 
``CP 2``, a full 

  1   2   3   >