[GitHub] flink issue #5614: [FLINK-8827] When FLINK_CONF_DIR contains spaces, execute...

2018-03-05 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5614
  
+1


---


[jira] [Commented] (FLINK-8827) When FLINK_CONF_DIR contains spaces, execute zookeeper related scripts failed

2018-03-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8827?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16387428#comment-16387428
 ] 

ASF GitHub Bot commented on FLINK-8827:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5614
  
+1


> When FLINK_CONF_DIR contains spaces, execute zookeeper related scripts failed
> -
>
> Key: FLINK-8827
> URL: https://issues.apache.org/jira/browse/FLINK-8827
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.4.0
> Environment: Red Hat Enterprise Linux Server release 6.5 (Santiago)
>Reporter: Donghui Xu
>Priority: Major
>
> When the path of FLINK_CONF_DIR including spaces, executing zookeeper related 
> scripts failed with the following error message: Expect binary expression.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5643: can integrate and support on apache kudu ?

2018-03-05 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5643
  
Pull Requests are not meant to ask question, please head to the [mailing 
lists](http://flink.apache.org/community.html#mailing-lists) or open a 
[JIRA](https://issues.apache.org/jira/projects/FLINK).


---


[jira] [Commented] (FLINK-8687) MaterializedCollectStreamResult#retrievePage should take resultLock

2018-03-05 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16387372#comment-16387372
 ] 

mingleizhang commented on FLINK-8687:
-

[~yuzhih...@gmail.com] Shouldn't we lock {{write}} situation ? means, also lock 
on {{CliTableResultView#updatePage}}. If we do not do that, I think it is still 
encounter a multi-threads problem. 

> MaterializedCollectStreamResult#retrievePage should take resultLock
> ---
>
> Key: FLINK-8687
> URL: https://issues.apache.org/jira/browse/FLINK-8687
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Assignee: mingleizhang
>Priority: Major
>
> Currently MaterializedCollectStreamResult#retrievePage checks page range and 
> calls snapshot.subList() without holding resultLock.
> {{resultLock}} should be taken.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5643: can integrate and support on apache kudu ?

2018-03-05 Thread qi20088
GitHub user qi20088 opened a pull request:

https://github.com/apache/flink/pull/5643

can integrate and support on apache kudu ?

*Thank you very much for contributing to Apache Flink - we are happy that 
you want to help us improve Flink. To help the community review your 
contribution in the best possible way, please go through the checklist below, 
which will get the contribution into a shape in which it can be best reviewed.*

*Please understand that we do not do this to make contributions to Flink a 
hassle. In order to uphold a high standard of quality for code contributions, 
while at the same time managing a large number of contributions, we need 
contributors to prepare the contributions well, and give reviewers enough 
contextual information for the review. Please also understand that 
contributions that do not follow this guide will take longer to review and thus 
typically be picked up with lower priority by the community.*

## Contribution Checklist

  - Make sure that the pull request corresponds to a [JIRA 
issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are 
made for typos in JavaDoc or documentation files, which need no JIRA issue.
  
  - Name the pull request in the form "[FLINK-] [component] Title of 
the pull request", where *FLINK-* should be replaced by the actual issue 
number. Skip *component* if you are unsure about which is the best component.
  Typo fixes that have no associated JIRA issue should be named following 
this pattern: `[hotfix] [docs] Fix typo in event time introduction` or 
`[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`.

  - Fill out the template below to describe the changes contributed by the 
pull request. That will give reviewers the context they need to do the review.
  
  - Make sure that the change passes the automated tests, i.e., `mvn clean 
verify` passes. You can set up Travis CI to do that following [this 
guide](http://flink.apache.org/contribute-code.html#best-practices).

  - Each pull request should address only one issue, not mix up code from 
multiple issues.
  
  - Each commit in the pull request has a meaningful commit message 
(including the JIRA id)

  - Once all items of the checklist are addressed, remove the above text 
and this checklist, leaving only the filled out template below.


**(The sections below can be removed for hotfixes of typos)**

## What is the purpose of the change

*(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*


## Brief change log

*(for example:)*
  - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
  - *Deployments RPC transmits only the blob storage reference*
  - *TaskManagers retrieve the TaskInfo from the blob cache*


## Verifying this change

*(Please pick either of the following options)*

This change is a trivial rework / code cleanup without any test coverage.

*(or)*

This change is already covered by existing tests, such as *(please describe 
tests)*.

*(or)*

This change added tests and can be verified as follows:

*(example:)*
  - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
  - *Extended integration test for recovery after master (JobManager) 
failure*
  - *Added test that validates that TaskInfo is transferred only once 
across recoveries*
  - *Manually verified the change by running a 4 node cluser with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
  - The serializers: (yes / no / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)
  - The S3 file system connector: (yes / no / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / no)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/apache/flink release-1.5

Alternatively you can review and apply these changes as the patch at:


[jira] [Assigned] (FLINK-5621) Flink should provide a mechanism to prevent scheduling tasks on TaskManagers with operational issues

2018-03-05 Thread vinoyang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5621?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang reassigned FLINK-5621:
---

Assignee: vinoyang

> Flink should provide a mechanism to prevent scheduling tasks on TaskManagers 
> with operational issues
> 
>
> Key: FLINK-5621
> URL: https://issues.apache.org/jira/browse/FLINK-5621
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.1.4
>Reporter: Jamie Grier
>Assignee: vinoyang
>Priority: Critical
>
> There are cases where jobs can get into a state where no progress can be made 
> if there is something pathologically wrong with one of the TaskManager nodes 
> in the cluster.
> An example of this would be a TaskManager on a machine that runs out of disk 
> space.  Flink never considers the TM to be "bad" and will keep using it to 
> attempt to run tasks -- which will continue to fail.
> A suggestion for overcoming this would be to allow an option where a TM will 
> commit suicide if that TM was the source of an exception that caused a job to 
> fail/restart.
> I'm sure there are plenty of other approaches to solving this..



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8687) MaterializedCollectStreamResult#retrievePage should take resultLock

2018-03-05 Thread mingleizhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8687?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

mingleizhang updated FLINK-8687:

Priority: Major  (was: Minor)

> MaterializedCollectStreamResult#retrievePage should take resultLock
> ---
>
> Key: FLINK-8687
> URL: https://issues.apache.org/jira/browse/FLINK-8687
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Assignee: mingleizhang
>Priority: Major
>
> Currently MaterializedCollectStreamResult#retrievePage checks page range and 
> calls snapshot.subList() without holding resultLock.
> {{resultLock}} should be taken.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8871) Checkpoint cancellation is not propagated to stop checkpointing threads on the task manager

2018-03-05 Thread Sihua Zhou (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8871?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16387335#comment-16387335
 ] 

Sihua Zhou commented on FLINK-8871:
---

[~srichter] Have you already work on this or decide to work on this? If not and 
if you don't mind, I'd like to take this ticket.

> Checkpoint cancellation is not propagated to stop checkpointing threads on 
> the task manager
> ---
>
> Key: FLINK-8871
> URL: https://issues.apache.org/jira/browse/FLINK-8871
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.2, 1.5.0, 1.4.1
>Reporter: Stefan Richter
>Priority: Critical
> Fix For: 1.6.0
>
>
> Flink currently lacks any form of feedback mechanism from the job manager / 
> checkpoint coordinator to the tasks when it comes to failing a checkpoint. 
> This means that running snapshots on the tasks are also not stopped even if 
> their owning checkpoint is already cancelled. Two examples for cases where 
> this applies are checkpoint timeouts and local checkpoint failures on a task 
> together with a configuration that does not fail tasks on checkpoint failure. 
> Notice that those running snapshots do no longer account for the maximum 
> number of parallel checkpoints, because their owning checkpoint is considered 
> as cancelled.
> Not stopping the task's snapshot thread can lead to a problematic situation 
> where the next checkpoints already started, while the abandoned checkpoint 
> thread from a previous checkpoint is still lingering around running. This 
> scenario can potentially cascade: many parallel checkpoints will slow down 
> checkpointing and make timeouts even more likely.
>  
> A possible solution is introducing a {{cancelCheckpoint}} method  as 
> counterpart to the {{triggerCheckpoint}} method in the task manager gateway, 
> which is invoked by the checkpoint coordinator as part of cancelling the 
> checkpoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6160) Retry JobManager/ResourceManager connection in case of timeout

2018-03-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6160?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16387316#comment-16387316
 ] 

ASF GitHub Bot commented on FLINK-6160:
---

Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/5440
  
@tillrohrmann Could you please take a look on this when available ? Thanks 


>  Retry JobManager/ResourceManager connection in case of timeout
> ---
>
> Key: FLINK-6160
> URL: https://issues.apache.org/jira/browse/FLINK-6160
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.3.0
>Reporter: Till Rohrmann
>Priority: Major
>  Labels: flip-6
>
> In case of a heartbeat timeout, the {{TaskExecutor}} closes the connection to 
> the remote component. Furthermore, it assumes that the component has actually 
> failed and, thus, it will only start trying to connect to the component if it 
> is notified about a new leader address and leader session id. This is 
> brittle, because the heartbeat could also time out without the component 
> having crashed. Thus, we should add an automatic retry to the latest known 
> leader address information in case of a timeout.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5440: [FLINK-6160] [flip-6] Retry JobManager/ResourceManager co...

2018-03-05 Thread zhangminglei
Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/5440
  
@tillrohrmann Could you please take a look on this when available ? Thanks 


---


[GitHub] flink issue #5481: [FLINK-8560] Access to the current key in ProcessFunction...

2018-03-05 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5481
  
@kl0u @aljoscha   I added the scala example, and I believe the only build 
failure in Travis is irrelevant


---


[jira] [Commented] (FLINK-8560) add KeyedProcessFunction to expose the key in onTimer() and other methods

2018-03-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16387304#comment-16387304
 ] 

ASF GitHub Bot commented on FLINK-8560:
---

Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5481
  
@kl0u @aljoscha   I added the scala example, and I believe the only build 
failure in Travis is irrelevant


> add KeyedProcessFunction to expose the key in onTimer() and other methods
> -
>
> Key: FLINK-8560
> URL: https://issues.apache.org/jira/browse/FLINK-8560
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Jürgen Thomann
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.5.0
>
>
> Currently it is required to store the key of a keyBy() in the processElement 
> method to have access to it in the OnTimerContext.
> This is not so good as you have to check in the processElement method for 
> every element if the key is already stored and set it if it's not already set.
> A possible solution would adding OnTimerContext#getCurrentKey() or a similar 
> method. Maybe having it in the open() method could maybe work as well.
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-Key-from-keyBy-in-ProcessFunction-tt18126.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-7795) Utilize error-prone to discover common coding mistakes

2018-03-05 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7795?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16345955#comment-16345955
 ] 

Ted Yu edited comment on FLINK-7795 at 3/6/18 4:48 AM:
---

error-prone has JDK 8 dependency .


was (Author: yuzhih...@gmail.com):
error-prone has JDK 8 dependency.

> Utilize error-prone to discover common coding mistakes
> --
>
> Key: FLINK-7795
> URL: https://issues.apache.org/jira/browse/FLINK-7795
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Ted Yu
>Priority: Major
>
> http://errorprone.info/ is a tool which detects common coding mistakes.
> We should incorporate into Flink build process.
> Here are the dependencies:
> {code}
> 
>   com.google.errorprone
>   error_prone_annotation
>   ${error-prone.version}
>   provided
> 
> 
>   
>   com.google.auto.service
>   auto-service
>   1.0-rc3
>   true
> 
> 
>   com.google.errorprone
>   error_prone_check_api
>   ${error-prone.version}
>   provided
>   
> 
>   com.google.code.findbugs
>   jsr305
> 
>   
> 
> 
>   com.google.errorprone
>   javac
>   9-dev-r4023-3
>   provided
> 
>   
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8428) Implement stream-stream non-window left outer join

2018-03-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16387274#comment-16387274
 ] 

