[jira] [Updated] (FLINK-6653) Avoid directly serializing AWS's Shard class in Kinesis consumer's checkpoints

2017-05-21 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6653?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-6653:
---
Description: 
Currently, the Kinesis consumer's checkpoints directly serialize AWS's 
{{Shard}} instances in checkpoints. This makes bumping AWS library versions 
hard, since any change to the {{Shard}} class by AWS will break checkpoint 
compatibility.

We should either have custom serialization for {{KinesisStreamShard}}, or 
disintegrate the information in {{Shard}}. Ideally, it would be best to make 
{{KinesisStreamShard}} and {{SequenceNumber}} to be non-serializable, hence 
avoiding Java serialization in the checkpoints.

  was:
Currently, the Kinesis consumer's checkpoints directly serialize AWS's `Shard` 
instances in checkpoints. This makes bumping AWS library versions hard, since 
any change to the `Shard` class by AWS will break checkpoint compatibility.

We should either have custom serialization for `KinesisStreamShard`, or 
disintegrate the information in `Shard`. Ideally, it would be best to make 
`KinesisStreamShard` and `SequenceNumber` to be non-serializable, hence 
avoiding Java serialization in the checkpoints.


> Avoid directly serializing AWS's Shard class in Kinesis consumer's checkpoints
> --
>
> Key: FLINK-6653
> URL: https://issues.apache.org/jira/browse/FLINK-6653
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Reporter: Tzu-Li (Gordon) Tai
>
> Currently, the Kinesis consumer's checkpoints directly serialize AWS's 
> {{Shard}} instances in checkpoints. This makes bumping AWS library versions 
> hard, since any change to the {{Shard}} class by AWS will break checkpoint 
> compatibility.
> We should either have custom serialization for {{KinesisStreamShard}}, or 
> disintegrate the information in {{Shard}}. Ideally, it would be best to make 
> {{KinesisStreamShard}} and {{SequenceNumber}} to be non-serializable, hence 
> avoiding Java serialization in the checkpoints.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6653) Avoid directly serializing AWS's Shard class in Kinesis consumer's checkpoints

2017-05-21 Thread Tzu-Li (Gordon) Tai (JIRA)
Tzu-Li (Gordon) Tai created FLINK-6653:
--

 Summary: Avoid directly serializing AWS's Shard class in Kinesis 
consumer's checkpoints
 Key: FLINK-6653
 URL: https://issues.apache.org/jira/browse/FLINK-6653
 Project: Flink
  Issue Type: Improvement
  Components: Kinesis Connector
Reporter: Tzu-Li (Gordon) Tai


Currently, the Kinesis consumer's checkpoints directly serialize AWS's `Shard` 
instances in checkpoints. This makes bumping AWS library versions hard, since 
any change to the `Shard` class by AWS will break checkpoint compatibility.

We should either have custom serialization for `KinesisStreamShard`, or 
disintegrate the information in `Shard`. Ideally, it would be best to make 
`KinesisStreamShard` and `SequenceNumber` to be non-serializable, hence 
avoiding Java serialization in the checkpoints.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5898) Race-Condition with Amazon Kinesis KPL

2017-05-21 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5898?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16019140#comment-16019140
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-5898:


Let me think a bit about how to proceed with this.. Will keep you updated.

> Race-Condition with Amazon Kinesis KPL
> --
>
> Key: FLINK-5898
> URL: https://issues.apache.org/jira/browse/FLINK-5898
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.2.0
>Reporter: Scott Kidder
>
> The Flink Kinesis streaming-connector uses the Amazon Kinesis Producer 
> Library (KPL) to send messages to Kinesis streams. The KPL relies on a native 
> binary client to send messages to achieve better performance.
> When a Kinesis Producer is instantiated, the KPL will extract the native 
> binary to a sub-directory of `/tmp` (or whatever the platform-specific 
> temporary directory happens to be).
> The KPL tries to prevent multiple processes from extracting the binary at the 
> same time by wrapping the operation in a mutex. Unfortunately, this does not 
> prevent multiple Flink cores from trying to perform this operation at the 
> same time. If two or more processes attempt to do this at the same time, then 
> the native binary in /tmp will be corrupted.
> The authors of the KPL are aware of this possibility and suggest that users 
> of the KPL  not do that ... (sigh):
> https://github.com/awslabs/amazon-kinesis-producer/issues/55#issuecomment-251408897
> I encountered this in my production environment when bringing up a new Flink 
> task-manager with multiple cores and restoring from an earlier savepoint, 
> resulting in the instantiation of a KPL client on each core at roughly the 
> same time.
> A stack-trace follows:
> {noformat}
> java.lang.RuntimeException: Could not copy native binaries to temp directory 
> /tmp/amazon-kinesis-producer-native-binaries
>   at 
> com.amazonaws.services.kinesis.producer.KinesisProducer.extractBinaries(KinesisProducer.java:849)
>   at 
> com.amazonaws.services.kinesis.producer.KinesisProducer.(KinesisProducer.java:243)
>   at 
> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer.open(FlinkKinesisProducer.java:198)
>   at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:112)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.SecurityException: The contents of the binary 
> /tmp/amazon-kinesis-producer-native-binaries/kinesis_producer_e9a87c761db92a73eb74519a4468ee71def87eb2
>  is not what it's expected to be.
>   at 
> com.amazonaws.services.kinesis.producer.KinesisProducer.extractBinaries(KinesisProducer.java:822)
>   ... 8 more
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5898) Race-Condition with Amazon Kinesis KPL

2017-05-21 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5898?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16019139#comment-16019139
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-5898:


[~skidder] thanks a lot for testing that. The broken checkpoint makes sense.

In general I think we should try to avoid directly serializing these 
third-party classes in our checkpoints, to ease compatibility paths for 
savepoints.
I propose to wait for Flink 1.4 with State Migration to allow a smoother path 
for this, instead of yet another hardcoded migration path in the codebase 
(those have been starting to pile up and IMO really polluting the main code a 
bit).

> Race-Condition with Amazon Kinesis KPL
> --
>
> Key: FLINK-5898
> URL: https://issues.apache.org/jira/browse/FLINK-5898
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.2.0
>Reporter: Scott Kidder
>
> The Flink Kinesis streaming-connector uses the Amazon Kinesis Producer 
> Library (KPL) to send messages to Kinesis streams. The KPL relies on a native 
> binary client to send messages to achieve better performance.
> When a Kinesis Producer is instantiated, the KPL will extract the native 
> binary to a sub-directory of `/tmp` (or whatever the platform-specific 
> temporary directory happens to be).
> The KPL tries to prevent multiple processes from extracting the binary at the 
> same time by wrapping the operation in a mutex. Unfortunately, this does not 
> prevent multiple Flink cores from trying to perform this operation at the 
> same time. If two or more processes attempt to do this at the same time, then 
> the native binary in /tmp will be corrupted.
> The authors of the KPL are aware of this possibility and suggest that users 
> of the KPL  not do that ... (sigh):
> https://github.com/awslabs/amazon-kinesis-producer/issues/55#issuecomment-251408897
> I encountered this in my production environment when bringing up a new Flink 
> task-manager with multiple cores and restoring from an earlier savepoint, 
> resulting in the instantiation of a KPL client on each core at roughly the 
> same time.
> A stack-trace follows:
> {noformat}
> java.lang.RuntimeException: Could not copy native binaries to temp directory 
> /tmp/amazon-kinesis-producer-native-binaries
>   at 
> com.amazonaws.services.kinesis.producer.KinesisProducer.extractBinaries(KinesisProducer.java:849)
>   at 
> com.amazonaws.services.kinesis.producer.KinesisProducer.(KinesisProducer.java:243)
>   at 
> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer.open(FlinkKinesisProducer.java:198)
>   at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:112)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.SecurityException: The contents of the binary 
> /tmp/amazon-kinesis-producer-native-binaries/kinesis_producer_e9a87c761db92a73eb74519a4468ee71def87eb2
>  is not what it's expected to be.
>   at 
> com.amazonaws.services.kinesis.producer.KinesisProducer.extractBinaries(KinesisProducer.java:822)
>   ... 8 more
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6482) Add nested serializers into configuration snapshots of composite serializers

