[jira] [Closed] (FLINK-7820) deprecate docs of FoldingState and FoldingStateDescriptor

2017-10-27 Thread Bowen Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7820?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li closed FLINK-7820.
---
Resolution: Fixed

This is done as part of FLINK-5967

> deprecate docs of FoldingState and FoldingStateDescriptor
> -
>
> Key: FLINK-7820
> URL: https://issues.apache.org/jira/browse/FLINK-7820
> Project: Flink
>  Issue Type: Improvement
>  Components: Queryable State
>Affects Versions: 1.4.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Minor
> Fix For: 1.4.0
>
>
> {{FoldState}} and {{FoldStateDescriptor}} have been deprecated. We should 
> remove docs related to the two classes



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6615) tmp directory not cleaned up on shutdown

2017-10-27 Thread Bowen Li (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16222978#comment-16222978
 ] 

Bowen Li commented on FLINK-6615:
-

I would recommend closing this ticket as 'Not a problem', if there's no 
objection

> tmp directory not cleaned up on shutdown
> 
>
> Key: FLINK-6615
> URL: https://issues.apache.org/jira/browse/FLINK-6615
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.2.0
>Reporter: Andrey
>Assignee: Bowen Li
>
> Steps to reproduce:
> 1) Stop task manager gracefully (kill -6 )
> 2) In the logs:
> {code}
> 2017-05-17 13:35:50,147 INFO  org.apache.zookeeper.ClientCnxn 
>   - EventThread shut down [main-EventThread]
> 2017-05-17 13:35:50,200 ERROR 
> org.apache.flink.runtime.io.disk.iomanager.IOManager  - IOManager 
> failed to properly clean up temp file directory: 
> /mnt/flink/tmp/flink-io-66f1d0ec-8976-41bf-9575-f80b181b0e47 
> [flink-akka.actor.default-dispatcher-2]
> java.nio.file.DirectoryNotEmptyException: 
> /mnt/flink/tmp/flink-io-66f1d0ec-8976-41bf-9575-f80b181b0e47
>   at 
> sun.nio.fs.UnixFileSystemProvider.implDelete(UnixFileSystemProvider.java:242)
>   at 
> sun.nio.fs.AbstractFileSystemProvider.delete(AbstractFileSystemProvider.java:103)
>   at java.nio.file.Files.delete(Files.java:1126)
>   at org.apache.flink.util.FileUtils.deleteDirectory(FileUtils.java:154)
>   at 
> org.apache.flink.runtime.io.disk.iomanager.IOManager.shutdown(IOManager.java:109)
>   at 
> org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync.shutdown(IOManagerAsync.java:185)
>   at 
> org.apache.flink.runtime.taskmanager.TaskManager.postStop(TaskManager.scala:241)
>   at akka.actor.Actor$class.aroundPostStop(Actor.scala:477)
> {code}
> Expected:
> * on shutdown delete non-empty directory anyway. 
> Notes:
> * after process terminated, I've checked 
> "/mnt/flink/tmp/flink-io-66f1d0ec-8976-41bf-9575-f80b181b0e47" directory and 
> didn't find anything there. So it looks like timing issue.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7935) Metrics with user supplied scope variables

2017-10-27 Thread Elias Levy (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16222972#comment-16222972
 ] 

Elias Levy commented on FLINK-7935:
---

Possibly.  It depends on whether you could add multiple metrics or metric 
groups that differ in their variables, but that could be formatted the same.  
E.g. the TaskManagerJobMetricGroup creates and tracks distinct TaskMetricGroup 
for each task in a the portion of a job the task manager is executing.  The 
metrics for each task are tracked separately, but I can format the scope so all 
of them are reported with the same name ("taskmanager.job.task") but with 
different variables/DD tags.



> Metrics with user supplied scope variables
> --
>
> Key: FLINK-7935
> URL: https://issues.apache.org/jira/browse/FLINK-7935
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.3.2
>Reporter: Elias Levy
>
> We use DataDog for metrics.  DD and Flink differ somewhat in how they track 
> metrics.
> Flink names and scopes metrics together, at least by default. E.g. by default 
>  the System scope for operator metrics is 
> {{.taskmanager}}.  
> The scope variables become part of the metric's full name.
> In DD the metric would be named something generic, e.g. 
> {{taskmanager.job.operator}}, and they would be distinguished by their tag 
> values, e.g. {{tm_id=foo}}, {{job_name=var}}, {{operator_name=baz}}.
> Flink allows you to configure the format string for system scopes, so it is 
> possible to set the operator scope format to {{taskmanager.job.operator}}.  
> We do this for all scopes:
> {code}
> metrics.scope.jm: jobmanager
> metrics.scope.jm.job: jobmanager.job
> metrics.scope.tm: taskmanager
> metrics.scope.tm.job: taskmanager.job
> metrics.scope.task: taskmanager.job.task
> metrics.scope.operator: taskmanager.job.operator
> {code}
> This seems to work.  The DataDog Flink metric's plugin submits all scope 
> variables as tags, even if they are not used within the scope format.  And it 
> appears internally this does not lead to metrics conflicting with each other.
> We would like to extend this to user defined metrics, but you can define 
> variables/scopes when adding a metric group or metric with the user API, so 
> that in DD we have a single metric with a tag with many different values, 
> rather than hundreds of metrics to just the one value we want to measure 
> across different event types.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6951) Incompatible versions of httpcomponents jars for Flink kinesis connector

2017-10-27 Thread Bowen Li (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16222891#comment-16222891
 ] 

Bowen Li commented on FLINK-6951:
-

The workaround is: rather than using httpclient-4.2.5 and httpcore-4.2.5 
described 
[here|https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/deployment/aws.html#flink-for-hadoop-27],
 I used httpclient-4.3.6 and httpcore-4.3.3.

I'm not sure why it's working now. May be because we upgraded KPL and KCL, and 
they don't the old httpcomponent APIs anymore? [~aljoscha] [~tzulitai]

> Incompatible versions of httpcomponents jars for Flink kinesis connector
> 
>
> Key: FLINK-6951
> URL: https://issues.apache.org/jira/browse/FLINK-6951
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.3.0
>Reporter: Ted Yu
>Assignee: Bowen Li
>Priority: Critical
> Fix For: 1.4.0, 1.3.3
>
>
> In the following thread, Bowen reported incompatible versions of 
> httpcomponents jars for Flink kinesis connector :
> http://search-hadoop.com/m/Flink/VkLeQN2m5EySpb1?subj=Re+Incompatible+Apache+Http+lib+in+Flink+kinesis+connector
> We should find a solution such that users don't have to change dependency 
> version(s) themselves when building Flink kinesis connector.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7938) support addAll() in ListState

2017-10-27 Thread Bowen Li (JIRA)
Bowen Li created FLINK-7938:
---

 Summary: support addAll() in ListState
 Key: FLINK-7938
 URL: https://issues.apache.org/jira/browse/FLINK-7938
 Project: Flink
  Issue Type: Improvement
  Components: State Backends, Checkpointing
Reporter: Bowen Li
Assignee: Bowen Li
 Fix For: 1.5.0


support {{addAll()}} in {{ListState}}, so Flink can be more efficient in adding 
elements to {{ListState}} in batch. This should give us a much better 
performance especially for {{ListState}} backed by RocksDB



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7475) support update() in ListState

2017-10-27 Thread Bowen Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7475?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li updated FLINK-7475:

Summary: support update() in ListState  (was: ListState support update)

> support update() in ListState
> -
>
> Key: FLINK-7475
> URL: https://issues.apache.org/jira/browse/FLINK-7475
> Project: Flink
>  Issue Type: Improvement
>  Components: Core, DataStream API
>Reporter: yf
>Assignee: Bowen Li
> Fix For: 1.5.0
>
>
> If I want to update the list. 
> I have to do two steps: 
> listState.clear() 
> for (Element e : myList) { 
> listState.add(e); 
> } 
> Why not I update the state by: 
> listState.update(myList) ?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7929) Add unit/integration tests for states backed by RocksDB

2017-10-27 Thread Bowen Li (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16222868#comment-16222868
 ] 

Bowen Li commented on FLINK-7929:
-

[~aljoscha] Thanks for pointing that out! I'm closing this ticket