ASF GitHub Bot commented on FLINK-8428:
---

Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r172408402
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowLeftJoinWithNonEquiPredicates.scala
 ---
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * Connect data for left stream and right stream. Only use for LeftJoin 
with NonEquiPredicates.
+  * An MapState of type [Row, Long] is added to record how many rows from 
the right table can be
+  * matched for each left row. Left join without NonEquiPredicates doesn't 
need it because
+  * left rows can always join right rows as long as join keys are same.
+  *
+  * @param leftTypethe input type of left stream
+  * @param rightType   the input type of right stream
+  * @param resultType  the output type of join
+  * @param genJoinFuncName the function code of other non-equi condition
+  * @param genJoinFuncCode the function name of other non-equi condition
+  * @param queryConfig the configuration for the query to generate
+  */
+class NonWindowLeftJoinWithNonEquiPredicates(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+resultType: TypeInformation[CRow],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+queryConfig: StreamQueryConfig)
+  extends NonWindowJoin(
+leftType,
+rightType,
+resultType,
+genJoinFuncName,
+genJoinFuncCode,
+queryConfig) {
+
+  // result row, all field from right will be null
+  private var resultRow: Row = _
+  // how many matched rows from the right table for each left row
+  private var leftJoinCnt: MapState[Row, Long] = _
+
+  override def open(parameters: Configuration): Unit = {
+super.open(parameters)
+
+val leftJoinCntDescriptor = new MapStateDescriptor[Row, Long](
--- End diff --

I think either is fine as long as they are consistent.


> Implement stream-stream non-window left outer join
> --
>
> Key: FLINK-8428
> URL: https://issues.apache.org/jira/browse/FLINK-8428
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>
> Implement stream-stream non-window left outer join for sql/table-api. A 
> simple design doc can be found 
> [here|https://docs.google.com/document/d/1u7bJHeEBP_hFhi8Jm4oT3FqQDOm2pJDqCtq1U1WMHDo/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8428) Implement stream-stream non-window left outer join

2018-03-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16387275#comment-16387275
 ] 

ASF GitHub Bot commented on FLINK-8428:
---

Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r172408177
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowJoin.scala
 ---
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state.{MapState, MapStateDescriptor, 
ValueState, ValueStateDescriptor}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.TupleTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.typeutils.TypeCheckUtils._
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * Connect data for left stream and right stream. Base class for 
stream-stream non-window Join.
+  *
+  * @param leftType  the input type of left stream
+  * @param rightType the input type of right stream
+  * @param resultTypethe output type of join
+  * @param genJoinFuncName   the function code of other non-equi condition
+  * @param genJoinFuncCode   the function name of other non-equi condition
+  * @param queryConfig   the configuration for the query to generate
+  */
+abstract class NonWindowJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+resultType: TypeInformation[CRow],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+queryConfig: StreamQueryConfig)
+  extends CoProcessFunction[CRow, CRow, CRow]
+  with Compiler[FlatJoinFunction[Row, Row, Row]]
+  with Logging {
+
+  // check if input types implement proper equals/hashCode
+  validateEqualsHashCode("join", leftType)
+  validateEqualsHashCode("join", rightType)
+
+  // state to hold left stream element
+  protected var leftState: MapState[Row, JTuple2[Int, Long]] = _
+  // state to hold right stream element
+  protected var rightState: MapState[Row, JTuple2[Int, Long]] = _
+  protected var cRowWrapper: CRowWrappingMultiOutputCollector = _
+
+  protected val minRetentionTime: Long = 
queryConfig.getMinIdleStateRetentionTime
+  protected val maxRetentionTime: Long = 
queryConfig.getMaxIdleStateRetentionTime
+  protected val stateCleaningEnabled: Boolean = minRetentionTime > 1
+
+  // state to record last timer of left stream, 0 means no timer
+  protected var leftTimer: ValueState[Long] = _
+  // state to record last timer of right stream, 0 means no timer
+  protected var rightTimer: ValueState[Long] = _
+
+  // other condition function
+  protected var joinFunction: FlatJoinFunction[Row, Row, Row] = _
+
+  override def open(parameters: Configuration): Unit = {
+LOG.debug(s"Compiling JoinFunction: $genJoinFuncName \n\n " +
+s"Code:\n$genJoinFuncCode")
+val clazz = compile(
+  getRuntimeContext.getUserCodeClassLoader,
+  genJoinFuncName,
+  genJoinFuncCode)
+LOG.debug("Instantiating JoinFunction.")
+joinFunction = clazz.newInstance()
+
+// initialize left and right state, the first element of tuple2 
indicates how many rows of
+// this row, while the second element represents the expired time of 
this 

[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...

2018-03-05 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r172408177
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowJoin.scala
 ---
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state.{MapState, MapStateDescriptor, 
ValueState, ValueStateDescriptor}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.TupleTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.typeutils.TypeCheckUtils._
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * Connect data for left stream and right stream. Base class for 
stream-stream non-window Join.
+  *
+  * @param leftType  the input type of left stream
+  * @param rightType the input type of right stream
+  * @param resultTypethe output type of join
+  * @param genJoinFuncName   the function code of other non-equi condition
+  * @param genJoinFuncCode   the function name of other non-equi condition
+  * @param queryConfig   the configuration for the query to generate
+  */
+abstract class NonWindowJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+resultType: TypeInformation[CRow],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+queryConfig: StreamQueryConfig)
+  extends CoProcessFunction[CRow, CRow, CRow]
+  with Compiler[FlatJoinFunction[Row, Row, Row]]
+  with Logging {
+
+  // check if input types implement proper equals/hashCode
+  validateEqualsHashCode("join", leftType)
+  validateEqualsHashCode("join", rightType)
+
+  // state to hold left stream element
+  protected var leftState: MapState[Row, JTuple2[Int, Long]] = _
+  // state to hold right stream element
+  protected var rightState: MapState[Row, JTuple2[Int, Long]] = _
+  protected var cRowWrapper: CRowWrappingMultiOutputCollector = _
+
+  protected val minRetentionTime: Long = 
queryConfig.getMinIdleStateRetentionTime
+  protected val maxRetentionTime: Long = 
queryConfig.getMaxIdleStateRetentionTime
+  protected val stateCleaningEnabled: Boolean = minRetentionTime > 1
+
+  // state to record last timer of left stream, 0 means no timer
+  protected var leftTimer: ValueState[Long] = _
+  // state to record last timer of right stream, 0 means no timer
+  protected var rightTimer: ValueState[Long] = _
+
+  // other condition function
+  protected var joinFunction: FlatJoinFunction[Row, Row, Row] = _
+
+  override def open(parameters: Configuration): Unit = {
+LOG.debug(s"Compiling JoinFunction: $genJoinFuncName \n\n " +
+s"Code:\n$genJoinFuncCode")
+val clazz = compile(
+  getRuntimeContext.getUserCodeClassLoader,
+  genJoinFuncName,
+  genJoinFuncCode)
+LOG.debug("Instantiating JoinFunction.")
+joinFunction = clazz.newInstance()
+
+// initialize left and right state, the first element of tuple2 
indicates how many rows of
+// this row, while the second element represents the expired time of 
this row.
+val tupleTypeInfo = new TupleTypeInfo[JTuple2[Int, Long]](Types.INT, 
Types.LONG)
+val leftStateDescriptor = new MapStateDescriptor[Row, JTuple2[Int, 
Long]](
+  "left", leftType, tupleTypeInfo)
+val 

[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...

2018-03-05 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r172408402
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowLeftJoinWithNonEquiPredicates.scala
 ---
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * Connect data for left stream and right stream. Only use for LeftJoin 
with NonEquiPredicates.
+  * An MapState of type [Row, Long] is added to record how many rows from 
the right table can be
+  * matched for each left row. Left join without NonEquiPredicates doesn't 
need it because
+  * left rows can always join right rows as long as join keys are same.
+  *
+  * @param leftTypethe input type of left stream
+  * @param rightType   the input type of right stream
+  * @param resultType  the output type of join
+  * @param genJoinFuncName the function code of other non-equi condition
+  * @param genJoinFuncCode the function name of other non-equi condition
+  * @param queryConfig the configuration for the query to generate
+  */
+class NonWindowLeftJoinWithNonEquiPredicates(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+resultType: TypeInformation[CRow],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+queryConfig: StreamQueryConfig)
+  extends NonWindowJoin(
+leftType,
+rightType,
+resultType,
+genJoinFuncName,
+genJoinFuncCode,
+queryConfig) {
+
+  // result row, all field from right will be null
+  private var resultRow: Row = _
+  // how many matched rows from the right table for each left row
+  private var leftJoinCnt: MapState[Row, Long] = _
+
+  override def open(parameters: Configuration): Unit = {
+super.open(parameters)
+
+val leftJoinCntDescriptor = new MapStateDescriptor[Row, Long](
--- End diff --

I think either is fine as long as they are consistent.


---


[jira] [Assigned] (FLINK-8861) Add support for batch queries in SQL Client

2018-03-05 Thread Xingcan Cui (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8861?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xingcan Cui reassigned FLINK-8861:
--

Assignee: Xingcan Cui

> Add support for batch queries in SQL Client
> ---
>
> Key: FLINK-8861
> URL: https://issues.apache.org/jira/browse/FLINK-8861
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> This issue is a subtask of part two "Full Embedded SQL Client" of the 
> implementation plan mentioned in 
> [FLIP-24|https://cwiki.apache.org/confluence/display/FLINK/FLIP-24+-+SQL+Client].
> Similar to streaming queries, it should be possible to execute batch queries 
> in the SQL Client and collect the results using {{DataSet.collect()}} for 
> debugging purposes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8689) Add runtime support of distinct filter using MapView

2018-03-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16387014#comment-16387014
 ] 

ASF GitHub Bot commented on FLINK-8689:
---

Github user walterddr commented on the issue:

https://github.com/apache/flink/pull/
  
@hequn8128 @fhueske Thanks for the feedback. I have updated the diff to 
directly use DistinctAccumulator for filtering and modified the 
`generateAggregation` API. Please kindly take another look when you have time. 
I have resolved the issue of multiple layer dataview codegen. 

In terms of reusing same `DataView` for multiple distinct aggregations 
against the same field, I tried to incorporate but there are many assumptions 
with single mapping between `AggregateFunction`s and `Accumulator`s that's hard 
to deal with. I am planning to continue and improve on it in a separated JIRA, 
what do you think?


> Add runtime support of distinct filter using MapView 
> -
>
> Key: FLINK-8689
> URL: https://issues.apache.org/jira/browse/FLINK-8689
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Rong Rong
>Assignee: Rong Rong
>Priority: Major
>
> This ticket should cover distinct aggregate function support to codegen for 
> *AggregateCall*, where *isDistinct* fields is set to true.
> This can be verified using the following SQL, which is not currently 
> producing correct results.
> {code:java}
> SELECT
>   a,
>   SUM(b) OVER (PARTITION BY a ORDER BY proctime ROWS BETWEEN 5 PRECEDING AND 
> CURRENT ROW)
> FROM
>   MyTable{code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5555: [FLINK-8689][table]Add runtime support of distinct filter...

2018-03-05 Thread walterddr
Github user walterddr commented on the issue:

https://github.com/apache/flink/pull/
  
@hequn8128 @fhueske Thanks for the feedback. I have updated the diff to 
directly use DistinctAccumulator for filtering and modified the 
`generateAggregation` API. Please kindly take another look when you have time. 
I have resolved the issue of multiple layer dataview codegen. 

In terms of reusing same `DataView` for multiple distinct aggregations 
against the same field, I tried to incorporate but there are many assumptions 
with single mapping between `AggregateFunction`s and `Accumulator`s that's hard 
to deal with. I am planning to continue and improve on it in a separated JIRA, 
what do you think?


---


[jira] [Created] (FLINK-8874) rewrite Flink docs/dev/stream/operators/process_function.md to recommend using KeyedProcessFunction

2018-03-05 Thread Bowen Li (JIRA)
Bowen Li created FLINK-8874:
---

 Summary: rewrite Flink 
docs/dev/stream/operators/process_function.md to recommend using 
KeyedProcessFunction
 Key: FLINK-8874
 URL: https://issues.apache.org/jira/browse/FLINK-8874
 Project: Flink
  Issue Type: Improvement
  Components: DataStream API, Documentation
Affects Versions: 1.5.0
Reporter: Bowen Li
Assignee: Bowen Li
 Fix For: 1.6.0


We need to completely rewrite Flink 
docs/dev/stream/operators/process_function.md to recommend using 
KeyedProcessFunction



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-5479) Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions

2018-03-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16386676#comment-16386676
 ] 

ASF GitHub Bot commented on FLINK-5479:
---

Github user EronWright commented on the issue:

https://github.com/apache/flink/pull/5634
  
I think the ideal would be that idleness would occur only for tail reads, 
i.e. due to a timeout from `kafkaConsumer.poll(pollTimeout)`.In other 
words, an intermittent connection issue would ideally not trigger idleness.


> Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions
> --
>
> Key: FLINK-5479
> URL: https://issues.apache.org/jira/browse/FLINK-5479
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Reported in ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-topic-partition-skewness-causes-watermark-not-being-emitted-td11008.html
> Similar to what's happening to idle sources blocking watermark progression in 
> downstream operators (see FLINK-5017), the per-partition watermark mechanism 
> in {{FlinkKafkaConsumer}} is also being blocked of progressing watermarks 
> when a partition is idle. The watermark of idle partitions is always 
> {{Long.MIN_VALUE}}, therefore the overall min watermark across all partitions 
> of a consumer subtask will never proceed.
> It's normally not a common case to have Kafka partitions not producing any 
> data, but it'll probably be good to handle this as well. I think we should 
> have a localized solution similar to FLINK-5017 for the per-partition 
> watermarks in {{AbstractFetcher}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5634: [FLINK-5479] [kafka] Idleness detection for periodic per-...

2018-03-05 Thread EronWright
Github user EronWright commented on the issue:

https://github.com/apache/flink/pull/5634
  
I think the ideal would be that idleness would occur only for tail reads, 
i.e. due to a timeout from `kafkaConsumer.poll(pollTimeout)`.In other 
words, an intermittent connection issue would ideally not trigger idleness.


---


[jira] [Created] (FLINK-8873) move unit tests of KeyedStream.scala from DataStreamTest.scala to KeyedStreamTest.scala

2018-03-05 Thread Bowen Li (JIRA)
Bowen Li created FLINK-8873:
---

 Summary: move unit tests of KeyedStream.scala from 
DataStreamTest.scala to KeyedStreamTest.scala
 Key: FLINK-8873
 URL: https://issues.apache.org/jira/browse/FLINK-8873
 Project: Flink
  Issue Type: Improvement
  Components: DataStream API, Tests
Affects Versions: 1.5.0
Reporter: Bowen Li
Assignee: Bowen Li
 Fix For: 1.6.0


move unit tests of KeyedStream.scala from DataStreamTest.scala to 
KeyedStreamTest.scala, in order to have clearer separation



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8480) Implement Java API to expose join functionality of TimeBoundedStreamJoinOperator

2018-03-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16386643#comment-16386643
 ] 

ASF GitHub Bot commented on FLINK-8480:
---

Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5482#discussion_r172303424
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
 ---
@@ -137,6 +158,151 @@ public EqualTo equalTo(KeySelector 
keySelector)  {
public  WithWindow 
window(WindowAssigner, W> assigner) {
return new WithWindow<>(input1, input2, 
keySelector1, keySelector2, keyType, assigner, null, null);
}
+
+   /**
+* Specifies the time boundaries over which the join 
operation works, so that
+* leftElement.timestamp + lowerBound <= 
rightElement.timestamp <= leftElement.timestamp + upperBound
+* By default both the lower and the upper bound are 
inclusive. This can be configured
+* with {@link 
TimeBounded#lowerBoundExclusive(boolean)} and
+* {@link TimeBounded#upperBoundExclusive(boolean)}
+*
+* @param lowerBound The lower bound. Needs to be 
smaller than or equal to the upperBound
+* @param upperBound The upper bound. Needs to be 
bigger than or equal to the lowerBound
+*/
+   public TimeBounded between(Time 
lowerBound, Time upperBound) {
+
+   TimeCharacteristic timeCharacteristic =
+   
input1.getExecutionEnvironment().getStreamTimeCharacteristic();
+
+   if (timeCharacteristic != 
TimeCharacteristic.EventTime) {
+   throw new 
RuntimeException("Time-bounded stream joins are only supported in event time");
+   }
+
+   checkNotNull(lowerBound, "A lower bound needs 
to be provided for a time-bounded join");
+   checkNotNull(upperBound, "An upper bound needs 
to be provided for a time-bounded join");
+   return new TimeBounded<>(
+   input1,
+   input2,
+   lowerBound.toMilliseconds(),
+   upperBound.toMilliseconds(),
+   true,
+   true,
+   keySelector1,
+   keySelector2
+   );
+   }
+   }
+   }
+
+   /**
+* Joined streams that have keys for both sides as well as the time 
boundaries over which
+* elements should be joined defined.
+*
+* @param  Input type of elements from the first stream
+* @param  Input type of elements from the second stream
+* @param  The type of the key
+*/
+   public static class TimeBounded {
+
+   private static final String TIMEBOUNDED_JOIN_FUNC_NAME = 
"TimeBoundedJoin";
+
+   private final DataStream left;
+   private final DataStream right;
+
+   private final long lowerBound;
+   private final long upperBound;
+
+   private final KeySelector keySelector1;
+   private final KeySelector keySelector2;
+
+   private boolean lowerBoundInclusive;
+   private boolean upperBoundInclusive;
+
+   public TimeBounded(
+   DataStream left,
+   DataStream right,
+   long lowerBound,
+   long upperBound,
+   boolean lowerBoundInclusive,
+   boolean upperBoundInclusive,
+   KeySelector keySelector1,
+   KeySelector keySelector2) {
+
+   this.left = Preconditions.checkNotNull(left);
+   this.right = Preconditions.checkNotNull(right);
+
+   this.lowerBound = lowerBound;
+   this.upperBound = upperBound;
+
+   this.lowerBoundInclusive = lowerBoundInclusive;
+   this.upperBoundInclusive = upperBoundInclusive;
+
+   this.keySelector1 = 
Preconditions.checkNotNull(keySelector1);
+   this.keySelector2 = 

[jira] [Commented] (FLINK-8480) Implement Java API to expose join functionality of TimeBoundedStreamJoinOperator

2018-03-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16386642#comment-16386642
 ] 

ASF GitHub Bot commented on FLINK-8480:
---

Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5482#discussion_r172306197
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimeBoundedStreamJoinOperator.java
 ---
@@ -0,0 +1,480 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.BooleanSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.OutputTag;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A TwoInputStreamOperator to execute time-bounded stream inner joins.
+ *
+ * By using a configurable lower and upper bound this operator will 
emit exactly those pairs
+ * (T1, T2) where t2.ts ∈ [T1.ts + lowerBound, T1.ts + upperBound]. Both 
the lower and the
+ * upper bound can be configured to be either inclusive or exclusive.
+ *
+ * As soon as elements are joined they are passed to a user-defined 
{@link TimeBoundedJoinFunction},
+ * as a {@link Tuple2}, with f0 being the left element and f1 being the 
right element
+ *
+ * The basic idea of this implementation is as follows: Whenever we 
receive an element at
+ * {@link #processElement1(StreamRecord)} (a.k.a. the left side), we add 
it to the left buffer.
+ * We then check the right buffer to see whether there are any elements 
that can be joined. If
+ * there are, they are joined and passed to a user-defined {@link 
TimeBoundedJoinFunction}.
+ * The same happens the other way around when receiving an element on the 
right side.
+ *
+ * In some cases the watermark needs to be delayed. This for example 
can happen if
+ * if t2.ts ∈ [t1.ts + 1, t1.ts + 2] and elements from t1 arrive earlier 
than elements from t2 and
+ * therefore get added to the left buffer. When an element now arrives on 
the right side, the
+ * watermark might have already progressed. The right element now gets 
joined with an
+ * older element from the left side, where the timestamp of the left 
element is lower than the
+ * current watermark, which would make this element late. This can be 
avoided by holding back the
+ * watermarks.
+ *
+ * The left and right buffers are cleared from unused values 
periodically
+ * (triggered by watermarks) in order not 

[jira] [Commented] (FLINK-8480) Implement Java API to expose join functionality of TimeBoundedStreamJoinOperator

2018-03-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16386641#comment-16386641
 ] 

ASF GitHub Bot commented on FLINK-8480:
---

Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5482#discussion_r172302583
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
 ---
@@ -137,6 +158,151 @@ public EqualTo equalTo(KeySelector 
keySelector)  {
public  WithWindow 
window(WindowAssigner, W> assigner) {
return new WithWindow<>(input1, input2, 
keySelector1, keySelector2, keyType, assigner, null, null);
}
+
+   /**
+* Specifies the time boundaries over which the join 
operation works, so that
+* leftElement.timestamp + lowerBound <= 
rightElement.timestamp <= leftElement.timestamp + upperBound
+* By default both the lower and the upper bound are 
inclusive. This can be configured
+* with {@link 
TimeBounded#lowerBoundExclusive(boolean)} and
+* {@link TimeBounded#upperBoundExclusive(boolean)}
+*
+* @param lowerBound The lower bound. Needs to be 
smaller than or equal to the upperBound
+* @param upperBound The upper bound. Needs to be 
bigger than or equal to the lowerBound
+*/
+   public TimeBounded between(Time 
lowerBound, Time upperBound) {
+
+   TimeCharacteristic timeCharacteristic =
+   
input1.getExecutionEnvironment().getStreamTimeCharacteristic();
+
+   if (timeCharacteristic != 
TimeCharacteristic.EventTime) {
+   throw new 
RuntimeException("Time-bounded stream joins are only supported in event time");
+   }
+
+   checkNotNull(lowerBound, "A lower bound needs 
to be provided for a time-bounded join");
+   checkNotNull(upperBound, "An upper bound needs 
to be provided for a time-bounded join");
+   return new TimeBounded<>(
+   input1,
+   input2,
+   lowerBound.toMilliseconds(),
+   upperBound.toMilliseconds(),
+   true,
+   true,
+   keySelector1,
+   keySelector2
+   );
+   }
+   }
+   }
+
+   /**
+* Joined streams that have keys for both sides as well as the time 
boundaries over which
+* elements should be joined defined.
+*
+* @param  Input type of elements from the first stream
+* @param  Input type of elements from the second stream
+* @param  The type of the key
+*/
+   public static class TimeBounded {
+
+   private static final String TIMEBOUNDED_JOIN_FUNC_NAME = 
"TimeBoundedJoin";
--- End diff --

hmm... this might be not very relevant, but I'd prefer a single config 
class that holds all function's names, rather than having them scattered all 
over the code base. 


> Implement Java API to expose join functionality of 
> TimeBoundedStreamJoinOperator
> 
>
> Key: FLINK-8480
> URL: https://issues.apache.org/jira/browse/FLINK-8480
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Florian Schmidt
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8480) Implement Java API to expose join functionality of TimeBoundedStreamJoinOperator

2018-03-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16386640#comment-16386640
 ] 

ASF GitHub Bot commented on FLINK-8480:
---

Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5482#discussion_r172303671
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimeBoundedStreamJoinOperator.java
 ---
@@ -0,0 +1,480 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.BooleanSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.OutputTag;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A TwoInputStreamOperator to execute time-bounded stream inner joins.
+ *
+ * By using a configurable lower and upper bound this operator will 
emit exactly those pairs
+ * (T1, T2) where t2.ts ∈ [T1.ts + lowerBound, T1.ts + upperBound]. Both 
the lower and the
+ * upper bound can be configured to be either inclusive or exclusive.
--- End diff --

bound**s**


> Implement Java API to expose join functionality of 
> TimeBoundedStreamJoinOperator
> 
>
> Key: FLINK-8480
> URL: https://issues.apache.org/jira/browse/FLINK-8480
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Florian Schmidt
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8480) Implement Java API to expose join functionality of TimeBoundedStreamJoinOperator

2018-03-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16386639#comment-16386639
 ] 

ASF GitHub Bot commented on FLINK-8480:
---

Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5482#discussion_r172302147
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
 ---
@@ -137,6 +158,151 @@ public EqualTo equalTo(KeySelector 
keySelector)  {
public  WithWindow 
window(WindowAssigner, W> assigner) {
return new WithWindow<>(input1, input2, 
keySelector1, keySelector2, keyType, assigner, null, null);
}
+
+   /**
+* Specifies the time boundaries over which the join 
operation works, so that
+* leftElement.timestamp + lowerBound <= 
rightElement.timestamp <= leftElement.timestamp + upperBound
+* By default both the lower and the upper bound are 
inclusive. This can be configured
+* with {@link 
TimeBounded#lowerBoundExclusive(boolean)} and
+* {@link TimeBounded#upperBoundExclusive(boolean)}
+*
+* @param lowerBound The lower bound. Needs to be 
smaller than or equal to the upperBound
+* @param upperBound The upper bound. Needs to be 
bigger than or equal to the lowerBound
+*/
+   public TimeBounded between(Time 
lowerBound, Time upperBound) {
+
+   TimeCharacteristic timeCharacteristic =
+   
input1.getExecutionEnvironment().getStreamTimeCharacteristic();
+
+   if (timeCharacteristic != 
TimeCharacteristic.EventTime) {
+   throw new 
RuntimeException("Time-bounded stream joins are only supported in event time");
--- End diff --

should use `IllegalStateException`. or even better, shall we create a Flink 
specific exception?


> Implement Java API to expose join functionality of 
> TimeBoundedStreamJoinOperator
> 
>
> Key: FLINK-8480
> URL: https://issues.apache.org/jira/browse/FLINK-8480
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Florian Schmidt
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5482: [FLINK-8480][DataStream] Add Java API for timeboun...

2018-03-05 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5482#discussion_r172306197
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimeBoundedStreamJoinOperator.java
 ---
@@ -0,0 +1,480 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.BooleanSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.OutputTag;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A TwoInputStreamOperator to execute time-bounded stream inner joins.
+ *
+ * By using a configurable lower and upper bound this operator will 
emit exactly those pairs
+ * (T1, T2) where t2.ts ∈ [T1.ts + lowerBound, T1.ts + upperBound]. Both 
the lower and the
+ * upper bound can be configured to be either inclusive or exclusive.
+ *
+ * As soon as elements are joined they are passed to a user-defined 
{@link TimeBoundedJoinFunction},
+ * as a {@link Tuple2}, with f0 being the left element and f1 being the 
right element
+ *
+ * The basic idea of this implementation is as follows: Whenever we 
receive an element at
+ * {@link #processElement1(StreamRecord)} (a.k.a. the left side), we add 
it to the left buffer.
+ * We then check the right buffer to see whether there are any elements 
that can be joined. If
+ * there are, they are joined and passed to a user-defined {@link 
TimeBoundedJoinFunction}.
+ * The same happens the other way around when receiving an element on the 
right side.
+ *
+ * In some cases the watermark needs to be delayed. This for example 
can happen if
+ * if t2.ts ∈ [t1.ts + 1, t1.ts + 2] and elements from t1 arrive earlier 
than elements from t2 and
+ * therefore get added to the left buffer. When an element now arrives on 
the right side, the
+ * watermark might have already progressed. The right element now gets 
joined with an
+ * older element from the left side, where the timestamp of the left 
element is lower than the
+ * current watermark, which would make this element late. This can be 
avoided by holding back the
+ * watermarks.
+ *
+ * The left and right buffers are cleared from unused values 
periodically
+ * (triggered by watermarks) in order not to grow infinitely.
+ *
+ *
+ * @param  The type of the elements in the left stream
+ * @param  The type of the elements in the right stream
+ * @param  The output type created by the user-defined function
+ */

[GitHub] flink pull request #5482: [FLINK-8480][DataStream] Add Java API for timeboun...

2018-03-05 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5482#discussion_r172303424
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
 ---
@@ -137,6 +158,151 @@ public EqualTo equalTo(KeySelector 
keySelector)  {
public  WithWindow 
window(WindowAssigner, W> assigner) {
return new WithWindow<>(input1, input2, 
keySelector1, keySelector2, keyType, assigner, null, null);
}
+
+   /**
+* Specifies the time boundaries over which the join 
operation works, so that
+* leftElement.timestamp + lowerBound <= 
rightElement.timestamp <= leftElement.timestamp + upperBound
+* By default both the lower and the upper bound are 
inclusive. This can be configured
+* with {@link 
TimeBounded#lowerBoundExclusive(boolean)} and
+* {@link TimeBounded#upperBoundExclusive(boolean)}
+*
+* @param lowerBound The lower bound. Needs to be 
smaller than or equal to the upperBound
+* @param upperBound The upper bound. Needs to be 
bigger than or equal to the lowerBound
+*/
+   public TimeBounded between(Time 
lowerBound, Time upperBound) {
+
+   TimeCharacteristic timeCharacteristic =
+   
input1.getExecutionEnvironment().getStreamTimeCharacteristic();
+
+   if (timeCharacteristic != 
TimeCharacteristic.EventTime) {
+   throw new 
RuntimeException("Time-bounded stream joins are only supported in event time");
+   }
+
+   checkNotNull(lowerBound, "A lower bound needs 
to be provided for a time-bounded join");
+   checkNotNull(upperBound, "An upper bound needs 
to be provided for a time-bounded join");
+   return new TimeBounded<>(
+   input1,
+   input2,
+   lowerBound.toMilliseconds(),
+   upperBound.toMilliseconds(),
+   true,
+   true,
+   keySelector1,
+   keySelector2
+   );
+   }
+   }
+   }
+
+   /**
+* Joined streams that have keys for both sides as well as the time 
boundaries over which
+* elements should be joined defined.
+*
+* @param  Input type of elements from the first stream
+* @param  Input type of elements from the second stream
+* @param  The type of the key
+*/
+   public static class TimeBounded {
+
+   private static final String TIMEBOUNDED_JOIN_FUNC_NAME = 
"TimeBoundedJoin";
+
+   private final DataStream left;
+   private final DataStream right;
+
+   private final long lowerBound;
+   private final long upperBound;
+
+   private final KeySelector keySelector1;
+   private final KeySelector keySelector2;
+
+   private boolean lowerBoundInclusive;
+   private boolean upperBoundInclusive;
+
+   public TimeBounded(
+   DataStream left,
+   DataStream right,
+   long lowerBound,
+   long upperBound,
+   boolean lowerBoundInclusive,
+   boolean upperBoundInclusive,
+   KeySelector keySelector1,
+   KeySelector keySelector2) {
+
+   this.left = Preconditions.checkNotNull(left);
+   this.right = Preconditions.checkNotNull(right);
+
+   this.lowerBound = lowerBound;
+   this.upperBound = upperBound;
+
+   this.lowerBoundInclusive = lowerBoundInclusive;
+   this.upperBoundInclusive = upperBoundInclusive;
+
+   this.keySelector1 = 
Preconditions.checkNotNull(keySelector1);
+   this.keySelector2 = 
Preconditions.checkNotNull(keySelector2);
+   }
+
+   /**
+* Configure whether the upper bound should be considered 
exclusive or inclusive.
+*/
+   public TimeBounded 

[GitHub] flink pull request #5482: [FLINK-8480][DataStream] Add Java API for timeboun...

2018-03-05 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5482#discussion_r172302147
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
 ---
@@ -137,6 +158,151 @@ public EqualTo equalTo(KeySelector 
keySelector)  {
public  WithWindow 
window(WindowAssigner, W> assigner) {
return new WithWindow<>(input1, input2, 
keySelector1, keySelector2, keyType, assigner, null, null);
}
+
+   /**
+* Specifies the time boundaries over which the join 
operation works, so that
+* leftElement.timestamp + lowerBound <= 
rightElement.timestamp <= leftElement.timestamp + upperBound
+* By default both the lower and the upper bound are 
inclusive. This can be configured
+* with {@link 
TimeBounded#lowerBoundExclusive(boolean)} and
+* {@link TimeBounded#upperBoundExclusive(boolean)}
+*
+* @param lowerBound The lower bound. Needs to be 
smaller than or equal to the upperBound
+* @param upperBound The upper bound. Needs to be 
bigger than or equal to the lowerBound
+*/
+   public TimeBounded between(Time 
lowerBound, Time upperBound) {
+
+   TimeCharacteristic timeCharacteristic =
+   
input1.getExecutionEnvironment().getStreamTimeCharacteristic();
+
+   if (timeCharacteristic != 
TimeCharacteristic.EventTime) {
+   throw new 
RuntimeException("Time-bounded stream joins are only supported in event time");
--- End diff --

should use `IllegalStateException`. or even better, shall we create a Flink 
specific exception?


---


[GitHub] flink pull request #5482: [FLINK-8480][DataStream] Add Java API for timeboun...

2018-03-05 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5482#discussion_r172302583
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
 ---
@@ -137,6 +158,151 @@ public EqualTo equalTo(KeySelector 
keySelector)  {
public  WithWindow 
window(WindowAssigner, W> assigner) {
return new WithWindow<>(input1, input2, 
keySelector1, keySelector2, keyType, assigner, null, null);
}
+
+   /**
+* Specifies the time boundaries over which the join 
operation works, so that
+* leftElement.timestamp + lowerBound <= 
rightElement.timestamp <= leftElement.timestamp + upperBound
+* By default both the lower and the upper bound are 
inclusive. This can be configured
+* with {@link 
TimeBounded#lowerBoundExclusive(boolean)} and
+* {@link TimeBounded#upperBoundExclusive(boolean)}
+*
+* @param lowerBound The lower bound. Needs to be 
smaller than or equal to the upperBound
+* @param upperBound The upper bound. Needs to be 
bigger than or equal to the lowerBound
+*/
+   public TimeBounded between(Time 
lowerBound, Time upperBound) {
+
+   TimeCharacteristic timeCharacteristic =
+   
input1.getExecutionEnvironment().getStreamTimeCharacteristic();
+
+   if (timeCharacteristic != 
TimeCharacteristic.EventTime) {
+   throw new 
RuntimeException("Time-bounded stream joins are only supported in event time");
+   }
+
+   checkNotNull(lowerBound, "A lower bound needs 
to be provided for a time-bounded join");
+   checkNotNull(upperBound, "An upper bound needs 
to be provided for a time-bounded join");
+   return new TimeBounded<>(
+   input1,
+   input2,
+   lowerBound.toMilliseconds(),
+   upperBound.toMilliseconds(),
+   true,
+   true,
+   keySelector1,
+   keySelector2
+   );
+   }
+   }
+   }
+
+   /**
+* Joined streams that have keys for both sides as well as the time 
boundaries over which
+* elements should be joined defined.
+*
+* @param  Input type of elements from the first stream
+* @param  Input type of elements from the second stream
+* @param  The type of the key
+*/
+   public static class TimeBounded {
+
+   private static final String TIMEBOUNDED_JOIN_FUNC_NAME = 
"TimeBoundedJoin";
--- End diff --

hmm... this might be not very relevant, but I'd prefer a single config 
class that holds all function's names, rather than having them scattered all 
over the code base. 


---


[GitHub] flink pull request #5482: [FLINK-8480][DataStream] Add Java API for timeboun...

2018-03-05 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5482#discussion_r172303671
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimeBoundedStreamJoinOperator.java
 ---
@@ -0,0 +1,480 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.BooleanSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.OutputTag;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A TwoInputStreamOperator to execute time-bounded stream inner joins.
+ *
+ * By using a configurable lower and upper bound this operator will 
emit exactly those pairs
+ * (T1, T2) where t2.ts ∈ [T1.ts + lowerBound, T1.ts + upperBound]. Both 
the lower and the
+ * upper bound can be configured to be either inclusive or exclusive.
--- End diff --

bound**s**


---


[jira] [Updated] (FLINK-8829) Flink in EMR(YARN) is down due to Akka communication issue

2018-03-05 Thread Aleksandr Filichkin (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8829?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aleksandr Filichkin updated FLINK-8829:
---
Description: 
Hi,

We have running Flink 1.3.2 app in Amazon EMR with YARN. Every week our Flink 
job is down due to:

_2018-02-16 19:00:04,595 WARN akka.remote.ReliableDeliverySupervisor - 
Association with remote system 
[akka.tcp://[fl...@ip-10-97-34-209.tr-fr-nonprod.aws-int.com:42177]|mailto:fl...@ip-10-97-34-209.tr-fr-nonprod.aws-int.com:42177]]
 has failed, address is now gated for [5000] ms. Reason: [Association failed 
with 
[akka.tcp://[fl...@ip-10-97-34-209.tr-fr-nonprod.aws-int.com:42177]]|mailto:fl...@ip-10-97-34-209.tr-fr-nonprod.aws-int.com:42177]]]
 Caused by: [Connection refused: 
ip-10-97-34-209.tr-fr-nonprod.aws-int.com/10.97.34.209:42177] 2018-02-16 
19:00:05,593 WARN akka.remote.RemoteWatcher - Detected unreachable: 
[akka.tcp://[fl...@ip-10-97-34-209.tr-fr-nonprod.aws-int.com:42177]|mailto:fl...@ip-10-97-34-209.tr-fr-nonprod.aws-int.com:42177]]
 2018-02-16 19:00:05,596 INFO 
org.apache.flink.runtime.client.JobSubmissionClientActor - Lost connection to 
JobManager 
akka.tcp://[fl...@ip-10-97-34-209.tr-fr-nonprod.aws-int.com:42177/user/jobmanager|mailto:fl...@ip-10-97-34-209.tr-fr-nonprod.aws-int.thomsonreuters.com:42177/user/jobmanager].
 Triggering connection timeout._

Do you have any ideas how to troubleshoot it?

 

  was:
Hi,

We have running Flink 1.3.2 app in Amazon EMR with YARN. Every week our Flink 
job is down due to:

_2018-02-16 19:00:04,595 WARN akka.remote.ReliableDeliverySupervisor - 
Association with remote system 
[akka.tcp://[fl...@ip-10-97-34-209.tr-fr-nonprod.aws-int.thomsonreuters.com:42177]|mailto:fl...@ip-10-97-34-209.tr-fr-nonprod.aws-int.thomsonreuters.com:42177]]
 has failed, address is now gated for [5000] ms. Reason: [Association failed 
with 
[akka.tcp://[fl...@ip-10-97-34-209.tr-fr-nonprod.aws-int.thomsonreuters.com:42177]]|mailto:fl...@ip-10-97-34-209.tr-fr-nonprod.aws-int.thomsonreuters.com:42177]]]
 Caused by: [Connection refused: 
ip-10-97-34-209.tr-fr-nonprod.aws-int.thomsonreuters.com/10.97.34.209:42177] 
2018-02-16 19:00:05,593 WARN akka.remote.RemoteWatcher - Detected unreachable: 
[akka.tcp://[fl...@ip-10-97-34-209.tr-fr-nonprod.aws-int.thomsonreuters.com:42177]|mailto:fl...@ip-10-97-34-209.tr-fr-nonprod.aws-int.thomsonreuters.com:42177]]
 2018-02-16 19:00:05,596 INFO 
org.apache.flink.runtime.client.JobSubmissionClientActor - Lost connection to 
JobManager 
akka.tcp://[fl...@ip-10-97-34-209.tr-fr-nonprod.aws-int.thomsonreuters.com:42177/user/jobmanager|mailto:fl...@ip-10-97-34-209.tr-fr-nonprod.aws-int.thomsonreuters.com:42177/user/jobmanager].
 Triggering connection timeout._

Do you have any ideas how to troubleshoot it?

 


> Flink in EMR(YARN) is down due to Akka communication issue
> --
>
> Key: FLINK-8829
> URL: https://issues.apache.org/jira/browse/FLINK-8829
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 1.3.2
>Reporter: Aleksandr Filichkin
>Priority: Major
>
> Hi,
> We have running Flink 1.3.2 app in Amazon EMR with YARN. Every week our Flink 
> job is down due to:
> _2018-02-16 19:00:04,595 WARN akka.remote.ReliableDeliverySupervisor - 
> Association with remote system 
> [akka.tcp://[fl...@ip-10-97-34-209.tr-fr-nonprod.aws-int.com:42177]|mailto:fl...@ip-10-97-34-209.tr-fr-nonprod.aws-int.com:42177]]
>  has failed, address is now gated for [5000] ms. Reason: [Association failed 
> with 
> [akka.tcp://[fl...@ip-10-97-34-209.tr-fr-nonprod.aws-int.com:42177]]|mailto:fl...@ip-10-97-34-209.tr-fr-nonprod.aws-int.com:42177]]]
>  Caused by: [Connection refused: 
> ip-10-97-34-209.tr-fr-nonprod.aws-int.com/10.97.34.209:42177] 2018-02-16 
> 19:00:05,593 WARN akka.remote.RemoteWatcher - Detected unreachable: 
> [akka.tcp://[fl...@ip-10-97-34-209.tr-fr-nonprod.aws-int.com:42177]|mailto:fl...@ip-10-97-34-209.tr-fr-nonprod.aws-int.com:42177]]
>  2018-02-16 19:00:05,596 INFO 
> org.apache.flink.runtime.client.JobSubmissionClientActor - Lost connection to 
> JobManager 
> akka.tcp://[fl...@ip-10-97-34-209.tr-fr-nonprod.aws-int.com:42177/user/jobmanager|mailto:fl...@ip-10-97-34-209.tr-fr-nonprod.aws-int.thomsonreuters.com:42177/user/jobmanager].
>  Triggering connection timeout._
> Do you have any ideas how to troubleshoot it?
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5642: [FLINK-8091] [flink-dist] Support running historys...

2018-03-05 Thread fnk
GitHub user fnk opened a pull request:

https://github.com/apache/flink/pull/5642

[FLINK-8091] [flink-dist] Support running historyserver in foreground

## Brief change log

*Allow historyserver to run in foreground*
  - The scripts "flink-console.sh" and "historyserver.sh" were adjusted to 
handle the "start-foreground" flag
  - The documentation for historyserver was adjusted accordingly


## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

We use this change in our production setup with flink 1.3 but as far as i 
can see the related scripts did not change between releases.

"mvn clean verify" succeeded.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency):  no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`:  no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
  - The S3 file system connector: no
## Documentation

  - Does this pull request introduce a new feature? yes
  - If yes, how is the feature documented? docs


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/noris-network/flink FLINK-8091

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5642.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5642


commit 1fdc24a86b59843020e7ccecb82e9f7daf9f34f7
Author: Andreas Fink 
Date:   2018-03-05T17:26:57Z

[FLINK-8091] Support running historyserver in foreground




---


[jira] [Commented] (FLINK-8091) Support running historyserver in foreground

2018-03-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16386414#comment-16386414
 ] 

ASF GitHub Bot commented on FLINK-8091:
---

GitHub user fnk opened a pull request:

https://github.com/apache/flink/pull/5642

[FLINK-8091] [flink-dist] Support running historyserver in foreground

## Brief change log

*Allow historyserver to run in foreground*
  - The scripts "flink-console.sh" and "historyserver.sh" were adjusted to 
handle the "start-foreground" flag
  - The documentation for historyserver was adjusted accordingly


## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

We use this change in our production setup with flink 1.3 but as far as i 
can see the related scripts did not change between releases.

"mvn clean verify" succeeded.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency):  no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`:  no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
  - The S3 file system connector: no
## Documentation

  - Does this pull request introduce a new feature? yes
  - If yes, how is the feature documented? docs


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/noris-network/flink FLINK-8091

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5642.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5642


commit 1fdc24a86b59843020e7ccecb82e9f7daf9f34f7
Author: Andreas Fink 
Date:   2018-03-05T17:26:57Z

[FLINK-8091] Support running historyserver in foreground




> Support running historyserver in foreground
> ---
>
> Key: FLINK-8091
> URL: https://issues.apache.org/jira/browse/FLINK-8091
> Project: Flink
>  Issue Type: Improvement
>  Components: Docker, History Server
>Affects Versions: 1.3.2
>Reporter: Joshua Griffith
>Priority: Minor
>
> The historyserver runs as a daemon. To use it with Docker it would be 
> convenient if it supported running in the foreground.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8703) Migrate tests from LocalFlinkMiniCluster to MiniClusterResource

2018-03-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16386404#comment-16386404
 ] 

ASF GitHub Bot commented on FLINK-8703:
---

Github user zentol closed the pull request at:

https://github.com/apache/flink/pull/5636


> Migrate tests from LocalFlinkMiniCluster to MiniClusterResource
> ---
>
> Key: FLINK-8703
> URL: https://issues.apache.org/jira/browse/FLINK-8703
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Reporter: Aljoscha Krettek
>Assignee: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5636: [FLINK-8703][tests] Port CancelingTestBase to Mini...

2018-03-05 Thread zentol
Github user zentol closed the pull request at:

https://github.com/apache/flink/pull/5636


---


[jira] [Commented] (FLINK-8818) Harden YarnFileStageTest upload test for eventual consistent read-after-write

2018-03-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8818?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16386398#comment-16386398
 ] 

ASF GitHub Bot commented on FLINK-8818:
---

Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/5601
  
The main issue was actually described in FLINK-8801:

According to 
https://docs.aws.amazon.com/AmazonS3/latest/dev/Introduction.html#ConsistencyModel:

> Amazon S3 provides read-after-write consistency for PUTS of new objects 
in your S3 bucket in all regions with one caveat. The caveat is that if you 
make a HEAD or GET request to the key name (to find if the object exists) 
before creating the object, Amazon S3 provides eventual consistency for 
read-after-write.


Some S3 file system implementations may actually execute such a request for 
the about-to-write object and thus the read-after-write is only eventually 
consistent.


> Harden YarnFileStageTest upload test for eventual consistent read-after-write
> -
>
> Key: FLINK-8818
> URL: https://issues.apache.org/jira/browse/FLINK-8818
> Project: Flink
>  Issue Type: Sub-task
>  Components: FileSystem, Tests, YARN
>Affects Versions: 1.5.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
> Fix For: 1.5.0, 1.4.3
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5601: [FLINK-8818][yarn/s3][tests] harden YarnFileStageTest upl...

2018-03-05 Thread NicoK
Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/5601
  
The main issue was actually described in FLINK-8801:

According to 
https://docs.aws.amazon.com/AmazonS3/latest/dev/Introduction.html#ConsistencyModel:

> Amazon S3 provides read-after-write consistency for PUTS of new objects 
in your S3 bucket in all regions with one caveat. The caveat is that if you 
make a HEAD or GET request to the key name (to find if the object exists) 
before creating the object, Amazon S3 provides eventual consistency for 
read-after-write.


Some S3 file system implementations may actually execute such a request for 
the about-to-write object and thus the read-after-write is only eventually 
consistent.


---


[jira] [Closed] (FLINK-8757) Add MiniClusterResource.getClusterClient()

2018-03-05 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8757?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-8757.
---
   Resolution: Duplicate
Fix Version/s: (was: 1.5.0)

> Add MiniClusterResource.getClusterClient()
> --
>
> Key: FLINK-8757
> URL: https://issues.apache.org/jira/browse/FLINK-8757
> Project: Flink
>  Issue Type: Improvement
>  Components: Client, Tests
>Affects Versions: 1.5.0
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Blocker
>
> Some of the tests that we need to port as part of FLINK-8700 need a way to 
> submit jobs asynchronously to the testing cluster. For this, we need to be 
> able to retrieve a {{ClusterClient}} and expose a method for asynchronous job 
> submission.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8871) Checkpoint cancellation is not propagated to stop checkpointing threads on the task manager

2018-03-05 Thread Stefan Richter (JIRA)
Stefan Richter created FLINK-8871:
-

 Summary: Checkpoint cancellation is not propagated to stop 
checkpointing threads on the task manager
 Key: FLINK-8871
 URL: https://issues.apache.org/jira/browse/FLINK-8871
 Project: Flink
  Issue Type: Bug
  Components: State Backends, Checkpointing
Affects Versions: 1.4.1, 1.3.2, 1.5.0
Reporter: Stefan Richter
 Fix For: 1.6.0


Flink currently lacks any form of feedback mechanism from the job manager / 
checkpoint coordinator to the tasks when it comes to failing a checkpoint. This 
means that running snapshots on the tasks are also not stopped even if their 
owning checkpoint is already cancelled. Two examples for cases where this 
applies are checkpoint timeouts and local checkpoint failures on a task 
together with a configuration that does not fail tasks on checkpoint failure. 
Notice that those running snapshots do no longer account for the maximum number 
of parallel checkpoints, because their owning checkpoint is considered as 
cancelled.

Not stopping the task's snapshot thread can lead to a problematic situation 
where the next checkpoints already started, while the abandoned checkpoint 
thread from a previous checkpoint is still lingering around running. This 
scenario can potentially cascade: many parallel checkpoints will slow down 
checkpointing and make timeouts even more likely.

 

A possible solution is introducing a {{cancelCheckpoint}} method  as 
counterpart to the {{triggerCheckpoint}} method in the task manager gateway, 
which is invoked by the checkpoint coordinator as part of cancelling the 
checkpoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8872) Yarn detached mode via -yd does not detach

2018-03-05 Thread Nico Kruber (JIRA)
Nico Kruber created FLINK-8872:
--

 Summary: Yarn detached mode via -yd does not detach
 Key: FLINK-8872
 URL: https://issues.apache.org/jira/browse/FLINK-8872
 Project: Flink
  Issue Type: Bug
  Components: Client, YARN
Affects Versions: 1.5.0
Reporter: Nico Kruber
Assignee: Nico Kruber
 Fix For: 1.5.0


Running yarn per-job cluster in detached mode currently does not work and waits 
for the job to finish.

Example:
{code}
./bin/flink run -m yarn-cluster -yn 10 -yjm 768 -ytm 3072 -ys 2 -yd -p 20 -c 
org.apache.flink.streaming.examples.wordcount.WordCount 
./examples/streaming/WordCount.jar --input
{code}

Output in case of an infinite program would then end with something like this:
{code}
2018-03-05 13:41:23,311 INFO  
org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Waiting for the 
cluster to be allocated
2018-03-05 13:41:23,313 INFO  
org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deploying 
cluster, current state ACCEPTED
2018-03-05 13:41:28,342 INFO  
org.apache.flink.yarn.AbstractYarnClusterDescriptor   - YARN 
application has been deployed successfully.
2018-03-05 13:41:28,343 INFO  
org.apache.flink.yarn.AbstractYarnClusterDescriptor   - The Flink YARN 
client has been started in detached mode. In order to stop Flink on YARN, use 
the following command or a YARN web interface to stop it:
yarn application -kill application_1519984124671_0006
Please also note that the temporary files of the YARN session in the home 
directoy will not be removed.
Starting execution of program
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-8849) Wrong link from concepts/runtime to doc on chaining

2018-03-05 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8849?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-8849.
---
   Resolution: Fixed
 Assignee: Ken Krugler
Fix Version/s: 1.4.2
   1.5.0

master: 38785a0072b58b0238615a3bdf8f6da579f98154

1.5: e19b9fd9debcc56747b1ba364cbe1d5837274d86

1.4: 5aa481d5aa2a22755959fdc48e3449535f2b44e9

> Wrong link from concepts/runtime to doc on chaining
> ---
>
> Key: FLINK-8849
> URL: https://issues.apache.org/jira/browse/FLINK-8849
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Reporter: Ken Krugler
>Assignee: Ken Krugler
>Priority: Minor
> Fix For: 1.5.0, 1.4.2
>
>
> On 
> https://ci.apache.org/projects/flink/flink-docs-master/concepts/runtime.html 
> there's a link to "chaining docs" that currently points at:
> https://ci.apache.org/projects/flink/flink-docs-master/dev/datastream_api.html#task-chaining-and-resource-groups
> but it should link to:
> https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/#task-chaining-and-resource-groups



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8601) Introduce PartitionedBloomFilter for Approximate calculation and other situations of performance optimization

2018-03-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8601?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16386365#comment-16386365
 ] 

ASF GitHub Bot commented on FLINK-8601:
---

Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/5641
  
For the sake of easy to discussion later..


> Introduce PartitionedBloomFilter for Approximate calculation and other 
> situations of performance optimization
> -
>
> Key: FLINK-8601
> URL: https://issues.apache.org/jira/browse/FLINK-8601
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API, State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
>
> h3. Backgroud
> Bloom filter is useful in many situation, for example:
>  * 1. Approximate calculation: deduplication (eg: UV calculation)
>  * 2. Performance optimization: eg, [runtime filter 
> join|https://www.cloudera.com/documentation/enterprise/5-9-x/topics/impala_runtime_filtering.html]
>By using BF, we can greatly reduce the number of queries for state 
> data in a stream join, and these filtered queries will eventually fail to 
> find any results, which is a poor performance for rocksdb-based state due to 
> traversing ```sst``` on the disk. 
> However, based on the current status provided by flink, it is hard to use the 
> bloom filter for the following reasons:
>  * 1. Serialization problem: Bloom filter status can be large (for example: 
> 100M), if implement it based on the RocksDB state, the state data will need 
> to be serialized each time it is queried and updated, and the performance 
> will be very poor.
>  * 2. Data skewed: Data in different key group can be skewed, and the 
> information of data skewed can not be accurately predicted before the program 
> is running. Therefore, it is impossible to determine how much resources bloom 
> filter should allocate. One way to do this is to allocate space needed for 
> the most skewed case, but this can lead to very serious waste of resources.
> h3. Requirement
> Therefore, I introduce the PartitionedBloomFilter for flink, which at least 
> need to meet the following features:
>  * 1. Support for changing Parallelism
>  * 2. Only serialize when necessary: when performing checkpoint
>  * 3. Can deal with data skew problem: users only need to specify a 
> PartitionedBloomFilter with the desired input, fpp, system will allocate 
> resource dynamic.
>  * 4. Do not conflict with other state: user can use KeyedState and 
> OperateState when using this bloom filter.
>  * 5. Support relax ttl (ie: the data survival time at least greater than the 
> specified time)
> Design doc:  [design 
> doc|https://docs.google.com/document/d/1s8w2dkNFDM9Fb2zoHwHY0hJRrqatAFta42T97nDXmqc/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5641: [FLINK-8601][WIP] Introduce PartitionedBloomFilter for Ap...

2018-03-05 Thread sihuazhou
Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/5641
  
For the sake of easy to discussion later..


---


[jira] [Closed] (FLINK-8857) HBase connector read example throws exception at the end.

2018-03-05 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8857?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-8857.
---
   Resolution: Fixed
Fix Version/s: 1.4.3
   1.5.0

master: 72bba50aa3382dbfe904cdb36797bc716f76a129

1.5: 302781dfe5d9e0a6b9246732ac3227db2ced64f6

1.4: 87af955b3a0d9e173c1901c5998cf5876cff91ea

> HBase connector read example throws exception at the end.
> -
>
> Key: FLINK-8857
> URL: https://issues.apache.org/jira/browse/FLINK-8857
> Project: Flink
>  Issue Type: Bug
>  Components: Batch Connectors and Input/Output Formats
>Affects Versions: 1.2.0, 1.3.2, 1.5.0, 1.4.1
>Reporter: Xu Zhang
>Assignee: Xu Zhang
>Priority: Trivial
>  Labels: easy-fix, starter
> Fix For: 1.5.0, 1.4.3
>
>
> Running test case example of
> {code:java}
> flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java{code}
> Although the result has been printed out successfully, but at the end, driver 
> will throw the following exception.
> {code:java}
> 
> The program finished with the following exception:
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error.
> at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:545)
> at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:419)
> at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:381)
> at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:838)
> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:259)
> at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1086)
> at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1133)
> at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1130)
> at 
> org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1656)
> at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1130)
> Caused by: java.lang.RuntimeException: No new data sinks have been defined 
> since the last execution. The last execution refers to the latest call to 
> 'execute()', 'count()', 'collect()', or 'print()'.
> at 
> org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:1050)
> at 
> org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:1032)
> at 
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:59)
> at 
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:926)
> at com.hulu.ap.flink.scoring.pipeline.ScoringJob.main(ScoringJob.java:82)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528)
> ... 13 more
> {code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5641: [FLINK-8601][WIP] Introduce PartitionedBloomFilter...

2018-03-05 Thread sihuazhou
GitHub user sihuazhou opened a pull request:

https://github.com/apache/flink/pull/5641

[FLINK-8601][WIP] Introduce PartitionedBloomFilter for Approximate 
calculation and other situations of performance optimization

This PR introduce PartitionedBloomFilter which support rescaling and can 
deal with data skew problem
 properly.

## Brief change log

- introduce PartitionedBloomFilter for Approximate calculation and other 
situations of performance optimization.

## Verifying this change

This change can be verified by the unit tests in below files:
- PartitionedBloomFilterTest.java
- LinkedBloomFilterTest.java
- LinkedBloomFilterNodeTest.java
- PartitionedBloomFilterManagerTest.java

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (yes)
 doc: [google 
doc](https://docs.google.com/document/d/1s8w2dkNFDM9Fb2zoHwHY0hJRrqatAFta42T97nDXmqc/edit?usp=sharing)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sihuazhou/flink bloomfilter_state

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5641.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5641


commit 5429abe0031a93596b12dada6e9696f3179eb4e8
Author: summerleafs 
Date:   2018-02-06T16:47:25Z

introduce bloom filter state.

commit 2d1f66c10fbf74272be76283b909b290ae55d4fd
Author: summerleafs 
Date:   2018-02-07T14:52:22Z

add unit tests for bloom filter state.

commit 433370a12814f7bd80127d4508e1dd0812a9d3fe
Author: summerleafs 
Date:   2018-02-07T18:12:13Z

add general type support.

commit 5e05ee84353516fe7ff6eb7dd3a01dfdb3337bc5
Author: summerleafs 
Date:   2018-02-09T15:10:11Z

this is a tmp commit.

commit 6e4ff0cebed853c598e0647e9f8aa56b5b59d0cc
Author: summerleafs 
Date:   2018-02-10T14:30:13Z

this is a tmp commit.

commit aa672e6e1e89b185722fde44a9b4044b87010c99
Author: summerleafs 
Date:   2018-02-10T15:32:01Z

this is a tmp commit.

commit 3b04502ba277cad2a7b0bc381fb192d18b56f17d
Author: summerleafs 
Date:   2018-02-11T11:34:54Z

fix build.

commit 775d6aaf354de35c7ddff242f8e006e13e9a0e76
Author: summerleafs 
Date:   2018-02-12T03:52:43Z

add annotation for classes.

commit b7f04303aa1ec1fbe9696bb58b13838b6a74a7ae
Author: summerleafs 
Date:   2018-02-12T03:53:19Z

a temp commit.

commit 28222bf5fc352a26082f2aee19be70ca5f9aa9d9
Author: sihuazhou 
Date:   2018-03-05T16:48:15Z

fix build.




---


[jira] [Commented] (FLINK-8601) Introduce PartitionedBloomFilter for Approximate calculation and other situations of performance optimization

2018-03-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8601?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16386361#comment-16386361
 ] 

ASF GitHub Bot commented on FLINK-8601:
---

GitHub user sihuazhou opened a pull request:

https://github.com/apache/flink/pull/5641

[FLINK-8601][WIP] Introduce PartitionedBloomFilter for Approximate 
calculation and other situations of performance optimization

This PR introduce PartitionedBloomFilter which support rescaling and can 
deal with data skew problem
 properly.

## Brief change log

- introduce PartitionedBloomFilter for Approximate calculation and other 
situations of performance optimization.

## Verifying this change

This change can be verified by the unit tests in below files:
- PartitionedBloomFilterTest.java
- LinkedBloomFilterTest.java
- LinkedBloomFilterNodeTest.java
- PartitionedBloomFilterManagerTest.java

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (yes)
 doc: [google 
doc](https://docs.google.com/document/d/1s8w2dkNFDM9Fb2zoHwHY0hJRrqatAFta42T97nDXmqc/edit?usp=sharing)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sihuazhou/flink bloomfilter_state

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5641.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5641


commit 5429abe0031a93596b12dada6e9696f3179eb4e8
Author: summerleafs 
Date:   2018-02-06T16:47:25Z

introduce bloom filter state.

commit 2d1f66c10fbf74272be76283b909b290ae55d4fd
Author: summerleafs 
Date:   2018-02-07T14:52:22Z

add unit tests for bloom filter state.

commit 433370a12814f7bd80127d4508e1dd0812a9d3fe
Author: summerleafs 
Date:   2018-02-07T18:12:13Z

add general type support.

commit 5e05ee84353516fe7ff6eb7dd3a01dfdb3337bc5
Author: summerleafs 
Date:   2018-02-09T15:10:11Z

this is a tmp commit.

commit 6e4ff0cebed853c598e0647e9f8aa56b5b59d0cc
Author: summerleafs 
Date:   2018-02-10T14:30:13Z

this is a tmp commit.

commit aa672e6e1e89b185722fde44a9b4044b87010c99
Author: summerleafs 
Date:   2018-02-10T15:32:01Z

this is a tmp commit.

commit 3b04502ba277cad2a7b0bc381fb192d18b56f17d
Author: summerleafs 
Date:   2018-02-11T11:34:54Z

fix build.

commit 775d6aaf354de35c7ddff242f8e006e13e9a0e76
Author: summerleafs 
Date:   2018-02-12T03:52:43Z

add annotation for classes.

commit b7f04303aa1ec1fbe9696bb58b13838b6a74a7ae
Author: summerleafs 
Date:   2018-02-12T03:53:19Z

a temp commit.

commit 28222bf5fc352a26082f2aee19be70ca5f9aa9d9
Author: sihuazhou 
Date:   2018-03-05T16:48:15Z

fix build.




> Introduce PartitionedBloomFilter for Approximate calculation and other 
> situations of performance optimization
> -
>
> Key: FLINK-8601
> URL: https://issues.apache.org/jira/browse/FLINK-8601
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API, State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
>
> h3. Backgroud
> Bloom filter is useful in many situation, for example:
>  * 1. Approximate calculation: deduplication (eg: UV calculation)
>  * 2. Performance optimization: eg, [runtime filter 
> join|https://www.cloudera.com/documentation/enterprise/5-9-x/topics/impala_runtime_filtering.html]
>By using BF, we can greatly reduce the number of queries for state 
> data in a stream join, and these filtered queries will eventually fail to 
> find any results, which is a poor performance for rocksdb-based state due to 
> traversing ```sst``` on the disk. 
> However, based on the current status provided by flink, it is hard to use the 
> bloom filter for the following reasons:
>  * 1. Serialization problem: Bloom filter status can be large (for example: 
> 100M), if implement it based on 

[jira] [Resolved] (FLINK-8337) GatherSumApplyITCase.testConnectedComponentsWithObjectReuseDisabled instable

2018-03-05 Thread Nico Kruber (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8337?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nico Kruber resolved FLINK-8337.

   Resolution: Fixed
 Assignee: Nico Kruber
Fix Version/s: (was: 1.4.3)

Should be fixed with FLINK-8517.

> GatherSumApplyITCase.testConnectedComponentsWithObjectReuseDisabled instable
> 
>
> Key: FLINK-8337
> URL: https://issues.apache.org/jira/browse/FLINK-8337
> Project: Flink
>  Issue Type: Bug
>  Components: Gelly, Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Nico Kruber
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.5.0
>
>
> The {{GatherSumApplyITCase.testConnectedComponentsWithObjectReuseDisabled}} 
> fails on Travis. It looks as if a sub partition has not been registered at 
> the task event dispatcher.
> https://travis-ci.org/apache/flink/jobs/323930301



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8849) Wrong link from concepts/runtime to doc on chaining

2018-03-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16386350#comment-16386350
 ] 

ASF GitHub Bot commented on FLINK-8849:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5630


> Wrong link from concepts/runtime to doc on chaining
> ---
>
> Key: FLINK-8849
> URL: https://issues.apache.org/jira/browse/FLINK-8849
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Reporter: Ken Krugler
>Priority: Minor
>
> On 
> https://ci.apache.org/projects/flink/flink-docs-master/concepts/runtime.html 
> there's a link to "chaining docs" that currently points at:
> https://ci.apache.org/projects/flink/flink-docs-master/dev/datastream_api.html#task-chaining-and-resource-groups
> but it should link to:
> https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/#task-chaining-and-resource-groups



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8857) HBase connector read example throws exception at the end.

2018-03-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16386351#comment-16386351
 ] 

ASF GitHub Bot commented on FLINK-8857:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5633


> HBase connector read example throws exception at the end.
> -
>
> Key: FLINK-8857
> URL: https://issues.apache.org/jira/browse/FLINK-8857
> Project: Flink
>  Issue Type: Bug
>  Components: Batch Connectors and Input/Output Formats
>Affects Versions: 1.2.0, 1.3.2, 1.5.0, 1.4.1
>Reporter: Xu Zhang
>Assignee: Xu Zhang
>Priority: Trivial
>  Labels: easy-fix, starter
>
> Running test case example of
> {code:java}
> flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java{code}
> Although the result has been printed out successfully, but at the end, driver 
> will throw the following exception.
> {code:java}
> 
> The program finished with the following exception:
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error.
> at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:545)
> at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:419)
> at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:381)
> at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:838)
> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:259)
> at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1086)
> at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1133)
> at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1130)
> at 
> org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1656)
> at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1130)
> Caused by: java.lang.RuntimeException: No new data sinks have been defined 
> since the last execution. The last execution refers to the latest call to 
> 'execute()', 'count()', 'collect()', or 'print()'.
> at 
> org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:1050)
> at 
> org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:1032)
> at 
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:59)
> at 
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:926)
> at com.hulu.ap.flink.scoring.pipeline.ScoringJob.main(ScoringJob.java:82)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528)
> ... 13 more
> {code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7226) REST responses contain invalid content-encoding header