2017-05-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6482?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16019136#comment-16019136
 ] 

ASF GitHub Bot commented on FLINK-6482:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/3937
  
@rmetzger it is currently under review by @StefanRRichter. The conflicts 
are pretty trivial and can be resolved very quickly; I'm just leaving it as is 
because the review started already last week.
I'm hoping to merge it by the end of Monday (today).


> Add nested serializers into configuration snapshots of composite serializers
> 
>
> Key: FLINK-6482
> URL: https://issues.apache.org/jira/browse/FLINK-6482
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing, Type Serialization System
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.3.0
>
>
> Currently, the composite serializers' configuration snapshots only wrap the 
> config snapshots of nested serializers.
> We should also consider adding serialization of the nested serializers into 
> the config snapshot, so that in the case where only some nested serializer 
> cannot be loaded (class missing / implementation changed), we can also 
> provide a path for serializer upgrades.
> This applies for all composite serializers that have nested serializers.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3937: [FLINK-6482] [core] Add nested serializers to config snap...

2017-05-21 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/3937
  
@rmetzger it is currently under review by @StefanRRichter. The conflicts 
are pretty trivial and can be resolved very quickly; I'm just leaving it as is 
because the review started already last week.
I'm hoping to merge it by the end of Monday (today).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6495) Migrate Akka configuration options

2017-05-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6495?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16019102#comment-16019102
 ] 

ASF GitHub Bot commented on FLINK-6495:
---

Github user zjureel commented on the issue:

https://github.com/apache/flink/pull/3935
  
@zentol Thank you for your suggestions, and I have fixed the problems you 
mentioned :)


> Migrate Akka configuration options
> --
>
> Key: FLINK-6495
> URL: https://issues.apache.org/jira/browse/FLINK-6495
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, Network
>Reporter: Chesnay Schepler
>Assignee: Fang Yong
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3935: [FLINK-6495] Migrate Akka configuration options

2017-05-21 Thread zjureel
Github user zjureel commented on the issue:

https://github.com/apache/flink/pull/3935
  
@zentol Thank you for your suggestions, and I have fixed the problems you 
mentioned :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API

2017-05-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16019077#comment-16019077
 ] 

ASF GitHub Bot commented on FLINK-6232:
---

Github user rtudoran commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r117651897
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeInnerJoin.scala
 ---
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import java.util
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.functions.RichFilterFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * A CoProcessFunction to support stream join stream, currently just 
support inner-join
+  *
+  * @param leftStreamWindowSizethe left stream window size
+  * @param rightStreamWindowSizethe right stream window size
+  * @param element1Type  the input type of left stream
+  * @param element2Type  the input type of right stream
+  * @param filterFuncthe function of other non-equi condition include 
time condition
+  *
+  */
+class ProcTimeInnerJoin(
+  private val leftStreamWindowSize: Long,
+  private val rightStreamWindowSize: Long,
+  private val element1Type: TypeInformation[CRow],
+  private val element2Type: TypeInformation[CRow],
+  private val filterFunc: RichFilterFunction[Row])
+  extends CoProcessFunction[CRow, CRow, CRow] {
+
+  private var outputC: CRow = _
+  private var listToRemove: JList[Long] = _
+
+  /** state to hold left stream element **/
+  private var row1MapState: MapState[Long, JList[Row]] = _
+  /** state to hold right stream element **/
+  private var row2MapState: MapState[Long, JList[Row]] = _
+
+  /** state to record last timer of left stream, 0 means no timer **/
+  private var timerState1: ValueState[Long] = _
--- End diff --

I understand and it make sense to me. As it reduces the overhead i think it 
is ok. @fhueske - do you have other opinion? 


> Support proctime inner equi-join between two streams in the SQL API
> ---
>
> Key: FLINK-6232
> URL: https://issues.apache.org/jira/browse/FLINK-6232
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: hongyuhong
>Assignee: hongyuhong
>
> The goal of this issue is to add support for inner equi-join on proc time 
> streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT o.proctime, o.productId, o.orderId, s.proctime AS shipTime 
> FROM Orders AS o 
> JOIN Shipments AS s 
> ON o.orderId = s.orderId 
> AND o.proctime BETWEEN s.proctime AND s.proctime + INTERVAL '1' HOUR;
> {code}
> The following restrictions should initially apply:
> * The join hint only support inner join
> * The ON clause should include equi-join condition
> * The time-condition {{o.proctime BETWEEN s.proctime AND s.proctime + 
> INTERVAL '1' HOUR}} only can use proctime that is a system attribute, the 
> time condition only support bounded time range like {{o.proctime BETWEEN 
> s.proctime - INTERVAL '1' HOUR AND s.proctime + INTERVAL '1' HOUR}}, not 
> support unbounded like {{o.proctime > s.protime}},  and  should include both 
> two stream's proctime attribute, {{o.proctime between proctime() and 
> proctime() + 1}} should also not be supported.
> 

[GitHub] flink pull request #3715: [FLINK-6232][Table]Support proctime inner equi...

2017-05-21 Thread rtudoran
Github user rtudoran commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r117651897
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeInnerJoin.scala
 ---
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import java.util
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.functions.RichFilterFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * A CoProcessFunction to support stream join stream, currently just 
support inner-join
+  *
+  * @param leftStreamWindowSizethe left stream window size
+  * @param rightStreamWindowSizethe right stream window size
+  * @param element1Type  the input type of left stream
+  * @param element2Type  the input type of right stream
+  * @param filterFuncthe function of other non-equi condition include 
time condition
+  *
+  */
+class ProcTimeInnerJoin(
+  private val leftStreamWindowSize: Long,
+  private val rightStreamWindowSize: Long,
+  private val element1Type: TypeInformation[CRow],
+  private val element2Type: TypeInformation[CRow],
+  private val filterFunc: RichFilterFunction[Row])
+  extends CoProcessFunction[CRow, CRow, CRow] {
+
+  private var outputC: CRow = _
+  private var listToRemove: JList[Long] = _
+
+  /** state to hold left stream element **/
+  private var row1MapState: MapState[Long, JList[Row]] = _
+  /** state to hold right stream element **/
+  private var row2MapState: MapState[Long, JList[Row]] = _
+
+  /** state to record last timer of left stream, 0 means no timer **/
+  private var timerState1: ValueState[Long] = _
--- End diff --

I understand and it make sense to me. As it reduces the overhead i think it 
is ok. @fhueske - do you have other opinion? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6649) Improve Non-window group aggregate with configurable `earlyFire`.

2017-05-21 Thread sunjincheng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16019045#comment-16019045
 ] 

sunjincheng commented on FLINK-6649:


Yes,[~fhueske] I like it called `Update Rate`, `Update rate` as a time interval 
also a way in my mind. About the design we may also need think about Global 
configuration and Window configuration,So I'll propose a google doc here later, 
And hope your review the design. :)

Thanks,
SunJincheng

> Improve Non-window group aggregate with configurable `earlyFire`.
> -
>
> Key: FLINK-6649
> URL: https://issues.apache.org/jira/browse/FLINK-6649
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> Currently,  Non-windowed group aggregate is earlyFiring at count(1), that is 
> every row will emit a aggregate result. But some times user want config count 
> number (`early firing with count[N]`) , to reduce the downstream pressure. 
> This JIRA. will enable the config of e`earlyFiring` for  Non-windowed group 
> aggregate.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-6650) Fix Non-windowed group-aggregate error when using append-table mode.