> Add unit/integration tests for states backed by RocksDB
> ---
>
> Key: FLINK-7929
> URL: https://issues.apache.org/jira/browse/FLINK-7929
> Project: Flink
>  Issue Type: Test
>  Components: State Backends, Checkpointing, Tests
>Reporter: Bowen Li
>Assignee: Bowen Li
> Fix For: 1.4.0
>
>
> While exploring how to implement FLINK-7475, I didn't find any existing unit 
> tests (or there are but I didn't find them...) that I can easily run to test 
> if {{RocksDB(Value/List/Map/...)State}} works.
> We should add unit/integration tests for {{RocksDB(Value/List/Map/...)State}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-7929) Add unit/integration tests for states backed by RocksDB

2017-10-27 Thread Bowen Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7929?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li closed FLINK-7929.
---
Resolution: Invalid

> Add unit/integration tests for states backed by RocksDB
> ---
>
> Key: FLINK-7929
> URL: https://issues.apache.org/jira/browse/FLINK-7929
> Project: Flink
>  Issue Type: Test
>  Components: State Backends, Checkpointing, Tests
>Reporter: Bowen Li
>Assignee: Bowen Li
> Fix For: 1.4.0
>
>
> While exploring how to implement FLINK-7475, I didn't find any existing unit 
> tests (or there are but I didn't find them...) that I can easily run to test 
> if {{RocksDB(Value/List/Map/...)State}} works.
> We should add unit/integration tests for {{RocksDB(Value/List/Map/...)State}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6225) Support Row Stream for CassandraSink

2017-10-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16222855#comment-16222855
 ] 

ASF GitHub Bot commented on FLINK-6225:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3748#discussion_r147442488
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
 ---
@@ -205,12 +207,16 @@ private CassandraSink(SingleOutputStreamOperator 
sink) {
 * @param   input type
 * @return CassandraSinkBuilder, to further configure the sink
 */
-   public static  CassandraSinkBuilder 
addSink(DataStream input) {
+   public static  CassandraSinkBuilder addSink(DataStream 
input) {
TypeInformation typeInfo = input.getType();
if (typeInfo instanceof TupleTypeInfo) {
-   DataStream tupleInput = (DataStream) input;
+   DataStream tupleInput = (DataStream) 
input;
return (CassandraSinkBuilder) new 
CassandraTupleSinkBuilder<>(tupleInput, tupleInput.getType(), 
tupleInput.getType().createSerializer(tupleInput.getExecutionEnvironment().getConfig()));
}
+   if (typeInfo instanceof RowTypeInfo) {
--- End diff --

Can check here for a concrete class with 
`Row.class.equals(typeInfo.getTypeClass())` to also support `GenericType` 
as well.


> Support Row Stream for CassandraSink
> 
>
> Key: FLINK-6225
> URL: https://issues.apache.org/jira/browse/FLINK-6225
> Project: Flink
>  Issue Type: New Feature
>  Components: Cassandra Connector
>Affects Versions: 1.3.0
>Reporter: Jing Fan
>Assignee: Haohui Mai
> Fix For: 1.4.0
>
>
> Currently in CassandraSink, specifying query is not supported for row-stream. 
> The solution should be similar to CassandraTupleSink.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6225) Support Row Stream for CassandraSink

2017-10-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16222852#comment-16222852
 ] 

ASF GitHub Bot commented on FLINK-6225:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3748#discussion_r147456818
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
 ---
@@ -375,6 +381,34 @@ protected void sanityCheck() {
}
 
/**
+* Builder for a {@link CassandraRowSink}.
+*/
+   public static class CassandraRowSinkBuilder extends 
CassandraSinkBuilder {
+   public CassandraRowSinkBuilder(DataStream input, 
TypeInformation typeInfo, TypeSerializer serializer) {
+   super(input, typeInfo, serializer);
+   }
+
+   @Override
+   protected void sanityCheck() {
+   super.sanityCheck();
+   if (query == null || query.length() == 0) {
+   throw new IllegalArgumentException("Query must 
not be null or empty.");
+   }
+   }
+
+   @Override
+   protected CassandraSink createSink() throws Exception {
+   return new CassandraSink<>(input.addSink(new 
CassandraRowSink(query, builder)).name("Cassandra Sink"));
+
+   }
+
+   @Override
+   protected CassandraSink createWriteAheadSink() throws 
Exception {
+   throw new IllegalArgumentException("Exactly-once 
guarantees can only be provided for tuple types.");
--- End diff --

I had a look at `CassandraTupleWriteAheadSink`. It would be straightforward 
to copy and adapt it for `Row`.


> Support Row Stream for CassandraSink
> 
>
> Key: FLINK-6225
> URL: https://issues.apache.org/jira/browse/FLINK-6225
> Project: Flink
>  Issue Type: New Feature
>  Components: Cassandra Connector
>Affects Versions: 1.3.0
>Reporter: Jing Fan
>Assignee: Haohui Mai
> Fix For: 1.4.0
>
>
> Currently in CassandraSink, specifying query is not supported for row-stream. 
> The solution should be similar to CassandraTupleSink.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #3748: [FLINK-6225] [Cassandra Connector] add CassandraTa...

2017-10-27 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3748#discussion_r147517745
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTableSink.java
 ---
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.cassandra;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.sinks.AppendStreamTableSink;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Properties;
+
+/**
+ * A cassandra  {@link AppendStreamTableSink}.
--- End diff --

`cassandra` -> `Cassandra`


---


[GitHub] flink pull request #3748: [FLINK-6225] [Cassandra Connector] add CassandraTa...

2017-10-27 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3748#discussion_r147442488
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
 ---
@@ -205,12 +207,16 @@ private CassandraSink(SingleOutputStreamOperator 
sink) {
 * @param   input type
 * @return CassandraSinkBuilder, to further configure the sink
 */
-   public static  CassandraSinkBuilder 
addSink(DataStream input) {
+   public static  CassandraSinkBuilder addSink(DataStream 
input) {
TypeInformation typeInfo = input.getType();
if (typeInfo instanceof TupleTypeInfo) {
-   DataStream tupleInput = (DataStream) input;
+   DataStream tupleInput = (DataStream) 
input;
return (CassandraSinkBuilder) new 
CassandraTupleSinkBuilder<>(tupleInput, tupleInput.getType(), 
tupleInput.getType().createSerializer(tupleInput.getExecutionEnvironment().getConfig()));
}
+   if (typeInfo instanceof RowTypeInfo) {
--- End diff --

Can check here for a concrete class with 
`Row.class.equals(typeInfo.getTypeClass())` to also support `GenericType` 
as well.


---


[jira] [Commented] (FLINK-6225) Support Row Stream for CassandraSink

2017-10-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16222853#comment-16222853
 ] 

ASF GitHub Bot commented on FLINK-6225:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3748#discussion_r147507230
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTableSink.java
 ---
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.cassandra;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.sinks.AppendStreamTableSink;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Properties;
+
+/**
+ * A cassandra  {@link AppendStreamTableSink}.
+ */
+public class CassandraTableSink implements AppendStreamTableSink {
--- End diff --

rename to `CassandraAppendTableSink`


> Support Row Stream for CassandraSink
> 
>
> Key: FLINK-6225
> URL: https://issues.apache.org/jira/browse/FLINK-6225
> Project: Flink
>  Issue Type: New Feature
>  Components: Cassandra Connector
>Affects Versions: 1.3.0
>Reporter: Jing Fan
>Assignee: Haohui Mai
> Fix For: 1.4.0
>
>
> Currently in CassandraSink, specifying query is not supported for row-stream. 
> The solution should be similar to CassandraTupleSink.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6225) Support Row Stream for CassandraSink

2017-10-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16222854#comment-16222854
 ] 

ASF GitHub Bot commented on FLINK-6225:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3748#discussion_r147517745
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTableSink.java
 ---
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.cassandra;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.sinks.AppendStreamTableSink;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Properties;
+
+/**
+ * A cassandra  {@link AppendStreamTableSink}.
--- End diff --

`cassandra` -> `Cassandra`


> Support Row Stream for CassandraSink
> 
>
> Key: FLINK-6225
> URL: https://issues.apache.org/jira/browse/FLINK-6225
> Project: Flink
>  Issue Type: New Feature
>  Components: Cassandra Connector
>Affects Versions: 1.3.0
>Reporter: Jing Fan
>Assignee: Haohui Mai
> Fix For: 1.4.0
>
>
> Currently in CassandraSink, specifying query is not supported for row-stream. 
> The solution should be similar to CassandraTupleSink.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #3748: [FLINK-6225] [Cassandra Connector] add CassandraTa...

2017-10-27 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3748#discussion_r147507230
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTableSink.java
 ---
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.cassandra;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.sinks.AppendStreamTableSink;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Properties;
+
+/**
+ * A cassandra  {@link AppendStreamTableSink}.
+ */
+public class CassandraTableSink implements AppendStreamTableSink {
--- End diff --

rename to `CassandraAppendTableSink`


---


[GitHub] flink pull request #3748: [FLINK-6225] [Cassandra Connector] add CassandraTa...

2017-10-27 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3748#discussion_r147456818
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
 ---
@@ -375,6 +381,34 @@ protected void sanityCheck() {
}
 
/**
+* Builder for a {@link CassandraRowSink}.
+*/
+   public static class CassandraRowSinkBuilder extends 
CassandraSinkBuilder {
+   public CassandraRowSinkBuilder(DataStream input, 
TypeInformation typeInfo, TypeSerializer serializer) {
+   super(input, typeInfo, serializer);
+   }
+
+   @Override
+   protected void sanityCheck() {
+   super.sanityCheck();
+   if (query == null || query.length() == 0) {
+   throw new IllegalArgumentException("Query must 
not be null or empty.");
+   }
+   }
+
+   @Override
+   protected CassandraSink createSink() throws Exception {
+   return new CassandraSink<>(input.addSink(new 
CassandraRowSink(query, builder)).name("Cassandra Sink"));
+
+   }
+
+   @Override
+   protected CassandraSink createWriteAheadSink() throws 
Exception {
+   throw new IllegalArgumentException("Exactly-once 
guarantees can only be provided for tuple types.");
--- End diff --

I had a look at `CassandraTupleWriteAheadSink`. It would be straightforward 
to copy and adapt it for `Row`.


---


[GitHub] flink pull request #3748: [FLINK-6225] [Cassandra Connector] add CassandraTa...

2017-10-27 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3748#discussion_r147517530
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
 ---
@@ -392,6 +425,45 @@ public void testCassandraPojoAtLeastOnceSink() throws 
Exception {
}
 
@Test
+   public void testCassandraTableSink() throws Exception {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   env.setParallelism(1);
+
+   DataStreamSource source = 
env.fromCollection(rowCollection);
+   CassandraTableSink cassandraTableSink = new 
CassandraTableSink(new ClusterBuilder() {
+   @Override
+   protected Cluster buildCluster(Cluster.Builder builder) 
{
+   return builder.addContactPointsWithPorts(new 
InetSocketAddress(HOST, PORT)).build();
+   }
+   }, injectTableName(INSERT_DATA_QUERY), new Properties());
+   CassandraTableSink newCassandrTableSink = 
cassandraTableSink.configure(FIELD_NAMES, FIELD_TYPES);
+
+   newCassandrTableSink.emitDataStream(source);
+
+   env.execute();
+   ResultSet rs = 
session.execute(injectTableName(SELECT_DATA_QUERY));
+   Assert.assertEquals(20, rs.all().size());
+   }
+
+   @Test
+   public void testCassandraTableSinkE2E() throws Exception {
--- End diff --

I think `testCassandraTableSink` can be removed in favor of this test.


---


[jira] [Commented] (FLINK-6225) Support Row Stream for CassandraSink

2017-10-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16222856#comment-16222856
 ] 

ASF GitHub Bot commented on FLINK-6225:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3748#discussion_r147517530
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
 ---
@@ -392,6 +425,45 @@ public void testCassandraPojoAtLeastOnceSink() throws 
Exception {
}
 
@Test
+   public void testCassandraTableSink() throws Exception {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   env.setParallelism(1);
+
+   DataStreamSource source = 
env.fromCollection(rowCollection);
+   CassandraTableSink cassandraTableSink = new 
CassandraTableSink(new ClusterBuilder() {
+   @Override
+   protected Cluster buildCluster(Cluster.Builder builder) 
{
+   return builder.addContactPointsWithPorts(new 
InetSocketAddress(HOST, PORT)).build();
+   }
+   }, injectTableName(INSERT_DATA_QUERY), new Properties());
+   CassandraTableSink newCassandrTableSink = 
cassandraTableSink.configure(FIELD_NAMES, FIELD_TYPES);
+
+   newCassandrTableSink.emitDataStream(source);
+
+   env.execute();
+   ResultSet rs = 
session.execute(injectTableName(SELECT_DATA_QUERY));
+   Assert.assertEquals(20, rs.all().size());
+   }
+
+   @Test
+   public void testCassandraTableSinkE2E() throws Exception {
--- End diff --

I think `testCassandraTableSink` can be removed in favor of this test.


> Support Row Stream for CassandraSink
> 
>
> Key: FLINK-6225
> URL: https://issues.apache.org/jira/browse/FLINK-6225
> Project: Flink
>  Issue Type: New Feature
>  Components: Cassandra Connector
>Affects Versions: 1.3.0
>Reporter: Jing Fan
>Assignee: Haohui Mai
> Fix For: 1.4.0
>
>
> Currently in CassandraSink, specifying query is not supported for row-stream. 
> The solution should be similar to CassandraTupleSink.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5967) Add RuntimeContext#getAggregatingState() and document AggregatingState

2017-10-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16222821#comment-16222821
 ] 

ASF GitHub Bot commented on FLINK-5967:
---

Github user bowenli86 closed the pull request at:

https://github.com/apache/flink/pull/4899


> Add RuntimeContext#getAggregatingState() and document AggregatingState
> --
>
> Key: FLINK-5967
> URL: https://issues.apache.org/jira/browse/FLINK-5967
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API, Documentation
>Reporter: Aljoscha Krettek
>Assignee: Bowen Li
>Priority: Critical
> Fix For: 1.4.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4899: [FLINK-5967][DataStream API][Doc] Add RuntimeConte...

2017-10-27 Thread bowenli86
Github user bowenli86 closed the pull request at:

https://github.com/apache/flink/pull/4899


---


[GitHub] flink pull request #4904: [hotfix] reorder the methods so they conform to th...

2017-10-27 Thread bowenli86
Github user bowenli86 closed the pull request at:

https://github.com/apache/flink/pull/4904


---


[jira] [Updated] (FLINK-7873) Introduce HybridStreamHandle to optimize the recovery mechanism and try to read the checkpoint data locally

2017-10-27 Thread Sihua Zhou (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7873?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sihua Zhou updated FLINK-7873:
--
Summary: Introduce HybridStreamHandle to optimize the recovery mechanism 
and try to read the checkpoint data locally  (was: Introduce 
HybridStreamStateHandle for quick recovery from checkpoint.)

> Introduce HybridStreamHandle to optimize the recovery mechanism and try to 
> read the checkpoint data locally
> ---
>
> Key: FLINK-7873
> URL: https://issues.apache.org/jira/browse/FLINK-7873
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.2
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>
> Current recover strategy will always read checkpoint data from remote 
> FileStream (HDFS). This will cost a lot of bandwidth when the state is so big 
> (e.g. 1T). What's worse, if this job performs recover again and again, it can 
> eat up all network bandwidth and do a huge hurt to cluster. So, I proposed 
> that we can cache the checkpoint data locally, and read checkpoint data from 
> local cache as well as we can, we read the data from remote only if we fail 
> locally. The advantage is that if a execution is assigned to the same 
> TaskManager as before, it can save a lot of bandwidth, and obtain a faster 
> recover.
> Key issues:
> 1. Cache the checkpoint data on local disk and manage it's create and delete.
> 2. introduce a HybridStreamStateHandler which try to create a local input 
> stream first, if failed, it then create a remote input stream, it prototype 
> looks like below:
> {code:java}
> class HybridStreamHandle {
>private StreamStateHandle localHandle;
>private StreamStateHandle remoteHandle;
>..
>public FSDataInputStream openInputStream() throws IOException {
> FSDataInputStream inputStream = localHandle.openInputStream();
> return inputStream != null ? inputStream : 
> remoteHandle.openInputStream();
> }
>.
> }
> {code}
> Solution:
>   There are two kind solutions I can think of.
> solution1:
>   Backend do the cached job, and the HybridStreamHandle point to both 
> local and remote data, HybridStreamHandle is managed by CheckpointCoordinator 
> as well as other StreamHandle, so CheckpointCoordinator will perform dispose 
> on it. when HybridStreamHandle performs dispose it call localHandle.dispose() 
> and remoteHandle.dispose(). In this way, we have to record TaskManager's info 
> (like location) in localHandle and add an entry in TaskManager to handle 
> localHandle dispose message, we also have to consider the HA situation.
> solution2:
>   Backend do the cached job and manage the cached data itself. It simple 
> use a TTL-like method to manage handle's dispose, we dispose a handle if it 
> wasn't be touched for a X time. We will touch the handles when we recover 
> from checkpoint or when we performs a checkpoint, once we touch a handle we 
> reset the TTL for it. In this way, all jobs is done by Backend, it 
> transparent to both JobManager and TaskManager. The only problem is that we 
> may dispose a handle that maybe useful, but even in this case, we can read 
> from remote data finally, and users can avoid this by set a proper TTL value 
> according to checkpoint interval and other things.
> Consider trying not to complicate the problem reasons, i prefer to use the 
> solution2. Can someone give me some advice? I would appreciate it very much~



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (FLINK-7844) Fine Grained Recovery triggers checkpoint timeout failure

2017-10-27 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7844?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-7844.
--
   Resolution: Fixed
Fix Version/s: 1.4.0

Fixed via 5231c9300c26895118b3277bc833536e92dcc6d1

> Fine Grained Recovery triggers checkpoint timeout failure
> -
>
> Key: FLINK-7844
> URL: https://issues.apache.org/jira/browse/FLINK-7844
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Zhenzhong Xu
>Assignee: Till Rohrmann
> Fix For: 1.4.0
>
> Attachments: screenshot-1.png
>
>
> Context: 
> We are using "individual" failover (fine-grained) recovery strategy for our 
> embarrassingly parallel router use case. The topic has over 2000 partitions, 
> and parallelism is set to ~180 that dispatched to over 20 task managers with 
> around 180 slots.
> Observations:
> We've noticed after one task manager termination, even though the individual 
> recovery happens correctly, that the workload was re-dispatched to a new 
> available task manager instance. However, the checkpoint would take 10 mins 
> to eventually timeout, causing all other task managers not able to commit 
> checkpoints. In a worst-case scenario, if job got restarted for other reasons 
> (i.e. job manager termination), that would cause more messages to be 
> re-processed/duplicates compared to the job without fine-grained recovery 
> enabled.
> I am suspecting that uber checkpoint was waiting for a previous checkpoint 
> that initiated by the old task manager and thus taking a long time to time 
> out.
> Two questions:
> 1. Is there a configuration that controls this checkpoint timeout?
> 2. Is there any reason that when Job Manager realizes that Task Manager is 
> gone and workload is redispatched, it still need to wait for the checkpoint 
> initiated by the old task manager?
> Checkpoint screenshot in attachments.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7844) Fine Grained Recovery triggers checkpoint timeout failure

2017-10-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16222726#comment-16222726
 ] 

ASF GitHub Bot commented on FLINK-7844:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4844


> Fine Grained Recovery triggers checkpoint timeout failure
> -
>
> Key: FLINK-7844
> URL: https://issues.apache.org/jira/browse/FLINK-7844
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Zhenzhong Xu
>Assignee: Zhenzhong Xu
> Fix For: 1.4.0
>
> Attachments: screenshot-1.png
>
>
> Context: 
> We are using "individual" failover (fine-grained) recovery strategy for our 
> embarrassingly parallel router use case. The topic has over 2000 partitions, 
> and parallelism is set to ~180 that dispatched to over 20 task managers with 
> around 180 slots.
> Observations:
> We've noticed after one task manager termination, even though the individual 
> recovery happens correctly, that the workload was re-dispatched to a new 
> available task manager instance. However, the checkpoint would take 10 mins 
> to eventually timeout, causing all other task managers not able to commit 
> checkpoints. In a worst-case scenario, if job got restarted for other reasons 
> (i.e. job manager termination), that would cause more messages to be 
> re-processed/duplicates compared to the job without fine-grained recovery 
> enabled.
> I am suspecting that uber checkpoint was waiting for a previous checkpoint 
> that initiated by the old task manager and thus taking a long time to time 
> out.
> Two questions:
> 1. Is there a configuration that controls this checkpoint timeout?
> 2. Is there any reason that when Job Manager realizes that Task Manager is 
> gone and workload is redispatched, it still need to wait for the checkpoint 
> initiated by the old task manager?
> Checkpoint screenshot in attachments.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7873) Introduce HybridStreamStateHandle for quick recovery from checkpoint.

2017-10-27 Thread Sihua Zhou (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7873?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sihua Zhou updated FLINK-7873:
--
Description: 
Current recover strategy will always read checkpoint data from remote 
FileStream (HDFS). This will cost a lot of bandwidth when the state is so big 
(e.g. 1T). What's worse, if this job performs recover again and again, it can 
eat up all network bandwidth and do a huge hurt to cluster. So, I proposed that 
we can cache the checkpoint data locally, and read checkpoint data from local 
cache as well as we can, we read the data from remote only if we fail locally. 
The advantage is that if a execution is assigned to the same TaskManager as 
before, it can save a lot of bandwidth, and obtain a faster recover.


Key issues:
1. Cache the checkpoint data on local disk and manage it's create and delete.
2. introduce a HybridStreamStateHandler which try to create a local input 
stream first, if failed, it then create a remote input stream, it prototype 
looks like below:
{code:java}
class HybridStreamHandle {
   private StreamStateHandle localHandle;
   private StreamStateHandle remoteHandle;
   ..
   public FSDataInputStream openInputStream() throws IOException {
FSDataInputStream inputStream = localHandle.openInputStream();
return inputStream != null ? inputStream : 
remoteHandle.openInputStream();
}
   .
}
{code}

Solution:
There are two kind solutions I can think of.

solution1:
Backend do the cached job, and the HybridStreamHandle point to both 
local and remote data, HybridStreamHandle is managed by CheckpointCoordinator 
as well as other StreamHandle, so CheckpointCoordinator will perform dispose on 
it. when HybridStreamHandle performs dispose it call localHandle.dispose() and 
remoteHandle.dispose(). In this way, we have to record TaskManager's info (like 
location) in localHandle and add an entry in TaskManager to handle localHandle 
dispose message, we also have to consider the HA situation.

solution2:
Backend do the cached job and manage the cached data itself. It simple 
use a TTL-like method to manage handle's dispose, we dispose a handle if it 
wasn't be touched for a X time. We will touch the handles when we recover from 
checkpoint or when we performs a checkpoint, once we touch a handle we reset 
the TTL for it. In this way, all jobs is done by Backend, it transparent to 
both JobManager and TaskManager. The only problem is that we may dispose a 
handle that maybe useful, but even in this case, we can read from remote data 
finally, and users can avoid this by set a proper TTL value according to 
checkpoint interval and other things.

Consider trying not to complicate the problem reasons, i prefer to use the 
solution2. Can someone give me some advice? I would appreciate it very much~

  was:
Current recover strategy will always read checkpoint data from remote 
FileStream (HDFS). This will cost a lot of bandwidth when the state is so big 
(e.g. 1T). What's worse, if this job performs recover again and again, it can 
eat up all network bandwidth and do a huge hurt to cluster. So, I proposed that 
we can cache the checkpoint data locally, and read checkpoint data from local 
cache as well as we can, we read the data from remote only if we fail locally. 
The advantage is that if a execution is assigned to the same TaskManager as 
before, it can save a lot of bandwidth, and obtain a faster recover.


Key issues:
1. Cache the checkpoint data on local disk and manage it's create and delete.
2. introduce a HybridStreamStateHandler which try to create a local input 
stream first, if failed, it then create a remote input stream, it prototype 
looks like below:
{code:java}
class HybridStreamHandle {
   private StreamStateHandle localHandle;
   private StreamStateHandle remoteHandle;
   ..
   public FSDataInputStream openInputStream() throws IOException {
FSDataInputStream inputStream = localHandle.openInputStream();
return inputStream != null ? inputStream : 
remoteHandle.openInputStream();
}
   .
}
{code}

Solution:
There are two kind solutions I can think of.

solution1:
Backend do the cached job, and the HybridStreamHandle point to both 
local and remote data, HybridStreamHandle is managed by CheckpointCoordinator 
as well as other StreamHandle, so CheckpointCoordinator will perform dispose on 
it. when HybridStreamHandle performs dispose it call localHandle.dispose() and 
remoteHandle.dispose(). In this way, we have to record TaskManager's info (like 
location) in localHandle and add an entry in TaskManager to handle localHandle 
dispose message, we also have to consider the HA situation.

solution2:
Backend do the cached job and manage the cached data itself. It simple 
use a TTL-like method to manage handle's dispose, we dispose a handle if it 
wasn't be touched for a {code}X{code} time. We 

[jira] [Assigned] (FLINK-7844) Fine Grained Recovery triggers checkpoint timeout failure

2017-10-27 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7844?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann reassigned FLINK-7844:


Assignee: Till Rohrmann  (was: Zhenzhong Xu)

> Fine Grained Recovery triggers checkpoint timeout failure
> -
>
> Key: FLINK-7844
> URL: https://issues.apache.org/jira/browse/FLINK-7844
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Zhenzhong Xu
>Assignee: Till Rohrmann
> Fix For: 1.4.0
>
> Attachments: screenshot-1.png
>
>
> Context: 
> We are using "individual" failover (fine-grained) recovery strategy for our 
> embarrassingly parallel router use case. The topic has over 2000 partitions, 
> and parallelism is set to ~180 that dispatched to over 20 task managers with 
> around 180 slots.
> Observations:
> We've noticed after one task manager termination, even though the individual 
> recovery happens correctly, that the workload was re-dispatched to a new 
> available task manager instance. However, the checkpoint would take 10 mins 
> to eventually timeout, causing all other task managers not able to commit 
> checkpoints. In a worst-case scenario, if job got restarted for other reasons 
> (i.e. job manager termination), that would cause more messages to be 
> re-processed/duplicates compared to the job without fine-grained recovery 
> enabled.
> I am suspecting that uber checkpoint was waiting for a previous checkpoint 
> that initiated by the old task manager and thus taking a long time to time 
> out.
> Two questions:
> 1. Is there a configuration that controls this checkpoint timeout?
> 2. Is there any reason that when Job Manager realizes that Task Manager is 
> gone and workload is redispatched, it still need to wait for the checkpoint 
> initiated by the old task manager?
> Checkpoint screenshot in attachments.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4844: [FLINK-7844] [ckPt] Fail unacknowledged pending ch...

2017-10-27 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4844


---


[jira] [Updated] (FLINK-7873) Introduce HybridStreamStateHandle for quick recovery from checkpoint.

2017-10-27 Thread Sihua Zhou (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7873?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sihua Zhou updated FLINK-7873:
--
Description: 
Current recover strategy will always read checkpoint data from remote 
FileStream (HDFS). This will cost a lot of bandwidth when the state is so big 
(e.g. 1T). What's worse, if this job performs recover again and again, it can 
eat up all network bandwidth and do a huge hurt to cluster. So, I proposed that 
we can cache the checkpoint data locally, and read checkpoint data from local 
cache as well as we can, we read the data from remote only if we fail locally. 
The advantage is that if a execution is assigned to the same TaskManager as 
before, it can save a lot of bandwidth, and obtain a faster recover.


Key issues:
1. Cache the checkpoint data on local disk and manage it's create and delete.
2. introduce a HybridStreamStateHandler which try to create a local input 
stream first, if failed, it then create a remote input stream, it prototype 
looks like below:
{code:java}
class HybridStreamHandle {
   private StreamStateHandle localHandle;
   private StreamStateHandle remoteHandle;
   ..
   public FSDataInputStream openInputStream() throws IOException {
FSDataInputStream inputStream = localHandle.openInputStream();
return inputStream != null ? inputStream : 
remoteHandle.openInputStream();
}
   .
}
{code}

Solution:
There are two kind solutions I can think of.

solution1:
Backend do the cached job, and the HybridStreamHandle point to both 
local and remote data, HybridStreamHandle is managed by CheckpointCoordinator 
as well as other StreamHandle, so CheckpointCoordinator will perform dispose on 
it. when HybridStreamHandle performs dispose it call localHandle.dispose() and 
remoteHandle.dispose(). In this way, we have to record TaskManager's info (like 
location) in localHandle and add an entry in TaskManager to handle localHandle 
dispose message, we also have to consider the HA situation.

solution2:
Backend do the cached job and manage the cached data itself. It simple 
use a TTL-like method to manage handle's dispose, we dispose a handle if it 
wasn't be touched for a {code}X{code} time. We will touch the handles when we 
recover from checkpoint or when we performs a checkpoint, once we touch a 
handle we reset the TTL for it. In this way, all jobs is done by Backend, it 
transparent to both JobManager and TaskManager. The only problem is that we may 
dispose a handle that maybe useful, but even in this case, we can read from 
remote data finally, and users can avoid this by set a proper TTL value 
according to checkpoint interval and other things.

Consider trying not to complicate the problem reasons, i prefer to use the 
solution2. Can someone give me some advice? I would appreciate it very much~

  was:
Current recovery strategy will always read checkpoint data from remote 
FileStream (HDFS). This will cost a lot of network when the state is so big 
(e.g. 1T), this cost can be saved by reading the checkpoint data from local 
disk. So i introduce a HybridStreamStateHandler which try to create a local 
input stream first, if failed, it then create a remote input stream, it 
prototype looks like below:
{code:java}
class HybridStreamHandle {
   private FileStateHandle localHandle;
   private FileStateHandle remoteHandle;
   ..
   public FSDataInputStream openInputStream() throws IOException {
FSDataInputStream inputStream = localHandle.openInputStream();
return inputStream != null ? inputStream : 
remoteHandle.openInputStream();
}
.
}
{code}


> Introduce HybridStreamStateHandle for quick recovery from checkpoint.
> -
>
> Key: FLINK-7873
> URL: https://issues.apache.org/jira/browse/FLINK-7873
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.2
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>
> Current recover strategy will always read checkpoint data from remote 
> FileStream (HDFS). This will cost a lot of bandwidth when the state is so big 
> (e.g. 1T). What's worse, if this job performs recover again and again, it can 
> eat up all network bandwidth and do a huge hurt to cluster. So, I proposed 
> that we can cache the checkpoint data locally, and read checkpoint data from 
> local cache as well as we can, we read the data from remote only if we fail 
> locally. The advantage is that if a execution is assigned to the same 
> TaskManager as before, it can save a lot of bandwidth, and obtain a faster 
> recover.
> Key issues:
> 1. Cache the checkpoint data on local disk and manage it's create and delete.
> 2. introduce a HybridStreamStateHandler which try to create a local input 
> stream first, if 

[jira] [Commented] (FLINK-7153) Eager Scheduling can't allocate source for ExecutionGraph correctly

2017-10-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7153?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16222717#comment-16222717
 ] 

ASF GitHub Bot commented on FLINK-7153:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/4916

[FLINK-7153] Re-introduce preferred locations for scheduling

## What is the purpose of the change

This PR makes the `TaskManagerLocation` accessible for asynchronous 
scheduling.

Due to changes for Flink 1.3 where we introduced asynchronous scheduling, 
it was not always guaranteed that the scheduler knew about the scheduling 
locations of producer tasks. Especially the eager scheduling mode was affected 
since the slot allocation happened concurrently.

In order to fix this problem, this PR adds a `TaskManagerLocationFuture` to 
each `Execution`. In eager scheduling mode, a slot will only be requested for a 
task if all its inputs have a slot assigned (e.g. their 
`TaskManagerLocationFuture` is completed). In lazy scheduling mode, we don't 
wait for the completion of all inputs, but take those inputs which are already 
known.

In order to distinguish whether we want to wait for all or take all 
available task manager locations, we add a `LocationPreferenceConstraint` which 
has the values `ALL` and `ANY`. `ALL` means that we have to wait for all inputs 
to have a location assigned, and `ANY` means that we take what's currently 
known.

In order to not deploy slots prematurely in eager mode, the slot assignment 
has been factored out into its own step. Before, one had to call 
`Execution#deployToSlot(SimpleSlot)` which assigned the given slot and started 
the deployment. Now, one has to call `Execution#tryAssignResource` before one 
can call `Execution#deploy`.

Moreover this PR fixes that the `FailoverRegions` are topologically sorted 
which is important for non queued scheduling.

FYI @StephanEwen 

## Brief change log

- Introduce `LocationPreferenceConstraint` to distinguish the waiting 
behaviour for the preferred locations
- Split slot assignment and deployment into two separate steps
- Moved preferred location calculation into the Execution to reduce code 
duplication between the `Scheduler` and the `SlotPool`
- Changed preferred location calculation to be blocking if 
`LocationPreferenceConstraint#ALL` and not all input locations are known

## Verifying this change

This change added tests and can be verified as follows:

- Added `ExecutionTest` to check the correct assigned slot release in case 
of cancellation and to check the correct preferred location calculation
- Added 
`ExecutionGraphDeploymentTest#testEagerSchedulingWaitsOnAllInputPreferredLocations`
 to check that eager scheduling waits for all inputs to be assigned before 
scheduling consumer tasks
- Moreover, the scheduler is being tested by existing tests such as 
`SchedulerSlotSharingTest`, `ScheduleWithCoLocationHintTest` and many IT cases 
for lazy scheduling (batch case)

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink fixGroupScheduling2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4916.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4916


commit 32eb1812583b84d80091d1a278d53ed663d8a065
Author: Till 
Date:   2017-10-16T12:04:13Z

[FLINK-7153] Re-introduce preferred locations for scheduling

commit 8c0c9aeaa7ca995247f2b9f9e63723e52d839a12
Author: Till Rohrmann 
Date:   2017-10-27T07:47:03Z

[FLINK-7153] Introduce LocationPreferenceConstraint for scheduling

The LocationPreferenceConstraint defines whether all or any preferred 
locations
have to be taken into consideration when scheduling tasks. Especially for 
batch
jobs where we do lazy scheduling not all input locations might be known for 
a
consumer task. Therefore, we set the location preference constraint to any 
which
means that only those location are taken into 

[GitHub] flink pull request #4916: [FLINK-7153] Re-introduce preferred locations for ...

2017-10-27 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/4916

[FLINK-7153] Re-introduce preferred locations for scheduling

## What is the purpose of the change

This PR makes the `TaskManagerLocation` accessible for asynchronous 
scheduling.

Due to changes for Flink 1.3 where we introduced asynchronous scheduling, 
it was not always guaranteed that the scheduler knew about the scheduling 
locations of producer tasks. Especially the eager scheduling mode was affected 
since the slot allocation happened concurrently.

In order to fix this problem, this PR adds a `TaskManagerLocationFuture` to 
each `Execution`. In eager scheduling mode, a slot will only be requested for a 
task if all its inputs have a slot assigned (e.g. their 
`TaskManagerLocationFuture` is completed). In lazy scheduling mode, we don't 
wait for the completion of all inputs, but take those inputs which are already 
known.

In order to distinguish whether we want to wait for all or take all 
available task manager locations, we add a `LocationPreferenceConstraint` which 
has the values `ALL` and `ANY`. `ALL` means that we have to wait for all inputs 
to have a location assigned, and `ANY` means that we take what's currently 
known.

In order to not deploy slots prematurely in eager mode, the slot assignment 
has been factored out into its own step. Before, one had to call 
`Execution#deployToSlot(SimpleSlot)` which assigned the given slot and started 
the deployment. Now, one has to call `Execution#tryAssignResource` before one 
can call `Execution#deploy`.

Moreover this PR fixes that the `FailoverRegions` are topologically sorted 
which is important for non queued scheduling.

FYI @StephanEwen 

## Brief change log

- Introduce `LocationPreferenceConstraint` to distinguish the waiting 
behaviour for the preferred locations
- Split slot assignment and deployment into two separate steps
- Moved preferred location calculation into the Execution to reduce code 
duplication between the `Scheduler` and the `SlotPool`
- Changed preferred location calculation to be blocking if 
`LocationPreferenceConstraint#ALL` and not all input locations are known

## Verifying this change

This change added tests and can be verified as follows:

- Added `ExecutionTest` to check the correct assigned slot release in case 
of cancellation and to check the correct preferred location calculation
- Added 
`ExecutionGraphDeploymentTest#testEagerSchedulingWaitsOnAllInputPreferredLocations`
 to check that eager scheduling waits for all inputs to be assigned before 
scheduling consumer tasks
- Moreover, the scheduler is being tested by existing tests such as 
`SchedulerSlotSharingTest`, `ScheduleWithCoLocationHintTest` and many IT cases 
for lazy scheduling (batch case)

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink fixGroupScheduling2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4916.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4916


commit 32eb1812583b84d80091d1a278d53ed663d8a065
Author: Till 
Date:   2017-10-16T12:04:13Z

[FLINK-7153] Re-introduce preferred locations for scheduling

commit 8c0c9aeaa7ca995247f2b9f9e63723e52d839a12
Author: Till Rohrmann 
Date:   2017-10-27T07:47:03Z

[FLINK-7153] Introduce LocationPreferenceConstraint for scheduling

The LocationPreferenceConstraint defines whether all or any preferred 
locations
have to be taken into consideration when scheduling tasks. Especially for 
batch
jobs where we do lazy scheduling not all input locations might be known for 
a
consumer task. Therefore, we set the location preference constraint to any 
which
means that only those location are taken into consideration which are known 
at
scheduling time.

commit c821e67529deaaed96f183fc22bc0a9fe246fa23
Author: Till Rohrmann 
Date:   2017-10-26T16:22:43Z

[hotfix] Make failover region topological sorted

commit 

[jira] [Closed] (FLINK-7924) Fix incorrect names of checkpoint options

2017-10-27 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7924?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-7924.
---

> Fix incorrect names of checkpoint options
> -
>
> Key: FLINK-7924
> URL: https://issues.apache.org/jira/browse/FLINK-7924
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.4.0
>
>
> Checkpoint options are incorrectly always called 'FULL_CHECKPOINT' when 
> actually,
> the checkpoints may always be incremental and only savepoints have to be full
> and self contained.
> Initially, we planned to add options for multiple checkpoints, like 
> checkpoints
> that were foreced to be full, and checkpoints that were incremental. That 
> is not necessary at this point.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (FLINK-7924) Fix incorrect names of checkpoint options

2017-10-27 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7924?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-7924.
-
Resolution: Fixed

Fixed in fe3b276818eec1d4a70632a45343d70dc2be53f3

> Fix incorrect names of checkpoint options
> -
>
> Key: FLINK-7924
> URL: https://issues.apache.org/jira/browse/FLINK-7924
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.4.0
>
>
> Checkpoint options are incorrectly always called 'FULL_CHECKPOINT' when 
> actually,
> the checkpoints may always be incremental and only savepoints have to be full
> and self contained.
> Initially, we planned to add options for multiple checkpoints, like 
> checkpoints
> that were foreced to be full, and checkpoints that were incremental. That 
> is not necessary at this point.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7765) Enable dependency convergence

2017-10-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16222619#comment-16222619
 ] 

ASF GitHub Bot commented on FLINK-7765:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/4777
  
Looks good - still the open question whether we add dependency convergence 
by default, and deactivate it in not yet done modules. That gives the completed 
modules a "done" feeling and king of doubles as a "todo" for the remaining 
modules.


> Enable dependency convergence
> -
>
> Key: FLINK-7765
> URL: https://issues.apache.org/jira/browse/FLINK-7765
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> For motivation check https://issues.apache.org/jira/browse/FLINK-7739
> SubTasks of this task depends on one another - to enable convergence in 
> `flink-runtime` it has to be enabled for `flink-shaded-hadoop` first.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4777: [FLINK-7765] Enable dependency convergence

2017-10-27 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/4777
  
Looks good - still the open question whether we add dependency convergence 
by default, and deactivate it in not yet done modules. That gives the completed 
modules a "done" feeling and king of doubles as a "todo" for the remaining 
modules.


---


[GitHub] flink issue #4914: [hotfix] [docs] Fix typos in types serialization document...

2017-10-27 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/4914
  
Good fix, merging...


---


[GitHub] flink issue #4913: [hotfix] [javadoc] Fix typo in Javadoc of ManagedSnapshot...

2017-10-27 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/4913
  
Thanks, good catch!
Merging...


---


[GitHub] flink issue #4912: [hotfix] [docs] Fix broken downloads page url

2017-10-27 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/4912
  
Thanks, merging...


---


[jira] [Commented] (FLINK-7838) Kafka011ProducerExactlyOnceITCase do not finish

2017-10-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16222551#comment-16222551
 ] 

ASF GitHub Bot commented on FLINK-7838:
---

Github user GJL commented on the issue:

https://github.com/apache/flink/pull/4915
  
 


> Kafka011ProducerExactlyOnceITCase do not finish
> ---
>
> Key: FLINK-7838
> URL: https://issues.apache.org/jira/browse/FLINK-7838
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Blocker
> Fix For: 1.4.0
>
> Attachments: initTransactions_deadlock.txt, log.txt
>
>
> See attached log



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4915: [FLINK-7838] Bunch of hotfixes and fix missing synchroniz...

2017-10-27 Thread GJL
Github user GJL commented on the issue:

https://github.com/apache/flink/pull/4915
  
👍 


---


[jira] [Commented] (FLINK-7838) Kafka011ProducerExactlyOnceITCase do not finish

2017-10-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16222550#comment-16222550
 ] 

ASF GitHub Bot commented on FLINK-7838:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4915#discussion_r147442314
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
 ---
@@ -563,7 +553,7 @@ public void close() throws Exception {
asyncException = ExceptionUtils.firstOrSuppressed(e, 
asyncException);
}
try {
-   producersPool.close();
+   producersPool.ifPresent(pool -> pool.close());
--- End diff --

I am not adamant about it but using `Optional` in private fields is not 
without controversy: https://stackoverflow.com/a/26328555

Also, `ifPresent(pool -> pool.close()` only works because `close` does not 
declare any checked exceptions. If it did, the code would not compile.


> Kafka011ProducerExactlyOnceITCase do not finish
> ---
>
> Key: FLINK-7838
> URL: https://issues.apache.org/jira/browse/FLINK-7838
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Blocker
> Fix For: 1.4.0
>
> Attachments: initTransactions_deadlock.txt, log.txt
>
>
> See attached log



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4915: [FLINK-7838] Bunch of hotfixes and fix missing syn...

2017-10-27 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4915#discussion_r147442314
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
 ---
@@ -563,7 +553,7 @@ public void close() throws Exception {
asyncException = ExceptionUtils.firstOrSuppressed(e, 
asyncException);
}
try {
-   producersPool.close();
+   producersPool.ifPresent(pool -> pool.close());
--- End diff --

I am not adamant about it but using `Optional` in private fields is not 
without controversy: https://stackoverflow.com/a/26328555

Also, `ifPresent(pool -> pool.close()` only works because `close` does not 
declare any checked exceptions. If it did, the code would not compile.


---


[jira] [Commented] (FLINK-7838) Kafka011ProducerExactlyOnceITCase do not finish

2017-10-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16222522#comment-16222522
 ] 

ASF GitHub Bot commented on FLINK-7838:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4915#discussion_r147436687
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
 ---
@@ -483,11 +478,6 @@ public void setLogFailuresOnly(boolean 
logFailuresOnly) {
 */
@Override
public void open(Configuration configuration) throws Exception {
-   if (semantic != Semantic.NONE && !((StreamingRuntimeContext) 
this.getRuntimeContext()).isCheckpointingEnabled()) {
--- End diff --

What is the benefit of moving this into `initializeState()`?


> Kafka011ProducerExactlyOnceITCase do not finish
> ---
>
> Key: FLINK-7838
> URL: https://issues.apache.org/jira/browse/FLINK-7838
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Blocker
> Fix For: 1.4.0
>
> Attachments: initTransactions_deadlock.txt, log.txt
>
>
> See attached log



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4915: [FLINK-7838] Bunch of hotfixes and fix missing syn...

2017-10-27 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4915#discussion_r147436687
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
 ---
@@ -483,11 +478,6 @@ public void setLogFailuresOnly(boolean 
logFailuresOnly) {
 */
@Override
public void open(Configuration configuration) throws Exception {
-   if (semantic != Semantic.NONE && !((StreamingRuntimeContext) 
this.getRuntimeContext()).isCheckpointingEnabled()) {
--- End diff --

What is the benefit of moving this into `initializeState()`?


---


[GitHub] flink issue #4894: [FLINK-7548] [table] Improve rowtime support of TableSour...

2017-10-27 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/4894
  
Added documentation for the new TableSource interfaces.


---


[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource

2017-10-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16222519#comment-16222519
 ] 

ASF GitHub Bot commented on FLINK-7548:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/4894
  
Added documentation for the new TableSource interfaces.


> Support watermark generation for TableSource
> 
>
> Key: FLINK-7548
> URL: https://issues.apache.org/jira/browse/FLINK-7548
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Fabian Hueske
>Priority: Blocker
> Fix For: 1.4.0
>
>
> As discussed in FLINK-7446, currently the TableSource only support to define 
> rowtime field, but not support to extract watermarks from the rowtime field. 
> We can provide a new interface called {{DefinedWatermark}}, which has two 
> methods {{getRowtimeAttribute}} (can only be an existing field) and 
> {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked 
> deprecated.
> How to support periodic and punctuated watermarks and support some built-in 
> strategies needs further discussion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7937) Add pagination to Flink History view

2017-10-27 Thread Andrew Roberts (JIRA)
Andrew Roberts created FLINK-7937:
-

 Summary: Add pagination to Flink History view
 Key: FLINK-7937
 URL: https://issues.apache.org/jira/browse/FLINK-7937
 Project: Flink
  Issue Type: Improvement
  Components: History Server, JobManager
Reporter: Andrew Roberts


We have enough historical jobs that the browser chokes when trying to render 
them all on one page. The history server should have pagination added, so it's 
only trying to render some small subset at a time.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7838) Kafka011ProducerExactlyOnceITCase do not finish

2017-10-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16222494#comment-16222494
 ] 

ASF GitHub Bot commented on FLINK-7838:
---

Github user pnowojski commented on the issue:

https://github.com/apache/flink/pull/4915
  
@GJL @aljoscha could you take a look?


> Kafka011ProducerExactlyOnceITCase do not finish
> ---
>
> Key: FLINK-7838
> URL: https://issues.apache.org/jira/browse/FLINK-7838
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Blocker
> Fix For: 1.4.0
>
> Attachments: initTransactions_deadlock.txt, log.txt
>
>
> See attached log



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4915: [FLINK-7838] Bunch of hotfixes and fix missing synchroniz...

2017-10-27 Thread pnowojski
Github user pnowojski commented on the issue:

https://github.com/apache/flink/pull/4915
  
@GJL @aljoscha could you take a look?


---


[jira] [Commented] (FLINK-7838) Kafka011ProducerExactlyOnceITCase do not finish

2017-10-27 Thread Piotr Nowojski (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16222450#comment-16222450
 ] 

Piotr Nowojski commented on FLINK-7838:
---

The main problem in this issue is this bug in Kafka:
https://issues.apache.org/jira/browse/KAFKA-6132

[~aljoscha] I'm not sure what we should do now in that case. I tried bumping 
our connector to work with Kafka 1.0.0-rc3 and this dead lock error was gone, 
however the upgrade broke the consumer side of our connector.

> Kafka011ProducerExactlyOnceITCase do not finish
> ---
>
> Key: FLINK-7838
> URL: https://issues.apache.org/jira/browse/FLINK-7838
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Blocker
> Fix For: 1.4.0
>
> Attachments: initTransactions_deadlock.txt, log.txt
>
>
> See attached log



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7838) Kafka011ProducerExactlyOnceITCase do not finish

2017-10-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16222448#comment-16222448
 ] 

ASF GitHub Bot commented on FLINK-7838:
---

GitHub user pnowojski opened a pull request:

https://github.com/apache/flink/pull/4915

[FLINK-7838] Bunch of hotfixes and fix missing synchronization in 
FlinkKafkaProducer011

## What is the purpose of the change

Most important is the commit adding missing synchronization, that might 
been the cause for some deadlocks on travis. Others are just non critical 
hotfixes.

## Brief change log

Please check individual commit messages.

## Verifying this change

This change is already covered by existing Kafka 0.11 connector tests.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (**yes** / no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/pnowojski/flink f7838

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4915.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4915


commit 4e0492595f497a49c63b8ffddcc66e720e4e4433
Author: Piotr Nowojski 
Date:   2017-10-24T15:35:56Z

[hotfix][kafka] Bump Kafka 0.11 dependency

This might include some bugfixes

commit e38b3461bc97a175bf67f1072b2e8a2a891c1f1a
Author: Piotr Nowojski 
Date:   2017-10-24T15:57:05Z

[FLINK-7838][kafka] Add missing synchronization in FlinkKafkaProducer

commit 04127b9c44807f5379e07d801847d993c39e94b1
Author: Piotr Nowojski 
Date:   2017-10-26T08:02:15Z

[hotfix][kafka] Fix FlinkKafkaProducer011 logger

commit 8b47ac214c4022563be8128e84bc02d5de98819c
Author: Piotr Nowojski 
Date:   2017-10-27T13:11:24Z

[hotfix][kafka-tests] Fix test names so that they are not ignored by mvn 
build

commit a6c4c8bbdbfc5c238557e151fa8598e71a562411
Author: Piotr Nowojski 
Date:   2017-10-25T16:08:46Z

[hotfix][kafka] Move checkpointing enable checking to initializeState

initializeState is called before open and since both of those functions
relay on chosen semantic, that means checkpointing enable check should
happen in initializeState.

commit 055e5d125df895fd010e1171d1d39f37177518a2
Author: Piotr Nowojski 
Date:   2017-10-27T13:14:58Z

[hotfix][kafka] Remove unsued field in FlinkKafkaProducer011

commit 6cf55ed8977135af01099452962962199e253348
Author: Piotr Nowojski 
Date:   2017-10-27T13:47:26Z

[hotfix][kafka] Do not return producers to a pool in abort for non 
EXACTLY_ONCE modes

Previously on abort(...) producers were returned to the pool. This was 
minor bug,
probably without any negative side effect, however this patch fixes it
and adds additional sanity checks to guard against similar bugs
in the future.




> Kafka011ProducerExactlyOnceITCase do not finish
> ---
>
> Key: FLINK-7838
> URL: https://issues.apache.org/jira/browse/FLINK-7838
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Blocker
> Fix For: 1.4.0
>
> Attachments: initTransactions_deadlock.txt, log.txt
>
>
> See attached log



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4915: [FLINK-7838] Bunch of hotfixes and fix missing syn...

2017-10-27 Thread pnowojski
GitHub user pnowojski opened a pull request:

https://github.com/apache/flink/pull/4915

[FLINK-7838] Bunch of hotfixes and fix missing synchronization in 
FlinkKafkaProducer011

## What is the purpose of the change

Most important is the commit adding missing synchronization, that might 
been the cause for some deadlocks on travis. Others are just non critical 
hotfixes.

## Brief change log

Please check individual commit messages.

## Verifying this change

This change is already covered by existing Kafka 0.11 connector tests.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (**yes** / no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/pnowojski/flink f7838

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4915.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4915


commit 4e0492595f497a49c63b8ffddcc66e720e4e4433
Author: Piotr Nowojski 
Date:   2017-10-24T15:35:56Z

[hotfix][kafka] Bump Kafka 0.11 dependency

This might include some bugfixes

commit e38b3461bc97a175bf67f1072b2e8a2a891c1f1a
Author: Piotr Nowojski 
Date:   2017-10-24T15:57:05Z

[FLINK-7838][kafka] Add missing synchronization in FlinkKafkaProducer

commit 04127b9c44807f5379e07d801847d993c39e94b1
Author: Piotr Nowojski 
Date:   2017-10-26T08:02:15Z

[hotfix][kafka] Fix FlinkKafkaProducer011 logger

commit 8b47ac214c4022563be8128e84bc02d5de98819c
Author: Piotr Nowojski 
Date:   2017-10-27T13:11:24Z

[hotfix][kafka-tests] Fix test names so that they are not ignored by mvn 
build

commit a6c4c8bbdbfc5c238557e151fa8598e71a562411
Author: Piotr Nowojski 
Date:   2017-10-25T16:08:46Z

[hotfix][kafka] Move checkpointing enable checking to initializeState

initializeState is called before open and since both of those functions
relay on chosen semantic, that means checkpointing enable check should
happen in initializeState.

commit 055e5d125df895fd010e1171d1d39f37177518a2
Author: Piotr Nowojski 
Date:   2017-10-27T13:14:58Z

[hotfix][kafka] Remove unsued field in FlinkKafkaProducer011

commit 6cf55ed8977135af01099452962962199e253348
Author: Piotr Nowojski 
Date:   2017-10-27T13:47:26Z

[hotfix][kafka] Do not return producers to a pool in abort for non 
EXACTLY_ONCE modes

Previously on abort(...) producers were returned to the pool. This was 
minor bug,
probably without any negative side effect, however this patch fixes it
and adds additional sanity checks to guard against similar bugs
in the future.




---


[jira] [Commented] (FLINK-7737) On HCFS systems, FSDataOutputStream does not issue hsync only hflush which leads to data loss

2017-10-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7737?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16222429#comment-16222429
 ] 

ASF GitHub Bot commented on FLINK-7737:
---

Github user pnowojski commented on the issue:

https://github.com/apache/flink/pull/4876
  
Yes I know what you mean. However including `outStream` to `hashCode` and 
`equals` wouldn't add any quirks.


> On HCFS systems, FSDataOutputStream does not issue hsync only hflush which 
> leads to data loss
> -
>
> Key: FLINK-7737
> URL: https://issues.apache.org/jira/browse/FLINK-7737
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.3.2
> Environment: Dev
>Reporter: Ryan Hobbs
>Assignee: Piotr Nowojski
>Priority: Blocker
> Fix For: 1.4.0
>
>
> During several tests where we simulated failure conditions, we have observed 
> that on HCFS systems where the data stream is of type FSDataOutputStream, 
> Flink will issue hflush() and not hsync() which results in data loss.
> In the class *StreamWriterBase.java* the code below will execute hsync if the 
> output stream is of type *HdfsDataOutputStream* but not for streams of type 
> *FSDataOutputStream*.  Is this by design?
> {code}
> protected void hflushOrSync(FSDataOutputStream os) throws IOException {
> try {
> // At this point the refHflushOrSync cannot be null,
> // since register method would have thrown if it was.
> this.refHflushOrSync.invoke(os);
> if (os instanceof HdfsDataOutputStream) {
>   ((HdfsDataOutputStream) 
> os).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
>   }
>   } catch (InvocationTargetException e) {
> String msg = "Error while trying to hflushOrSync!";
> LOG.error(msg + " " + e.getCause());
> Throwable cause = e.getCause();
> if (cause != null && cause instanceof IOException) {
> throw (IOException) cause;
>   }
> throw new RuntimeException(msg, e);
>   } catch (Exception e) {
> String msg = "Error while trying to hflushOrSync!";
> LOG.error(msg + " " + e);
> throw new RuntimeException(msg, e);
>   }
>   }
> {code}
> Could a potential fix me to perform a sync even on streams of type 
> *FSDataOutputStream*?
> {code}
>  if (os instanceof HdfsDataOutputStream) {
> ((HdfsDataOutputStream) 
> os).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
> } else if (os instanceof FSDataOutputStream) {
> os.hsync();
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4876: [FLINK-7737][filesystem] Add syncOnFlush flag to StreamWr...

2017-10-27 Thread pnowojski
Github user pnowojski commented on the issue:

https://github.com/apache/flink/pull/4876
  
Yes I know what you mean. However including `outStream` to `hashCode` and 
`equals` wouldn't add any quirks.


---


[jira] [Commented] (FLINK-7876) Register TaskManagerMetricGroup under ResourceID instead of InstanceID

2017-10-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16222427#comment-16222427
 ] 

ASF GitHub Bot commented on FLINK-7876:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4872
  
This PR does not strictly depend on the `MetricFetcher` changes. The reason 
why they are is simply that I directly wanted to adapt the `MetricFetcher` 
which I touched with #4852. It just turned out later that I would be fixing 
FLINK-7100 with this PR as well.

I actually think that #4852 can also be merged into the release branch. 
However, if you insist, then I can try to disentangle this PR from the 
`MetricFetcher` changes.


> Register TaskManagerMetricGroup under ResourceID instead of InstanceID
> --
>
> Key: FLINK-7876
> URL: https://issues.apache.org/jira/browse/FLINK-7876
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: flip-6
>
> Currently, the {{TaskManager}} registers the {{TaskManagerMetricGroup}} under 
> its {{InstanceID}} and thereby binding its metrics effectively to the 
> lifetime of its registration with the {{JobManager}}. This has also 
> implications how the REST handler retrieve the TaskManager metrics, namely by 
> its {{InstanceID}}.
> I would actually propose to register the {{TaskManagerMetricGroup}} under the 
> {{TaskManager}}/{{TaskExecutor}} {{ResourceID}} which is valid over the whole 
> lifetime of the {{TaskManager}}/{{TaskExecutor}}. That way we would also be 
> able to query metrics independent of the connection status to the 
> {{JobManager}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4872: [FLINK-7876] Register TaskManagerMetricGroup under Resour...

2017-10-27 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4872
  
This PR does not strictly depend on the `MetricFetcher` changes. The reason 
why they are is simply that I directly wanted to adapt the `MetricFetcher` 
which I touched with #4852. It just turned out later that I would be fixing 
FLINK-7100 with this PR as well.

I actually think that #4852 can also be merged into the release branch. 
However, if you insist, then I can try to disentangle this PR from the 
`MetricFetcher` changes.


---


[jira] [Assigned] (FLINK-7051) Bump up Calcite version to 1.14

2017-10-27 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7051?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske reassigned FLINK-7051:


Assignee: Fabian Hueske  (was: Haohui Mai)

> Bump up Calcite version to 1.14
> ---
>
> Key: FLINK-7051
> URL: https://issues.apache.org/jira/browse/FLINK-7051
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Fabian Hueske
>Priority: Critical
> Fix For: 1.4.0
>
>
> This is an umbrella issue for all tasks that need to be done once Apache 
> Calcite 1.14 is released.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-7051) Bump up Calcite version to 1.14

2017-10-27 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7051?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske reassigned FLINK-7051:


Assignee: Haohui Mai  (was: Fabian Hueske)

> Bump up Calcite version to 1.14
> ---
>
> Key: FLINK-7051
> URL: https://issues.apache.org/jira/browse/FLINK-7051
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Haohui Mai
>Priority: Critical
> Fix For: 1.4.0
>
>
> This is an umbrella issue for all tasks that need to be done once Apache 
> Calcite 1.14 is released.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7876) Register TaskManagerMetricGroup under ResourceID instead of InstanceID

2017-10-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16222399#comment-16222399
 ] 

ASF GitHub Bot commented on FLINK-7876:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4872#discussion_r147416486
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java
 ---
@@ -239,7 +239,15 @@ public void shutdown() {
 
if (queryService != null) {
stopTimeout = new FiniteDuration(1L, 
TimeUnit.SECONDS);
-   stopFuture = 
Patterns.gracefulStop(queryService, stopTimeout);
+
+   try {
+   stopFuture = 
Patterns.gracefulStop(queryService, stopTimeout);
+   } catch (IllegalStateException ignored) {
+   // this can happen if the underlying 
actor system has been stopped before shutting
+   // the metric registry down
+   // TODO: Pull the MetricQueryService 
actor out of the MetricRegistry
+   LOG.debug("Cannot gracefully stop the 
metric query service actor.");
--- End diff --

Giving it a second look, I deliberately did not include the exception, 
because it can only happen if the `ActorSystem` has been shut down before. I'll 
change the debug log message instead.


> Register TaskManagerMetricGroup under ResourceID instead of InstanceID
> --
>
> Key: FLINK-7876
> URL: https://issues.apache.org/jira/browse/FLINK-7876
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: flip-6
>
> Currently, the {{TaskManager}} registers the {{TaskManagerMetricGroup}} under 
> its {{InstanceID}} and thereby binding its metrics effectively to the 
> lifetime of its registration with the {{JobManager}}. This has also 
> implications how the REST handler retrieve the TaskManager metrics, namely by 
> its {{InstanceID}}.
> I would actually propose to register the {{TaskManagerMetricGroup}} under the 
> {{TaskManager}}/{{TaskExecutor}} {{ResourceID}} which is valid over the whole 
> lifetime of the {{TaskManager}}/{{TaskExecutor}}. That way we would also be 
> able to query metrics independent of the connection status to the 
> {{JobManager}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4872: [FLINK-7876] Register TaskManagerMetricGroup under...

2017-10-27 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4872#discussion_r147416486
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java
 ---
@@ -239,7 +239,15 @@ public void shutdown() {
 
if (queryService != null) {
stopTimeout = new FiniteDuration(1L, 
TimeUnit.SECONDS);
-   stopFuture = 
Patterns.gracefulStop(queryService, stopTimeout);
+
+   try {
+   stopFuture = 
Patterns.gracefulStop(queryService, stopTimeout);
+   } catch (IllegalStateException ignored) {
+   // this can happen if the underlying 
actor system has been stopped before shutting
+   // the metric registry down
+   // TODO: Pull the MetricQueryService 
actor out of the MetricRegistry
+   LOG.debug("Cannot gracefully stop the 
metric query service actor.");
--- End diff --

Giving it a second look, I deliberately did not include the exception, 
because it can only happen if the `ActorSystem` has been shut down before. I'll 
change the debug log message instead.


---


[jira] [Commented] (FLINK-7876) Register TaskManagerMetricGroup under ResourceID instead of InstanceID

2017-10-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16222395#comment-16222395
 ] 

ASF GitHub Bot commented on FLINK-7876:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4872#discussion_r147415854
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java
 ---
@@ -239,7 +239,15 @@ public void shutdown() {
 
if (queryService != null) {
stopTimeout = new FiniteDuration(1L, 
TimeUnit.SECONDS);
-   stopFuture = 
Patterns.gracefulStop(queryService, stopTimeout);
+
+   try {
+   stopFuture = 
Patterns.gracefulStop(queryService, stopTimeout);
+   } catch (IllegalStateException ignored) {
+   // this can happen if the underlying 
actor system has been stopped before shutting
+   // the metric registry down
+   // TODO: Pull the MetricQueryService 
actor out of the MetricRegistry
+   LOG.debug("Cannot gracefully stop the 
metric query service actor.");
--- End diff --

good point


> Register TaskManagerMetricGroup under ResourceID instead of InstanceID
> --
>
> Key: FLINK-7876
> URL: https://issues.apache.org/jira/browse/FLINK-7876
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: flip-6
>
> Currently, the {{TaskManager}} registers the {{TaskManagerMetricGroup}} under 
> its {{InstanceID}} and thereby binding its metrics effectively to the 
> lifetime of its registration with the {{JobManager}}. This has also 
> implications how the REST handler retrieve the TaskManager metrics, namely by 
> its {{InstanceID}}.
> I would actually propose to register the {{TaskManagerMetricGroup}} under the 
> {{TaskManager}}/{{TaskExecutor}} {{ResourceID}} which is valid over the whole 
> lifetime of the {{TaskManager}}/{{TaskExecutor}}. That way we would also be 
> able to query metrics independent of the connection status to the 
> {{JobManager}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4872: [FLINK-7876] Register TaskManagerMetricGroup under...

2017-10-27 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4872#discussion_r147415854
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java
 ---
@@ -239,7 +239,15 @@ public void shutdown() {
 
if (queryService != null) {
stopTimeout = new FiniteDuration(1L, 
TimeUnit.SECONDS);
-   stopFuture = 
Patterns.gracefulStop(queryService, stopTimeout);
+
+   try {
+   stopFuture = 
Patterns.gracefulStop(queryService, stopTimeout);
+   } catch (IllegalStateException ignored) {
+   // this can happen if the underlying 
actor system has been stopped before shutting
+   // the metric registry down
+   // TODO: Pull the MetricQueryService 
actor out of the MetricRegistry
+   LOG.debug("Cannot gracefully stop the 
metric query service actor.");
--- End diff --

good point


---


[jira] [Commented] (FLINK-7876) Register TaskManagerMetricGroup under ResourceID instead of InstanceID

2017-10-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16222394#comment-16222394
 ] 

ASF GitHub Bot commented on FLINK-7876:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4872#discussion_r147415810
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java
 ---
@@ -113,7 +113,7 @@ protected void startClusterComponents(Configuration 
configuration, RpcService rp
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
-   MetricRegistry metricRegistry,
+   MetricRegistryImpl metricRegistry,
--- End diff --

You're right. I'll go over the different places again and try to fix it.


> Register TaskManagerMetricGroup under ResourceID instead of InstanceID
> --
>
> Key: FLINK-7876
> URL: https://issues.apache.org/jira/browse/FLINK-7876
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: flip-6
>
> Currently, the {{TaskManager}} registers the {{TaskManagerMetricGroup}} under 
> its {{InstanceID}} and thereby binding its metrics effectively to the 
> lifetime of its registration with the {{JobManager}}. This has also 
> implications how the REST handler retrieve the TaskManager metrics, namely by 
> its {{InstanceID}}.
> I would actually propose to register the {{TaskManagerMetricGroup}} under the 
> {{TaskManager}}/{{TaskExecutor}} {{ResourceID}} which is valid over the whole 
> lifetime of the {{TaskManager}}/{{TaskExecutor}}. That way we would also be 
> able to query metrics independent of the connection status to the 
> {{JobManager}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4872: [FLINK-7876] Register TaskManagerMetricGroup under...

2017-10-27 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4872#discussion_r147415810
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java
 ---
@@ -113,7 +113,7 @@ protected void startClusterComponents(Configuration 
configuration, RpcService rp
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
-   MetricRegistry metricRegistry,
+   MetricRegistryImpl metricRegistry,
--- End diff --

You're right. I'll go over the different places again and try to fix it.


---


[jira] [Commented] (FLINK-7844) Fine Grained Recovery triggers checkpoint timeout failure

2017-10-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16222388#comment-16222388
 ] 

ASF GitHub Bot commented on FLINK-7844:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4844
  
Thanks for the review @StephanEwen. I will address your comments and then 
merge this PR.


> Fine Grained Recovery triggers checkpoint timeout failure
> -
>
> Key: FLINK-7844
> URL: https://issues.apache.org/jira/browse/FLINK-7844
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Zhenzhong Xu
>Assignee: Zhenzhong Xu
> Attachments: screenshot-1.png
>
>
> Context: 
> We are using "individual" failover (fine-grained) recovery strategy for our 
> embarrassingly parallel router use case. The topic has over 2000 partitions, 
> and parallelism is set to ~180 that dispatched to over 20 task managers with 
> around 180 slots.
> Observations:
> We've noticed after one task manager termination, even though the individual 
> recovery happens correctly, that the workload was re-dispatched to a new 
> available task manager instance. However, the checkpoint would take 10 mins 
> to eventually timeout, causing all other task managers not able to commit 
> checkpoints. In a worst-case scenario, if job got restarted for other reasons 
> (i.e. job manager termination), that would cause more messages to be 
> re-processed/duplicates compared to the job without fine-grained recovery 
> enabled.
> I am suspecting that uber checkpoint was waiting for a previous checkpoint 
> that initiated by the old task manager and thus taking a long time to time 
> out.
> Two questions:
> 1. Is there a configuration that controls this checkpoint timeout?
> 2. Is there any reason that when Job Manager realizes that Task Manager is 
> gone and workload is redispatched, it still need to wait for the checkpoint 
> initiated by the old task manager?
> Checkpoint screenshot in attachments.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7844) Fine Grained Recovery triggers checkpoint timeout failure

2017-10-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16222387#comment-16222387
 ] 

ASF GitHub Bot commented on FLINK-7844:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4844#discussion_r147415027
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ---
@@ -1270,6 +1272,42 @@ public void run() {
}
 
/**
+* Discards the given pending checkpoint because of the given cause.
+*
+* @param pendingCheckpoint to discard
+* @param cause for discarding the checkpoint
+*/
+   private void discardCheckpoint(PendingCheckpoint pendingCheckpoint, 
@Nullable Throwable cause) {
+   Thread.holdsLock(lock);
--- End diff --

Yes definitely.


> Fine Grained Recovery triggers checkpoint timeout failure
> -
>
> Key: FLINK-7844
> URL: https://issues.apache.org/jira/browse/FLINK-7844
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Zhenzhong Xu
>Assignee: Zhenzhong Xu
> Attachments: screenshot-1.png
>
>
> Context: 
> We are using "individual" failover (fine-grained) recovery strategy for our 
> embarrassingly parallel router use case. The topic has over 2000 partitions, 
> and parallelism is set to ~180 that dispatched to over 20 task managers with 
> around 180 slots.
> Observations:
> We've noticed after one task manager termination, even though the individual 
> recovery happens correctly, that the workload was re-dispatched to a new 
> available task manager instance. However, the checkpoint would take 10 mins 
> to eventually timeout, causing all other task managers not able to commit 
> checkpoints. In a worst-case scenario, if job got restarted for other reasons 
> (i.e. job manager termination), that would cause more messages to be 
> re-processed/duplicates compared to the job without fine-grained recovery 
> enabled.
> I am suspecting that uber checkpoint was waiting for a previous checkpoint 
> that initiated by the old task manager and thus taking a long time to time 
> out.
> Two questions:
> 1. Is there a configuration that controls this checkpoint timeout?
> 2. Is there any reason that when Job Manager realizes that Task Manager is 
> gone and workload is redispatched, it still need to wait for the checkpoint 
> initiated by the old task manager?
> Checkpoint screenshot in attachments.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4844: [FLINK-7844] [ckPt] Fail unacknowledged pending checkpoin...

2017-10-27 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4844
  
Thanks for the review @StephanEwen. I will address your comments and then 
merge this PR.


---


[GitHub] flink pull request #4844: [FLINK-7844] [ckPt] Fail unacknowledged pending ch...

2017-10-27 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4844#discussion_r147415027
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ---
@@ -1270,6 +1272,42 @@ public void run() {
}
 
/**
+* Discards the given pending checkpoint because of the given cause.
+*
+* @param pendingCheckpoint to discard
+* @param cause for discarding the checkpoint
+*/
+   private void discardCheckpoint(PendingCheckpoint pendingCheckpoint, 
@Nullable Throwable cause) {
+   Thread.holdsLock(lock);
--- End diff --

Yes definitely.


---


[jira] [Commented] (FLINK-7705) Port JobDetailsHandler to new REST endpoint

2017-10-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16222384#comment-16222384
 ] 

ASF GitHub Bot commented on FLINK-7705:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4884
  
Hi @zjureel, I'll merge this PR once we forked off the 1.4 release branch.


> Port JobDetailsHandler to new REST endpoint
> ---
>
> Key: FLINK-7705
> URL: https://issues.apache.org/jira/browse/FLINK-7705
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Port existing {{JobDetailsHandler}} to new REST endpoint.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4884: [FLINK-7705] Add JobDetailsHandler

2017-10-27 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4884
  
Hi @zjureel, I'll merge this PR once we forked off the 1.4 release branch.


---


[jira] [Commented] (FLINK-7800) Enable window joins without equi-join predicates

2017-10-27 Thread Xingcan Cui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16222325#comment-16222325
 ] 

Xingcan Cui commented on FLINK-7800:


Hi [~fhueske], the problem is not as easy as I expected. The key point is that 
once we remove the equi-predicate check in {{FlinkLogicalJoin}}, there will be 
different query plans in the optimization phase. For instance, given the 
following test expressions:
{code}
val joinT = ds1.join(ds2).filter('b === 'h + 1 && 'a - 1 === 'd + 2).select('c, 
'g)
{code}
two kinds of plans will become the candidates: with or without equi-predicates 
in {{LogicalJoin}}. Worse still, the plans without equi-predicates may have a 
lower cost (mainly in terms of IO for the DataSet join), thus be selected as 
the result.

To solve this, we must propose a mechanism to ensure that the plans with 
equi-predicates should always be preferred, regardless of their costs. Maybe 
that can be implemented by adding a "punishment factor" to plans without 
equi-predicates (or "enhancement factor" vice versa), but I am concerned 
whether this may break the existing cost model. Do you have some ideas about 
that?

Best, Xingcan

> Enable window joins without equi-join predicates
> 
>
> Key: FLINK-7800
> URL: https://issues.apache.org/jira/browse/FLINK-7800
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Fabian Hueske
>Assignee: Xingcan Cui
>
> Currently, windowed joins can only be translated if they have at least on 
> equi-join predicate. This limitation exists due to the lack of a good cross 
> join strategy for the DataSet API.
> Due to the window, windowed joins do not have to be executed as cross joins. 
> Hence, the equi-join limitation does not need to be enforces (even though 
> non-equi joins are executed with a parallelism of 1 right now).
> We can resolve this issue by adding a boolean flag to the 
> {{FlinkLogicalJoinConverter}} rule to permit non-equi joins and add such a 
> rule to the logical optimization set of the DataStream API.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4914: [hotfix] [docs] Fix typos in types serialization d...

2017-10-27 Thread GJL
GitHub user GJL opened a pull request:

https://github.com/apache/flink/pull/4914

[hotfix] [docs] Fix typos in types serialization documentation

This fixes typos in the types serialization documentation.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/GJL/flink hotfix-docs-types-serilization

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4914.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4914


commit 7e7e5defc7e52ede0c3331a5d30e9228b1f87eaa
Author: gyao 
Date:   2017-10-27T12:55:39Z

[hotfix] [docs] Fix typos in types serialization documentation




---


[GitHub] flink pull request #4913: [hotfix] [javadoc] Fix typo in Javadoc of ManagedS...

2017-10-27 Thread GJL
GitHub user GJL opened a pull request:

https://github.com/apache/flink/pull/4913

[hotfix] [javadoc] Fix typo in Javadoc of 
ManagedSnapshotContext#getCheckpointId()

This fixes a typo in the Javadoc of 
ManagedSnapshotContext#getCheckpointId().

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/GJL/flink 
hotfix-javadoc-ManagedSnapshotContext

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4913.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4913


commit 26aca12920d741aafff01fec3d1ebd9c50177fd2
Author: gyao 
Date:   2017-10-27T12:48:21Z

[hotfix] [javadoc] Fix typo in Javadoc of 
ManagedSnapshotContext#getCheckpointId()




---


[GitHub] flink pull request #4912: [hotfix] [docs] Fix broken downloads page url

2017-10-27 Thread GJL
GitHub user GJL opened a pull request:

https://github.com/apache/flink/pull/4912

[hotfix] [docs] Fix broken downloads page url

This fixes a broken *Downloads Page* url on the AWS page of the 
documentation: 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/aws.html

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/GJL/flink hotfix-docs-aws-downloads-page

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4912.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4912


commit 7369dd69e0fa416b86ce430826b65edf36f67815
Author: gyao 
Date:   2017-10-27T12:44:32Z

[hotfix] [docs] Fix broken downloads page url




---


[jira] [Closed] (FLINK-6173) flink-table not pack-in com.fasterxml.jackson.* in after #FLINK-5414

2017-10-27 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske closed FLINK-6173.

   Resolution: Fixed
Fix Version/s: 1.3.3

Fixed for 1.3.3 with c3289c9d982292975caded4c45926e9653ce5b63
Fixed for 1.4.0 with 6f83b4131662353ade632af3cc1c479793b33866

> flink-table not pack-in com.fasterxml.jackson.* in after #FLINK-5414
> 
>
> Key: FLINK-6173
> URL: https://issues.apache.org/jira/browse/FLINK-6173
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Zhenghua Gao
>Assignee: Timo Walther
>Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
>
> Currently, flink-table will pack-in com.fasterxml.jackson.* and rename them 
> to org.apache.flink.shaded.calcite.com.fasterxml.jackson.*
> If a project depends on flink-table, and uses fasterxml as follows(function 
> explain uses fasterxml indirectly):
> {code:title=WordCount.scala|borderStyle=solid}
> object WordCountWithTable {
>   def main(args: Array[String]): Unit = {
> // set up execution environment
> val env = ExecutionEnvironment.getExecutionEnvironment
> val tEnv = TableEnvironment.getTableEnvironment(env)
> val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 
> 1))
> val expr = input.toTable(tEnv)
> val result = expr
>   .groupBy('word)
>   .select('word, 'frequency.sum as 'frequency)
>   .filter('frequency === 2)
> println(tEnv.explain(result))
> result.toDataSet[WC].print()
>   }
>   case class WC(word: String, frequency: Long)
> }
> {code}
> It actually uses org.apache.flink.shaded.calcite.com.fasterxml.jackson.*
> I found after FLINK-5414,  flink-table didn't pack-in com.fasterxml.jackson.* 
> and the project would throw class not found exception.
> {code:borderStyle=solid}
> Exception in thread "main" java.lang.NoClassDefFoundError: 
> org/apache/flink/shaded/calcite/com/fasterxml/jackson/databind/ObjectMapper
>   at 
> org.apache.flink.table.explain.PlanJsonParser.getSqlExecutionPlan(PlanJsonParser.java:32)
>   at 
> org.apache.flink.table.api.BatchTableEnvironment.explain(BatchTableEnvironment.scala:143)
>   at 
> org.apache.flink.table.api.BatchTableEnvironment.explain(BatchTableEnvironment.scala:164)
>   at 
> org.apache.flink.quickstart.WordCountWithTable$.main(WordCountWithTable.scala:34)
>   at 
> org.apache.flink.quickstart.WordCountWithTable.main(WordCountWithTable.scala)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
> Caused by: java.lang.ClassNotFoundException: 
> org.apache.flink.shaded.calcite.com.fasterxml.jackson.databind.ObjectMapper
>   at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>   at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>   ... 10 more
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7737) On HCFS systems, FSDataOutputStream does not issue hsync only hflush which leads to data loss

2017-10-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7737?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1609#comment-1609
 ] 

ASF GitHub Bot commented on FLINK-7737:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/4876
  
But do you understand what I mean? Semantics of code in the main scope 
should not be quirked to make assertions in tests shorter to write.

Equals/hashCode is usually not implemented on I/O classes, like the output 
stream, because it is not well defined.


> On HCFS systems, FSDataOutputStream does not issue hsync only hflush which 
> leads to data loss
> -
>
> Key: FLINK-7737
> URL: https://issues.apache.org/jira/browse/FLINK-7737
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.3.2
> Environment: Dev
>Reporter: Ryan Hobbs
>Assignee: Piotr Nowojski
>Priority: Blocker
> Fix For: 1.4.0
>
>
> During several tests where we simulated failure conditions, we have observed 
> that on HCFS systems where the data stream is of type FSDataOutputStream, 
> Flink will issue hflush() and not hsync() which results in data loss.
> In the class *StreamWriterBase.java* the code below will execute hsync if the 
> output stream is of type *HdfsDataOutputStream* but not for streams of type 
> *FSDataOutputStream*.  Is this by design?
> {code}
> protected void hflushOrSync(FSDataOutputStream os) throws IOException {
> try {
> // At this point the refHflushOrSync cannot be null,
> // since register method would have thrown if it was.
> this.refHflushOrSync.invoke(os);
> if (os instanceof HdfsDataOutputStream) {
>   ((HdfsDataOutputStream) 
> os).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
>   }
>   } catch (InvocationTargetException e) {
> String msg = "Error while trying to hflushOrSync!";
> LOG.error(msg + " " + e.getCause());
> Throwable cause = e.getCause();
> if (cause != null && cause instanceof IOException) {
> throw (IOException) cause;
>   }
> throw new RuntimeException(msg, e);
>   } catch (Exception e) {
> String msg = "Error while trying to hflushOrSync!";
> LOG.error(msg + " " + e);
> throw new RuntimeException(msg, e);
>   }
>   }
> {code}
> Could a potential fix me to perform a sync even on streams of type 
> *FSDataOutputStream*?
> {code}
>  if (os instanceof HdfsDataOutputStream) {
> ((HdfsDataOutputStream) 
> os).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
> } else if (os instanceof FSDataOutputStream) {
> os.hsync();
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4876: [FLINK-7737][filesystem] Add syncOnFlush flag to StreamWr...

2017-10-27 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/4876
  
But do you understand what I mean? Semantics of code in the main scope 
should not be quirked to make assertions in tests shorter to write.

Equals/hashCode is usually not implemented on I/O classes, like the output 
stream, because it is not well defined.


---


[jira] [Assigned] (FLINK-7871) SlotPool should release its unused slot to RM

2017-10-27 Thread shuai.xu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7871?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shuai.xu reassigned FLINK-7871:
---

Assignee: shuai.xu

> SlotPool should release its unused slot to RM
> -
>
> Key: FLINK-7871
> URL: https://issues.apache.org/jira/browse/FLINK-7871
> Project: Flink
>  Issue Type: Bug
>Reporter: shuai.xu
>Assignee: shuai.xu
>
> As described in design wiki 
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077, 
> _*The SlotPool releases slots that are unused to the ResourceManager. Slots 
> count as unused if they are not used when the job is fully running (fully 
> recovered).*_
> but now, the slot pool will keep the slots once offered to it until the job 
> finished.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7933) Test instability PrometheusReporterTest

2017-10-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7933?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16222145#comment-16222145
 ] 

ASF GitHub Bot commented on FLINK-7933:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4908#discussion_r147379207
  
--- Diff: 
flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTaskScopeTest.java
 ---
@@ -72,10 +72,30 @@
private final AbstractID taskAttemptId2 = new AbstractID();
private final String[] labelValues2 = {jobId.toString(), 
taskId2.toString(), taskAttemptId2.toString(), TASK_MANAGER_HOST, TASK_NAME, "" 
+ ATTEMPT_NUMBER, JOB_NAME, TASK_MANAGER_ID, "" + SUBTASK_INDEX_2};
 
-   private final TaskManagerMetricGroup tmMetricGroup = new 
TaskManagerMetricGroup(registry, TASK_MANAGER_HOST, TASK_MANAGER_ID);
-   private final TaskManagerJobMetricGroup tmJobMetricGroup = new 
TaskManagerJobMetricGroup(registry, tmMetricGroup, jobId, JOB_NAME);
-   private final TaskMetricGroup taskMetricGroup1 = new 
TaskMetricGroup(registry, tmJobMetricGroup, taskId1, taskAttemptId1, TASK_NAME, 
SUBTASK_INDEX_1, ATTEMPT_NUMBER);
-   private final TaskMetricGroup taskMetricGroup2 = new 
TaskMetricGroup(registry, tmJobMetricGroup, taskId2, taskAttemptId2, TASK_NAME, 
SUBTASK_INDEX_2, ATTEMPT_NUMBER);
+   private TaskMetricGroup taskMetricGroup1;
+   private TaskMetricGroup taskMetricGroup2;
+
+   private MetricRegistry registry;
+   private int port;
--- End diff --

yup we could do that too.


> Test instability PrometheusReporterTest
> ---
>
> Key: FLINK-7933
> URL: https://issues.apache.org/jira/browse/FLINK-7933
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics, Tests
>Affects Versions: 1.4.0
>Reporter: Kostas Kloudas
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: test-stability
>
> Travis log: https://travis-ci.org/kl0u/flink/jobs/293220196



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7933) Test instability PrometheusReporterTest

2017-10-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7933?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16222146#comment-16222146
 ] 

ASF GitHub Bot commented on FLINK-7933:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4908#discussion_r147379229
  
--- Diff: 
flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTaskScopeTest.java
 ---
@@ -72,10 +72,30 @@
private final AbstractID taskAttemptId2 = new AbstractID();
private final String[] labelValues2 = {jobId.toString(), 
taskId2.toString(), taskAttemptId2.toString(), TASK_MANAGER_HOST, TASK_NAME, "" 
+ ATTEMPT_NUMBER, JOB_NAME, TASK_MANAGER_ID, "" + SUBTASK_INDEX_2};
 
-   private final TaskManagerMetricGroup tmMetricGroup = new 
TaskManagerMetricGroup(registry, TASK_MANAGER_HOST, TASK_MANAGER_ID);
-   private final TaskManagerJobMetricGroup tmJobMetricGroup = new 
TaskManagerJobMetricGroup(registry, tmMetricGroup, jobId, JOB_NAME);
-   private final TaskMetricGroup taskMetricGroup1 = new 
TaskMetricGroup(registry, tmJobMetricGroup, taskId1, taskAttemptId1, TASK_NAME, 
SUBTASK_INDEX_1, ATTEMPT_NUMBER);
-   private final TaskMetricGroup taskMetricGroup2 = new 
TaskMetricGroup(registry, tmJobMetricGroup, taskId2, taskAttemptId2, TASK_NAME, 
SUBTASK_INDEX_2, ATTEMPT_NUMBER);
+   private TaskMetricGroup taskMetricGroup1;
+   private TaskMetricGroup taskMetricGroup2;
+
+   private MetricRegistry registry;
+   private int port;
+
+   @Before
+   public void setupReporter() {
+   registry = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1",
 "9400-9500")));
+   PrometheusReporter reporter = (PrometheusReporter) 
registry.getReporters().get(0);
+   port = reporter.getPort();
+
+   TaskManagerMetricGroup tmMetricGroup = new 
TaskManagerMetricGroup(registry, TASK_MANAGER_HOST, TASK_MANAGER_ID);
+   TaskManagerJobMetricGroup tmJobMetricGroup = new 
TaskManagerJobMetricGroup(registry, tmMetricGroup, jobId, JOB_NAME);
+   taskMetricGroup1 = new TaskMetricGroup(registry, 
tmJobMetricGroup, taskId1, taskAttemptId1, TASK_NAME, SUBTASK_INDEX_1, 
ATTEMPT_NUMBER);
+   taskMetricGroup2 = new TaskMetricGroup(registry, 
tmJobMetricGroup, taskId2, taskAttemptId2, TASK_NAME, SUBTASK_INDEX_2, 
ATTEMPT_NUMBER);
+   }
+
+   @After
+   public void shutdownRegistry() {
+   if (registry != null) {
+   registry.shutdown();
--- End diff --

the registry is closing the reporter.


> Test instability PrometheusReporterTest
> ---
>
> Key: FLINK-7933
> URL: https://issues.apache.org/jira/browse/FLINK-7933
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics, Tests
>Affects Versions: 1.4.0
>Reporter: Kostas Kloudas
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: test-stability
>
> Travis log: https://travis-ci.org/kl0u/flink/jobs/293220196



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4908: [FLINK-7933][metrics] Improve PrometheusReporter t...

2017-10-27 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4908#discussion_r147379229
  
--- Diff: 
flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTaskScopeTest.java
 ---
@@ -72,10 +72,30 @@
private final AbstractID taskAttemptId2 = new AbstractID();
private final String[] labelValues2 = {jobId.toString(), 
taskId2.toString(), taskAttemptId2.toString(), TASK_MANAGER_HOST, TASK_NAME, "" 
+ ATTEMPT_NUMBER, JOB_NAME, TASK_MANAGER_ID, "" + SUBTASK_INDEX_2};
 
-   private final TaskManagerMetricGroup tmMetricGroup = new 
TaskManagerMetricGroup(registry, TASK_MANAGER_HOST, TASK_MANAGER_ID);
-   private final TaskManagerJobMetricGroup tmJobMetricGroup = new 
TaskManagerJobMetricGroup(registry, tmMetricGroup, jobId, JOB_NAME);
-   private final TaskMetricGroup taskMetricGroup1 = new 
TaskMetricGroup(registry, tmJobMetricGroup, taskId1, taskAttemptId1, TASK_NAME, 
SUBTASK_INDEX_1, ATTEMPT_NUMBER);
-   private final TaskMetricGroup taskMetricGroup2 = new 
TaskMetricGroup(registry, tmJobMetricGroup, taskId2, taskAttemptId2, TASK_NAME, 
SUBTASK_INDEX_2, ATTEMPT_NUMBER);
+   private TaskMetricGroup taskMetricGroup1;
+   private TaskMetricGroup taskMetricGroup2;
+
+   private MetricRegistry registry;
+   private int port;
+
+   @Before
+   public void setupReporter() {
+   registry = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1",
 "9400-9500")));
+   PrometheusReporter reporter = (PrometheusReporter) 
registry.getReporters().get(0);
+   port = reporter.getPort();
+
+   TaskManagerMetricGroup tmMetricGroup = new 
TaskManagerMetricGroup(registry, TASK_MANAGER_HOST, TASK_MANAGER_ID);
+   TaskManagerJobMetricGroup tmJobMetricGroup = new 
TaskManagerJobMetricGroup(registry, tmMetricGroup, jobId, JOB_NAME);
+   taskMetricGroup1 = new TaskMetricGroup(registry, 
tmJobMetricGroup, taskId1, taskAttemptId1, TASK_NAME, SUBTASK_INDEX_1, 
ATTEMPT_NUMBER);
+   taskMetricGroup2 = new TaskMetricGroup(registry, 
tmJobMetricGroup, taskId2, taskAttemptId2, TASK_NAME, SUBTASK_INDEX_2, 
ATTEMPT_NUMBER);
+   }
+
+   @After
+   public void shutdownRegistry() {
+   if (registry != null) {
+   registry.shutdown();
--- End diff --

the registry is closing the reporter.


---


[GitHub] flink pull request #4908: [FLINK-7933][metrics] Improve PrometheusReporter t...

2017-10-27 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4908#discussion_r147379207
  
--- Diff: 
flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTaskScopeTest.java
 ---
@@ -72,10 +72,30 @@
private final AbstractID taskAttemptId2 = new AbstractID();
private final String[] labelValues2 = {jobId.toString(), 
taskId2.toString(), taskAttemptId2.toString(), TASK_MANAGER_HOST, TASK_NAME, "" 
+ ATTEMPT_NUMBER, JOB_NAME, TASK_MANAGER_ID, "" + SUBTASK_INDEX_2};
 
-   private final TaskManagerMetricGroup tmMetricGroup = new 
TaskManagerMetricGroup(registry, TASK_MANAGER_HOST, TASK_MANAGER_ID);
-   private final TaskManagerJobMetricGroup tmJobMetricGroup = new 
TaskManagerJobMetricGroup(registry, tmMetricGroup, jobId, JOB_NAME);
-   private final TaskMetricGroup taskMetricGroup1 = new 
TaskMetricGroup(registry, tmJobMetricGroup, taskId1, taskAttemptId1, TASK_NAME, 
SUBTASK_INDEX_1, ATTEMPT_NUMBER);
-   private final TaskMetricGroup taskMetricGroup2 = new 
TaskMetricGroup(registry, tmJobMetricGroup, taskId2, taskAttemptId2, TASK_NAME, 
SUBTASK_INDEX_2, ATTEMPT_NUMBER);
+   private TaskMetricGroup taskMetricGroup1;
+   private TaskMetricGroup taskMetricGroup2;
+
+   private MetricRegistry registry;
+   private int port;
--- End diff --

yup we could do that too.


---


[jira] [Commented] (FLINK-7878) Extend the resource type user can define in ResourceSpec

2017-10-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7878?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16222143#comment-16222143
 ] 

ASF GitHub Bot commented on FLINK-7878:
---

Github user shuai-xu commented on the issue:

https://github.com/apache/flink/pull/4911
  
I run the failed test on my machine and it pass, and it seems my changes 
will not influence it.


> Extend the resource type user can define in ResourceSpec
> 
>
> Key: FLINK-7878
> URL: https://issues.apache.org/jira/browse/FLINK-7878
> Project: Flink
>  Issue Type: Bug
>  Components: DataSet API, DataStream API
>Reporter: shuai.xu
>Assignee: shuai.xu
>  Labels: flip-6
>
> Now, flink only support user define how much CPU and MEM used in an operator, 
> but now the resource in a cluster is various. For example, an application for 
> image processing may need GPU, some others may need FPGA. 
> Only CPU and MEM is not enough, and the resource type is becoming more and 
> more, so we need to make the ResourSpec extendible.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4911: [FLINK-7878] [api] make resource type extendible in Resou...

2017-10-27 Thread shuai-xu
Github user shuai-xu commented on the issue:

https://github.com/apache/flink/pull/4911
  
I run the failed test on my machine and it pass, and it seems my changes 
will not influence it.


---


[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource

2017-10-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16222013#comment-16222013
 ] 

ASF GitHub Bot commented on FLINK-7548:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/4894
  
Thanks for the feedback @twalthr.
I pushed an update.


> Support watermark generation for TableSource
> 
>
> Key: FLINK-7548
> URL: https://issues.apache.org/jira/browse/FLINK-7548
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Fabian Hueske
>Priority: Blocker
> Fix For: 1.4.0
>
>
> As discussed in FLINK-7446, currently the TableSource only support to define 
> rowtime field, but not support to extract watermarks from the rowtime field. 
> We can provide a new interface called {{DefinedWatermark}}, which has two 
> methods {{getRowtimeAttribute}} (can only be an existing field) and 
> {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked 
> deprecated.
> How to support periodic and punctuated watermarks and support some built-in 
> strategies needs further discussion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4894: [FLINK-7548] [table] Improve rowtime support of TableSour...

2017-10-27 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/4894
  
Thanks for the feedback @twalthr.
I pushed an update.


---


[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource

2017-10-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16221986#comment-16221986
 ] 

ASF GitHub Bot commented on FLINK-7548:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4894#discussion_r147358782
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
 ---
@@ -84,6 +86,13 @@
return typeInfo;
}
 
+   @Override
+   public TableSchema getTableSchema() {
+   return new TableSchema(
+   ((RowTypeInfo) typeInfo).getFieldNames(),
--- End diff --

What do you think about adding a `fromTypeInfo` method to the companion 
object that creates a `TableSchema`?


> Support watermark generation for TableSource
> 
>
> Key: FLINK-7548
> URL: https://issues.apache.org/jira/browse/FLINK-7548
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Fabian Hueske
>Priority: Blocker
> Fix For: 1.4.0
>
>
> As discussed in FLINK-7446, currently the TableSource only support to define 
> rowtime field, but not support to extract watermarks from the rowtime field. 
> We can provide a new interface called {{DefinedWatermark}}, which has two 
> methods {{getRowtimeAttribute}} (can only be an existing field) and 
> {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked 
> deprecated.
> How to support periodic and punctuated watermarks and support some built-in 
> strategies needs further discussion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4894: [FLINK-7548] [table] Improve rowtime support of Ta...

2017-10-27 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4894#discussion_r147358782
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
 ---
@@ -84,6 +86,13 @@
return typeInfo;
}
 
+   @Override
+   public TableSchema getTableSchema() {
+   return new TableSchema(
+   ((RowTypeInfo) typeInfo).getFieldNames(),
--- End diff --

What do you think about adding a `fromTypeInfo` method to the companion 
object that creates a `TableSchema`?


---


[GitHub] flink pull request #4908: [FLINK-7933][metrics] Improve PrometheusReporter t...

2017-10-27 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4908#discussion_r147357330
  
--- Diff: 
flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java
 ---
@@ -70,9 +69,25 @@
@Rule
public ExpectedException thrown = ExpectedException.none();
 
-   private final MetricRegistry registry = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1",
 "" + NON_DEFAULT_PORT)));
-   private final FrontMetricGroup metricGroup = 
new FrontMetricGroup<>(0, new TaskManagerMetricGroup(registry, HOST_NAME, 
TASK_MANAGER));
-   private final MetricReporter reporter = registry.getReporters().get(0);
+   private MetricRegistry registry;
+   private FrontMetricGroup metricGroup;
+   private PrometheusReporter reporter;
+   private int port;
--- End diff --

do we need this extra field for that? Can't we access `reporter.getPort()`? 
Isn't it redundant?


---


[jira] [Commented] (FLINK-7933) Test instability PrometheusReporterTest

2017-10-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7933?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16221979#comment-16221979
 ] 

ASF GitHub Bot commented on FLINK-7933:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4908#discussion_r147357745
  
--- Diff: 
flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTaskScopeTest.java
 ---
@@ -72,10 +72,30 @@
private final AbstractID taskAttemptId2 = new AbstractID();
private final String[] labelValues2 = {jobId.toString(), 
taskId2.toString(), taskAttemptId2.toString(), TASK_MANAGER_HOST, TASK_NAME, "" 
+ ATTEMPT_NUMBER, JOB_NAME, TASK_MANAGER_ID, "" + SUBTASK_INDEX_2};
 
-   private final TaskManagerMetricGroup tmMetricGroup = new 
TaskManagerMetricGroup(registry, TASK_MANAGER_HOST, TASK_MANAGER_ID);
-   private final TaskManagerJobMetricGroup tmJobMetricGroup = new 
TaskManagerJobMetricGroup(registry, tmMetricGroup, jobId, JOB_NAME);
-   private final TaskMetricGroup taskMetricGroup1 = new 
TaskMetricGroup(registry, tmJobMetricGroup, taskId1, taskAttemptId1, TASK_NAME, 
SUBTASK_INDEX_1, ATTEMPT_NUMBER);
-   private final TaskMetricGroup taskMetricGroup2 = new 
TaskMetricGroup(registry, tmJobMetricGroup, taskId2, taskAttemptId2, TASK_NAME, 
SUBTASK_INDEX_2, ATTEMPT_NUMBER);
+   private TaskMetricGroup taskMetricGroup1;
+   private TaskMetricGroup taskMetricGroup2;
+
+   private MetricRegistry registry;
+   private int port;
+
+   @Before
+   public void setupReporter() {
+   registry = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1",
 "9400-9500")));
+   PrometheusReporter reporter = (PrometheusReporter) 
registry.getReporters().get(0);
+   port = reporter.getPort();
+
+   TaskManagerMetricGroup tmMetricGroup = new 
TaskManagerMetricGroup(registry, TASK_MANAGER_HOST, TASK_MANAGER_ID);
+   TaskManagerJobMetricGroup tmJobMetricGroup = new 
TaskManagerJobMetricGroup(registry, tmMetricGroup, jobId, JOB_NAME);
+   taskMetricGroup1 = new TaskMetricGroup(registry, 
tmJobMetricGroup, taskId1, taskAttemptId1, TASK_NAME, SUBTASK_INDEX_1, 
ATTEMPT_NUMBER);
+   taskMetricGroup2 = new TaskMetricGroup(registry, 
tmJobMetricGroup, taskId2, taskAttemptId2, TASK_NAME, SUBTASK_INDEX_2, 
ATTEMPT_NUMBER);
+   }
+
+   @After
+   public void shutdownRegistry() {
+   if (registry != null) {
+   registry.shutdown();
--- End diff --

you are not closing the reporter here. Isn't this a root cause of the test 
instability?


> Test instability PrometheusReporterTest
> ---
>
> Key: FLINK-7933
> URL: https://issues.apache.org/jira/browse/FLINK-7933
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics, Tests
>Affects Versions: 1.4.0
>Reporter: Kostas Kloudas
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: test-stability
>
> Travis log: https://travis-ci.org/kl0u/flink/jobs/293220196



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4908: [FLINK-7933][metrics] Improve PrometheusReporter t...

2017-10-27 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4908#discussion_r147357533
  
--- Diff: 
flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTaskScopeTest.java
 ---
@@ -72,10 +72,30 @@
private final AbstractID taskAttemptId2 = new AbstractID();
private final String[] labelValues2 = {jobId.toString(), 
taskId2.toString(), taskAttemptId2.toString(), TASK_MANAGER_HOST, TASK_NAME, "" 
+ ATTEMPT_NUMBER, JOB_NAME, TASK_MANAGER_ID, "" + SUBTASK_INDEX_2};
 
-   private final TaskManagerMetricGroup tmMetricGroup = new 
TaskManagerMetricGroup(registry, TASK_MANAGER_HOST, TASK_MANAGER_ID);
-   private final TaskManagerJobMetricGroup tmJobMetricGroup = new 
TaskManagerJobMetricGroup(registry, tmMetricGroup, jobId, JOB_NAME);
-   private final TaskMetricGroup taskMetricGroup1 = new 
TaskMetricGroup(registry, tmJobMetricGroup, taskId1, taskAttemptId1, TASK_NAME, 
SUBTASK_INDEX_1, ATTEMPT_NUMBER);
-   private final TaskMetricGroup taskMetricGroup2 = new 
TaskMetricGroup(registry, tmJobMetricGroup, taskId2, taskAttemptId2, TASK_NAME, 
SUBTASK_INDEX_2, ATTEMPT_NUMBER);
+   private TaskMetricGroup taskMetricGroup1;
+   private TaskMetricGroup taskMetricGroup2;
+
+   private MetricRegistry registry;
+   private int port;
--- End diff --

can't we access `getPort()` instead of using this variable? Isn't it 
redundant?


---


[jira] [Commented] (FLINK-7933) Test instability PrometheusReporterTest

2017-10-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7933?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16221978#comment-16221978
 ] 

ASF GitHub Bot commented on FLINK-7933:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4908#discussion_r147357050
  
--- Diff: 
flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTaskScopeTest.java
 ---
@@ -72,10 +72,30 @@
private final AbstractID taskAttemptId2 = new AbstractID();
private final String[] labelValues2 = {jobId.toString(), 
taskId2.toString(), taskAttemptId2.toString(), TASK_MANAGER_HOST, TASK_NAME, "" 
+ ATTEMPT_NUMBER, JOB_NAME, TASK_MANAGER_ID, "" + SUBTASK_INDEX_2};
 
-   private final TaskManagerMetricGroup tmMetricGroup = new 
TaskManagerMetricGroup(registry, TASK_MANAGER_HOST, TASK_MANAGER_ID);
-   private final TaskManagerJobMetricGroup tmJobMetricGroup = new 
TaskManagerJobMetricGroup(registry, tmMetricGroup, jobId, JOB_NAME);
-   private final TaskMetricGroup taskMetricGroup1 = new 
TaskMetricGroup(registry, tmJobMetricGroup, taskId1, taskAttemptId1, TASK_NAME, 
SUBTASK_INDEX_1, ATTEMPT_NUMBER);
-   private final TaskMetricGroup taskMetricGroup2 = new 
TaskMetricGroup(registry, tmJobMetricGroup, taskId2, taskAttemptId2, TASK_NAME, 
SUBTASK_INDEX_2, ATTEMPT_NUMBER);
+   private TaskMetricGroup taskMetricGroup1;
+   private TaskMetricGroup taskMetricGroup2;
+
+   private MetricRegistry registry;
+   private int port;
+
+   @Before
+   public void setupReporter() {
+   registry = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1",
 "9400-9500")));
--- End diff --

Can not we bind to a random port (`0`) and later call `getPort()` as you do 
it now? 


> Test instability PrometheusReporterTest
> ---
>
> Key: FLINK-7933
> URL: https://issues.apache.org/jira/browse/FLINK-7933
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics, Tests
>Affects Versions: 1.4.0
>Reporter: Kostas Kloudas
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: test-stability
>
> Travis log: https://travis-ci.org/kl0u/flink/jobs/293220196



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7933) Test instability PrometheusReporterTest

2017-10-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7933?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16221977#comment-16221977
 ] 

ASF GitHub Bot commented on FLINK-7933:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4908#discussion_r147357330
  
--- Diff: 
flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java
 ---
@@ -70,9 +69,25 @@
@Rule
public ExpectedException thrown = ExpectedException.none();
 
-   private final MetricRegistry registry = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1",
 "" + NON_DEFAULT_PORT)));
-   private final FrontMetricGroup metricGroup = 
new FrontMetricGroup<>(0, new TaskManagerMetricGroup(registry, HOST_NAME, 
TASK_MANAGER));
-   private final MetricReporter reporter = registry.getReporters().get(0);
+   private MetricRegistry registry;
+   private FrontMetricGroup metricGroup;
+   private PrometheusReporter reporter;
+   private int port;
--- End diff --

do we need this extra field for that? Can't we access `reporter.getPort()`? 
Isn't it redundant?


> Test instability PrometheusReporterTest
> ---
>
> Key: FLINK-7933
> URL: https://issues.apache.org/jira/browse/FLINK-7933
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics, Tests
>Affects Versions: 1.4.0
>Reporter: Kostas Kloudas
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: test-stability
>
> Travis log: https://travis-ci.org/kl0u/flink/jobs/293220196



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4908: [FLINK-7933][metrics] Improve PrometheusReporter t...

2017-10-27 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4908#discussion_r147357745
  
--- Diff: 
flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTaskScopeTest.java
 ---
@@ -72,10 +72,30 @@
private final AbstractID taskAttemptId2 = new AbstractID();
private final String[] labelValues2 = {jobId.toString(), 
taskId2.toString(), taskAttemptId2.toString(), TASK_MANAGER_HOST, TASK_NAME, "" 
+ ATTEMPT_NUMBER, JOB_NAME, TASK_MANAGER_ID, "" + SUBTASK_INDEX_2};
 
-   private final TaskManagerMetricGroup tmMetricGroup = new 
TaskManagerMetricGroup(registry, TASK_MANAGER_HOST, TASK_MANAGER_ID);
-   private final TaskManagerJobMetricGroup tmJobMetricGroup = new 
TaskManagerJobMetricGroup(registry, tmMetricGroup, jobId, JOB_NAME);
-   private final TaskMetricGroup taskMetricGroup1 = new 
TaskMetricGroup(registry, tmJobMetricGroup, taskId1, taskAttemptId1, TASK_NAME, 
SUBTASK_INDEX_1, ATTEMPT_NUMBER);
-   private final TaskMetricGroup taskMetricGroup2 = new 
TaskMetricGroup(registry, tmJobMetricGroup, taskId2, taskAttemptId2, TASK_NAME, 
SUBTASK_INDEX_2, ATTEMPT_NUMBER);
+   private TaskMetricGroup taskMetricGroup1;
+   private TaskMetricGroup taskMetricGroup2;
+
+   private MetricRegistry registry;
+   private int port;
+
+   @Before
+   public void setupReporter() {
+   registry = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1",
 "9400-9500")));
+   PrometheusReporter reporter = (PrometheusReporter) 
registry.getReporters().get(0);
+   port = reporter.getPort();
+
+   TaskManagerMetricGroup tmMetricGroup = new 
TaskManagerMetricGroup(registry, TASK_MANAGER_HOST, TASK_MANAGER_ID);
+   TaskManagerJobMetricGroup tmJobMetricGroup = new 
TaskManagerJobMetricGroup(registry, tmMetricGroup, jobId, JOB_NAME);
+   taskMetricGroup1 = new TaskMetricGroup(registry, 
tmJobMetricGroup, taskId1, taskAttemptId1, TASK_NAME, SUBTASK_INDEX_1, 
ATTEMPT_NUMBER);
+   taskMetricGroup2 = new TaskMetricGroup(registry, 
tmJobMetricGroup, taskId2, taskAttemptId2, TASK_NAME, SUBTASK_INDEX_2, 
ATTEMPT_NUMBER);
+   }
+
+   @After
+   public void shutdownRegistry() {
+   if (registry != null) {
+   registry.shutdown();
--- End diff --

you are not closing the reporter here. Isn't this a root cause of the test 
instability?


---


[jira] [Commented] (FLINK-7933) Test instability PrometheusReporterTest

2017-10-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7933?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16221980#comment-16221980
 ] 

ASF GitHub Bot commented on FLINK-7933:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4908#discussion_r147357533
  
--- Diff: 
flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTaskScopeTest.java
 ---
@@ -72,10 +72,30 @@
private final AbstractID taskAttemptId2 = new AbstractID();
private final String[] labelValues2 = {jobId.toString(), 
taskId2.toString(), taskAttemptId2.toString(), TASK_MANAGER_HOST, TASK_NAME, "" 
+ ATTEMPT_NUMBER, JOB_NAME, TASK_MANAGER_ID, "" + SUBTASK_INDEX_2};
 
-   private final TaskManagerMetricGroup tmMetricGroup = new 
TaskManagerMetricGroup(registry, TASK_MANAGER_HOST, TASK_MANAGER_ID);
-   private final TaskManagerJobMetricGroup tmJobMetricGroup = new 
TaskManagerJobMetricGroup(registry, tmMetricGroup, jobId, JOB_NAME);
-   private final TaskMetricGroup taskMetricGroup1 = new 
TaskMetricGroup(registry, tmJobMetricGroup, taskId1, taskAttemptId1, TASK_NAME, 
SUBTASK_INDEX_1, ATTEMPT_NUMBER);
-   private final TaskMetricGroup taskMetricGroup2 = new 
TaskMetricGroup(registry, tmJobMetricGroup, taskId2, taskAttemptId2, TASK_NAME, 
SUBTASK_INDEX_2, ATTEMPT_NUMBER);
+   private TaskMetricGroup taskMetricGroup1;
+   private TaskMetricGroup taskMetricGroup2;
+
+   private MetricRegistry registry;
+   private int port;
--- End diff --

can't we access `getPort()` instead of using this variable? Isn't it 
redundant?


> Test instability PrometheusReporterTest
> ---
>
> Key: FLINK-7933
> URL: https://issues.apache.org/jira/browse/FLINK-7933
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics, Tests
>Affects Versions: 1.4.0
>Reporter: Kostas Kloudas
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: test-stability
>
> Travis log: https://travis-ci.org/kl0u/flink/jobs/293220196



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4908: [FLINK-7933][metrics] Improve PrometheusReporter t...

2017-10-27 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4908#discussion_r147357050
  
--- Diff: 
flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTaskScopeTest.java
 ---
@@ -72,10 +72,30 @@
private final AbstractID taskAttemptId2 = new AbstractID();
private final String[] labelValues2 = {jobId.toString(), 
taskId2.toString(), taskAttemptId2.toString(), TASK_MANAGER_HOST, TASK_NAME, "" 
+ ATTEMPT_NUMBER, JOB_NAME, TASK_MANAGER_ID, "" + SUBTASK_INDEX_2};
 
-   private final TaskManagerMetricGroup tmMetricGroup = new 
TaskManagerMetricGroup(registry, TASK_MANAGER_HOST, TASK_MANAGER_ID);
-   private final TaskManagerJobMetricGroup tmJobMetricGroup = new 
TaskManagerJobMetricGroup(registry, tmMetricGroup, jobId, JOB_NAME);
-   private final TaskMetricGroup taskMetricGroup1 = new 
TaskMetricGroup(registry, tmJobMetricGroup, taskId1, taskAttemptId1, TASK_NAME, 
SUBTASK_INDEX_1, ATTEMPT_NUMBER);
-   private final TaskMetricGroup taskMetricGroup2 = new 
TaskMetricGroup(registry, tmJobMetricGroup, taskId2, taskAttemptId2, TASK_NAME, 
SUBTASK_INDEX_2, ATTEMPT_NUMBER);
+   private TaskMetricGroup taskMetricGroup1;
+   private TaskMetricGroup taskMetricGroup2;
+
+   private MetricRegistry registry;
+   private int port;
+
+   @Before
+   public void setupReporter() {
+   registry = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1",
 "9400-9500")));
--- End diff --

Can not we bind to a random port (`0`) and later call `getPort()` as you do 
it now? 


---


[jira] [Commented] (FLINK-7933) Test instability PrometheusReporterTest

2017-10-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7933?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16221963#comment-16221963
 ] 

ASF GitHub Bot commented on FLINK-7933:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/4908
  
Thanks, looks good, +1 to merge


> Test instability PrometheusReporterTest
> ---
>
> Key: FLINK-7933
> URL: https://issues.apache.org/jira/browse/FLINK-7933
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics, Tests
>Affects Versions: 1.4.0
>Reporter: Kostas Kloudas
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: test-stability
>
> Travis log: https://travis-ci.org/kl0u/flink/jobs/293220196



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4908: [FLINK-7933][metrics] Improve PrometheusReporter tests

2017-10-27 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/4908
  
Thanks, looks good, +1 to merge


---


[jira] [Commented] (FLINK-7878) Extend the resource type user can define in ResourceSpec

2017-10-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7878?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16221940#comment-16221940
 ] 

ASF GitHub Bot commented on FLINK-7878:
---

GitHub user shuai-xu opened a pull request:

https://github.com/apache/flink/pull/4911

[FLINK-7878] [api] make resource type extendible in ResourceSpec

Summary:
Now, flink only support user define CPU and MEM,
but some user need to specify the GPU, FPGA and so on resources.
So it need to make the resource type extendible in the ResourceSpec.
Add a extend field for new resources.

## What is the purpose of the change

This pull request adds a extensible filed to the ResourceSpec, so user can 
define variable resources only if supported by their resource manager.

*(for example:)*
user can use 
_text.flatMap().setResource(new ResourceSpce(1, 100, new 
ResourceSpce.Resource("GPU", 0.5)));_
to define their GPU requirement for the operator.

## Verifying this change
This change added tests and can be verified as follows:
  - *Added unit tests ResourceSpecTest to verify.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/shuai-xu/flink jira-7878

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4911.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4911


commit 3e1d61a33f18b351424d4684cbaebc22674f582c
Author: shuai.xus 
Date:   2017-10-25T06:56:35Z

[FLINK-7878] [api] make resource type extendible in ResourceSpec

Summary:
Now, flink only support user define CPU and MEM,
but some user need to specify the GPU, FPGA and so on resources.
So it need to make the resouce type extendible in the ResourceSpec.
Add a extend field for new resources.

Test Plan: UnitTest

Reviewers: haitao.w

Differential Revision: https://aone.alibaba-inc.com/code/D327427




> Extend the resource type user can define in ResourceSpec
> 
>
> Key: FLINK-7878
> URL: https://issues.apache.org/jira/browse/FLINK-7878
> Project: Flink
>  Issue Type: Bug
>  Components: DataSet API, DataStream API
>Reporter: shuai.xu
>Assignee: shuai.xu
>  Labels: flip-6
>
> Now, flink only support user define how much CPU and MEM used in an operator, 
> but now the resource in a cluster is various. For example, an application for 
> image processing may need GPU, some others may need FPGA. 
> Only CPU and MEM is not enough, and the resource type is becoming more and 
> more, so we need to make the ResourSpec extendible.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4911: [FLINK-7878] [api] make resource type extendible i...

2017-10-27 Thread shuai-xu
GitHub user shuai-xu opened a pull request:

https://github.com/apache/flink/pull/4911

[FLINK-7878] [api] make resource type extendible in ResourceSpec

Summary:
Now, flink only support user define CPU and MEM,
but some user need to specify the GPU, FPGA and so on resources.
So it need to make the resource type extendible in the ResourceSpec.
Add a extend field for new resources.

## What is the purpose of the change

This pull request adds a extensible filed to the ResourceSpec, so user can 
define variable resources only if supported by their resource manager.

*(for example:)*
user can use 
_text.flatMap().setResource(new ResourceSpce(1, 100, new 
ResourceSpce.Resource("GPU", 0.5)));_
to define their GPU requirement for the operator.

## Verifying this change
This change added tests and can be verified as follows:
  - *Added unit tests ResourceSpecTest to verify.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/shuai-xu/flink jira-7878

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4911.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4911


commit 3e1d61a33f18b351424d4684cbaebc22674f582c
Author: shuai.xus 
Date:   2017-10-25T06:56:35Z

[FLINK-7878] [api] make resource type extendible in ResourceSpec

Summary:
Now, flink only support user define CPU and MEM,
but some user need to specify the GPU, FPGA and so on resources.
So it need to make the resouce type extendible in the ResourceSpec.
Add a extend field for new resources.

Test Plan: UnitTest

Reviewers: haitao.w

Differential Revision: https://aone.alibaba-inc.com/code/D327427




---


[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource

2017-10-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16221869#comment-16221869
 ] 

ASF GitHub Bot commented on FLINK-7548:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4894#discussion_r147339907
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala
 ---
@@ -18,55 +18,7 @@
 
 package org.apache.flink.table.plan.nodes
 
-import org.apache.flink.api.common.functions.Function
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.api.TableConfig
-import org.apache.flink.table.codegen.{FunctionCodeGenerator, 
GeneratedFunction}
-import org.apache.flink.types.Row
-
 /**
   * Common class for batch and stream scans.
   */
-trait CommonScan[T] {
-
-  /**
-* We check if the input type is exactly the same as the internal row 
type.
-* A conversion is necessary if types differ.
-*/
-  private[flink] def needsConversion(
-  externalTypeInfo: TypeInformation[Any],
-  internalTypeInfo: TypeInformation[T]): Boolean =
-externalTypeInfo != internalTypeInfo
-
-  private[flink] def generatedConversionFunction[F <: Function](
-  config: TableConfig,
-  functionClass: Class[F],
-  inputType: TypeInformation[Any],
-  expectedType: TypeInformation[Row],
-  conversionOperatorName: String,
-  fieldNames: Seq[String],
-  inputFieldMapping: Option[Array[Int]] = None)
-: GeneratedFunction[F, Row] = {
-
-val generator = new FunctionCodeGenerator(
-  config,
-  false,
-  inputType,
-  None,
-  inputFieldMapping)
-val conversion = 
generator.generateConverterResultExpression(expectedType, fieldNames)
-
-val body =
-  s"""
- |${conversion.code}
- |return ${conversion.resultTerm};
- |""".stripMargin
-
-generator.generateFunction(
-  conversionOperatorName,
-  functionClass,
-  body,
-  expectedType)
-  }
-
-}
+trait CommonScan[T]
--- End diff --

I kept it for consistency. The other operators have a CommonX trait as well.


> Support watermark generation for TableSource
> 
>
> Key: FLINK-7548
> URL: https://issues.apache.org/jira/browse/FLINK-7548
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Fabian Hueske
>Priority: Blocker
> Fix For: 1.4.0
>
>
> As discussed in FLINK-7446, currently the TableSource only support to define 
> rowtime field, but not support to extract watermarks from the rowtime field. 
> We can provide a new interface called {{DefinedWatermark}}, which has two 
> methods {{getRowtimeAttribute}} (can only be an existing field) and 
> {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked 
> deprecated.
> How to support periodic and punctuated watermarks and support some built-in 
> strategies needs further discussion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


  1   2   >