2018-03-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16386349#comment-16386349
 ] 

ASF GitHub Bot commented on FLINK-7226:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5590


> REST responses contain invalid content-encoding header
> --
>
> Key: FLINK-7226
> URL: https://issues.apache.org/jira/browse/FLINK-7226
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Affects Versions: 1.2.0, 1.1.4, 1.3.1, 1.4.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.4.0, 1.3.2
>
>
> FLINK-5705 made changes to the {{RuntimeMonitorHandler}} to set the 
> {{content-encoding}} header to {{UTF-8}}. This however isn't a valid value 
> for this header, and should instead be included in the {{content-type}} 
> header.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5590: [hotfix][REST] Fix CONTENT_TYPE header

2018-03-05 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5590


---


[GitHub] flink pull request #5592: [hotfix] fix javadoc link of ClusterClient#trigger...

2018-03-05 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5592


---


[GitHub] flink pull request #5607: [hotfix][docs] Drop the incorrect parallel remark ...

2018-03-05 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5607


---


[GitHub] flink pull request #5633: [FLINK-8857] [Hbase] Avoid HBase connector read ex...

2018-03-05 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5633


---


[GitHub] flink pull request #5630: [FLINK-8849][Documentation] Fix link to chaining d...

2018-03-05 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5630


---


[GitHub] flink pull request #5627: [doc] Remove missed CheckpointedRestoring