2017-05-21 Thread sunjincheng (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6650?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sunjincheng updated FLINK-6650:
---
Description: 
When I test Non-windowed group-aggregate with {{stream.toTable(tEnv, 'a, 'b, 
'c).select('a.sum, weightAvgFun('a, 'b)).toAppendStream[Row].addSink(new 
StreamITCase.StringSink)}}, I got the error as follows:
{code}
org.apache.flink.table.api.TableException: Table is not an append-only table. 
Output needs to handle update and delete changes.

at 
org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:631)
at 
org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:607)
at 
org.apache.flink.table.api.scala.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:219)
at 
org.apache.flink.table.api.scala.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:195)
at 
org.apache.flink.table.api.scala.TableConversions.toAppendStream(TableConversions.scala:121)
{code}
The reason is {{DataStreamGroupAggregate#producesUpdates}} as follows:
{code}
override def producesUpdates = true
{code}

I think in the view of the user, what user want are(for example):
Data:
{code}
val data = List(
  (1L, 1, "Hello"),
  (2L, 2, "Hello"),
  (3L, 3, "Hello"),
  (4L, 4, "Hello"),
  (5L, 5, "Hello"),
  (6L, 6, "Hello"),
  (7L, 7, "Hello World"),
  (8L, 8, "Hello World"),
  (20L, 20, "Hello World"))
{code}

* Case1:
TableAPI
{code}
 stream.toTable(tEnv, 'a, 'b, 'c).select('a.sum)
 .toAppendStream[Row].addSink(new StreamITCase.StringSink)
{code}
Result
{code}
// StringSink process datas:
1
3
6
10
15
21
28
36
56
// Last output datas:
1
3
6
10
15
21
28
36
56
{code}

* Case 2:
TableAPI
{code}
stream.toTable(tEnv, 'a, 'b, 'c).select('a.sum).toRetractStream[Row]
.addSink(new StreamITCase.RetractingSink)
{code}
Result:
{code}
// RetractingSink process datas:
(true,1)
(false,1)
(true,3)
(false,3)
(true,6)
(false,6)
(true,10)
(false,10)
(true,15)
(false,15)
(true,21)
(false,21)
(true,28)
(false,28)
(true,36)
(false,36)
(true,56)
// Last output data:
56
{code}
In fact about #Case 1,we can using unbounded OVER windows, as follows:
TableAPI
{code}
stream.toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
.window(Over orderBy 'proctime preceding UNBOUNDED_ROW as 'w)
.select('a.sum over 'w)
.toAppendStream[Row].addSink(new StreamITCase.StringSink)
{code}
Result
{code}
Same as #Case1
{code}

But after the [FLINK-6649 | https://issues.apache.org/jira/browse/FLINK-6649] 
OVER can not express the #Case1 with earlyFiring.

So I still think that Non-windowed group-aggregate not always update-table, 
user can decide which mode to use.

Is there any drawback to this improvement? Welcome anyone feedback?

  was:
When I test Non-windowed group-aggregate with {{stream.toTable(tEnv, 'a, 'b, 
'c).select('a.sum, weightAvgFun('a, 'b)).toAppendStream[Row].addSink(new 
StreamITCase.StringSink)}}, I got the error as follows:
{code}
org.apache.flink.table.api.TableException: Table is not an append-only table. 
Output needs to handle update and delete changes.

at 
org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:631)
at 
org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:607)
at 
org.apache.flink.table.api.scala.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:219)
at 
org.apache.flink.table.api.scala.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:195)
at 
org.apache.flink.table.api.scala.TableConversions.toAppendStream(TableConversions.scala:121)
{code}
The reason is {{DataStreamGroupAggregate#producesUpdates}} as follows:
{code}
override def producesUpdates = true
{code}

I think in the view of the user, what user want are(for example):
Data:
{code}
val data = List(
  (1L, 1, "Hello"),
  (2L, 2, "Hello"),
  (3L, 3, "Hello"),
  (4L, 4, "Hello"),
  (5L, 5, "Hello"),
  (6L, 6, "Hello"),
  (7L, 7, "Hello World"),
  (8L, 8, "Hello World"),
  (20L, 20, "Hello World"))
{code}

* Case1:
TableAPI
{code}
 stream.toTable(tEnv, 'a, 'b, 'c).select('a.sum)
 .toAppendStream[Row].addSink(new StreamITCase.StringSink)
{code}
Result
{code}
StringSink process datas:
1
3
6
10
15
21
28
36
56
last output datas:
1
3
6
10
15
21
28
36
56
{code}

* Case 2:
TableAPI
{code}
stream.toTable(tEnv, 'a, 'b, 'c).select('a.sum).toRetractStream[Row]
.addSink(new StreamITCase.RetractingSink)
{code}
Result:
{code}
RetractingSink process datas:
(true,1)
(false,1)
(true,3)
(false,3)
(true,6)
(false,6)
(true,10)
(false,10)
(true,15)
(false,15)
(true,21)
(false,21)
(true,28)
(false,28)
(true,36)
(false,36)
(true,56)
// last output data:
56
{code}
In fact about #Case 1,we can using unbounded OVER windows, as follows:
TableAPI
{code}

[jira] [Updated] (FLINK-6650) Fix Non-windowed group-aggregate error when using append-table mode.

2017-05-21 Thread sunjincheng (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6650?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sunjincheng updated FLINK-6650:
---
Description: 
When I test Non-windowed group-aggregate with {{stream.toTable(tEnv, 'a, 'b, 
'c).select('a.sum, weightAvgFun('a, 'b)).toAppendStream[Row].addSink(new 
StreamITCase.StringSink)}}, I got the error as follows:
{code}
org.apache.flink.table.api.TableException: Table is not an append-only table. 
Output needs to handle update and delete changes.

at 
org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:631)
at 
org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:607)
at 
org.apache.flink.table.api.scala.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:219)
at 
org.apache.flink.table.api.scala.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:195)
at 
org.apache.flink.table.api.scala.TableConversions.toAppendStream(TableConversions.scala:121)
{code}
The reason is {{DataStreamGroupAggregate#producesUpdates}} as follows:
{code}
override def producesUpdates = true
{code}

I think in the view of the user, what user want are(for example):
Data:
{code}
val data = List(
  (1L, 1, "Hello"),
  (2L, 2, "Hello"),
  (3L, 3, "Hello"),
  (4L, 4, "Hello"),
  (5L, 5, "Hello"),
  (6L, 6, "Hello"),
  (7L, 7, "Hello World"),
  (8L, 8, "Hello World"),
  (20L, 20, "Hello World"))
{code}

* Case1:
TableAPI
{code}
 stream.toTable(tEnv, 'a, 'b, 'c).select('a.sum)
 .toAppendStream[Row].addSink(new StreamITCase.StringSink)
{code}
Result
{code}
StringSink process datas:
1
3
6
10
15
21
28
36
56
last output datas:
1
3
6
10
15
21
28
36
56
{code}

* Case 2:
TableAPI
{code}
stream.toTable(tEnv, 'a, 'b, 'c).select('a.sum).toRetractStream[Row]
.addSink(new StreamITCase.RetractingSink)
{code}
Result:
{code}
RetractingSink process datas:
(true,1)
(false,1)
(true,3)
(false,3)
(true,6)
(false,6)
(true,10)
(false,10)
(true,15)
(false,15)
(true,21)
(false,21)
(true,28)
(false,28)
(true,36)
(false,36)
(true,56)
// last output data:
56
{code}
In fact about #Case 1,we can using unbounded OVER windows, as follows:
TableAPI
{code}
stream.toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
.window(Over orderBy 'proctime preceding UNBOUNDED_ROW as 'w)
.select('a.sum over 'w)
.toAppendStream[Row].addSink(new StreamITCase.StringSink)
{code}
Result
{code}
Same as #Case1
{code}

But after the [FLINK-6649 | https://issues.apache.org/jira/browse/FLINK-6649] 
OVER can not express the #Case1 with earlyFiring.

So I still think that Non-windowed group-aggregate not always update-table, 
user can decide which mode to use.

Is there any drawback to this improvement? Welcome anyone feedback?

  was:
When I test Non-windowed group-aggregate with {{stream.toTable(tEnv, 'a, 'b, 
'c).select('a.sum, weightAvgFun('a, 'b)).toAppendStream[Row].addSink(new 
StreamITCase.StringSink)}}, I got the error as follows:
{code}
org.apache.flink.table.api.TableException: Table is not an append-only table. 
Output needs to handle update and delete changes.

at 
org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:631)
at 
org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:607)
at 
org.apache.flink.table.api.scala.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:219)
at 
org.apache.flink.table.api.scala.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:195)
at 
org.apache.flink.table.api.scala.TableConversions.toAppendStream(TableConversions.scala:121)
{code}
The reason is {{DataStreamGroupAggregate#producesUpdates}} as follows:
{code}
override def producesUpdates = true
{code}

I think in the view of the user, what user want are(for example):
Data:
{code}
val data = List(
  (1L, 1, "Hello"),
  (2L, 2, "Hello"),
  (3L, 3, "Hello"),
  (4L, 4, "Hello"),
  (5L, 5, "Hello"),
  (6L, 6, "Hello"),
  (7L, 7, "Hello World"),
  (8L, 8, "Hello World"),
  (20L, 20, "Hello World"))
{code}

* Case1:
TableAPI
{code}
 stream.toTable(tEnv, 'a, 'b, 'c).select('a.sum)
 .toAppendStream[Row].addSink(new StreamITCase.StringSink)
{code}
Result
{code}
StringSink process datas:
1
3
6
10
15
21
28
36
56
last output datas:
1
3
6
10
15
21
28
36
56
{code}

* Case 2:
TableAPI
{code}
stream.toTable(tEnv, 'a, 'b, 'c).select('a.sum).toRetractStream[Row]
.addSink(new StreamITCase.RetractingSink)
{code}
Result:

{code}
RetractingSink process datas:
(true,1)
(false,1)
(true,3)
(false,3)
(true,6)
(false,6)
(true,10)
(false,10)
(true,15)
(false,15)
(true,21)
(false,21)
(true,28)
(false,28)
(true,36)
(false,36)
(true,56)
// last output data:
56
{code}
In fact about #Case 1,we can using unbounded OVER windows, as follows:
TableAPI
{code}

[jira] [Updated] (FLINK-6650) Fix Non-windowed group-aggregate error when using append-table mode.

2017-05-21 Thread sunjincheng (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6650?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sunjincheng updated FLINK-6650:
---
Description: 
When I test Non-windowed group-aggregate with {{stream.toTable(tEnv, 'a, 'b, 
'c).select('a.sum, weightAvgFun('a, 'b)).toAppendStream[Row].addSink(new 
StreamITCase.StringSink)}}, I got the error as follows:
{code}
org.apache.flink.table.api.TableException: Table is not an append-only table. 
Output needs to handle update and delete changes.

at 
org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:631)
at 
org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:607)
at 
org.apache.flink.table.api.scala.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:219)
at 
org.apache.flink.table.api.scala.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:195)
at 
org.apache.flink.table.api.scala.TableConversions.toAppendStream(TableConversions.scala:121)
{code}
The reason is {{DataStreamGroupAggregate#producesUpdates}} as follows:
{code}
override def producesUpdates = true
{code}

I think in the view of the user, what user want are(for example):
Data:
{code}
val data = List(
  (1L, 1, "Hello"),
  (2L, 2, "Hello"),
  (3L, 3, "Hello"),
  (4L, 4, "Hello"),
  (5L, 5, "Hello"),
  (6L, 6, "Hello"),
  (7L, 7, "Hello World"),
  (8L, 8, "Hello World"),
  (20L, 20, "Hello World"))
{code}

* Case1:
TableAPI
{code}
 stream.toTable(tEnv, 'a, 'b, 'c).select('a.sum)
 .toAppendStream[Row].addSink(new StreamITCase.StringSink)
{code}
Result
{code}
StringSink process datas:
1
3
6
10
15
21
28
36
56
last output datas:
1
3
6
10
15
21
28
36
56
{code}

* Case 2:
TableAPI
{code}
stream.toTable(tEnv, 'a, 'b, 'c).select('a.sum).toRetractStream[Row]
.addSink(new StreamITCase.RetractingSink)
{code}
Result:

{code}
RetractingSink process datas:
(true,1)
(false,1)
(true,3)
(false,3)
(true,6)
(false,6)
(true,10)
(false,10)
(true,15)
(false,15)
(true,21)
(false,21)
(true,28)
(false,28)
(true,36)
(false,36)
(true,56)
// last output data:
56
{code}
In fact about #Case 1,we can using unbounded OVER windows, as follows:
TableAPI
{code}
stream.toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
.window(Over orderBy 'proctime preceding UNBOUNDED_ROW as 'w)
.select('a.sum over 'w)
.toAppendStream[Row].addSink(new StreamITCase.StringSink)
{code}
Result
{code}
Same as #Case1
{code}

But after the [FLINK-6649 | https://issues.apache.org/jira/browse/FLINK-6649] 
OVER can not express the #Case1 with earlyFiring.

So I still think that Non-windowed group-aggregate not always update-table, 
user can decide which mode to use.

Is there any drawback to this improvement? Welcome anyone feedback?

  was:
When I test Non-windowed group-aggregate with {{stream.toTable(tEnv, 'a, 'b, 
'c).select('a.sum, weightAvgFun('a, 'b)).toAppendStream[Row].addSink(new 
StreamITCase.StringSink)}}, I got the error as follows:
{code}
org.apache.flink.table.api.TableException: Table is not an append-only table. 
Output needs to handle update and delete changes.

at 
org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:631)
at 
org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:607)
at 
org.apache.flink.table.api.scala.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:219)
at 
org.apache.flink.table.api.scala.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:195)
at 
org.apache.flink.table.api.scala.TableConversions.toAppendStream(TableConversions.scala:121)
{code}
The reason is {{DataStreamGroupAggregate#producesUpdates}} as follows:
{code}
override def producesUpdates = true
{code}

I think in the view of the user, what user want are(for example):
Data:
{code}
val data = List(
  (1L, 1, "Hello"),
  (2L, 2, "Hello"),
  (3L, 3, "Hello"),
  (4L, 4, "Hello"),
  (5L, 5, "Hello"),
  (6L, 6, "Hello"),
  (7L, 7, "Hello World"),
  (8L, 8, "Hello World"),
  (20L, 20, "Hello World"))
{code}

* Case1:
TableAPI
{code}
 stream.toTable(tEnv, 'a, 'b, 'c).select('a.sum)
 .toAppendStream[Row].addSink(new StreamITCase.StringSink)
{code}
Result
{code}
1
3
6
10
15
21
28
36
56
{code}

* Case 2:
TableAPI
{code}
stream.toTable(tEnv, 'a, 'b, 'c).select('a.sum).toRetractStream[Row]
.addSink(new StreamITCase.RetractingSink)
{code}
Result:
{code}
56
{code}
In fact about #Case 1,we can using unbounded OVER windows, as follows:
TableAPI
{code}
stream.toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
.window(Over orderBy 'proctime preceding UNBOUNDED_ROW as 'w)
.select('a.sum over 'w)
.toAppendStream[Row].addSink(new StreamITCase.StringSink)
{code}
Result
{code}
Same as #Case1
{code}

But after the [FLINK-6649 | 

[jira] [Updated] (FLINK-6650) Fix Non-windowed group-aggregate error when using append-table mode.

2017-05-21 Thread sunjincheng (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6650?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sunjincheng updated FLINK-6650:
---
Description: 
When I test Non-windowed group-aggregate with {{stream.toTable(tEnv, 'a, 'b, 
'c).select('a.sum, weightAvgFun('a, 'b)).toAppendStream[Row].addSink(new 
StreamITCase.StringSink)}}, I got the error as follows:
{code}
org.apache.flink.table.api.TableException: Table is not an append-only table. 
Output needs to handle update and delete changes.

at 
org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:631)
at 
org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:607)
at 
org.apache.flink.table.api.scala.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:219)
at 
org.apache.flink.table.api.scala.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:195)
at 
org.apache.flink.table.api.scala.TableConversions.toAppendStream(TableConversions.scala:121)
{code}
The reason is {{DataStreamGroupAggregate#producesUpdates}} as follows:
{code}
override def producesUpdates = true
{code}

I think in the view of the user, what user want are(for example):
Data:
{code}
val data = List(
  (1L, 1, "Hello"),
  (2L, 2, "Hello"),
  (3L, 3, "Hello"),
  (4L, 4, "Hello"),
  (5L, 5, "Hello"),
  (6L, 6, "Hello"),
  (7L, 7, "Hello World"),
  (8L, 8, "Hello World"),
  (20L, 20, "Hello World"))
{code}

* Case1:
TableAPI
{code}
 stream.toTable(tEnv, 'a, 'b, 'c).select('a.sum)
 .toAppendStream[Row].addSink(new StreamITCase.StringSink)
{code}
Result
{code}
1
3
6
10
15
21
28
36
56
{code}

* Case 2:
TableAPI
{code}
stream.toTable(tEnv, 'a, 'b, 'c).select('a.sum).toRetractStream[Row]
.addSink(new StreamITCase.RetractingSink)
{code}
Result:
{code}
56
{code}
In fact about #Case 1,we can using unbounded OVER windows, as follows:
TableAPI
{code}
stream.toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
.window(Over orderBy 'proctime preceding UNBOUNDED_ROW as 'w)
.select('a.sum over 'w)
.toAppendStream[Row].addSink(new StreamITCase.StringSink)
{code}
Result
{code}
Same as #Case1
{code}

But after the [FLINK-6649 | https://issues.apache.org/jira/browse/FLINK-6649] 
OVER can not express the #Case1 with earlyFiring.

So I still think that Non-windowed group-aggregate not always update-table, 
user can decide which mode to use.

Is there any drawback to this improvement? Welcome anyone feedback?

  was:
When I test Non-windowed group-aggregate with {{stream.toTable(tEnv, 'a, 'b, 
'c).select('a.sum, weightAvgFun('a, 'b)).toAppendStream[Row].addSink(new 
StreamITCase.StringSink)}}, I got the error as follows:
{code}
org.apache.flink.table.api.TableException: Table is not an append-only table. 
Output needs to handle update and delete changes.

at 
org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:631)
at 
org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:607)
at 
org.apache.flink.table.api.scala.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:219)
at 
org.apache.flink.table.api.scala.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:195)
at 
org.apache.flink.table.api.scala.TableConversions.toAppendStream(TableConversions.scala:121)
{code}
The reason is {{DataStreamGroupAggregate#producesUpdates}} as follows:
{code}
override def producesUpdates = true
{code}

I think in the view of the user, what user want are(for example):
Data:
{code}
val data = List(
  (1L, 1, "Hello"),
  (2L, 2, "Hello"),
  (3L, 3, "Hello"),
  (4L, 4, "Hello"),
  (5L, 5, "Hello"),
  (6L, 6, "Hello"),
  (7L, 7, "Hello World"),
  (8L, 8, "Hello World"),
  (20L, 20, "Hello World"))
{code}

* Case1:
TableAPI
{code}
 stream.toTable(tEnv, 'a, 'b, 'c).select('a.sum).toRetractStream[Row]
.addSink(new StreamITCase.RetractingSink)
{code}
Result
{code}
1
3
6
10
15
21
28
36
56
{code}

* Case 2:
TableAPI
{code}
stream.toTable(tEnv, 'a, 'b, 'c).select('a.sum).toRetractStream[Row]
.addSink(new StreamITCase.RetractingSink)
{code}
Result:
{code}
56
{code}
In fact about #Case 1,we can using unbounded OVER windows, as follows:
TableAPI
{code}
stream.toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
.window(Over orderBy 'proctime preceding UNBOUNDED_ROW as 'w)
.select('a.sum over 'w)
.toAppendStream[Row].addSink(new StreamITCase.StringSink)
{code}
Result
{code}
Same as #Case1
{code}

But after the [FLINK-6649 | https://issues.apache.org/jira/browse/FLINK-6649] 
OVER can not express the #Case1 with earlyFiring.

So I still think that Non-windowed group-aggregate not always update-table, 
user can decide which mode to use.

Is there any drawback to this improvement? Welcome anyone feedback?


> Fix Non-windowed 

[GitHub] flink issue #3947: [FLINK-6610][web] Allow uploadDir to be null in WebFronte...

2017-05-21 Thread rmetzger
Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/3947
  
+1 to merge (tested the change locally)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6610) WebServer could not be created,when set the "jobmanager.web.submit.enable" to false

2017-05-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16018946#comment-16018946
 ] 

ASF GitHub Bot commented on FLINK-6610:
---

Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/3947
  
+1 to merge (tested the change locally)


> WebServer could not be created,when set the "jobmanager.web.submit.enable" to 
> false
> ---
>
> Key: FLINK-6610
> URL: https://issues.apache.org/jira/browse/FLINK-6610
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Affects Versions: 1.3.0
>Reporter: zhihao chen
>Assignee: Chesnay Schepler
>
> WebServer could not be created,when set the "jobmanager.web.submit.enable" to 
> false  
> because the WebFrontendBootstrap will check uploadDir not allow be null 
> this.uploadDir = Preconditions.checkNotNull(directory);
> {code}
> 2017-05-17 15:15:46,938 ERROR 
> org.apache.flink.runtime.webmonitor.WebMonitorUtils   - WebServer 
> could not be created
> java.lang.NullPointerException
>   at 
> org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58)
>   at 
> org.apache.flink.runtime.webmonitor.utils.WebFrontendBootstrap.(WebFrontendBootstrap.java:73)
>   at 
> org.apache.flink.runtime.webmonitor.WebRuntimeMonitor.(WebRuntimeMonitor.java:359)
>   at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>   at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>   at 
> org.apache.flink.runtime.webmonitor.WebMonitorUtils.startWebRuntimeMonitor(WebMonitorUtils.java:135)
>   at 
> org.apache.flink.runtime.clusterframework.BootstrapTools.createWebMonitorIfConfigured(BootstrapTools.java:242)
>   at 
> org.apache.flink.yarn.YarnApplicationMasterRunner.runApplicationMaster(YarnApplicationMasterRunner.java:352)
>   at 
> org.apache.flink.yarn.YarnApplicationMasterRunner$1.call(YarnApplicationMasterRunner.java:195)
>   at 
> org.apache.flink.yarn.YarnApplicationMasterRunner$1.call(YarnApplicationMasterRunner.java:192)
>   at 
> org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:422)
>   at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
>   at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
>   at 
> org.apache.flink.yarn.YarnApplicationMasterRunner.run(YarnApplicationMasterRunner.java:192)
>   at 
> org.apache.flink.yarn.YarnApplicationMasterRunner.main(YarnApplicationMasterRunner.java:116)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-6643) Flink restarts job in HA even if NoRestartStrategy is set