2018-03-05 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5627


---


[jira] [Commented] (FLINK-8818) Harden YarnFileStageTest upload test for eventual consistent read-after-write

2018-03-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8818?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16386341#comment-16386341
 ] 

ASF GitHub Bot commented on FLINK-8818:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5601
  
S3 is actually strongly consistent when reading newly created objects, just 
not in listing or renaming objects (files).

The test seems to actually use reads of full paths, so wondering why there 
is a failure in the first place.

If there is an issue that the Yarn upload code relies on eventually 
consistent operations, then fixing the test by retries may disguise the actual 
issue. If there is no eventually consistent operation, then this should not be 
necessary in the first place. I fear this change may be down a tricky path...

Can you explain/double check why the failure happened and why the retry is 
necessary to stabilize the test, but the actual Yarn code is not affected by 
this?


> Harden YarnFileStageTest upload test for eventual consistent read-after-write
> -
>
> Key: FLINK-8818
> URL: https://issues.apache.org/jira/browse/FLINK-8818
> Project: Flink
>  Issue Type: Sub-task
>  Components: FileSystem, Tests, YARN
>Affects Versions: 1.5.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
> Fix For: 1.5.0, 1.4.3
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5601: [FLINK-8818][yarn/s3][tests] harden YarnFileStageTest upl...

2018-03-05 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5601
  
S3 is actually strongly consistent when reading newly created objects, just 
not in listing or renaming objects (files).

The test seems to actually use reads of full paths, so wondering why there 
is a failure in the first place.

If there is an issue that the Yarn upload code relies on eventually 
consistent operations, then fixing the test by retries may disguise the actual 
issue. If there is no eventually consistent operation, then this should not be 
necessary in the first place. I fear this change may be down a tricky path...

Can you explain/double check why the failure happened and why the retry is 
necessary to stabilize the test, but the actual Yarn code is not affected by 
this?


---


[jira] [Closed] (FLINK-8870) End-to-end tests wrongly pass if md5sum is not installed

2018-03-05 Thread Florian Schmidt (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8870?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Florian Schmidt closed FLINK-8870.
--
Resolution: Invalid

Behaves as expected, I just misinterpreted the output

> End-to-end tests wrongly pass if md5sum is not installed
> 
>
> Key: FLINK-8870
> URL: https://issues.apache.org/jira/browse/FLINK-8870
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.4.1
>Reporter: Florian Schmidt
>Priority: Major
> Fix For: 1.5.0
>
>
> Actual: The end-to-end tests don't fail if md5sum is not installed
> {code:bash}
> Job with JobID 95f6a482cc9800f1daeb907f4940a116 has finished.
> Job Runtime: 1067 ms
> /Users/florianschmidt/dev/flink/flink-end-to-end-tests/test-scripts/common.sh:
>  line 120: md5sum: command not found
> pass WordCount
> Stopping taskexecutor daemon (pid: 33007) on host Florians-MBP.fritz.box.
> Stopping standalonesession daemon (pid: 32715) on host Florians-MBP.fritz.box.
> All tests PASS
> {code}
> Expected: The tests should fail



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8807) ZookeeperCompleted checkpoint store can get stuck in infinite loop