2017-05-21 Thread Robert Metzger (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6643?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger updated FLINK-6643:
--
Description: 
While testing Flink 1.3 RC1, I found that the JobManager is trying to recover a 
job that had the {{NoRestartStrategy}} set.

{code}
2017-05-19 15:09:04,038 INFO  org.apache.flink.yarn.YarnJobManager  
- Attempting to recover all jobs.
2017-05-19 15:09:04,039 DEBUG 
org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  - 
Retrieving all stored job ids from ZooKeeper under 
flink/application_149487096_0064/jobgraphs.
2017-05-19 15:09:04,041 INFO  org.apache.flink.yarn.YarnJobManager  
- There are 1 jobs to recover. Starting the job recovery.
2017-05-19 15:09:04,043 INFO  org.apache.flink.yarn.YarnJobManager  
- Attempting to recover job f94b1f7a0e9e3dbcb160c687e476ca77.
2017-05-19 15:09:04,043 DEBUG 
org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  - 
Recovering job graph f94b1f7a0e9e3dbcb160c687e476ca77 from 
flink/application_149487096_0064/jobgraphs/f94b1f7a0e9e3dbcb160c687e476ca77.
2017-05-19 15:09:04,078 WARN  org.apache.hadoop.util.NativeCodeLoader   
- Unable to load native-hadoop library for your platform... using 
builtin-java classes where applicable
2017-05-19 15:09:04,142 INFO  
org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  - 
Recovered SubmittedJobGraph(f94b1f7a0e9e3dbcb160c687e476ca77, JobInfo(clients: 
Set((Actor[akka.tcp://flink@permanent-qa-cluster-master.c.astral-sorter-757.internal:40391/user/$a#-155566858],EXECUTION_RESULT_AND_STATE_CHANGES)),
 start: 1495206476885)).
2017-05-19 15:09:04,142 INFO  org.apache.flink.yarn.YarnJobManager  
- Submitting recovered job f94b1f7a0e9e3dbcb160c687e476ca77.
2017-05-19 15:09:04,143 INFO  org.apache.flink.yarn.YarnJobManager  
- Submitting job f94b1f7a0e9e3dbcb160c687e476ca77 
(CarTopSpeedWindowingExample) (Recovery).
2017-05-19 15:09:04,151 INFO  org.apache.flink.yarn.YarnJobManager  
- Using restart strategy NoRestartStrategy for 
f94b1f7a0e9e3dbcb160c687e476ca77.
2017-05-19 15:09:04,163 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- Job recovers 
via failover strategy: full graph restart
{code}

  was:
While testing Flink 1.3 RC1, I found that the JobManager is trying to recover a 
job that had the {[NoRestartStrategy}} set.

{code}
2017-05-19 15:09:04,038 INFO  org.apache.flink.yarn.YarnJobManager  
- Attempting to recover all jobs.
2017-05-19 15:09:04,039 DEBUG 
org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  - 
Retrieving all stored job ids from ZooKeeper under 
flink/application_149487096_0064/jobgraphs.
2017-05-19 15:09:04,041 INFO  org.apache.flink.yarn.YarnJobManager  
- There are 1 jobs to recover. Starting the job recovery.
2017-05-19 15:09:04,043 INFO  org.apache.flink.yarn.YarnJobManager  
- Attempting to recover job f94b1f7a0e9e3dbcb160c687e476ca77.
2017-05-19 15:09:04,043 DEBUG 
org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  - 
Recovering job graph f94b1f7a0e9e3dbcb160c687e476ca77 from 
flink/application_149487096_0064/jobgraphs/f94b1f7a0e9e3dbcb160c687e476ca77.
2017-05-19 15:09:04,078 WARN  org.apache.hadoop.util.NativeCodeLoader   
- Unable to load native-hadoop library for your platform... using 
builtin-java classes where applicable
2017-05-19 15:09:04,142 INFO  
org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  - 
Recovered SubmittedJobGraph(f94b1f7a0e9e3dbcb160c687e476ca77, JobInfo(clients: 
Set((Actor[akka.tcp://flink@permanent-qa-cluster-master.c.astral-sorter-757.internal:40391/user/$a#-155566858],EXECUTION_RESULT_AND_STATE_CHANGES)),
 start: 1495206476885)).
2017-05-19 15:09:04,142 INFO  org.apache.flink.yarn.YarnJobManager  
- Submitting recovered job f94b1f7a0e9e3dbcb160c687e476ca77.
2017-05-19 15:09:04,143 INFO  org.apache.flink.yarn.YarnJobManager  
- Submitting job f94b1f7a0e9e3dbcb160c687e476ca77 
(CarTopSpeedWindowingExample) (Recovery).
2017-05-19 15:09:04,151 INFO  org.apache.flink.yarn.YarnJobManager  
- Using restart strategy NoRestartStrategy for 
f94b1f7a0e9e3dbcb160c687e476ca77.
2017-05-19 15:09:04,163 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- Job recovers 
via failover strategy: full graph restart
{code}


> Flink restarts job in HA even if NoRestartStrategy is set
> -
>
> Key: FLINK-6643
> URL: https://issues.apache.org/jira/browse/FLINK-6643
> Project: Flink
>  Issue Type: Bug
>  

[jira] [Updated] (FLINK-6643) Flink restarts job in HA even if NoRestartStrategy is set

2017-05-21 Thread Robert Metzger (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6643?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger updated FLINK-6643:
--
Description: 
While testing Flink 1.3 RC1, I found that the JobManager is trying to recover a 
job that had the {[NoRestartStrategy}} set.

{code}
2017-05-19 15:09:04,038 INFO  org.apache.flink.yarn.YarnJobManager  
- Attempting to recover all jobs.
2017-05-19 15:09:04,039 DEBUG 
org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  - 
Retrieving all stored job ids from ZooKeeper under 
flink/application_149487096_0064/jobgraphs.
2017-05-19 15:09:04,041 INFO  org.apache.flink.yarn.YarnJobManager  
- There are 1 jobs to recover. Starting the job recovery.
2017-05-19 15:09:04,043 INFO  org.apache.flink.yarn.YarnJobManager  
- Attempting to recover job f94b1f7a0e9e3dbcb160c687e476ca77.
2017-05-19 15:09:04,043 DEBUG 
org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  - 
Recovering job graph f94b1f7a0e9e3dbcb160c687e476ca77 from 
flink/application_149487096_0064/jobgraphs/f94b1f7a0e9e3dbcb160c687e476ca77.
2017-05-19 15:09:04,078 WARN  org.apache.hadoop.util.NativeCodeLoader   
- Unable to load native-hadoop library for your platform... using 
builtin-java classes where applicable
2017-05-19 15:09:04,142 INFO  
org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  - 
Recovered SubmittedJobGraph(f94b1f7a0e9e3dbcb160c687e476ca77, JobInfo(clients: 
Set((Actor[akka.tcp://flink@permanent-qa-cluster-master.c.astral-sorter-757.internal:40391/user/$a#-155566858],EXECUTION_RESULT_AND_STATE_CHANGES)),
 start: 1495206476885)).
2017-05-19 15:09:04,142 INFO  org.apache.flink.yarn.YarnJobManager  
- Submitting recovered job f94b1f7a0e9e3dbcb160c687e476ca77.
2017-05-19 15:09:04,143 INFO  org.apache.flink.yarn.YarnJobManager  
- Submitting job f94b1f7a0e9e3dbcb160c687e476ca77 
(CarTopSpeedWindowingExample) (Recovery).
2017-05-19 15:09:04,151 INFO  org.apache.flink.yarn.YarnJobManager  
- Using restart strategy NoRestartStrategy for 
f94b1f7a0e9e3dbcb160c687e476ca77.
2017-05-19 15:09:04,163 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- Job recovers 
via failover strategy: full graph restart
{code}

  was:
While testing Flink 1.3 RC1, I found that the JobManager is trying to recover a 
job that had the {NoRestartStrategy} set.

{code}
2017-05-19 15:09:04,038 INFO  org.apache.flink.yarn.YarnJobManager  
- Attempting to recover all jobs.
2017-05-19 15:09:04,039 DEBUG 
org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  - 
Retrieving all stored job ids from ZooKeeper under 
flink/application_149487096_0064/jobgraphs.
2017-05-19 15:09:04,041 INFO  org.apache.flink.yarn.YarnJobManager  
- There are 1 jobs to recover. Starting the job recovery.
2017-05-19 15:09:04,043 INFO  org.apache.flink.yarn.YarnJobManager  
- Attempting to recover job f94b1f7a0e9e3dbcb160c687e476ca77.
2017-05-19 15:09:04,043 DEBUG 
org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  - 
Recovering job graph f94b1f7a0e9e3dbcb160c687e476ca77 from 
flink/application_149487096_0064/jobgraphs/f94b1f7a0e9e3dbcb160c687e476ca77.
2017-05-19 15:09:04,078 WARN  org.apache.hadoop.util.NativeCodeLoader   
- Unable to load native-hadoop library for your platform... using 
builtin-java classes where applicable
2017-05-19 15:09:04,142 INFO  
org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  - 
Recovered SubmittedJobGraph(f94b1f7a0e9e3dbcb160c687e476ca77, JobInfo(clients: 
Set((Actor[akka.tcp://flink@permanent-qa-cluster-master.c.astral-sorter-757.internal:40391/user/$a#-155566858],EXECUTION_RESULT_AND_STATE_CHANGES)),
 start: 1495206476885)).
2017-05-19 15:09:04,142 INFO  org.apache.flink.yarn.YarnJobManager  
- Submitting recovered job f94b1f7a0e9e3dbcb160c687e476ca77.
2017-05-19 15:09:04,143 INFO  org.apache.flink.yarn.YarnJobManager  
- Submitting job f94b1f7a0e9e3dbcb160c687e476ca77 
(CarTopSpeedWindowingExample) (Recovery).
2017-05-19 15:09:04,151 INFO  org.apache.flink.yarn.YarnJobManager  
- Using restart strategy NoRestartStrategy for 
f94b1f7a0e9e3dbcb160c687e476ca77.
2017-05-19 15:09:04,163 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- Job recovers 
via failover strategy: full graph restart
{code}


> Flink restarts job in HA even if NoRestartStrategy is set
> -
>
> Key: FLINK-6643
> URL: https://issues.apache.org/jira/browse/FLINK-6643
> Project: Flink
>  Issue Type: Bug
>

[GitHub] flink issue #3949: [FLINK-6629] Use HAServices to find connecting address fo...

2017-05-21 Thread rmetzger
Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/3949
  
+1 to merge this change


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6629) ClusterClient cannot submit jobs to HA cluster if address not set in configuration

2017-05-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6629?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16018936#comment-16018936
 ] 

ASF GitHub Bot commented on FLINK-6629:
---

Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/3949
  
+1 to merge this change


> ClusterClient cannot submit jobs to HA cluster if address not set in 
> configuration
> --
>
> Key: FLINK-6629
> URL: https://issues.apache.org/jira/browse/FLINK-6629
> Project: Flink
>  Issue Type: Bug
>  Components: Client
>Affects Versions: 1.3.0, 1.2.1, 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Blocker
> Fix For: 1.3.0, 1.4.0
>
>
> In the general case, the {{ClusterClient}} fails to submit jobs to an HA 
> cluster. The problem is the {{LazyActorSystemLoader}} which creates an 
> {{ActorSystem}}, upon first call. The {{ActorSystem}} is created by reading 
> the JobManager's address from the {{Configuration}} in order to find the 
> connecting address via {{ConnectionUtils.findConnectingAddress}}. The address 
> in the configuration is, however, only valid in the non-HA case. In the HA 
> case, we have to obtain the leader's address from ZooKeeper. Therefore, if  
> the address is not explicitly set in the {{flink-conf.yaml}}, then the 
> {{ClusterClient}} might either fail with a {{RuntimeException}} if no address 
> at all has been specified or it will use an invalid address and retrieve the 
> wrong connecting address.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6482) Add nested serializers into configuration snapshots of composite serializers

2017-05-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6482?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16018935#comment-16018935
 ] 

ASF GitHub Bot commented on FLINK-6482:
---

Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/3937
  
Hey,
what's the status of this PR?
I see the JIRA is marked as a blocker. When do you think is this PR ready 
to be merged?


> Add nested serializers into configuration snapshots of composite serializers
> 
>
> Key: FLINK-6482
> URL: https://issues.apache.org/jira/browse/FLINK-6482
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing, Type Serialization System
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.3.0
>
>
> Currently, the composite serializers' configuration snapshots only wrap the 
> config snapshots of nested serializers.
> We should also consider adding serialization of the nested serializers into 
> the config snapshot, so that in the case where only some nested serializer 
> cannot be loaded (class missing / implementation changed), we can also 
> provide a path for serializer upgrades.
> This applies for all composite serializers that have nested serializers.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3937: [FLINK-6482] [core] Add nested serializers to config snap...

2017-05-21 Thread rmetzger
Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/3937
  
Hey,
what's the status of this PR?
I see the JIRA is marked as a blocker. When do you think is this PR ready 
to be merged?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3952) Bump Netty to 4.1

2017-05-21 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3952?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16018878#comment-16018878
 ] 

Stephan Ewen commented on FLINK-3952:
-

True, only Hadoop's netty is shaded.

We should be able to shade Flink's Netty as well. Upgrading it to 4.1 seems 
harder.

It is always an issue if libraries bread their backwards compatibility within a 
major version :-(

> Bump Netty to 4.1
> -
>
> Key: FLINK-3952
> URL: https://issues.apache.org/jira/browse/FLINK-3952
> Project: Flink
>  Issue Type: Improvement
>  Components: Core, Network
>Reporter: rektide de la fey
>  Labels: netty
>
> Netty 4.1 is about to release final. This release has [a number of 
> significant 
> enhancements|http://netty.io/wiki/new-and-noteworthy-in-4.1.html], and in 
> particular I find HTTP/2 codecs to be incredibly desirable to have. 
> Additionally, hopefully, the [Hadoop patches for Netty 
> 4.1|https://issues.apache.org/jira/browse/HADOOP-11716] get some tests and 
> get merged, & I believe if/when that happens it'll be important for Flink to 
> also be using the new Netty minor version.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6603) Enable checkstyle on test sources

2017-05-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6603?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16018825#comment-16018825
 ] 

ASF GitHub Bot commented on FLINK-6603:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3941
  
@greghogan Awesome, looks good! Thanks a lot for taking this into account!


> Enable checkstyle on test sources
> -
>
> Key: FLINK-6603
> URL: https://issues.apache.org/jira/browse/FLINK-6603
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Trivial
> Fix For: 1.4.0
>
>
> With the addition of strict checkstyle to select modules (currently limited 
> to {{flink-streaming-java}}) we can enable the checkstyle flag 
> {{includeTestSourceDirectory}} to perform the same unused imports, 
> whitespace, and other checks on test sources.
> Should first resolve the import grouping as discussed in FLINK-6107. Also, 
> several tests exceed the 2500 line limit.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3941: [FLINK-6603] [streaming] Enable checkstyle on test source...

2017-05-21 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3941
  
@greghogan Awesome, looks good! Thanks a lot for taking this into account!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6446) Various improvements to the Web Frontend

2017-05-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16018822#comment-16018822
 ] 

ASF GitHub Bot commented on FLINK-6446:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3946#discussion_r117632701
  
--- Diff: flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.jade 
---
@@ -28,10 +28,10 @@ split
 a(ui-sref=".subtasks({nodeid: nodeid})") Subtasks
 
   li(ui-sref-active='active')
-a(ui-sref=".taskmanagers({nodeid: nodeid})") TaskManagers
+a(ui-sref=".taskmanagers({nodeid: nodeid})") Subtasks by 
TaskManager
--- End diff --

@greghogan Hmmm, how should we call them? I have been asked a lot of times 
what the "TaskManagers" tab means. Especially, because when running one slot 
TaskManagers (which is super common), it is identical to the "Subtasks" view.

I am just looking for a name that solves that confusion.


> Various improvements to the Web Frontend
> 
>
> Key: FLINK-6446
> URL: https://issues.apache.org/jira/browse/FLINK-6446
> Project: Flink
>  Issue Type: Improvement
>  Components: Webfrontend
>Reporter: Stephan Ewen
>
> This is the umbrella issue for various improvements to the web frontend,



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3946: [FLINK-6446] Fix some small issues in the web UI

2017-05-21 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3946#discussion_r117632701
  
--- Diff: flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.jade 
---
@@ -28,10 +28,10 @@ split
 a(ui-sref=".subtasks({nodeid: nodeid})") Subtasks
 
   li(ui-sref-active='active')
-a(ui-sref=".taskmanagers({nodeid: nodeid})") TaskManagers
+a(ui-sref=".taskmanagers({nodeid: nodeid})") Subtasks by 
TaskManager
--- End diff --

@greghogan Hmmm, how should we call them? I have been asked a lot of times 
what the "TaskManagers" tab means. Especially, because when running one slot 
TaskManagers (which is super common), it is identical to the "Subtasks" view.

I am just looking for a name that solves that confusion.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Closed] (FLINK-5341) Add a metric exposing how many times a job has been restarted

2017-05-21 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5341?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-5341.
---
Resolution: Fixed

The "fullRestarts" metric was added in 8ed85fe49b7595546a8f968e0faa1fa7d4da47ec.

> Add a metric exposing how many times a job has been restarted
> -
>
> Key: FLINK-5341
> URL: https://issues.apache.org/jira/browse/FLINK-5341
> Project: Flink
>  Issue Type: New Feature
>  Components: Core
>Reporter: Dan Bress
>Priority: Minor
>
> I would like the job manager to expose a metric how many times each job has 
> been restarted.  This way I can grab this number and measure whether or not 
> my job is healthy



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Closed] (FLINK-4909) JDBC Input format cast exception management

2017-05-21 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4909?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-4909.
---
Resolution: Won't Fix

The JDBC InputFormat no longer does any casts or type checks.

> JDBC Input format cast exception management
> ---
>
> Key: FLINK-4909
> URL: https://issues.apache.org/jira/browse/FLINK-4909
> Project: Flink
>  Issue Type: Improvement
>  Components: Batch Connectors and Input/Output Formats
>Affects Versions: 1.1.1
>Reporter: Stefano Bortoli
>Priority: Minor
>
> When setting the type settings for the JDBC Input Format one can easily make 
> errors in setting the correct type (e.g. INT vs DECIMAL, or LONG vs 
> BIG_DECIMAL)
> it would be nice to have the class cast Exception to return also the name of 
> field causing the exception. In a sense, improve the exception message. 
> For example: java.lang.ClassCastException: java.math.BigDecimal cannot be 
> cast to java.lang.String *for field MY_BIGDECIMAL_FIELD*



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Closed] (FLINK-4061) about flink jdbc connect oracle db exists a crital bug

2017-05-21 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4061?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-4061.
---
Resolution: Cannot Reproduce

>  about flink jdbc connect oracle db exists a crital bug
> ---
>
> Key: FLINK-4061
> URL: https://issues.apache.org/jira/browse/FLINK-4061
> Project: Flink
>  Issue Type: Bug
>  Components: DataSet API
>Affects Versions: 1.1.0
> Environment: ubuntu ,jdk1.8.0  ,Start a Local Flink Cluster
>Reporter: dengchangfu
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> I use flink-jdbc to connect oracle db for etl, so i write a demo to test the 
> feature. the code is simple,but after I submit this app ,a exception happen.
> exception info like this:
> Caused by: java.lang.NullPointerException
>   at 
> org.apache.flink.api.java.io.jdbc.JDBCInputFormat.open(JDBCInputFormat.java:231)
>   at 
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:147)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>   at java.lang.Thread.run(Thread.java:745)
> my code like this:
> import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.java.DataSet;
> import org.apache.flink.api.java.ExecutionEnvironment;
> import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
> import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat;
> import org.apache.flink.api.table.Row;
> import org.apache.flink.api.table.typeutils.RowTypeInfo;
> import 
> org.apache.flink.api.java.io.jdbc.JDBCInputFormat.JDBCInputFormatBuilder;
> import java.sql.ResultSet;
> import java.sql.Types;
> /**
>  * Skeleton for a Flink Job.
>  *
>  * For a full example of a Flink Job, see the WordCountJob.java file in the
>  * same package/directory or have a look at the website.
>  *
>  * You can also generate a .jar file that you can submit on your Flink
>  * cluster.
>  * Just type
>  *mvn clean package
>  * in the projects root directory.
>  * You will find the jar in
>  *target/flink-quickstart-0.1-SNAPSHOT-Sample.jar
>  *
>  */
> public class Job {
> public static final TypeInformation[] fieldTypes = new 
> TypeInformation[]{
> BasicTypeInfo.STRING_TYPE_INFO,
> BasicTypeInfo.FLOAT_TYPE_INFO
> };
> public static final RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes);
> public static void main(String[] args) {
> // set up the execution environment
> ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment();
> JDBCInputFormatBuilder inputBuilder = 
> JDBCInputFormat.buildJDBCInputFormat()
> .setDrivername("oracle.jdbc.driver.OracleDriver")
> .setDBUrl("jdbc:oracle:thin:@10.2.121.128:1521:jgjtest")
> .setUsername("crmii")
> .setPassword("crmii")
> .setQuery("select CLIENT_ID,OCCUR_BALANCE from 
> HS_ASSET.FUNDJOUR@OTC")
> .setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
> .setRowTypeInfo(rowTypeInfo);
> DataSet source = env.createInput(inputBuilder.finish());
> source.output(JDBCOutputFormat.buildJDBCOutputFormat()
> .setDrivername("oracle.jdbc.driver.OracleDriver")
> .setDBUrl("jdbc:oracle:thin:@10.2.121.128:1521:jgjtest")
> .setUsername("crmii")
> .setPassword("crmii")
> .setQuery("insert into dengabc (client_id,salary) 
> values(?,?)")
> .setSqlTypes(new int[]{Types.VARCHAR, Types.DOUBLE})
> .finish());
> //source.print();
> //source.first(20).print();
> //dbData.print();
> /**
>  * Here, you can start creating your execution plan for Flink.
>  *
>  * Start with getting some data from the environment, like
>  *env.readTextFile(textPath);
>  *
>  * then, transform the resulting DataSet using operations
>  * like
>  *.filter()
>  *.flatMap()
>  *.join()
>  *.coGroup()
>  * and many more.
>  * Have a look at the programming guide for the Java API:
>  *
>  * http://flink.apache.org/docs/latest/apis/batch/index.html
>  *
>  * and the examples
>  *
>  * http://flink.apache.org/docs/latest/apis/batch/examples.html
>  *
>  */
> // execute program
> try {
> env.execute("Flink Java API Skeleton");
> } catch (Exception e) {
> e.getMessage();
> }
> }
> }
> my pom.xml like this:
> 
> http://maven.apache.org/POM/4.0.0; 
> 

[jira] [Commented] (FLINK-6620) Add KeyGroupCheckpointedOperator interface that works for checkpointing key-groups

2017-05-21 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16018778#comment-16018778
 ] 

Aljoscha Krettek commented on FLINK-6620:
-

[~lzljs3620320] Unfortunately I'm very busy last week and the next week 
(ApacheCon and Strata London) so I'm very slow to respond to anything. Thanks 
for opening this issue! 

[~srichter] What do you think about this? We've had an interface like this for 
a while in Beam to support writing custom things along with the timers in 
key-grouped state.

> Add KeyGroupCheckpointedOperator interface that works for checkpointing 
> key-groups
> --
>
> Key: FLINK-6620
> URL: https://issues.apache.org/jira/browse/FLINK-6620
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Jingsong Lee
>Priority: Minor
>
> [~aljoscha] We have discussed it on: 
> https://issues.apache.org/jira/browse/BEAM-1393
> {code}
> /**
>  * This interface is used to checkpoint key-groups state.
>  */
> public interface KeyGroupCheckpointedOperator extends 
> KeyGroupRestoringOperator{
>   /**
>* Snapshots the state for a given {@code keyGroupIdx}.
>*
>* AbstractStreamOperator would call this hook in
>* AbstractStreamOperator.snapshotState() while iterating over the key 
> groups.
>* @param keyGroupIndex the id of the key-group to be put in the snapshot.
>* @param out the stream to write to.
>*/
>   void snapshotKeyGroupState(int keyGroupIndex, DataOutputStream out) throws 
> Exception;
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Closed] (FLINK-6640) Ensure registration of shared state happens before externalizing a checkpoint

2017-05-21 Thread Stefan Richter (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6640?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter closed FLINK-6640.
-
Resolution: Fixed

fixed in 040356391f

> Ensure registration of shared state happens before externalizing a checkpoint
> -
>
> Key: FLINK-6640
> URL: https://issues.apache.org/jira/browse/FLINK-6640
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
> Fix For: 1.3.0
>
>
> Currently, a checkpoint is externalized before its shared state is 
> registered. As a consequence, placeholder state handles become part of an 
> externalized checkpoint, which they should not.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6652) Problem with DelimitedInputFormat

2017-05-21 Thread Moritz Schubotz (JIRA)
Moritz Schubotz created FLINK-6652:
--

 Summary: Problem with DelimitedInputFormat
 Key: FLINK-6652
 URL: https://issues.apache.org/jira/browse/FLINK-6652
 Project: Flink
  Issue Type: Bug
  Components: Batch Connectors and Input/Output Formats
Affects Versions: 1.2.1
Reporter: Moritz Schubotz
 Fix For: 1.2.0


After upgrading from Flink 1.2.0 to 1.2.1 I got the following error
```
07:54:52,395 ERROR org.apache.flink.api.common.io.DelimitedInputFormat  
 - Unexpected problen while getting the file statistics for file 'mytestfile': 
-1
java.lang.ArrayIndexOutOfBoundsException: -1
at 
org.apache.flink.api.common.io.DelimitedInputFormat.readLine(DelimitedInputFormat.java:572)
at 
org.apache.flink.api.common.io.DelimitedInputFormat.getStatistics(DelimitedInputFormat.java:423)
at 
org.apache.flink.api.common.io.DelimitedInputFormat.getStatistics(DelimitedInputFormat.java:48)
at 
org.apache.flink.optimizer.dag.DataSourceNode.computeOperatorSpecificDefaultEstimates(DataSourceNode.java:166)
```
I have created a test repo to isolate the issue here
https://github.com/physikerwelt/flinkReadTest
and reproduced the bug using travis
https://travis-ci.org/physikerwelt/flinkReadTest



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)