2018-03-05 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8807?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16386260#comment-16386260
 ] 

Aljoscha Krettek commented on FLINK-8807:
-

Fixed on release-1.4 in
64adf4bcfa387047036534059cfe361977439cca

> ZookeeperCompleted checkpoint store can get stuck in infinite loop
> --
>
> Key: FLINK-8807
> URL: https://issues.apache.org/jira/browse/FLINK-8807
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.5.0
>
>
> This code: 
> https://github.com/apache/flink/blob/9071e3befb8c279f73c3094c9f6bddc0e7cce9e5/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java#L201
>  can be stuck forever if at least one checkpoint is not readable because 
> {{CompletedCheckpoint}} does not have a proper {{equals()}}/{{hashCode()}} 
> anymore.
> We have to fix this and also add a unit test that verifies the loop still 
> works if we make one snapshot unreadable.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8870) End-to-end tests wrongly pass if md5sum is not installed

2018-03-05 Thread Florian Schmidt (JIRA)
Florian Schmidt created FLINK-8870:
--

 Summary: End-to-end tests wrongly pass if md5sum is not installed
 Key: FLINK-8870
 URL: https://issues.apache.org/jira/browse/FLINK-8870
 Project: Flink
  Issue Type: Bug
  Components: Tests
Affects Versions: 1.4.1
Reporter: Florian Schmidt
 Fix For: 1.5.0


Actual: The end-to-end tests don't fail if md5sum is not installed

{code:bash}
Job with JobID 95f6a482cc9800f1daeb907f4940a116 has finished.
Job Runtime: 1067 ms
/Users/florianschmidt/dev/flink/flink-end-to-end-tests/test-scripts/common.sh: 
line 120: md5sum: command not found
pass WordCount
Stopping taskexecutor daemon (pid: 33007) on host Florians-MBP.fritz.box.
Stopping standalonesession daemon (pid: 32715) on host Florians-MBP.fritz.box.
All tests PASS
{code}

Expected: The tests should fail



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8807) ZookeeperCompleted checkpoint store can get stuck in infinite loop

2018-03-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8807?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16386252#comment-16386252
 ] 

ASF GitHub Bot commented on FLINK-8807:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/5623
  
Thanks for the comments! I'll incorporate them and merge.


> ZookeeperCompleted checkpoint store can get stuck in infinite loop
> --
>
> Key: FLINK-8807
> URL: https://issues.apache.org/jira/browse/FLINK-8807
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.5.0
>
>
> This code: 
> https://github.com/apache/flink/blob/9071e3befb8c279f73c3094c9f6bddc0e7cce9e5/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java#L201
>  can be stuck forever if at least one checkpoint is not readable because 
> {{CompletedCheckpoint}} does not have a proper {{equals()}}/{{hashCode()}} 
> anymore.
> We have to fix this and also add a unit test that verifies the loop still 
> works if we make one snapshot unreadable.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8807) ZookeeperCompleted checkpoint store can get stuck in infinite loop

2018-03-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8807?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16386253#comment-16386253
 ] 

ASF GitHub Bot commented on FLINK-8807:
---

Github user aljoscha closed the pull request at:

https://github.com/apache/flink/pull/5623


> ZookeeperCompleted checkpoint store can get stuck in infinite loop
> --
>
> Key: FLINK-8807
> URL: https://issues.apache.org/jira/browse/FLINK-8807
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.5.0
>
>
> This code: 
> https://github.com/apache/flink/blob/9071e3befb8c279f73c3094c9f6bddc0e7cce9e5/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java#L201
>  can be stuck forever if at least one checkpoint is not readable because 
> {{CompletedCheckpoint}} does not have a proper {{equals()}}/{{hashCode()}} 
> anymore.
> We have to fix this and also add a unit test that verifies the loop still 
> works if we make one snapshot unreadable.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5623: [FLINK-8807] Fix ZookeeperCompleted checkpoint store can ...

2018-03-05 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/5623
  
Thanks for the comments! I'll incorporate them and merge.


---


[GitHub] flink pull request #5623: [FLINK-8807] Fix ZookeeperCompleted checkpoint sto...

2018-03-05 Thread aljoscha
Github user aljoscha closed the pull request at:

https://github.com/apache/flink/pull/5623


---


[jira] [Created] (FLINK-8869) Kafka restore from checkpoint without react to the new add partition to kafka server

2018-03-05 Thread aitozi (JIRA)
aitozi created FLINK-8869:
-

 Summary: Kafka restore from checkpoint without react to the new 
add partition to kafka server
 Key: FLINK-8869
 URL: https://issues.apache.org/jira/browse/FLINK-8869
 Project: Flink
  Issue Type: Bug
  Components: Kafka Connector
Affects Versions: 1.3.2
Reporter: aitozi
Assignee: aitozi


When job restore from a savepoint and the kafka server has added serval 
partition , it doesnt consume data or produce data from/to the new partition



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8807) ZookeeperCompleted checkpoint store can get stuck in infinite loop

2018-03-05 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8807?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16386197#comment-16386197
 ] 

Aljoscha Krettek commented on FLINK-8807:
-

Fixed on release-1.5 in
f8681a9d7732f6f36483e26ec68624809f6cd4b1

Fixed on master in
4226bf22aab5b4359998422fe53755db19785515

> ZookeeperCompleted checkpoint store can get stuck in infinite loop
> --
>
> Key: FLINK-8807
> URL: https://issues.apache.org/jira/browse/FLINK-8807
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.5.0
>
>
> This code: 
> https://github.com/apache/flink/blob/9071e3befb8c279f73c3094c9f6bddc0e7cce9e5/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java#L201
>  can be stuck forever if at least one checkpoint is not readable because 
> {{CompletedCheckpoint}} does not have a proper {{equals()}}/{{hashCode()}} 
> anymore.
> We have to fix this and also add a unit test that verifies the loop still 
> works if we make one snapshot unreadable.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8868) Support Table Function as Table

2018-03-05 Thread Ruidong Li (JIRA)
Ruidong Li created FLINK-8868:
-

 Summary: Support Table Function as Table
 Key: FLINK-8868
 URL: https://issues.apache.org/jira/browse/FLINK-8868
 Project: Flink
  Issue Type: Improvement
  Components: Table API  SQL
Reporter: Ruidong Li
Assignee: Ruidong Li


support SQL like:  SELECT * FROM TABLE(tf("a"))



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8839) Table source factory discovery is broken in SQL Client

2018-03-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8839?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16386152#comment-16386152
 ] 

ASF GitHub Bot commented on FLINK-8839:
---

GitHub user twalthr opened a pull request:

https://github.com/apache/flink/pull/5640

[FLINK-8839] [sql-client] Fix table source factory discovery

## What is the purpose of the change

This PR fixes the table source factory discovery by adding dependencies to 
the classloader. It also implements an `ExecutionContext` that can be reused 
during the same session.


## Brief change log

- New `ExecutionContext` abstraction
- Possibility to pass a classloader to the Java service provider


## Verifying this change

- See `DependencyTest`

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
  - The S3 file system connector: no

## Documentation

  - Does this pull request introduce a new feature? no
  - If yes, how is the feature documented? JavaDocs


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/twalthr/flink FLINK-8839

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5640.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5640


commit 8c7b1427f94082dc023073125b32eceda556d8cd
Author: Timo Walther 
Date:   2018-03-05T12:46:41Z

[FLINK-8839] [sql-client] Fix table source factory discovery




> Table source factory discovery is broken in SQL Client
> --
>
> Key: FLINK-8839
> URL: https://issues.apache.org/jira/browse/FLINK-8839
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Table source factories cannot not be discovered if they were added using a 
> jar file.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5640: [FLINK-8839] [sql-client] Fix table source factory...

2018-03-05 Thread twalthr
GitHub user twalthr opened a pull request:

https://github.com/apache/flink/pull/5640

[FLINK-8839] [sql-client] Fix table source factory discovery

## What is the purpose of the change

This PR fixes the table source factory discovery by adding dependencies to 
the classloader. It also implements an `ExecutionContext` that can be reused 
during the same session.


## Brief change log

- New `ExecutionContext` abstraction
- Possibility to pass a classloader to the Java service provider


## Verifying this change

- See `DependencyTest`

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
  - The S3 file system connector: no

## Documentation

  - Does this pull request introduce a new feature? no
  - If yes, how is the feature documented? JavaDocs


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/twalthr/flink FLINK-8839

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5640.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5640


commit 8c7b1427f94082dc023073125b32eceda556d8cd
Author: Timo Walther 
Date:   2018-03-05T12:46:41Z

[FLINK-8839] [sql-client] Fix table source factory discovery




---


[jira] [Created] (FLINK-8867) Rocksdb checkpointing failing with fs.default-scheme: hdfs:// config

2018-03-05 Thread Shashank Agarwal (JIRA)
Shashank Agarwal created FLINK-8867:
---

 Summary: Rocksdb checkpointing failing with fs.default-scheme: 
hdfs:// config
 Key: FLINK-8867
 URL: https://issues.apache.org/jira/browse/FLINK-8867
 Project: Flink
  Issue Type: Bug
  Components: Configuration, State Backends, Checkpointing, YARN
Affects Versions: 1.4.1, 1.4.2
Reporter: Shashank Agarwal
 Fix For: 1.5.0, 1.4.3


In our setup, when we put an entry in our Flink_conf file for default schema.

{code}
fs.default-scheme: hdfs://mydomain.com:8020/flink
{code}

Than application with rocksdb state backend fails with the following exception. 
When we remove this config it works fine. It's working fine with other state 
backends.

{code}
AsynchronousException{java.lang.Exception: Could not materialize checkpoint 1 
for operator order ip stream (1/1).}
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:948)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Could not materialize checkpoint 1 for operator 
order ip stream (1/1).
... 6 more
Caused by: java.util.concurrent.ExecutionException: 
java.lang.IllegalStateException
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at 
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
... 5 more
Suppressed: java.lang.Exception: Could not properly cancel managed 
keyed state future.
at 
org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:91)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:976)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:939)
... 5 more
Caused by: java.util.concurrent.ExecutionException: 
java.lang.IllegalStateException
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at 
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
at 
org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:66)
at 
org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:89)
... 7 more
Caused by: java.lang.IllegalStateException
at 
org.apache.flink.util.Preconditions.checkState(Preconditions.java:179)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalSnapshotOperation.materializeSnapshot(RocksDBKeyedStateBackend.java:926)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$1.call(RocksDBKeyedStateBackend.java:389)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$1.call(RocksDBKeyedStateBackend.java:386)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
... 5 more
[CIRCULAR REFERENCE:java.lang.IllegalStateException]
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks

2018-03-05 Thread Timo Walther (JIRA)
Timo Walther created FLINK-8866:
---

 Summary: Create unified interfaces to configure and instatiate 
TableSinks
 Key: FLINK-8866
 URL: https://issues.apache.org/jira/browse/FLINK-8866
 Project: Flink
  Issue Type: New Feature
  Components: Table API  SQL
Reporter: Timo Walther
Assignee: Timo Walther


Similar to the efforts done in FLINK-8240. We need unified ways to configure 
and instantiate TableSinks. Among other applications, this is necessary in 
order to declare table sinks in an environment file of the SQL client. Such 
that the sink can be used for {{INSERT INTO}} statements.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8858) Add support for INSERT INTO in SQL Client

2018-03-05 Thread Timo Walther (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8858?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther updated FLINK-8858:

Issue Type: Sub-task  (was: New Feature)
Parent: FLINK-7594

> Add support for INSERT INTO in SQL Client
> -
>
> Key: FLINK-8858
> URL: https://issues.apache.org/jira/browse/FLINK-8858
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.6.0
>Reporter: Renjie Liu
>Assignee: Renjie Liu
>Priority: Major
>
> The current design of SQL Client embedded mode doesn't support long running 
> queries. It would be useful for simple jobs that can be expressed in a single 
> sql statement if we can submit sql statements stored in files as long running 
> queries. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7756) RocksDB state backend Checkpointing (Async and Incremental) is not working with CEP.

2018-03-05 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16386138#comment-16386138
 ] 

Aljoscha Krettek commented on FLINK-7756:
-

[~shashank734] Yes, because that should also fail with a better error message.

> RocksDB state backend Checkpointing (Async and Incremental)  is not working 
> with CEP.
> -
>
> Key: FLINK-7756
> URL: https://issues.apache.org/jira/browse/FLINK-7756
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, State Backends, Checkpointing, Streaming
>Affects Versions: 1.4.0, 1.3.2
> Environment: Flink 1.3.2, Yarn, HDFS, RocksDB backend
>Reporter: Shashank Agarwal
>Assignee: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.5.0, 1.4.1, 1.4.2
>
> Attachments: jobmanager.log, jobmanager_without_cassandra.log, 
> taskmanager.log, taskmanager_without_cassandra.log
>
>
> When i try to use RocksDBStateBackend on my staging cluster (which is using 
> HDFS as file system) it crashes. But When i use FsStateBackend on staging 
> (which is using HDFS as file system) it is working fine.
> On local with local file system it's working fine in both cases.
> Please check attached logs. I have around 20-25 tasks in my app.
> {code:java}
> 2017-09-29 14:21:31,639 INFO  
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink  - No state 
> to restore for the BucketingSink (taskIdx=0).
> 2017-09-29 14:21:31,640 INFO  
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - 
> Initializing RocksDB keyed state backend from snapshot.
> 2017-09-29 14:21:32,020 INFO  
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink  - No state 
> to restore for the BucketingSink (taskIdx=1).
> 2017-09-29 14:21:32,022 INFO  
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - 
> Initializing RocksDB keyed state backend from snapshot.
> 2017-09-29 14:21:32,078 INFO  com.datastax.driver.core.NettyUtil  
>   - Found Netty's native epoll transport in the classpath, using 
> it
> 2017-09-29 14:21:34,177 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Attempting to fail task externally Co-Flat Map (1/2) 
> (b879f192c4e8aae6671cdafb3a24c00a).
> 2017-09-29 14:21:34,177 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Attempting to fail task externally Map (2/2) 
> (1ea5aef6ccc7031edc6b37da2912d90b).
> 2017-09-29 14:21:34,178 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Attempting to fail task externally Co-Flat Map (2/2) 
> (4bac8e764c67520d418a4c755be23d4d).
> 2017-09-29 14:21:34,178 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Co-Flat Map (1/2) (b879f192c4e8aae6671cdafb3a24c00a) switched 
> from RUNNING to FAILED.
> AsynchronousException{java.lang.Exception: Could not materialize checkpoint 2 
> for operator Co-Flat Map (1/2).}
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:970)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.Exception: Could not materialize checkpoint 2 for 
> operator Co-Flat Map (1/2).
>   ... 6 more
> Caused by: java.util.concurrent.ExecutionException: 
> java.lang.IllegalStateException
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897)
>   ... 5 more
>   Suppressed: java.lang.Exception: Could not properly cancel managed 
> keyed state future.
>   at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1023)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:961)
>   ... 5 more
>   Caused by: java.util.concurrent.ExecutionException: 
> java.lang.IllegalStateException
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at 

[jira] [Updated] (FLINK-8858) Add support for INSERT INTO in SQL Client

2018-03-05 Thread Timo Walther (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8858?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther updated FLINK-8858:

Summary: Add support for INSERT INTO in SQL Client  (was: SQL Client to 
submit long running query in file)

> Add support for INSERT INTO in SQL Client
> -
>
> Key: FLINK-8858
> URL: https://issues.apache.org/jira/browse/FLINK-8858
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Affects Versions: 1.6.0
>Reporter: Renjie Liu
>Assignee: Renjie Liu
>Priority: Major
>
> The current design of SQL Client embedded mode doesn't support long running 
> queries. It would be useful for simple jobs that can be expressed in a single 
> sql statement if we can submit sql statements stored in files as long running 
> queries. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8860) SlotManager spamming log files

2018-03-05 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8860?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-8860:

Priority: Blocker  (was: Critical)

> SlotManager spamming log files
> --
>
> Key: FLINK-8860
> URL: https://issues.apache.org/jira/browse/FLINK-8860
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager, ResourceManager
>Affects Versions: 1.5.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> {{SlotManager}} is spamming the log files a lot with
> {code}
> 2018-03-05 10:45:12,393 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Received 
> slot report from instance b16c4e516995d1e672c0933bb380770c.
> 2018-03-05 10:45:12,393 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Received 
> slot report from instance de58fbf1c069620a4275c8b529deb20b.
> 2018-03-05 10:45:12,393 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Received 
> slot report from instance 86ab5a7e1d57bb2883fc0d1f2aebb304.
> 2018-03-05 10:45:12,393 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Received 
> slot report from instance 90f9ab2bd433db41b3ab567fd246fb3c.
> 2018-03-05 10:45:12,393 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Received 
> slot report from instance ec99fcc5a801272402af9afe08a1001d.
> 2018-03-05 10:45:12,394 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Received 
> slot report from instance 4c1c4b5ce52195dc90196c10c26d9ef8.
> 2018-03-05 10:45:12,394 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Received 
> slot report from instance 2541d0f1398fc307aaf86bf7750535f1.
> 2018-03-05 10:45:12,394 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Received 
> slot report from instance 91d94a9fdfce3cbcef32ac1c6e7b3fbf.
> 2018-03-05 10:45:22,392 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Received 
> slot report from instance 91d94a9fdfce3cbcef32ac1c6e7b3fbf.
> 2018-03-05 10:45:22,394 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Received 
> slot report from instance 90f9ab2bd433db41b3ab567fd246fb3c.
> {code}
> This message is printed once per {{TaskManager}} heartbeat.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8865) Add CLI query code completion in SQL Client

2018-03-05 Thread Timo Walther (JIRA)
Timo Walther created FLINK-8865:
---

 Summary: Add CLI query code completion in SQL Client
 Key: FLINK-8865
 URL: https://issues.apache.org/jira/browse/FLINK-8865
 Project: Flink
  Issue Type: Sub-task
  Components: Table API  SQL
Reporter: Timo Walther


This issue is a subtask of part two "Full Embedded SQL Client" of the 
implementation plan mentioned in 
[FLIP-24|https://cwiki.apache.org/confluence/display/FLINK/FLIP-24+-+SQL+Client].

Calcite already offers a code completion functionality. It would be great if we 
could expose this feature also through the SQL CLI Client.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8864) Add CLI query history in SQL Client

2018-03-05 Thread Timo Walther (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8864?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther updated FLINK-8864:

Summary: Add CLI query history in SQL Client  (was: Add CLI query history)

> Add CLI query history in SQL Client
> ---
>
> Key: FLINK-8864
> URL: https://issues.apache.org/jira/browse/FLINK-8864
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Priority: Major
>
> This issue is a subtask of part two "Full Embedded SQL Client" of the 
> implementation plan mentioned in 
> [FLIP-24|https://cwiki.apache.org/confluence/display/FLINK/FLIP-24+-+SQL+Client].
> It would be great to have the possibility of persisting the CLI's query 
> history. Such that queries can be reused when the CLI Client is started 
> again. Also a search feature as it is offered by terminals would be good.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8862) Support HBase snapshot read

2018-03-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8862?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16386135#comment-16386135
 ] 

ASF GitHub Bot commented on FLINK-8862:
---

GitHub user neoremind opened a pull request:

https://github.com/apache/flink/pull/5639

[FLINK-8862] [HBase] Support HBase snapshot read

## What is the purpose of the change

*Flink-hbase connector only supports reading/scanning HBase over region 
server scanner, there is also 
[snapshot](http://hbase.apache.org/book.html#ops.snapshots) scanning solution, 
just like Hadoop provides 2 ways to scan HBase, one is 
[TableInputFormat](https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/mapreduce/TableInputFormat.html),
 the other is 
[TableSnapshotInputFormat](https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.html),
 so it would be great if flink supports both solutions to ensure more wider 
usage scope and provide alternatives for users.*


## Brief change log

  - *Create `TableInputSplitStrategy` interface and its implementations as 
abstraction logic for `AbstractTableInputFormat`*
  - *Update `HBaseRowInputFormat` and `TableInputFormat`*
  - *Add `HBaseSnapshotRowInputFormat` and `TableSnapshotInputFormat`*
  - *Extract 2 interfaces including `HBaseTableScannerAware` and 
`ResultToTupleMapper`*
  - *Add `HBaseSnapshotReadExample`*


## Verifying this change

This change is already covered by existing tests as follows, and new test 
cases has been added as well.

`org.apache.flink.addons.hbase.HBaseConnectorITCase`

This change added tests and can be verified as follows:

  - *Manually create one snapshot for a specific HBase table, and use 
TableSnapshotInputFormat to do full scan.*
  - *Running existing HBaseReadExample to do full scan.*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (**yes** / no)
  - If yes, how is the feature documented? (not applicable / **docs** / 
**JavaDocs** / not documented)
  - For document, please visit [JIRA 
ticket](https://issues.apache.org/jira/projects/FLINK/issues/FLINK-8862?filter=allopenissues),
 a detailed design doc and class diagram have been attached.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/neoremind/flink snapshot

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5639.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5639


commit 0b36b434f987a971b6463ce3441c483380cfa9dd
Author: neoremind 
Date:   2018-03-05T14:14:09Z

Support HBase snapshot read




> Support HBase snapshot read
> ---
>
> Key: FLINK-8862
> URL: https://issues.apache.org/jira/browse/FLINK-8862
> Project: Flink
>  Issue Type: Improvement
>  Components: Batch Connectors and Input/Output Formats
>Affects Versions: 1.2.0
>Reporter: Xu Zhang
>Priority: Major
> Attachments: FLINK-8862-Design-Class-Diagram.png, 
> FLINK-8862-DesignDoc.pdf
>
>
> Flink-hbase connector only supports reading/scanning HBase over region server 
> scanner, there is also snapshot scanning solution, just like Hadoop provides 
> 2 ways to scan HBase, one is TableInputFormat, the other is 
> TableSnapshotInputFormat, so it would be great if flink supports both 
> solutions to ensure more wider usage scope and provide alternatives for users.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5639: [FLINK-8862] [HBase] Support HBase snapshot read

2018-03-05 Thread neoremind
GitHub user neoremind opened a pull request:

https://github.com/apache/flink/pull/5639

[FLINK-8862] [HBase] Support HBase snapshot read

## What is the purpose of the change

*Flink-hbase connector only supports reading/scanning HBase over region 
server scanner, there is also 
[snapshot](http://hbase.apache.org/book.html#ops.snapshots) scanning solution, 
just like Hadoop provides 2 ways to scan HBase, one is 
[TableInputFormat](https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/mapreduce/TableInputFormat.html),
 the other is 
[TableSnapshotInputFormat](https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.html),
 so it would be great if flink supports both solutions to ensure more wider 
usage scope and provide alternatives for users.*


## Brief change log

  - *Create `TableInputSplitStrategy` interface and its implementations as 
abstraction logic for `AbstractTableInputFormat`*
  - *Update `HBaseRowInputFormat` and `TableInputFormat`*
  - *Add `HBaseSnapshotRowInputFormat` and `TableSnapshotInputFormat`*
  - *Extract 2 interfaces including `HBaseTableScannerAware` and 
`ResultToTupleMapper`*
  - *Add `HBaseSnapshotReadExample`*


## Verifying this change

This change is already covered by existing tests as follows, and new test 
cases has been added as well.

`org.apache.flink.addons.hbase.HBaseConnectorITCase`

This change added tests and can be verified as follows:

  - *Manually create one snapshot for a specific HBase table, and use 
TableSnapshotInputFormat to do full scan.*
  - *Running existing HBaseReadExample to do full scan.*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (**yes** / no)
  - If yes, how is the feature documented? (not applicable / **docs** / 
**JavaDocs** / not documented)
  - For document, please visit [JIRA 
ticket](https://issues.apache.org/jira/projects/FLINK/issues/FLINK-8862?filter=allopenissues),
 a detailed design doc and class diagram have been attached.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/neoremind/flink snapshot

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5639.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5639


commit 0b36b434f987a971b6463ce3441c483380cfa9dd
Author: neoremind 
Date:   2018-03-05T14:14:09Z

Support HBase snapshot read




---


[jira] [Created] (FLINK-8864) Add CLI query history

2018-03-05 Thread Timo Walther (JIRA)
Timo Walther created FLINK-8864:
---

 Summary: Add CLI query history
 Key: FLINK-8864
 URL: https://issues.apache.org/jira/browse/FLINK-8864
 Project: Flink
  Issue Type: Sub-task
  Components: Table API  SQL
Reporter: Timo Walther


This issue is a subtask of part two "Full Embedded SQL Client" of the 
implementation plan mentioned in 
[FLIP-24|https://cwiki.apache.org/confluence/display/FLINK/FLIP-24+-+SQL+Client].

It would be great to have the possibility of persisting the CLI's query 
history. Such that queries can be reused when the CLI Client is started again. 
Also a search feature as it is offered by terminals would be good.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8863) Add user-defined function support in SQL Client

2018-03-05 Thread Timo Walther (JIRA)
Timo Walther created FLINK-8863:
---

 Summary: Add user-defined function support in SQL Client
 Key: FLINK-8863
 URL: https://issues.apache.org/jira/browse/FLINK-8863
 Project: Flink
  Issue Type: Sub-task
  Components: Table API  SQL
Reporter: Timo Walther


This issue is a subtask of part two "Full Embedded SQL Client" of the 
implementation plan mentioned in 
[FLIP-24|https://cwiki.apache.org/confluence/display/FLINK/FLIP-24+-+SQL+Client].

It should be possible to declare user-defined functions in the SQL client. For 
now, we limit the registration to classes that implement {{ScalarFunction}}, 
{{TableFunction}}, {{AggregateFunction}}. Functions that are implemented in SQL 
are not part of this issue.

I would suggest to introduce a {{functions}} top-level property. The 
declaration could look similar to:

{code}
functions:
  - name: testFunction
from: class   <-- optional, default: class
class: org.my.MyScalarFunction
constructor:  <-- optional, needed for 
certain types of functions
  - 42.0
  - class: org.my.Class  <-- possibility to create objects 
via properties
constructor: 
  - 1
  - true
  - false
  - "whatever"
  - type: INT
value: 1
{code}





--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8862) Support HBase snapshot read

2018-03-05 Thread Xu Zhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8862?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xu Zhang updated FLINK-8862:

Attachment: FLINK-8862-Design-Class-Diagram.png

> Support HBase snapshot read
> ---
>
> Key: FLINK-8862
> URL: https://issues.apache.org/jira/browse/FLINK-8862
> Project: Flink
>  Issue Type: Improvement
>  Components: Batch Connectors and Input/Output Formats
>Affects Versions: 1.2.0
>Reporter: Xu Zhang
>Priority: Major
> Attachments: FLINK-8862-Design-Class-Diagram.png, 
> FLINK-8862-DesignDoc.pdf
>
>
> Flink-hbase connector only supports reading/scanning HBase over region server 
> scanner, there is also snapshot scanning solution, just like Hadoop provides 
> 2 ways to scan HBase, one is TableInputFormat, the other is 
> TableSnapshotInputFormat, so it would be great if flink supports both 
> solutions to ensure more wider usage scope and provide alternatives for users.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8517) StaticlyNestedIterationsITCase.testJobWithoutObjectReuse unstable on Travis

2018-03-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8517?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16386129#comment-16386129
 ] 

ASF GitHub Bot commented on FLINK-8517:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5621


> StaticlyNestedIterationsITCase.testJobWithoutObjectReuse unstable on Travis
> ---
>
> Key: FLINK-8517
> URL: https://issues.apache.org/jira/browse/FLINK-8517
> Project: Flink
>  Issue Type: Bug
>  Components: DataSet API, TaskManager, Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.5.0
>
>
> The {{StaticlyNestedIterationsITCase.testJobWithoutObjectReuse}} test case 
> fails on Travis. This exception might be relevant:
> {code:java}
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:891)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:834)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:834)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.IllegalStateException: Partition 
> 557b069f2b89f8ba599e6ab0956a3f5a@58f1a6b7d8ae10b9141f17c08d06cecb not 
> registered at task event dispatcher.
>   at 
> org.apache.flink.runtime.io.network.TaskEventDispatcher.subscribeToEvent(TaskEventDispatcher.java:107)
>   at 
> org.apache.flink.runtime.iterative.task.IterationHeadTask.initSuperstepBarrier(IterationHeadTask.java:242)
>   at 
> org.apache.flink.runtime.iterative.task.IterationHeadTask.run(IterationHeadTask.java:266)
>   at 
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>   at java.lang.Thread.run(Thread.java:748){code}
>  
> https://api.travis-ci.org/v3/job/60156/log.txt



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5621: [FLINK-8517] fix missing synchronization in TaskEv...

2018-03-05 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5621


---


[jira] [Updated] (FLINK-8862) Support HBase snapshot read

2018-03-05 Thread Xu Zhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8862?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xu Zhang updated FLINK-8862:

Attachment: FLINK-8862-DesignDoc.pdf

> Support HBase snapshot read
> ---
>
> Key: FLINK-8862
> URL: https://issues.apache.org/jira/browse/FLINK-8862
> Project: Flink
>  Issue Type: Improvement
>  Components: Batch Connectors and Input/Output Formats
>Affects Versions: 1.2.0
>Reporter: Xu Zhang
>Priority: Major
> Attachments: FLINK-8862-DesignDoc.pdf
>
>
> Flink-hbase connector only supports reading/scanning HBase over region server 
> scanner, there is also snapshot scanning solution, just like Hadoop provides 
> 2 ways to scan HBase, one is TableInputFormat, the other is 
> TableSnapshotInputFormat, so it would be great if flink supports both 
> solutions to ensure more wider usage scope and provide alternatives for users.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8862) Support HBase snapshot read

2018-03-05 Thread Xu Zhang (JIRA)
Xu Zhang created FLINK-8862:
---

 Summary: Support HBase snapshot read
 Key: FLINK-8862
 URL: https://issues.apache.org/jira/browse/FLINK-8862
 Project: Flink
  Issue Type: Improvement
  Components: Batch Connectors and Input/Output Formats
Affects Versions: 1.2.0
Reporter: Xu Zhang


Flink-hbase connector only supports reading/scanning HBase over region server 
scanner, there is also snapshot scanning solution, just like Hadoop provides 2 
ways to scan HBase, one is TableInputFormat, the other is 
TableSnapshotInputFormat, so it would be great if flink supports both solutions 
to ensure more wider usage scope and provide alternatives for users.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8861) Add support for batch queries in SQL Client

2018-03-05 Thread Timo Walther (JIRA)
Timo Walther created FLINK-8861:
---

 Summary: Add support for batch queries in SQL Client
 Key: FLINK-8861
 URL: https://issues.apache.org/jira/browse/FLINK-8861
 Project: Flink
  Issue Type: Sub-task
  Components: Table API  SQL
Reporter: Timo Walther


This issue is a subtask of part two "Full Embedded SQL Client" of the 
implementation plan mentioned in 
[FLIP-24|https://cwiki.apache.org/confluence/display/FLINK/FLIP-24+-+SQL+Client].

Similar to streaming queries, it should be possible to execute batch queries in 
the SQL Client and collect the results using {{DataSet.collect()}} for 
debugging purposes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8853) SQL Client cannot emit query results that contain a rowtime attribute

2018-03-05 Thread Timo Walther (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8853?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther updated FLINK-8853:

Issue Type: Sub-task  (was: Bug)
Parent: FLINK-7594

> SQL Client cannot emit query results that contain a rowtime attribute
> -
>
> Key: FLINK-8853
> URL: https://issues.apache.org/jira/browse/FLINK-8853
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Fabian Hueske
>Assignee: Timo Walther
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Emitting a query result that contains a rowtime attribute fails with the 
> following exception:
> {code:java}
> Caused by: java.lang.ClassCastException: java.sql.Timestamp cannot be cast to 
> java.lang.Long
>     at 
> org.apache.flink.api.common.typeutils.base.LongSerializer.serialize(LongSerializer.java:27)
>     at 
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.serialize(RowSerializer.java:160)
>     at 
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.serialize(RowSerializer.java:46)
>     at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:125)
>     at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30)
>     at 
> org.apache.flink.streaming.experimental.CollectSink.invoke(CollectSink.java:66)
>     ... 44 more{code}
> The problem is cause by the {{ResultStore}} which configures the 
> {{CollectionSink}} with the field types obtained from the {{TableSchema}}. 
> The type of the rowtime field is a {{TimeIndicatorType}} which is serialized 
> as Long. However, in the query result it is represented as Timestamp. Hence, 
> the type must be replaced by a {{SqlTimeTypeInfo}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8852) SQL Client does not work with new FLIP-6 mode

2018-03-05 Thread Timo Walther (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8852?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther updated FLINK-8852:

Issue Type: Sub-task  (was: Bug)
Parent: FLINK-7594

> SQL Client does not work with new FLIP-6 mode
> -
>
> Key: FLINK-8852
> URL: https://issues.apache.org/jira/browse/FLINK-8852
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Fabian Hueske
>Assignee: Timo Walther
>Priority: Blocker
> Fix For: 1.5.0
>
>
> The SQL client does not submit queries to local Flink cluster that runs in 
> FLIP-6 mode. It doesn't throw an exception either.
> Job submission works if the legacy Flink cluster mode is used (`mode: old`)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8850) SQL Client does not support Event-time

2018-03-05 Thread Timo Walther (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8850?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther updated FLINK-8850:

Issue Type: Sub-task  (was: Bug)
Parent: FLINK-7594

> SQL Client does not support Event-time
> --
>
> Key: FLINK-8850
> URL: https://issues.apache.org/jira/browse/FLINK-8850
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Fabian Hueske
>Assignee: Timo Walther
>Priority: Blocker
> Fix For: 1.5.0
>
>
> The SQL client fails with an exception if a table includes a rowtime 
> attribute.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8686) Improve basic embedded SQL client

2018-03-05 Thread Timo Walther (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8686?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther updated FLINK-8686:

Issue Type: Sub-task  (was: Improvement)
Parent: FLINK-7594

> Improve basic embedded SQL client 
> --
>
> Key: FLINK-8686
> URL: https://issues.apache.org/jira/browse/FLINK-8686
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Blocker
> Fix For: 1.5.0
>
>
> This issue describes follow-up issues that should be fixes in order to make 
> the SQL client more stable:
>  - Add more tests for executor
>  - Configure JVM heap size
>  - Limit changelog and table buffers
>  - "The input is invalid please check it again." => add allowed range
>  - Load dependencies recursively
>  - Cache table & environments in executor
>  - Clean up results in result store
>  - Improve error message for unsupported batch queries
>  - Add more logging instead swallowing exceptions
>  - List properties in error message about missing TS factory sorted by name
>  - Add command to show loaded TS factories and their required propeties
>  - Add command to reload configuration from files (no need to restart client)
>  - Improve error message in case of invalid json-schema (right now: 
> {{java.lang.IllegalArgumentException: No type could be found in node:}}
>  - Add switch to show full stacktraces of exceptions
>  - Give error message when setting unknown parameters 
> {{result-mode=changelog}} does not give an error but should be 
> {{execution.result-mode=changelog}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8274) Fix Java 64K method compiling limitation for CommonCalc

2018-03-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8274?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16386109#comment-16386109
 ] 

ASF GitHub Bot commented on FLINK-8274:
---

Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/5613
  
@Xpray and @hequn8128 are you fine with merging this PR for now and then 
open follow-up issues for more splitting (unboxing, expression, class)?


> Fix Java 64K method compiling limitation for CommonCalc
> ---
>
> Key: FLINK-8274
> URL: https://issues.apache.org/jira/browse/FLINK-8274
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>Priority: Critical
>
> For complex SQL Queries, the generated code for {code}DataStreamCalc{code}, 
> {code}DataSetCalc{code} may exceed Java's method length limitation 64kb.
>  
> This issue will split long method to several sub method calls.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5613: [FLINK-8274] [table] Split generated methods for preventi...

2018-03-05 Thread twalthr
Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/5613
  
@Xpray and @hequn8128 are you fine with merging this PR for now and then 
open follow-up issues for more splitting (unboxing, expression, class)?


---


[jira] [Commented] (FLINK-6924) ADD LOG(X) supported in TableAPI

2018-03-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6924?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16386093#comment-16386093
 ] 

ASF GitHub Bot commented on FLINK-6924:
---

GitHub user buptljy opened a pull request:

https://github.com/apache/flink/pull/5638

[FLINK-6924][table]ADD LOG(X) supported in TableAPI

## What is the purpose of the change
  * Add LOG(X) function in TableAPI.
## Brief change log
 * Add LOG(X) function in TableAPI.
 * Modify LOG(X) unit tests from "testSqlApi" to "testAllApis".
## Verifying this change
 * This can be tested by unit testing.
## Does this pull request potentially affect one of the following parts:
 * No
## Documentation
 * 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/buptljy/flink log

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5638.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5638


commit 376cee8a6bc25afdf8df50b65c3fa8f35a5e4b7c
Author: Liao Jiayi 
Date:   2018-03-04T09:15:10Z

add log table function

commit 9ab7a07c0b614cb7af0fee7e69f6d58bf5004b28
Author: Liao Jiayi 
Date:   2018-03-04T09:15:23Z

Merge branch 'master' of github.com:apache/flink into log




> ADD LOG(X) supported in TableAPI
> 
>
> Key: FLINK-6924
> URL: https://issues.apache.org/jira/browse/FLINK-6924
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.4.0
>Reporter: sunjincheng
>Assignee: zjuwangg
>Priority: Major
>  Labels: starter
>
> See FLINK-6891 for detail.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5638: [FLINK-6924][table]ADD LOG(X) supported in TableAP...

2018-03-05 Thread buptljy
GitHub user buptljy opened a pull request:

https://github.com/apache/flink/pull/5638

[FLINK-6924][table]ADD LOG(X) supported in TableAPI

## What is the purpose of the change
  * Add LOG(X) function in TableAPI.
## Brief change log
 * Add LOG(X) function in TableAPI.
 * Modify LOG(X) unit tests from "testSqlApi" to "testAllApis".
## Verifying this change
 * This can be tested by unit testing.
## Does this pull request potentially affect one of the following parts:
 * No
## Documentation
 * 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/buptljy/flink log

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5638.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5638


commit 376cee8a6bc25afdf8df50b65c3fa8f35a5e4b7c
Author: Liao Jiayi 
Date:   2018-03-04T09:15:10Z

add log table function

commit 9ab7a07c0b614cb7af0fee7e69f6d58bf5004b28
Author: Liao Jiayi 
Date:   2018-03-04T09:15:23Z

Merge branch 'master' of github.com:apache/flink into log




---


  1   2   >