[jira] [Commented] (FLINK-9187) add prometheus pushgateway reporter

2018-05-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16493113#comment-16493113
 ] 

ASF GitHub Bot commented on FLINK-9187:
---

Github user lamber-ken commented on a diff in the pull request:

https://github.com/apache/flink/pull/5857#discussion_r191306509
  
--- Diff: 
flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporter.java
 ---
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.prometheus;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.metrics.reporter.Scheduled;
+import org.apache.flink.util.AbstractID;
+
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.exporter.PushGateway;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * /**
+ * {@link MetricReporter} that exports {@link Metric Metrics} via 
Prometheus Pushgateway.
+ */
+@PublicEvolving
+public class PrometheusPushGatewayReporter extends 
AbstractPrometheusReporter implements Scheduled {
+   private static final Logger LOG = 
LoggerFactory.getLogger(PrometheusPushGatewayReporter.class);
+
+   public static final String ARG_HOST = "host";
+   public static final String ARG_PORT = "port";
+
+   public static final char JOB_NAME_SEPARATOR = '-';
+   public static final String JOB_NAME_PREFIX = "flink" + 
JOB_NAME_SEPARATOR;
+
+   private PushGateway pushGateway;
+   private final String jobName;
+
+   public PrometheusPushGatewayReporter() {
+   String random = new AbstractID().toString();
+   jobName = JOB_NAME_PREFIX + random;
--- End diff --

the prefix of jobName can be configurable, ok?


> add prometheus pushgateway reporter
> ---
>
> Key: FLINK-9187
> URL: https://issues.apache.org/jira/browse/FLINK-9187
> Project: Flink
>  Issue Type: New Feature
>  Components: Metrics
>Affects Versions: 1.4.2
>Reporter: lamber-ken
>Priority: Minor
>  Labels: features
> Fix For: 1.6.0
>
>
> make flink system can send metrics to prometheus via pushgateway.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5857: [FLINK-9187][METRICS] add prometheus pushgateway r...

2018-05-28 Thread lamber-ken
Github user lamber-ken commented on a diff in the pull request:

https://github.com/apache/flink/pull/5857#discussion_r191306509
  
--- Diff: 
flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporter.java
 ---
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.prometheus;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.metrics.reporter.Scheduled;
+import org.apache.flink.util.AbstractID;
+
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.exporter.PushGateway;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * /**
+ * {@link MetricReporter} that exports {@link Metric Metrics} via 
Prometheus Pushgateway.
+ */
+@PublicEvolving
+public class PrometheusPushGatewayReporter extends 
AbstractPrometheusReporter implements Scheduled {
+   private static final Logger LOG = 
LoggerFactory.getLogger(PrometheusPushGatewayReporter.class);
+
+   public static final String ARG_HOST = "host";
+   public static final String ARG_PORT = "port";
+
+   public static final char JOB_NAME_SEPARATOR = '-';
+   public static final String JOB_NAME_PREFIX = "flink" + 
JOB_NAME_SEPARATOR;
+
+   private PushGateway pushGateway;
+   private final String jobName;
+
+   public PrometheusPushGatewayReporter() {
+   String random = new AbstractID().toString();
+   jobName = JOB_NAME_PREFIX + random;
--- End diff --

the prefix of jobName can be configurable, ok?


---


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-05-28 Thread Fred Teunissen (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16493109#comment-16493109
 ] 

Fred Teunissen commented on FLINK-8500:
---

@[~aljoscha], [~tzulitai], If you like, I can make the PR with a new default 
{{deserialize}} method on the interface {{KeyedDeserializationSchema}} with 
either the signature
 {{default T deserialize(byte[] messageKey, byte[] message, String topic, int 
partition, long offset, long timestamp, TimestampType timestampType) throws 
IOException ...}}
 or the signature
 {{default T deserialize(ConsumerRecord record) throws 
IOException ...}}

I have no preference, both have their pros and cons.

> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8545) Implement upsert stream table source

2018-05-28 Thread Hequn Cheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16493096#comment-16493096
 ] 

Hequn Cheng commented on FLINK-8545:


Hi [~pnowojski], plan to work on it and probably will finish the issue by the 
end of next month. :-)

> Implement upsert stream table source 
> -
>
> Key: FLINK-8545
> URL: https://issues.apache.org/jira/browse/FLINK-8545
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>
> As more and more users are eager for ingesting data with upsert mode in flink 
> sql/table-api, it is valuable to enable table source with upsert mode. I will 
> provide a design doc later and we can have more discussions. Any suggestions 
> are warmly welcomed !
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-7795) Utilize error-prone to discover common coding mistakes

2018-05-28 Thread Ted Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7795?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16345955#comment-16345955
 ] 

Ted Yu edited comment on FLINK-7795 at 5/29/18 4:13 AM:


error-prone has JDK 8 dependency .


was (Author: yuzhih...@gmail.com):
error-prone has JDK 8 dependency.

> Utilize error-prone to discover common coding mistakes
> --
>
> Key: FLINK-7795
> URL: https://issues.apache.org/jira/browse/FLINK-7795
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Ted Yu
>Priority: Major
>
> http://errorprone.info/ is a tool which detects common coding mistakes.
> We should incorporate into Flink build process.
> Here are the dependencies:
> {code}
> 
>   com.google.errorprone
>   error_prone_annotation
>   ${error-prone.version}
>   provided
> 
> 
>   
>   com.google.auto.service
>   auto-service
>   1.0-rc3
>   true
> 
> 
>   com.google.errorprone
>   error_prone_check_api
>   ${error-prone.version}
>   provided
>   
> 
>   com.google.code.findbugs
>   jsr305
> 
>   
> 
> 
>   com.google.errorprone
>   javac
>   9-dev-r4023-3
>   provided
> 
>   
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9460) Redundant output in table & upsert semantics

2018-05-28 Thread zhengcanbin (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9460?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhengcanbin updated FLINK-9460:
---
Description: 
The output seems incorrect in my table & upsert example, here's the code:
{code:java}
object VerifyUpsert {

  def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.createLocalEnvironment()
val tEnv = TableEnvironment.getTableEnvironment(env)
env.setParallelism(1)

val input = env.socketTextStream("localhost", 9099).map { x =>
  val tokens = x.split(",")
  DemoSource(tokens(0), tokens(1), tokens(2))
}

tEnv.registerDataStream("demoSource", input, 'record_time, 'user_id, 
'page_id)

val fieldNames = Array("record_time", "pv", "uv")
val fieldTypes = Array(Types.STRING, Types.LONG, 
Types.LONG).asInstanceOf[Array[TypeInformation[_]]]
tEnv.registerTableSink("demoSink", fieldNames, fieldTypes, 
MyPrintSink(fieldNames, fieldTypes))

tEnv.sqlUpdate(
  """
|INSERT INTO demoSink
|SELECT
|  SUBSTRING(record_time, 1, 16) as record_time,
|  count(user_id) as pv,
|  count(DISTINCT user_id) as uv
|FROM demoSource
|GROUP BY SUBSTRING(record_time, 1, 16)
  """.stripMargin)

env.execute()
  }

  case class DemoSource(record_time: String, user_id: String, page_id: String)

}

case class MyPrintSink(var fNames: Array[String], var fTypes: 
Array[TypeInformation[_]]) extends UpsertStreamTableSink[Row] {

  override def setKeyFields(keys: Array[String]): Unit = Seq.empty

  override def setIsAppendOnly(isAppendOnly: lang.Boolean): Unit = {}

  override def getRecordType: TypeInformation[Row] = new RowTypeInfo(fTypes, 
fNames)

  override def emitDataStream(dataStream: DataStream[tuple.Tuple2[lang.Boolean, 
Row]]): Unit = dataStream.addSink(new PrintSinkFunction())

  override def getFieldNames: Array[String] = fNames

  override def getFieldTypes: Array[TypeInformation[_]] = fTypes

  override def configure(fieldNames: Array[String], fieldTypes: 
Array[TypeInformation[_]]): TableSink[tuple.Tuple2[lang.Boolean, Row]] = {
val copy = MyPrintSink(fNames, fTypes)
copy.fNames = fieldNames
copy.fTypes = fieldTypes
copy
  }
}{code}
when application starts, I type in netcat client one record a time,  below 
table shows outputs for every input record:

 
||input||output||
|2018-05-24 21:34:02,8,9|(true,2018-05-24 21:34,1,1)|
|2018-05-24 21:34:12,6,6|(true,2018-05-24 21:34,2,2)|
|2018-05-24 21:34:12,0,9|(true,2018-05-24 21:34,3,3)|
|2018-05-24 21:34:12,0,4|{color:#ff}(true,2018-05-24 21:34,2,2){color}
 (true,2018-05-24 21:34,4,3)|

 

when the forth record is consumed, two output records would be printed in sink, 
obviously the first one record with red color is redundant. I followed the 
source code and found something wrong with 

 
{code:java}
org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction#processElement
{code}
!image-2018-05-29-11-51-20-671.png!

I think when (!generateRetraction) && !inputC.change is true, we should not 
invoke out.collect here.

 

[~StephanEwen] pls look over this

  was:
The output seems incorrect in my table & upsert example, here's the code:
{code:java}
object VerifyUpsert {

  def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.createLocalEnvironment()
val tEnv = TableEnvironment.getTableEnvironment(env)
env.setParallelism(1)

val input = env.socketTextStream("localhost", 9099).map { x =>
  val tokens = x.split(",")
  DemoSource(tokens(0), tokens(1), tokens(2))
}

tEnv.registerDataStream("demoSource", input, 'record_time, 'user_id, 
'page_id)

val fieldNames = Array("record_time", "pv", "uv")
val fieldTypes = Array(Types.STRING, Types.LONG, 
Types.LONG).asInstanceOf[Array[TypeInformation[_]]]
tEnv.registerTableSink("demoSink", fieldNames, fieldTypes, 
MyPrintSink(fieldNames, fieldTypes))

tEnv.sqlUpdate(
  """
|INSERT INTO demoSink
|SELECT
|  SUBSTRING(record_time, 1, 16) as record_time,
|  count(user_id) as pv,
|  count(DISTINCT user_id) as uv
|FROM demoSource
|GROUP BY SUBSTRING(record_time, 1, 16)
  """.stripMargin)

env.execute()
  }

  case class DemoSource(record_time: String, user_id: String, page_id: String)

}

case class MyPrintSink(var fNames: Array[String], var fTypes: 
Array[TypeInformation[_]]) extends UpsertStreamTableSink[Row] {

  override def setKeyFields(keys: Array[String]): Unit = Seq.empty

  override def setIsAppendOnly(isAppendOnly: lang.Boolean): Unit = {}

  override def getRecordType: TypeInformation[Row] = new RowTypeInfo(fTypes, 
fNames)

  override def emitDataStream(dataStream: DataStream[tuple.Tuple2[lang.Boolean, 
Row]]): Unit = dataStream.addSink(new PrintSinkFunction())

  override def getFieldNames: Array[String] = fNames

  override def getFieldTypes: 

[jira] [Created] (FLINK-9460) Redundant output in table & upsert semantics

2018-05-28 Thread zhengcanbin (JIRA)
zhengcanbin created FLINK-9460:
--

 Summary: Redundant output in table & upsert semantics
 Key: FLINK-9460
 URL: https://issues.apache.org/jira/browse/FLINK-9460
 Project: Flink
  Issue Type: Bug
  Components: Table API  SQL
Affects Versions: 1.5.0
Reporter: zhengcanbin
 Fix For: 1.6.0
 Attachments: image-2018-05-29-11-39-45-698.png, 
image-2018-05-29-11-51-20-671.png

The output seems incorrect in my table & upsert example, here's the code:
{code:java}
object VerifyUpsert {

  def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.createLocalEnvironment()
val tEnv = TableEnvironment.getTableEnvironment(env)
env.setParallelism(1)

val input = env.socketTextStream("localhost", 9099).map { x =>
  val tokens = x.split(",")
  DemoSource(tokens(0), tokens(1), tokens(2))
}

tEnv.registerDataStream("demoSource", input, 'record_time, 'user_id, 
'page_id)

val fieldNames = Array("record_time", "pv", "uv")
val fieldTypes = Array(Types.STRING, Types.LONG, 
Types.LONG).asInstanceOf[Array[TypeInformation[_]]]
tEnv.registerTableSink("demoSink", fieldNames, fieldTypes, 
MyPrintSink(fieldNames, fieldTypes))

tEnv.sqlUpdate(
  """
|INSERT INTO demoSink
|SELECT
|  SUBSTRING(record_time, 1, 16) as record_time,
|  count(user_id) as pv,
|  count(DISTINCT user_id) as uv
|FROM demoSource
|GROUP BY SUBSTRING(record_time, 1, 16)
  """.stripMargin)

env.execute()
  }

  case class DemoSource(record_time: String, user_id: String, page_id: String)

}

case class MyPrintSink(var fNames: Array[String], var fTypes: 
Array[TypeInformation[_]]) extends UpsertStreamTableSink[Row] {

  override def setKeyFields(keys: Array[String]): Unit = Seq.empty

  override def setIsAppendOnly(isAppendOnly: lang.Boolean): Unit = {}

  override def getRecordType: TypeInformation[Row] = new RowTypeInfo(fTypes, 
fNames)

  override def emitDataStream(dataStream: DataStream[tuple.Tuple2[lang.Boolean, 
Row]]): Unit = dataStream.addSink(new PrintSinkFunction())

  override def getFieldNames: Array[String] = fNames

  override def getFieldTypes: Array[TypeInformation[_]] = fTypes

  override def configure(fieldNames: Array[String], fieldTypes: 
Array[TypeInformation[_]]): TableSink[tuple.Tuple2[lang.Boolean, Row]] = {
val copy = MyPrintSink(fNames, fTypes)
copy.fNames = fieldNames
copy.fTypes = fieldTypes
copy
  }
}{code}
when application starts, I type in netcat client one record a time,  below 
table shows outputs for every input record:

 
||input||output||
|2018-05-24 21:34:02,8,9|(true,2018-05-24 21:34,1,1)|
|2018-05-24 21:34:12,6,6|(true,2018-05-24 21:34,2,2)|
|2018-05-24 21:34:12,0,9|(true,2018-05-24 21:34,3,3)|
|2018-05-24 21:34:12,0,4|{color:#FF}(true,2018-05-24 21:34,2,2){color}
(true,2018-05-24 21:34,4,3)|

 

when the forth record is consumed, two output records would be printed in sink, 
obviously the first one record with red color is redundant. I followed the 
source code and found something wrong with 

 
{code:java}
org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction#processElement
{code}
!image-2018-05-29-11-51-20-671.png!

I think when (!generateRetraction) && !inputC.change is true, we should not 
invoke out.collect here.

 

[~astephan] please look over this



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9187) add prometheus pushgateway reporter

2018-05-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16493059#comment-16493059
 ] 

ASF GitHub Bot commented on FLINK-9187:
---

Github user lamber-ken commented on a diff in the pull request:

https://github.com/apache/flink/pull/5857#discussion_r191297773
  
--- Diff: 
flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporter.java
 ---
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.prometheus;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.metrics.reporter.Scheduled;
+import org.apache.flink.util.AbstractID;
+
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.exporter.PushGateway;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * /**
+ * {@link MetricReporter} that exports {@link Metric Metrics} via 
Prometheus Pushgateway.
+ */
+@PublicEvolving
+public class PrometheusPushGatewayReporter extends 
AbstractPrometheusReporter implements Scheduled {
+   private static final Logger LOG = 
LoggerFactory.getLogger(PrometheusPushGatewayReporter.class);
+
+   public static final String ARG_HOST = "host";
+   public static final String ARG_PORT = "port";
+
+   public static final char JOB_NAME_SEPARATOR = '-';
+   public static final String JOB_NAME_PREFIX = "flink" + 
JOB_NAME_SEPARATOR;
+
+   private PushGateway pushGateway;
+   private final String jobName;
+
+   public PrometheusPushGatewayReporter() {
+   String random = new AbstractID().toString();
+   jobName = JOB_NAME_PREFIX + random;
--- End diff --

if the jobname is configurable, it means each taskmanager may use same 
jobname. 
if so, the metrics of tm(A) may be covered with the metrics of tm(B), etc

### for example, tm1, tm2 push the metrics at the same time
```

io.prometheus
simpleclient
0.0.26



io.prometheus
simpleclient_pushgateway
0.0.26





CollectorRegistry registry = new CollectorRegistry();
String sameJobName = "flink-job";

// taskmanager A
Gauge tm1 = 
Gauge.build().name("flink_taskmanager_Status_JVM_CPU_Time").help("tm jvm 
cpu").register(registry);
tm1.set(41);

PushGateway pg1 = new PushGateway("localhost:9091");
pg1.push(registry, sameJobName);


// taskmanager B
registry.clear();
Gauge tm2 = 
Gauge.build().name("flink_taskmanager_Status_JVM_CPU_Time").help("tm jvm 
cpu").register(registry);
tm2.set(42);

PushGateway pg2 = new PushGateway("localhost:9091");
pg2.push(registry, sameJobName);

```

### result, the metrics of tmA is covered with tmB

![image](https://user-images.githubusercontent.com/20113411/40636865-269828ce-6334-11e8-92e7-b222e6cfe6c0.png)



> add prometheus pushgateway reporter
> ---
>
> Key: FLINK-9187
> URL: https://issues.apache.org/jira/browse/FLINK-9187
> Project: Flink
>  Issue Type: New Feature
>  Components: Metrics
>Affects Versions: 1.4.2
>Reporter: lamber-ken
>Priority: Minor
>  Labels: features
> Fix For: 1.6.0
>
>
> make flink system can send metrics to prometheus via pushgateway.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5857: [FLINK-9187][METRICS] add prometheus pushgateway r...

2018-05-28 Thread lamber-ken
Github user lamber-ken commented on a diff in the pull request:

https://github.com/apache/flink/pull/5857#discussion_r191297773
  
--- Diff: 
flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporter.java
 ---
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.prometheus;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.metrics.reporter.Scheduled;
+import org.apache.flink.util.AbstractID;
+
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.exporter.PushGateway;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * /**
+ * {@link MetricReporter} that exports {@link Metric Metrics} via 
Prometheus Pushgateway.
+ */
+@PublicEvolving
+public class PrometheusPushGatewayReporter extends 
AbstractPrometheusReporter implements Scheduled {
+   private static final Logger LOG = 
LoggerFactory.getLogger(PrometheusPushGatewayReporter.class);
+
+   public static final String ARG_HOST = "host";
+   public static final String ARG_PORT = "port";
+
+   public static final char JOB_NAME_SEPARATOR = '-';
+   public static final String JOB_NAME_PREFIX = "flink" + 
JOB_NAME_SEPARATOR;
+
+   private PushGateway pushGateway;
+   private final String jobName;
+
+   public PrometheusPushGatewayReporter() {
+   String random = new AbstractID().toString();
+   jobName = JOB_NAME_PREFIX + random;
--- End diff --

if the jobname is configurable, it means each taskmanager may use same 
jobname. 
if so, the metrics of tm(A) may be covered with the metrics of tm(B), etc

### for example, tm1, tm2 push the metrics at the same time
```

io.prometheus
simpleclient
0.0.26



io.prometheus
simpleclient_pushgateway
0.0.26





CollectorRegistry registry = new CollectorRegistry();
String sameJobName = "flink-job";

// taskmanager A
Gauge tm1 = 
Gauge.build().name("flink_taskmanager_Status_JVM_CPU_Time").help("tm jvm 
cpu").register(registry);
tm1.set(41);

PushGateway pg1 = new PushGateway("localhost:9091");
pg1.push(registry, sameJobName);


// taskmanager B
registry.clear();
Gauge tm2 = 
Gauge.build().name("flink_taskmanager_Status_JVM_CPU_Time").help("tm jvm 
cpu").register(registry);
tm2.set(42);

PushGateway pg2 = new PushGateway("localhost:9091");
pg2.push(registry, sameJobName);

```

### result, the metrics of tmA is covered with tmB

![image](https://user-images.githubusercontent.com/20113411/40636865-269828ce-6334-11e8-92e7-b222e6cfe6c0.png)



---


[jira] [Commented] (FLINK-9325) generate the _meta file for checkpoint only when the writing is truly successful

2018-05-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16493057#comment-16493057
 ] 

ASF GitHub Bot commented on FLINK-9325:
---

Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/5982
  
Yes, @StephanEwen thanks for the continuous suggestions, will follow your 
suggestion.


> generate the _meta file for checkpoint only when the writing is truly 
> successful
> 
>
> Key: FLINK-9325
> URL: https://issues.apache.org/jira/browse/FLINK-9325
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
>
> We should generate the _meta file for checkpoint only when the writing is 
> totally successful. We should write the metadata file first to a temp file 
> and then atomically rename it (with an equivalent workaround for S3). 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5982: [FLINK-9325][checkpoint]generate the meta file for checkp...

2018-05-28 Thread sihuazhou
Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/5982
  
Yes, @StephanEwen thanks for the continuous suggestions, will follow your 
suggestion.


---


[jira] [Commented] (FLINK-9459) Maven enforcer plugin prevents compilation with HDP's Hadoop

2018-05-28 Thread Ted Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16493031#comment-16493031
 ] 

Ted Yu commented on FLINK-9459:
---

Related to FLINK-9091 ?

> Maven enforcer plugin prevents compilation with HDP's Hadoop
> 
>
> Key: FLINK-9459
> URL: https://issues.apache.org/jira/browse/FLINK-9459
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Affects Versions: 1.5.0
>Reporter: Truong Duc Kien
>Priority: Major
>
> Compiling Flink with Hortonwork HDP's version of Hadoop is currently 
> unsuccessful due to Enforce Plugin catches a problem with their Hadoop.
>  
> The command used is
>  
> {noformat}
> mvn clean install -DskipTests -Dcheckstyle.skip=true 
> -Dmaven.javadoc.skip=true  -Pvendor-repos -Dhadoop.version=2.7.3.2.6.5.0-292
> {noformat}
>  
> The problems:
> {noformat}
> Dependency convergence error for 
> com.fasterxml.jackson.core:jackson-core:2.6.0 paths to dependency are:    
>  
> +-org.apache.flink:flink-bucketing-sink-test:1.5-SNAPSHOT   
>  +-org.apache.flink:flink-shaded-hadoop2:1.5-SNAPSHOT 
>    +-com.microsoft.azure:azure-storage:5.4.0 
>  +-com.fasterxml.jackson.core:jackson-core:2.6.0 
> and 
> +-org.apache.flink:flink-bucketing-sink-test:1.5-SNAPSHOT 
>  +-org.apache.flink:flink-shaded-hadoop2:1.5-SNAPSHOT 
>    +-com.fasterxml.jackson.core:jackson-core:2.6.0 
> and   
>   
> +-org.apache.flink:flink-bucketing-sink-test:1.5-SNAPSHOT 
>  +-org.apache.flink:flink-shaded-hadoop2:1.5-SNAPSHOT 
>    +-com.fasterxml.jackson.core:jackson-databind:2.2.3 
>  +-com.fasterxml.jackson.core:jackson-core:2.2.3 
> [WARNING] Rule 0: org.apache.maven.plugins.enforcer.DependencyConvergence 
> failed with message: Failed while enforcing releasability. See above detailed 
> error message. 
> [INFO] FAILURE build of project org.apache.flink:flink-bucketing-sink-test   
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9459) Maven enforcer plugin prevents compilation with HDP's Hadoop

2018-05-28 Thread Truong Duc Kien (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9459?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Truong Duc Kien updated FLINK-9459:
---
Description: 
Compiling Flink with Hortonwork HDP's version of Hadoop is currently 
unsuccessful due to Enforce Plugin catches a problem with their Hadoop.

 

The command used is

 
{noformat}
mvn clean install -DskipTests -Dcheckstyle.skip=true -Dmaven.javadoc.skip=true  
-Pvendor-repos -Dhadoop.version=2.7.3.2.6.5.0-292
{noformat}
 

The problems:
{noformat}
Dependency convergence error for com.fasterxml.jackson.core:jackson-core:2.6.0 
paths to dependency are: 
+-org.apache.flink:flink-bucketing-sink-test:1.5-SNAPSHOT   
 +-org.apache.flink:flink-shaded-hadoop2:1.5-SNAPSHOT 
   +-com.microsoft.azure:azure-storage:5.4.0 
 +-com.fasterxml.jackson.core:jackson-core:2.6.0 

and 

+-org.apache.flink:flink-bucketing-sink-test:1.5-SNAPSHOT 
 +-org.apache.flink:flink-shaded-hadoop2:1.5-SNAPSHOT 
   +-com.fasterxml.jackson.core:jackson-core:2.6.0 

and 

+-org.apache.flink:flink-bucketing-sink-test:1.5-SNAPSHOT 
 +-org.apache.flink:flink-shaded-hadoop2:1.5-SNAPSHOT 
   +-com.fasterxml.jackson.core:jackson-databind:2.2.3 
 +-com.fasterxml.jackson.core:jackson-core:2.2.3 

[WARNING] Rule 0: org.apache.maven.plugins.enforcer.DependencyConvergence 
failed with message: Failed while enforcing releasability. See above detailed 
error message. 

[INFO] FAILURE build of project org.apache.flink:flink-bucketing-sink-test   
{noformat}

  was:
Compiling Flink with Hortonwork HDP's version of Hadoop is currently 
unsuccessful due to Enforce Plugin catches a problem with their Hadoop.

 

The command used is

 
{noformat}
mvn clean install -DskipTests -Dcheckstyle.skip=true -Dmaven.javadoc.skip=true  
-Pvendor-repos -Dhadoop.version=2.7.3.2.6.5.0-292
{noformat}
 

The problems:
{noformat}
Dependency convergence error for com.fasterxml.jackson.core:jackson-core:2.6.0 
paths to dependency are: 
+-org.apache.flink:flink-bucketing-sink-test:1.5-SNAPSHOT   
 +-org.apache.flink:flink-shaded-hadoop2:1.5-SNAPSHOT 
   +-com.microsoft.azure:azure-storage:5.4.0 
 +-com.fasterxml.jackson.core:jackson-core:2.6.0 

and 

+-org.apache.flink:flink-bucketing-sink-test:1.5-SNAPSHOT 
 +-org.apache.flink:flink-shaded-hadoop2:1.5-SNAPSHOT 
   +-com.fasterxml.jackson.core:jackson-core:2.6.0 

and 
+-org.apache.flink:flink-bucketing-sink-test:1.5-SNAPSHOT 
 +-org.apache.flink:flink-shaded-hadoop2:1.5-SNAPSHOT 
   +-com.fasterxml.jackson.core:jackson-databind:2.2.3 
 +-com.fasterxml.jackson.core:jackson-core:2.2.3 

[WARNING] Rule 0: org.apache.maven.plugins.enforcer.DependencyConvergence 
failed with message: Failed while enforcing releasability. See above detailed 
error message. 

[INFO] FAILURE build of project org.apache.flink:flink-bucketing-sink-test   
{noformat}


> Maven enforcer plugin prevents compilation with HDP's Hadoop
> 
>
> Key: FLINK-9459
> URL: https://issues.apache.org/jira/browse/FLINK-9459
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Affects Versions: 1.5.0
>Reporter: Truong Duc Kien
>Priority: Major
>
> Compiling Flink with Hortonwork HDP's version of Hadoop is currently 
> unsuccessful due to Enforce Plugin catches a problem with their Hadoop.
>  
> The command used is
>  
> {noformat}
> mvn clean install -DskipTests -Dcheckstyle.skip=true 
> -Dmaven.javadoc.skip=true  -Pvendor-repos -Dhadoop.version=2.7.3.2.6.5.0-292
> {noformat}
>  
> The problems:
> {noformat}
> Dependency convergence error for 
> com.fasterxml.jackson.core:jackson-core:2.6.0 paths to dependency are:    
>  
> +-org.apache.flink:flink-bucketing-sink-test:1.5-SNAPSHOT   
>  +-org.apache.flink:flink-shaded-hadoop2:1.5-SNAPSHOT 
>    +-com.microsoft.azure:azure-storage:5.4.0 
>  +-com.fasterxml.jackson.core:jackson-core:2.6.0 
> and 
> +-org.apache.flink:flink-bucketing-sink-test:1.5-SNAPSHOT 
>  +-org.apache.flink:flink-shaded-hadoop2:1.5-SNAPSHOT 
>    +-com.fasterxml.jackson.core:jackson-core:2.6.0 
> and   
>   
> +-org.apache.flink:flink-bucketing-sink-test:1.5-SNAPSHOT 
>  +-org.apache.flink:flink-shaded-hadoop2:1.5-SNAPSHOT 
>    +-com.fasterxml.jackson.core:jackson-databind:2.2.3 
>  +-com.fasterxml.jackson.core:jackson-core:2.2.3 
> [WARNING] Rule 0: org.apache.maven.plugins.enforcer.DependencyConvergence 
> failed with message: Failed while enforcing releasability. See above detailed 
> error message. 
> 

[jira] [Created] (FLINK-9459) Maven enforcer plugin prevents compilation with HDP's Hadoop

2018-05-28 Thread Truong Duc Kien (JIRA)
Truong Duc Kien created FLINK-9459:
--

 Summary: Maven enforcer plugin prevents compilation with HDP's 
Hadoop
 Key: FLINK-9459
 URL: https://issues.apache.org/jira/browse/FLINK-9459
 Project: Flink
  Issue Type: Bug
  Components: Build System
Affects Versions: 1.5.0
Reporter: Truong Duc Kien


Compiling Flink with Hortonwork HDP's version of Hadoop is currently 
unsuccessful due to Enforce Plugin catches a problem with their Hadoop.

 

The command used is

 
{noformat}
mvn clean install -DskipTests -Dcheckstyle.skip=true -Dmaven.javadoc.skip=true  
-Pvendor-repos -Dhadoop.version=2.7.3.2.6.5.0-292
{noformat}
 

The problems:
{noformat}
Dependency convergence error for com.fasterxml.jackson.core:jackson-core:2.6.0 
paths to dependency are: 
+-org.apache.flink:flink-bucketing-sink-test:1.5-SNAPSHOT   
 +-org.apache.flink:flink-shaded-hadoop2:1.5-SNAPSHOT 
   +-com.microsoft.azure:azure-storage:5.4.0 
 +-com.fasterxml.jackson.core:jackson-core:2.6.0 

and 

+-org.apache.flink:flink-bucketing-sink-test:1.5-SNAPSHOT 
 +-org.apache.flink:flink-shaded-hadoop2:1.5-SNAPSHOT 
   +-com.fasterxml.jackson.core:jackson-core:2.6.0 

and 
+-org.apache.flink:flink-bucketing-sink-test:1.5-SNAPSHOT 
 +-org.apache.flink:flink-shaded-hadoop2:1.5-SNAPSHOT 
   +-com.fasterxml.jackson.core:jackson-databind:2.2.3 
 +-com.fasterxml.jackson.core:jackson-core:2.2.3 

[WARNING] Rule 0: org.apache.maven.plugins.enforcer.DependencyConvergence 
failed with message: Failed while enforcing releasability. See above detailed 
error message. 

[INFO] FAILURE build of project org.apache.flink:flink-bucketing-sink-test   
{noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-9458) Unable to recover from job failure on YARN with NPE

2018-05-28 Thread vinoyang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9458?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang reassigned FLINK-9458:
---

Assignee: vinoyang

> Unable to recover from job failure on YARN with NPE
> ---
>
> Key: FLINK-9458
> URL: https://issues.apache.org/jira/browse/FLINK-9458
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.5.0
> Environment: Ambari HDP 2.6.3
> Hadoop 2.7.3
>  
> Job configuration:
> 120 Task Managers x 1 slots 
>  
>  
>Reporter: Truong Duc Kien
>Assignee: vinoyang
>Priority: Major
>
> After upgrading our job to Flink 1.5, they are unable to recover from failure 
> with the following exception appears repeatedly
> {noformat}
> 2018-05-29 04:56:06,086 [ jobmanager-future-thread-36] INFO 
> o.a.f.r.e.ExecutionGraph Try to restart or fail the job xxx 
> (23d9e87bf43ce163ff7db8afb062fb1d) if no longer possible. 
> 2018-05-29 04:56:06,086 [ jobmanager-future-thread-36] INFO 
> o.a.f.r.e.ExecutionGraph Job xxx (23d9e87bf43ce163ff7db8afb062fb1d) switched 
> from state RESTARTING to RESTARTING. 
> 2018-05-29 04:56:06,086 [ jobmanager-future-thread-36] INFO 
> o.a.f.r.e.ExecutionGraph Restarting the job xxx 
> (23d9e87bf43ce163ff7db8afb062fb1d). 
> 2018-05-29 04:57:06,086 [ jobmanager-future-thread-36] WARN 
> o.a.f.r.e.ExecutionGraph Failed to restart the job. 
> java.lang.NullPointerException at 
> org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint.isAssignedAndAlive(CoLocationConstraint.java:104)
>  at 
> org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup.resetConstraints(CoLocationGroup.java:119)
>  at 
> org.apache.flink.runtime.executiongraph.ExecutionGraph.restart(ExecutionGraph.java:1247)
>  at 
> org.apache.flink.runtime.executiongraph.restart.ExecutionGraphRestartCallback.triggerFullRecovery(ExecutionGraphRestartCallback.java:59)
>  at 
> org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy$1.run(FixedDelayRestartStrategy.java:68)
>  at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at 
> java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9458) Unable to recover from job failure on YARN with NPE

2018-05-28 Thread Truong Duc Kien (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9458?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Truong Duc Kien updated FLINK-9458:
---
Description: 
After upgrading our job to Flink 1.5, they are unable to recover from failure 
with the following exception appears repeatedly
{noformat}
2018-05-29 04:56:06,086 [ jobmanager-future-thread-36] INFO 
o.a.f.r.e.ExecutionGraph Try to restart or fail the job xxx 
(23d9e87bf43ce163ff7db8afb062fb1d) if no longer possible. 
2018-05-29 04:56:06,086 [ jobmanager-future-thread-36] INFO 
o.a.f.r.e.ExecutionGraph Job xxx (23d9e87bf43ce163ff7db8afb062fb1d) switched 
from state RESTARTING to RESTARTING. 
2018-05-29 04:56:06,086 [ jobmanager-future-thread-36] INFO 
o.a.f.r.e.ExecutionGraph Restarting the job xxx 
(23d9e87bf43ce163ff7db8afb062fb1d). 
2018-05-29 04:57:06,086 [ jobmanager-future-thread-36] WARN 
o.a.f.r.e.ExecutionGraph Failed to restart the job. 
java.lang.NullPointerException at 
org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint.isAssignedAndAlive(CoLocationConstraint.java:104)
 at 
org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup.resetConstraints(CoLocationGroup.java:119)
 at 
org.apache.flink.runtime.executiongraph.ExecutionGraph.restart(ExecutionGraph.java:1247)
 at 
org.apache.flink.runtime.executiongraph.restart.ExecutionGraphRestartCallback.triggerFullRecovery(ExecutionGraphRestartCallback.java:59)
 at 
org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy$1.run(FixedDelayRestartStrategy.java:68)
 at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at 
java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
 at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
at java.lang.Thread.run(Thread.java:748)

{noformat}

  was:
After upgrading our job to Flink 1.5, they are unable to recover from failure 
with the following exception appears repeatedly
{noformat}
2018-05-29 04:56:06,086 [ jobmanager-future-thread-36] INFO 
o.a.f.r.e.ExecutionGraph Try to restart or fail the job xxx 
(23d9e87bf43ce163ff7db8afb062fb1d) if no longer possible. 
2018-05-29 04:56:06,086 [ jobmanager-future-thread-36] INFO 
o.a.f.r.e.ExecutionGraph Job xxx (23d9e87bf43ce163ff7db8afb062fb1d) switched 
from state RESTARTING to RESTARTING. 
2018-05-29 04:56:06,086 [ jobmanager-future-thread-36] INFO 
o.a.f.r.e.ExecutionGraph Restarting the job xxx 
(23d9e87bf43ce163ff7db8afb062fb1d). 
2018-05-29 04:57:06,086 [ jobmanager-future-thread-36] WARN 
o.a.f.r.e.ExecutionGraph Failed to restart the job. 
java.lang.NullPointerException at 
org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint.isAssignedAndAlive(CoLocationConstraint.java:104)
 at 
org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup.resetConstraints(CoLocationGroup.java:119)
 at 
org.apache.flink.runtime.executiongraph.ExecutionGraph.restart(ExecutionGraph.java:1247)
 at 
org.apache.flink.runtime.executiongraph.restart.ExecutionGraphRestartCallback.triggerFullRecovery(ExecutionGraphRestartCallback.java:59)
 at 
org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy$1.run(FixedDelayRestartStrategy.java:68)
 at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at 
java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
 at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
at java.lang.Thread.run(Thread.java:748)

{noformat}


> Unable to recover from job failure on YARN with NPE
> ---
>
> Key: FLINK-9458
> URL: https://issues.apache.org/jira/browse/FLINK-9458
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.5.0
> Environment: Ambari HDP 2.6.3
> Hadoop 2.7.3
>  
> Job configuration:
> 120 Task Managers x 1 slots 
>  
>  
>Reporter: Truong Duc Kien
>Priority: Major
>
> After upgrading our job to Flink 1.5, they are unable to recover from failure 
> with the following exception appears repeatedly
> {noformat}
> 2018-05-29 04:56:06,086 [ jobmanager-future-thread-36] INFO 
> o.a.f.r.e.ExecutionGraph Try to restart or fail the job xxx 
> (23d9e87bf43ce163ff7db8afb062fb1d) if no longer possible. 

[jira] [Updated] (FLINK-9458) Unable to recover from job failure on YARN with NPE

2018-05-28 Thread Truong Duc Kien (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9458?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Truong Duc Kien updated FLINK-9458:
---
Description: 
After upgrading our job to Flink 1.5, they are unable to recover from failure 
with the following exception appears repeatedly
{noformat}
2018-05-29 04:56:06,086 [ jobmanager-future-thread-36] INFO 
o.a.f.r.e.ExecutionGraph Try to restart or fail the job xxx 
(23d9e87bf43ce163ff7db8afb062fb1d) if no longer possible. 
2018-05-29 04:56:06,086 [ jobmanager-future-thread-36] INFO 
o.a.f.r.e.ExecutionGraph Job xxx (23d9e87bf43ce163ff7db8afb062fb1d) switched 
from state RESTARTING to RESTARTING. 
2018-05-29 04:56:06,086 [ jobmanager-future-thread-36] INFO 
o.a.f.r.e.ExecutionGraph Restarting the job xxx 
(23d9e87bf43ce163ff7db8afb062fb1d). 
2018-05-29 04:57:06,086 [ jobmanager-future-thread-36] WARN 
o.a.f.r.e.ExecutionGraph Failed to restart the job. 
java.lang.NullPointerException at 
org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint.isAssignedAndAlive(CoLocationConstraint.java:104)
 at 
org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup.resetConstraints(CoLocationGroup.java:119)
 at 
org.apache.flink.runtime.executiongraph.ExecutionGraph.restart(ExecutionGraph.java:1247)
 at 
org.apache.flink.runtime.executiongraph.restart.ExecutionGraphRestartCallback.triggerFullRecovery(ExecutionGraphRestartCallback.java:59)
 at 
org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy$1.run(FixedDelayRestartStrategy.java:68)
 at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at 
java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
 at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
at java.lang.Thread.run(Thread.java:748)

{noformat}

  was:
After upgrading our job to Flink 1.5, they are unable to recover from failure 
with the following exception appears repeatedly


{noformat}
2018-05-29 04:56:06,086 [ jobmanager-future-thread-36] INFO 
o.a.f.r.e.ExecutionGraph Try to restart or fail the job xxx 
(23d9e87bf43ce163ff7db8afb062fb1d) if no longer possible. 2018-05-29 
04:56:06,086 [ jobmanager-future-thread-36] INFO o.a.f.r.e.ExecutionGraph Job 
xxx (23d9e87bf43ce163ff7db8afb062fb1d) switched from state RESTARTING to 
RESTARTING. 2018-05-29 04:56:06,086 [ jobmanager-future-thread-36] INFO 
o.a.f.r.e.ExecutionGraph Restarting the job xxx 
(23d9e87bf43ce163ff7db8afb062fb1d). 2018-05-29 04:57:06,086 [ 
jobmanager-future-thread-36] WARN o.a.f.r.e.ExecutionGraph Failed to restart 
the job. java.lang.NullPointerException at 
org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint.isAssignedAndAlive(CoLocationConstraint.java:104)
 at 
org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup.resetConstraints(CoLocationGroup.java:119)
 at 
org.apache.flink.runtime.executiongraph.ExecutionGraph.restart(ExecutionGraph.java:1247)
 at 
org.apache.flink.runtime.executiongraph.restart.ExecutionGraphRestartCallback.triggerFullRecovery(ExecutionGraphRestartCallback.java:59)
 at 
org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy$1.run(FixedDelayRestartStrategy.java:68)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at 
java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
 at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
at java.lang.Thread.run(Thread.java:748)
{noformat}



> Unable to recover from job failure on YARN with NPE
> ---
>
> Key: FLINK-9458
> URL: https://issues.apache.org/jira/browse/FLINK-9458
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.5.0
> Environment: Ambari HDP 2.6.3
> Hadoop 2.7.3
>  
> Job configuration:
> 120 Task Managers x 1 slots 
>  
>  
>Reporter: Truong Duc Kien
>Priority: Major
>
> After upgrading our job to Flink 1.5, they are unable to recover from failure 
> with the following exception appears repeatedly
> {noformat}
> 2018-05-29 04:56:06,086 [ jobmanager-future-thread-36] INFO 
> o.a.f.r.e.ExecutionGraph Try to restart or fail the job xxx 
> (23d9e87bf43ce163ff7db8afb062fb1d) if no longer possible. 

[jira] [Created] (FLINK-9458) Unable to recover from job failure on YARN with NPE

2018-05-28 Thread Truong Duc Kien (JIRA)
Truong Duc Kien created FLINK-9458:
--

 Summary: Unable to recover from job failure on YARN with NPE
 Key: FLINK-9458
 URL: https://issues.apache.org/jira/browse/FLINK-9458
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.5.0
 Environment: Ambari HDP 2.6.3

Hadoop 2.7.3

 

Job configuration:

120 Task Managers x 1 slots 

 

 
Reporter: Truong Duc Kien


After upgrading our job to Flink 1.5, they are unable to recover from failure 
with the following exception appears repeatedly


{noformat}
2018-05-29 04:56:06,086 [ jobmanager-future-thread-36] INFO 
o.a.f.r.e.ExecutionGraph Try to restart or fail the job xxx 
(23d9e87bf43ce163ff7db8afb062fb1d) if no longer possible. 2018-05-29 
04:56:06,086 [ jobmanager-future-thread-36] INFO o.a.f.r.e.ExecutionGraph Job 
xxx (23d9e87bf43ce163ff7db8afb062fb1d) switched from state RESTARTING to 
RESTARTING. 2018-05-29 04:56:06,086 [ jobmanager-future-thread-36] INFO 
o.a.f.r.e.ExecutionGraph Restarting the job xxx 
(23d9e87bf43ce163ff7db8afb062fb1d). 2018-05-29 04:57:06,086 [ 
jobmanager-future-thread-36] WARN o.a.f.r.e.ExecutionGraph Failed to restart 
the job. java.lang.NullPointerException at 
org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint.isAssignedAndAlive(CoLocationConstraint.java:104)
 at 
org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup.resetConstraints(CoLocationGroup.java:119)
 at 
org.apache.flink.runtime.executiongraph.ExecutionGraph.restart(ExecutionGraph.java:1247)
 at 
org.apache.flink.runtime.executiongraph.restart.ExecutionGraphRestartCallback.triggerFullRecovery(ExecutionGraphRestartCallback.java:59)
 at 
org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy$1.run(FixedDelayRestartStrategy.java:68)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at 
java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
 at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
at java.lang.Thread.run(Thread.java:748)
{noformat}




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-9453) Flink 1.5 incorrectly shows statistics in web UI

2018-05-28 Thread yuqi (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9453?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuqi reassigned FLINK-9453:
---

Assignee: yuqi

> Flink 1.5 incorrectly shows statistics in web UI
> 
>
> Key: FLINK-9453
> URL: https://issues.apache.org/jira/browse/FLINK-9453
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming, Web Client, Webfrontend
>Affects Versions: 1.5.0
>Reporter: Andrei Shumanski
>Assignee: yuqi
>Priority: Major
> Attachments: Flink_1.4.PNG, Flink_1.5.PNG
>
>
> Flink 1.5.0 incorrectly shows execution statistics and differently from Flink 
> 1.4.1.
> I attach screenshots from both versions. It looks like 1.5.0 sums all steps 
> in operators connected with "forward()". It makes it impossible to understand 
> how much data was actually consumed.
> My test code:
>  
> {code:java}
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> List input = new ArrayList<>();
> input.add(1);
> input.add(2);
> input.add(3);
> input.add(4);
> input.add(5);
> DataStream stream = env.fromCollection(input).rebalance();
> stream.map(i -> i+1).name("incr").forward().map(i -> 
> i-1).name("decr").forward().print();
> env.execute();
> }
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9394) Let externalized checkpoint resume e2e also test rescaling

2018-05-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16492999#comment-16492999
 ] 

ASF GitHub Bot commented on FLINK-9394:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/6038
  
@StefanRRichter thanks for the review, will merge this.


> Let externalized checkpoint resume e2e also test rescaling
> --
>
> Key: FLINK-9394
> URL: https://issues.apache.org/jira/browse/FLINK-9394
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Major
>
> We should let the externalized checkpoints resuming e2e test also cover 
> rescaling cases.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6038: [FLINK-9394] [e2e] Test rescaling when resuming from exte...

2018-05-28 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/6038
  
@StefanRRichter thanks for the review, will merge this.


---


[jira] [Commented] (FLINK-8205) Multi key get

2018-05-28 Thread Kostas Kloudas (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16492903#comment-16492903
 ] 

Kostas Kloudas commented on FLINK-8205:
---

Hi [~sihuazhou] . Sorry but I was a bit busy with the 1.5 release.

Yes I would like to move forward with this. I will have a look at the design 
doc you posted. 

> Multi key get
> -
>
> Key: FLINK-8205
> URL: https://issues.apache.org/jira/browse/FLINK-8205
> Project: Flink
>  Issue Type: New Feature
>  Components: Queryable State
>Affects Versions: 1.4.0
> Environment: Any
>Reporter: Martin Eden
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.6.0
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Currently the Java queryable state api only allows for fetching one key at a 
> time. It would be extremely useful and more efficient if a similar call 
> exists for submitting multiple keys.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8863) Add user-defined function support in SQL Client

2018-05-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16492857#comment-16492857
 ] 

ASF GitHub Bot commented on FLINK-8863:
---

GitHub user xccui opened a pull request:

https://github.com/apache/flink/pull/6090

[FLINK-8863] [SQL] Add user-defined function support in SQL Client

## What is the purpose of the change

This PR aims to add user-defined function (ScalarFunction, TableFunction 
and AggregateFunction) support to the SQL Client.

## Brief change log

  - Introduce a new `HierarchyDescriptor` and its corresponding validator 
`HierarchyDescriptorValidator`, which allow constructing descriptors 
hierarchically.
  - Add a `PrimitiveTypeDescriptor` to describe a primitive type value and 
a `ClassTypeDescriptor` to describe a class type value. A `ClassTypeDescriptor` 
contains a `constructor` field, which is composed of a list of 
`PrimitiveTypeDescriptor` or `ClassTypeDescriptor`.
  - Add a `UDFDescriptor` and its base class `FunctionDescriptor` to 
describe a `UserDefinedFunction`. Given a `DescriptorProperties`, a 
`UserDefinedFunction` can be instantiated with the 
`FunctionValidator.generateUserDefinedFunction()` method.
  - Add related tests for the new components.

## Verifying this change

The change can be verified with the test cases added in 
`LocalExecutorITCase.java`.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (yes)
  - If yes, how is the feature documented? (*the doc has not been finished 
yet*)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/xccui/flink FLINK-8863-udf

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6090.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6090


commit c6c7d63f96eda01e24735271859ea8528e229021
Author: Xingcan Cui 
Date:   2018-05-27T15:36:25Z

[FLINK-8863] Add user-defined function support in SQL Client




> Add user-defined function support in SQL Client
> ---
>
> Key: FLINK-8863
> URL: https://issues.apache.org/jira/browse/FLINK-8863
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> This issue is a subtask of part two "Full Embedded SQL Client" of the 
> implementation plan mentioned in 
> [FLIP-24|https://cwiki.apache.org/confluence/display/FLINK/FLIP-24+-+SQL+Client].
>  
> It should be possible to declare user-defined functions in the SQL client. 
> For now, we limit the registration to classes that implement 
> {{ScalarFunction}}, {{TableFunction}}, {{AggregateFunction}}. Functions that 
> are implemented in SQL are not part of this issue. 
> I would suggest to introduce a {{functions}} top-level property. The 
> declaration could look similar to: 
> {code} 
> functions: 
>   - name: testFunction 
> from: class <-- optional, default: class 
> class: org.my.MyScalarFunction 
> constructor: <-- optional, needed for certain types of functions 
>   - 42.0 
>   - class: org.my.Class <-- possibility to create objects via properties 
> constructor: 
>   - 1 
>   - true 
>   - false 
>   - "whatever" 
>   - type: INT 
> value: 1 
> {code} 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6090: [FLINK-8863] [SQL] Add user-defined function suppo...

2018-05-28 Thread xccui
GitHub user xccui opened a pull request:

https://github.com/apache/flink/pull/6090

[FLINK-8863] [SQL] Add user-defined function support in SQL Client

## What is the purpose of the change

This PR aims to add user-defined function (ScalarFunction, TableFunction 
and AggregateFunction) support to the SQL Client.

## Brief change log

  - Introduce a new `HierarchyDescriptor` and its corresponding validator 
`HierarchyDescriptorValidator`, which allow constructing descriptors 
hierarchically.
  - Add a `PrimitiveTypeDescriptor` to describe a primitive type value and 
a `ClassTypeDescriptor` to describe a class type value. A `ClassTypeDescriptor` 
contains a `constructor` field, which is composed of a list of 
`PrimitiveTypeDescriptor` or `ClassTypeDescriptor`.
  - Add a `UDFDescriptor` and its base class `FunctionDescriptor` to 
describe a `UserDefinedFunction`. Given a `DescriptorProperties`, a 
`UserDefinedFunction` can be instantiated with the 
`FunctionValidator.generateUserDefinedFunction()` method.
  - Add related tests for the new components.

## Verifying this change

The change can be verified with the test cases added in 
`LocalExecutorITCase.java`.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (yes)
  - If yes, how is the feature documented? (*the doc has not been finished 
yet*)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/xccui/flink FLINK-8863-udf

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6090.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6090


commit c6c7d63f96eda01e24735271859ea8528e229021
Author: Xingcan Cui 
Date:   2018-05-27T15:36:25Z

[FLINK-8863] Add user-defined function support in SQL Client




---


[GitHub] flink issue #6076: [hotfix][docs] Specify operators behaviour on processing ...

2018-05-28 Thread eliaslevy
Github user eliaslevy commented on the issue:

https://github.com/apache/flink/pull/6076
  
Thanks, that helps.

Regarding debugging watermarks, at least in 1.4.2 sources did not display 
their emitted watermarks, although this may have been fixed in 1.5.0.  In turn, 
that makes jobs with task chaining turned on more difficult to debug.  It may 
be useful to suggest turning off task chaining for debugging.

Also, the Kafka connector page discusses the fact that the Kafka source 
only emits watermarks as the minimum of the per partition watermark, which 
means an idle partition may delay watermark emission. It may be good to 
reiterate that behavior here, as Kafka is the most commonly used connector.



---


[jira] [Commented] (FLINK-9450) Job hangs if S3 access it denied during checkpoints

2018-05-28 Thread Elias Levy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16492837#comment-16492837
 ] 

Elias Levy commented on FLINK-9450:
---

The logs show no errors.  Alas, we are using the JRE, so these nodes don't have 
jps or jstack installed. 

Re: job failure on checkpoint failure.  My understanding is that is a 1.5 
feature (env.getCheckpointConfig.setFailTasksOnCheckpointingErrors).  We are 
running 1.4.2.

The checkpoint and the job just hang.  There is no failure, at least for the 
amount of time I've waited, which has been several minutes.  I don't know if it 
will fail if I wait longer.  The job continues after I clear the firewall rules.

Looks like the checkpoint may be hanging on the synchronous portion of the 
async checkpoint.

This is from a Flink job running on a cluster with a single TM:

I run:

{noformat}
iptables -A OUTPUT -p tcp --dport 80 -j REJECT  --reject-with tcp-reset
iptables -A OUTPUT -p tcp --dport 443 -j REJECT  --reject-with tcp-reset
{noformat}

The logs show what seems like a normal checkpoint attempt:

{noformat}
2018-05-28 16:44:06.449070500 INFO  
org.apache.flink.runtime.state.DefaultOperatorStateBackend    - 
DefaultOperatorStateBackend snapshot (File Stream Factory @ 
s3://bucket/flink/job/checkpoints/03a1057d5d21dff9745eebd954c6, synchronous 
part) in thread Thread[Async calls on Source: Kafka Control Topic -> Updates 
(1/1),5,Flink Task Threads] took 0 ms.
2018-05-28 16:44:06.449103500 INFO  
org.apache.flink.runtime.state.DefaultOperatorStateBackend    - 
DefaultOperatorStateBackend snapshot (File Stream Factory @ 
s3://bucket/flink/job/checkpoints/03a1057d5d21dff9745eebd954c6, synchronous 
part) in thread Thread[Async calls on Source: Kafka Control Topic -> Updates 
(1/1),5,Flink Task Threads] took 0 ms.
2018-05-28 16:44:06.449388500 INFO  
org.apache.flink.runtime.state.DefaultOperatorStateBackend    - 
DefaultOperatorStateBackend snapshot (File Stream Factory @ 
s3://bucket/flink/job/checkpoints/03a1057d5d21dff9745eebd954c6, synchronous 
part) in thread Thread[Async calls on Source: Kafka Queries Topic -> Filter 
(1/1),5,Flink Task Threads] took 0 ms.
2018-05-28 16:44:06.449390500 INFO  
org.apache.flink.runtime.state.DefaultOperatorStateBackend    - 
DefaultOperatorStateBackend snapshot (File Stream Factory @ 
s3://bucket/flink/job/checkpoints/03a1057d5d21dff9745eebd954c6, synchronous 
part) in thread Thread[Suppress Events -> Assign Timestamp -> (Sink: Kafka 
Event Topic, Sink: Kafka Event Topic) (1/1),5,Flink Task Threads] took 0 ms.
2018-05-28 16:44:06.449400500 INFO  
org.apache.flink.runtime.state.DefaultOperatorStateBackend    - 
DefaultOperatorStateBackend snapshot (File Stream Factory @ 
s3://bucket/flink/job/checkpoints/03a1057d5d21dff9745eebd954c6, synchronous 
part) in thread Thread[Suppress Events -> Assign Timestamp -> (Sink: Kafka 
Event Topic, Sink: Kafka Event Topic) (1/1),5,Flink Task Threads] took 0 ms.
2018-05-28 16:44:06.451695500 INFO  
org.apache.flink.runtime.state.DefaultOperatorStateBackend    - 
DefaultOperatorStateBackend snapshot (File Stream Factory @ 
s3://bucket/flink/job/checkpoints/03a1057d5d21dff9745eebd954c6, synchronous 
part) in thread Thread[Engine (1/1),5,Flink Task Threads] took 2 ms
{noformat}

 After I disable the firewall rules:
{noformat}
iptables -D OUTPUT -p tcp --dport 80 -j REJECT --reject-with tcp-reset
iptables -D OUTPUT -p tcp --dport 443 -j REJECT  --reject-with tcp-reset
{noformat}

{noformat}
2018-05-28 16:47:19.741581500 INFO  
org.apache.flink.runtime.state.DefaultOperatorStateBackend- 
DefaultOperatorStateBackend snapshot (File Stream Factory @ 
s3://bucket/flink/job/checkpoints/03a1057d5d21dff9745eebd954c6, 
asynchronous part) in thread Thread[pool-58-thread-1,5,Flink Task Threads] took 
193290 ms.
2018-05-28 16:47:23.424434500 INFO  
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - Heap backend 
snapshot (File Stream Factory @ 
s3://bucket/flink/job/checkpoints/03a1057d5d21dff9745eebd954c6, synchronous 
part) in thread Thread[Suppress Events -> Assign Timestamp -> (Sink: Kafka 
Event Topic, Sink: Kafka Event Topic) (1/1),5,Flink Task Threads] took 0 ms.
2018-05-28 16:47:23.424876500 INFO  
org.apache.flink.runtime.state.DefaultOperatorStateBackend- 
DefaultOperatorStateBackend snapshot (File Stream Factory @ 
s3://bucket/flink/job/checkpoints/03a1057d5d21dff9745eebd954c6, synchronous 
part) in thread Thread[Async calls on Source: Kafka Queries Topic -> Agent 
Queries Filter (1/1),5,Flink Task Threads] took 0 ms.
2018-05-28 16:47:23.426263500 INFO  
org.apache.flink.runtime.state.DefaultOperatorStateBackend- 
DefaultOperatorStateBackend snapshot (File Stream Factory @ 
s3://bucket/flink/job/checkpoints/03a1057d5d21dff9745eebd954c6, synchronous 
part) in thread Thread[Async calls on Source: Kafka Queries Topic -> Agent 
Queries Filter 

[GitHub] flink issue #5982: [FLINK-9325][checkpoint]generate the meta file for checkp...

2018-05-28 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5982
  
I think we need to have a special output stream type 
(`AtomicCreatingFsDataOutputStream` or similar) as the return type of 
`FileSystem.createAtomic()`. Otherwise, how can a user actually create a file? 
The `closeAndPublish()` method is not part of any API class.


---


[jira] [Commented] (FLINK-9325) generate the _meta file for checkpoint only when the writing is truly successful

2018-05-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16492828#comment-16492828
 ] 

ASF GitHub Bot commented on FLINK-9325:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5982
  
I think we need to have a special output stream type 
(`AtomicCreatingFsDataOutputStream` or similar) as the return type of 
`FileSystem.createAtomic()`. Otherwise, how can a user actually create a file? 
The `closeAndPublish()` method is not part of any API class.


> generate the _meta file for checkpoint only when the writing is truly 
> successful
> 
>
> Key: FLINK-9325
> URL: https://issues.apache.org/jira/browse/FLINK-9325
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
>
> We should generate the _meta file for checkpoint only when the writing is 
> totally successful. We should write the metadata file first to a temp file 
> and then atomically rename it (with an equivalent workaround for S3). 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9451) End-to-end test: Scala Quickstarts

2018-05-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9451?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16492813#comment-16492813
 ] 

ASF GitHub Bot commented on FLINK-9451:
---

GitHub user medcv opened a pull request:

https://github.com/apache/flink/pull/6089

[FLINK-9451]End-to-end test: Scala Quickstarts

## What is the purpose of the change

Added an end-to-end test which verifies Flink's quickstarts scala. It does 
the following:

- create a new Flink project using the quickstarts scala archetype
- add a new Flink kafka connector dependency to the pom.xml 
- run mvn clean package
- verify that no core dependencies are contained in the jar file
- Run the program

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/medcv/flink FLINK-9451

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6089.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6089


commit 26f5948eff55fd54dbffc3125e0599479bee3bbe
Author: Yadan.JS 
Date:   2018-05-28T16:16:40Z

[FLINK-9451]End-to-end test: Scala Quickstarts




> End-to-end test: Scala Quickstarts
> --
>
> Key: FLINK-9451
> URL: https://issues.apache.org/jira/browse/FLINK-9451
> Project: Flink
>  Issue Type: Sub-task
>  Components: Quickstarts
>Affects Versions: 1.5.0, 1.4.1, 1.4.2
>Reporter: Yazdan Shirvany
>Assignee: Yazdan Shirvany
>Priority: Blocker
>
> We could add an end-to-end test which verifies Flink's quickstarts scala. It 
> should do the following:
>  # create a new Flink project using the quickstarts archetype
>  # add a new Flink dependency to the {{pom.xml}} (e.g. Flink connector or 
> library)
>  # run {{mvn clean package -Pbuild-jar}}
>  # verify that no core dependencies are contained in the jar file
>  # Run the program



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6089: [FLINK-9451]End-to-end test: Scala Quickstarts

2018-05-28 Thread medcv
GitHub user medcv opened a pull request:

https://github.com/apache/flink/pull/6089

[FLINK-9451]End-to-end test: Scala Quickstarts

## What is the purpose of the change

Added an end-to-end test which verifies Flink's quickstarts scala. It does 
the following:

- create a new Flink project using the quickstarts scala archetype
- add a new Flink kafka connector dependency to the pom.xml 
- run mvn clean package
- verify that no core dependencies are contained in the jar file
- Run the program

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/medcv/flink FLINK-9451

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6089.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6089


commit 26f5948eff55fd54dbffc3125e0599479bee3bbe
Author: Yadan.JS 
Date:   2018-05-28T16:16:40Z

[FLINK-9451]End-to-end test: Scala Quickstarts




---


[jira] [Assigned] (FLINK-9455) Make SlotManager aware of multi slot TaskManagers

2018-05-28 Thread Sihua Zhou (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9455?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sihua Zhou reassigned FLINK-9455:
-

Assignee: Sihua Zhou

> Make SlotManager aware of multi slot TaskManagers
> -
>
> Key: FLINK-9455
> URL: https://issues.apache.org/jira/browse/FLINK-9455
> Project: Flink
>  Issue Type: Improvement
>  Components: ResourceManager
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.6.0, 1.5.1
>
>
> The {{SlotManager}} responsible for managing all available slots of a Flink 
> cluster can request to start new {{TaskManagers}} if it cannot fulfill a slot 
> request. The started {{TaskManager}} can be started with multiple slots 
> configured but currently, the {{SlotManager}} thinks that it will be started 
> with a single slot. As a consequence, it might issue multiple requests to 
> start new TaskManagers even though a single one would be sufficient to 
> fulfill all pending slot requests.
> In order to avoid requesting unnecessary resources which are freed after the 
> idle timeout, I suggest to make the {{SlotManager}} aware of how many slots a 
> {{TaskManager}} is started with. That way the SlotManager only needs to 
> request a new {{TaskManager}} if all of the previously started slots 
> (potentially not yet registered and, thus, future slots) are being assigned 
> to slot requests.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-9456) Let ResourceManager notify JobManager about failed/killed TaskManagers

2018-05-28 Thread Sihua Zhou (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9456?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sihua Zhou reassigned FLINK-9456:
-

Assignee: Sihua Zhou

> Let ResourceManager notify JobManager about failed/killed TaskManagers
> --
>
> Key: FLINK-9456
> URL: https://issues.apache.org/jira/browse/FLINK-9456
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.6.0, 1.5.1
>
>
> Often, the {{ResourceManager}} learns faster about TaskManager 
> failures/killings because it directly communicates with the underlying 
> resource management framework. Instead of only relying on the 
> {{JobManager}}'s heartbeat to figure out that a {{TaskManager}} has died, we 
> should additionally send a signal from the {{ResourceManager}} to the 
> {{JobManager}} if a {{TaskManager}} has died. That way, we can react faster 
> to {{TaskManager}} failures and recover our running job/s.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-9440) Allow cancelation and reset of timers

2018-05-28 Thread Stefan Richter (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9440?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter reassigned FLINK-9440:
-

Assignee: Stefan Richter

> Allow cancelation and reset of timers
> -
>
> Key: FLINK-9440
> URL: https://issues.apache.org/jira/browse/FLINK-9440
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.4.2
>Reporter: Elias Levy
>Assignee: Stefan Richter
>Priority: Major
>
> Currently the {{TimerService}} allows one to register timers, but it is not 
> possible to delete a timer or to reset a timer to a new value.  If one wishes 
> to reset a timer, one must also handle the previous inserted timer callbacks 
> and ignore them.
> I would be useful if the API allowed one to remove and reset timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9457) Cancel container requests when cancelling pending slot allocations

2018-05-28 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-9457:


 Summary: Cancel container requests when cancelling pending slot 
allocations
 Key: FLINK-9457
 URL: https://issues.apache.org/jira/browse/FLINK-9457
 Project: Flink
  Issue Type: Improvement
  Components: ResourceManager, YARN
Affects Versions: 1.5.0
Reporter: Till Rohrmann
 Fix For: 1.6.0, 1.5.1


When cancelling a pending slot allocation request on the {{ResourceManager}}, 
then we should also check whether we still need all the requested containers. 
If it turns out that we no longer need them, then we should try to cancel the 
unnecessary container requests. That way Flink will be, for example, a better 
Yarn citizen which quickly releases resources and resource requests if no 
longer needed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7449) Improve and enhance documentation for incremental checkpoints

2018-05-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16492788#comment-16492788
 ] 

ASF GitHub Bot commented on FLINK-7449:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4543
  
Ping @StefanRRichter. Can we get this documentation merged? The community 
would definitely appreciate it.


> Improve and enhance documentation for incremental checkpoints
> -
>
> Key: FLINK-7449
> URL: https://issues.apache.org/jira/browse/FLINK-7449
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 1.4.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Minor
>
> We should provide more details about incremental checkpoints in the 
> documentation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #4543: [FLINK-7449] [docs] Additional documentation for incremen...

2018-05-28 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4543
  
Ping @StefanRRichter. Can we get this documentation merged? The community 
would definitely appreciate it.


---


[jira] [Created] (FLINK-9456) Let ResourceManager notify JobManager about failed/killed TaskManagers

2018-05-28 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-9456:


 Summary: Let ResourceManager notify JobManager about failed/killed 
TaskManagers
 Key: FLINK-9456
 URL: https://issues.apache.org/jira/browse/FLINK-9456
 Project: Flink
  Issue Type: Improvement
  Components: Distributed Coordination
Affects Versions: 1.5.0
Reporter: Till Rohrmann
 Fix For: 1.6.0, 1.5.1


Often, the {{ResourceManager}} learns faster about TaskManager 
failures/killings because it directly communicates with the underlying resource 
management framework. Instead of only relying on the {{JobManager}}'s heartbeat 
to figure out that a {{TaskManager}} has died, we should additionally send a 
signal from the {{ResourceManager}} to the {{JobManager}} if a {{TaskManager}} 
has died. That way, we can react faster to {{TaskManager}} failures and recover 
our running job/s.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9455) Make SlotManager aware of multi slot TaskManagers

2018-05-28 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-9455:


 Summary: Make SlotManager aware of multi slot TaskManagers
 Key: FLINK-9455
 URL: https://issues.apache.org/jira/browse/FLINK-9455
 Project: Flink
  Issue Type: Improvement
  Components: ResourceManager
Affects Versions: 1.5.0
Reporter: Till Rohrmann
 Fix For: 1.6.0, 1.5.1


The {{SlotManager}} responsible for managing all available slots of a Flink 
cluster can request to start new {{TaskManagers}} if it cannot fulfill a slot 
request. The started {{TaskManager}} can be started with multiple slots 
configured but currently, the {{SlotManager}} thinks that it will be started 
with a single slot. As a consequence, it might issue multiple requests to start 
new TaskManagers even though a single one would be sufficient to fulfill all 
pending slot requests.

In order to avoid requesting unnecessary resources which are freed after the 
idle timeout, I suggest to make the {{SlotManager}} aware of how many slots a 
{{TaskManager}} is started with. That way the SlotManager only needs to request 
a new {{TaskManager}} if all of the previously started slots (potentially not 
yet registered and, thus, future slots) are being assigned to slot requests.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8899) Submitting YARN job with FLIP-6 may lead to ApplicationAttemptNotFoundException

2018-05-28 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8899?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-8899:
-
Fix Version/s: 1.5.1
   1.6.0

> Submitting YARN job with FLIP-6 may lead to 
> ApplicationAttemptNotFoundException
> ---
>
> Key: FLINK-8899
> URL: https://issues.apache.org/jira/browse/FLINK-8899
> Project: Flink
>  Issue Type: Bug
>  Components: ResourceManager, YARN
>Affects Versions: 1.5.0
>Reporter: Nico Kruber
>Priority: Major
>  Labels: flip-6
> Fix For: 1.6.0, 1.5.1
>
>
> Occasionally, running a simple word count as this
> {code}
> ./bin/flink run -m yarn-cluster -yjm 768 -ytm 3072 -ys 2 -p 20 -c 
> org.apache.flink.streaming.examples.wordcount.WordCount 
> ./examples/streaming/WordCount.jar --input /usr/share/doc/rsync-3.0.6/COPYING
> {code}
> leads to an {{ApplicationAttemptNotFoundException}} in the logs:
> {code}
> 2018-03-08 16:18:08,507 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Job Streaming 
> WordCount (df707a3c9817ddf5936efe56d427e2bd) switched from state RUNNING to 
> FINISHED.
> 2018-03-08 16:18:08,508 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Stopping 
> checkpoint coordinator for job df707a3c9817ddf5936efe56d427e2bd
> 2018-03-08 16:18:08,508 INFO  
> org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore  - 
> Shutting down
> 2018-03-08 16:18:08,536 INFO  
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Job 
> df707a3c9817ddf5936efe56d427e2bd reached globally terminal state FINISHED.
> 2018-03-08 16:18:08,611 INFO  org.apache.flink.runtime.jobmaster.JobMaster
>   - Stopping the JobMaster for job Streaming 
> WordCount(df707a3c9817ddf5936efe56d427e2bd).
> 2018-03-08 16:18:08,634 INFO  org.apache.flink.runtime.jobmaster.JobMaster
>   - Close ResourceManager connection 
> dcfdc329d61aae0ace2de26292c8916b: JobManager is shutting down..
> 2018-03-08 16:18:08,634 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - Disconnect job manager 
> 0...@akka.tcp://fl...@ip-172-31-2-0.eu-west-1.compute.internal:38555/user/jobmanager_0
>  for job df707a3c9817ddf5936efe56d427e2bd from the resource manager.
> 2018-03-08 16:18:08,664 INFO  
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool  - Suspending 
> SlotPool.
> 2018-03-08 16:18:08,664 INFO  
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool  - Stopping 
> SlotPool.
> 2018-03-08 16:18:08,664 INFO  
> org.apache.flink.runtime.jobmaster.JobManagerRunner   - 
> JobManagerRunner already shutdown.
> 2018-03-08 16:18:09,650 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Register 
> TaskManager adc8090bdb3f7052943ff86bde7d2a7b at the SlotManager.
> 2018-03-08 16:18:09,654 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - Replacing old instance of worker for ResourceID 
> container_1519984124671_0090_01_05
> 2018-03-08 16:18:09,654 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - 
> Unregister TaskManager adc8090bdb3f7052943ff86bde7d2a7b from the SlotManager.
> 2018-03-08 16:18:09,654 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Register 
> TaskManager b975dbd16e0fd59c1168d978490a4b76 at the SlotManager.
> 2018-03-08 16:18:09,654 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - The target with resource ID 
> container_1519984124671_0090_01_05 is already been monitored.
> 2018-03-08 16:18:09,992 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Register 
> TaskManager 73c258a0dbad236501b8391971c330ba at the SlotManager.
> 2018-03-08 16:18:10,000 INFO  
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - RECEIVED 
> SIGNAL 15: SIGTERM. Shutting down as requested.
> 2018-03-08 16:18:10,028 ERROR 
> org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl  - Exception 
> on heartbeat
> org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException: 
> Application attempt appattempt_1519984124671_0090_01 doesn't exist in 
> ApplicationMasterService cache.
>   at 
> org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService.allocate(ApplicationMasterService.java:403)
>   at 
> org.apache.hadoop.yarn.api.impl.pb.service.ApplicationMasterProtocolPBServiceImpl.allocate(ApplicationMasterProtocolPBServiceImpl.java:60)
>   at 
> org.apache.hadoop.yarn.proto.ApplicationMasterProtocol$ApplicationMasterProtocolService$2.callBlockingMethod(ApplicationMasterProtocol.java:99)
>   at 
> 

[jira] [Updated] (FLINK-9010) NoResourceAvailableException with FLIP-6

2018-05-28 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9010?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-9010:
-
Priority: Major  (was: Critical)

> NoResourceAvailableException with FLIP-6 
> -
>
> Key: FLINK-9010
> URL: https://issues.apache.org/jira/browse/FLINK-9010
> Project: Flink
>  Issue Type: Bug
>  Components: ResourceManager
>Affects Versions: 1.5.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
>  Labels: flip-6
> Fix For: 1.6.0, 1.5.1
>
>
> I was trying to run a bigger program with 400 slots (100 TMs, 2 slots each) 
> with FLIP-6 mode and a checkpointing interval of 1000 and got the following 
> exception:
> {code}
> 2018-03-16 03:41:20,154 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - Received new container: 
> container_1521038088305_0257_01_000101 - Remaining pending container 
> requests: 302
> 2018-03-16 03:41:20,154 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - TaskExecutor container_1521038088305_0257_01_000101 will be 
> started with container size 8192 MB, JVM heap size 5120 MB, JVM direct memory 
> limit 3072 MB
> 2018-03-16 03:41:20,154 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - TM:remote keytab path obtained null
> 2018-03-16 03:41:20,154 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - TM:remote keytab principal obtained null
> 2018-03-16 03:41:20,154 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - TM:remote yarn conf path obtained null
> 2018-03-16 03:41:20,154 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - TM:remote krb5 path obtained null
> 2018-03-16 03:41:20,155 INFO  org.apache.flink.yarn.Utils 
>   - Copying from 
> file:/mnt/yarn/usercache/hadoop/appcache/application_1521038088305_0257/container_1521038088305_0257_01_01/3cd0c7d7-ccc1-4680-83a5-54e64dd637bc-taskmanager-conf.yaml
>  to 
> hdfs://ip-172-31-1-91.eu-west-1.compute.internal:8020/user/hadoop/.flink/application_1521038088305_0257/3cd0c7d7-ccc1-4680-83a5-54e64dd637bc-taskmanager-conf.yaml
> 2018-03-16 03:41:20,165 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - Prepared local resource for modified yaml: resource { scheme: 
> "hdfs" host: "ip-172-31-1-91.eu-west-1.compute.internal" port: 8020 file: 
> "/user/hadoop/.flink/application_1521038088305_0257/3cd0c7d7-ccc1-4680-83a5-54e64dd637bc-taskmanager-conf.yaml"
>  } size: 595 timestamp: 1521171680164 type: FILE visibility: APPLICATION
> 2018-03-16 03:41:20,168 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - Creating container launch context for TaskManagers
> 2018-03-16 03:41:20,168 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - Starting TaskManagers with command: $JAVA_HOME/bin/java 
> -Xms5120m -Xmx5120m -XX:MaxDirectMemorySize=3072m  
> -Dlog.file=/taskmanager.log 
> -Dlogback.configurationFile=file:./logback.xml 
> -Dlog4j.configuration=file:./log4j.properties 
> org.apache.flink.yarn.YarnTaskExecutorRunner --configDir . 1> 
> /taskmanager.out 2> /taskmanager.err
> 2018-03-16 03:41:20,176 INFO  
> org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy  - 
> Opening proxy : ip-172-31-3-221.eu-west-1.compute.internal:8041
> 2018-03-16 03:41:20,180 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - Received new container: 
> container_1521038088305_0257_01_000102 - Remaining pending container 
> requests: 301
> 2018-03-16 03:41:20,180 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - TaskExecutor container_1521038088305_0257_01_000102 will be 
> started with container size 8192 MB, JVM heap size 5120 MB, JVM direct memory 
> limit 3072 MB
> 2018-03-16 03:41:20,180 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - TM:remote keytab path obtained null
> 2018-03-16 03:41:20,180 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - TM:remote keytab principal obtained null
> 2018-03-16 03:41:20,180 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - TM:remote yarn conf path obtained null
> 2018-03-16 03:41:20,180 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - TM:remote krb5 path obtained null
> 2018-03-16 03:41:20,181 INFO  org.apache.flink.yarn.Utils 
>   - Copying from 
> file:/mnt/yarn/usercache/hadoop/appcache/application_1521038088305_0257/container_1521038088305_0257_01_01/6766be70-82f7-4999-a371-11c27527fb6e-taskmanager-conf.yaml
>  to 
> 

[jira] [Updated] (FLINK-9010) NoResourceAvailableException with FLIP-6

2018-05-28 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9010?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-9010:
-
Fix Version/s: 1.6.0

> NoResourceAvailableException with FLIP-6 
> -
>
> Key: FLINK-9010
> URL: https://issues.apache.org/jira/browse/FLINK-9010
> Project: Flink
>  Issue Type: Bug
>  Components: ResourceManager
>Affects Versions: 1.5.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Critical
>  Labels: flip-6
> Fix For: 1.6.0, 1.5.1
>
>
> I was trying to run a bigger program with 400 slots (100 TMs, 2 slots each) 
> with FLIP-6 mode and a checkpointing interval of 1000 and got the following 
> exception:
> {code}
> 2018-03-16 03:41:20,154 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - Received new container: 
> container_1521038088305_0257_01_000101 - Remaining pending container 
> requests: 302
> 2018-03-16 03:41:20,154 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - TaskExecutor container_1521038088305_0257_01_000101 will be 
> started with container size 8192 MB, JVM heap size 5120 MB, JVM direct memory 
> limit 3072 MB
> 2018-03-16 03:41:20,154 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - TM:remote keytab path obtained null
> 2018-03-16 03:41:20,154 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - TM:remote keytab principal obtained null
> 2018-03-16 03:41:20,154 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - TM:remote yarn conf path obtained null
> 2018-03-16 03:41:20,154 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - TM:remote krb5 path obtained null
> 2018-03-16 03:41:20,155 INFO  org.apache.flink.yarn.Utils 
>   - Copying from 
> file:/mnt/yarn/usercache/hadoop/appcache/application_1521038088305_0257/container_1521038088305_0257_01_01/3cd0c7d7-ccc1-4680-83a5-54e64dd637bc-taskmanager-conf.yaml
>  to 
> hdfs://ip-172-31-1-91.eu-west-1.compute.internal:8020/user/hadoop/.flink/application_1521038088305_0257/3cd0c7d7-ccc1-4680-83a5-54e64dd637bc-taskmanager-conf.yaml
> 2018-03-16 03:41:20,165 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - Prepared local resource for modified yaml: resource { scheme: 
> "hdfs" host: "ip-172-31-1-91.eu-west-1.compute.internal" port: 8020 file: 
> "/user/hadoop/.flink/application_1521038088305_0257/3cd0c7d7-ccc1-4680-83a5-54e64dd637bc-taskmanager-conf.yaml"
>  } size: 595 timestamp: 1521171680164 type: FILE visibility: APPLICATION
> 2018-03-16 03:41:20,168 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - Creating container launch context for TaskManagers
> 2018-03-16 03:41:20,168 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - Starting TaskManagers with command: $JAVA_HOME/bin/java 
> -Xms5120m -Xmx5120m -XX:MaxDirectMemorySize=3072m  
> -Dlog.file=/taskmanager.log 
> -Dlogback.configurationFile=file:./logback.xml 
> -Dlog4j.configuration=file:./log4j.properties 
> org.apache.flink.yarn.YarnTaskExecutorRunner --configDir . 1> 
> /taskmanager.out 2> /taskmanager.err
> 2018-03-16 03:41:20,176 INFO  
> org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy  - 
> Opening proxy : ip-172-31-3-221.eu-west-1.compute.internal:8041
> 2018-03-16 03:41:20,180 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - Received new container: 
> container_1521038088305_0257_01_000102 - Remaining pending container 
> requests: 301
> 2018-03-16 03:41:20,180 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - TaskExecutor container_1521038088305_0257_01_000102 will be 
> started with container size 8192 MB, JVM heap size 5120 MB, JVM direct memory 
> limit 3072 MB
> 2018-03-16 03:41:20,180 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - TM:remote keytab path obtained null
> 2018-03-16 03:41:20,180 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - TM:remote keytab principal obtained null
> 2018-03-16 03:41:20,180 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - TM:remote yarn conf path obtained null
> 2018-03-16 03:41:20,180 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - TM:remote krb5 path obtained null
> 2018-03-16 03:41:20,181 INFO  org.apache.flink.yarn.Utils 
>   - Copying from 
> file:/mnt/yarn/usercache/hadoop/appcache/application_1521038088305_0257/container_1521038088305_0257_01_01/6766be70-82f7-4999-a371-11c27527fb6e-taskmanager-conf.yaml
>  to 
> 

[jira] [Commented] (FLINK-5505) Harmonize ZooKeeper configuration parameters

2018-05-28 Thread vinoyang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16492775#comment-16492775
 ] 

vinoyang commented on FLINK-5505:
-

[~till.rohrmann] if you have no opinion, I will start this issue.

> Harmonize ZooKeeper configuration parameters
> 
>
> Key: FLINK-5505
> URL: https://issues.apache.org/jira/browse/FLINK-5505
> Project: Flink
>  Issue Type: Improvement
>  Components: Mesos
>Affects Versions: 1.2.0, 1.3.0
>Reporter: Till Rohrmann
>Assignee: vinoyang
>Priority: Trivial
> Fix For: 1.5.1
>
>
> Since Flink users don't necessarily know all of the Mesos terminology and a 
> JobManager runs also as a task, I would like to rename some of Flink's Mesos 
> configuration parameters. I would propose the following changes:
> {{mesos.initial-tasks}} => {{mesos.initial-taskmanagers}}
> {{mesos.maximum-failed-tasks}} => {{mesos.maximum-failed-taskmanagers}}
> {{mesos.resourcemanager.artifactserver.\*}} => {{mesos.artifactserver.*}}
> {{mesos.resourcemanager.framework.\*}} => {{mesos.framework.*}}
> {{mesos.resourcemanager.tasks.\*}} => {{mesos.taskmanager.*}}
> {{recovery.zookeeper.path.mesos-workers}} => 
> {{mesos.high-availability.zookeeper.path.mesos-workers}}
> What do you think [~eronwright]?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9352) In Standalone checkpoint recover mode many jobs with same checkpoint interval cause IO pressure

2018-05-28 Thread vinoyang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16492772#comment-16492772
 ] 

vinoyang commented on FLINK-9352:
-

[~till.rohrmann] OK, I will accept your opinion. Although, I think giving user 
choice could promote the scalability.

> In Standalone checkpoint recover mode many jobs with same checkpoint interval 
> cause IO pressure
> ---
>
> Key: FLINK-9352
> URL: https://issues.apache.org/jira/browse/FLINK-9352
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.4.2, 1.6.0
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>
> currently, the periodic checkpoint coordinator startCheckpointScheduler uses 
> *baseInterval* as the initialDelay parameter. the *baseInterval* is also the 
> checkpoint interval. 
> In standalone checkpoint mode, many jobs config the same checkpoint interval. 
> When all jobs being recovered (the cluster restart or jobmanager leadership 
> switched), all jobs' checkpoint period will tend to accordance. All jobs' 
> CheckpointCoordinator would start and trigger in a approximate time point.
> This caused the high IO cost in the same time period in our production 
> scenario.
> I suggest let the scheduleAtFixedRate's initial delay parameter as a API 
> config which can let user scatter checkpoint in this scenario.
>  
> cc [~StephanEwen] [~Zentol]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9352) In Standalone checkpoint recover mode many jobs with same checkpoint interval cause IO pressure

2018-05-28 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16492766#comment-16492766
 ] 

Till Rohrmann commented on FLINK-9352:
--

I would prefer [~StephanEwen]'s proposal to make it random and let the system 
be in charge instead of introducing yet another configuration knob. Keeping 
Flink's configuration complexity low if possible is important imo.

> In Standalone checkpoint recover mode many jobs with same checkpoint interval 
> cause IO pressure
> ---
>
> Key: FLINK-9352
> URL: https://issues.apache.org/jira/browse/FLINK-9352
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.4.2, 1.6.0
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>
> currently, the periodic checkpoint coordinator startCheckpointScheduler uses 
> *baseInterval* as the initialDelay parameter. the *baseInterval* is also the 
> checkpoint interval. 
> In standalone checkpoint mode, many jobs config the same checkpoint interval. 
> When all jobs being recovered (the cluster restart or jobmanager leadership 
> switched), all jobs' checkpoint period will tend to accordance. All jobs' 
> CheckpointCoordinator would start and trigger in a approximate time point.
> This caused the high IO cost in the same time period in our production 
> scenario.
> I suggest let the scheduleAtFixedRate's initial delay parameter as a API 
> config which can let user scatter checkpoint in this scenario.
>  
> cc [~StephanEwen] [~Zentol]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9454) Build instructions for MapR do not work

2018-05-28 Thread Andrei Shumanski (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9454?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16492764#comment-16492764
 ] 

Andrei Shumanski commented on FLINK-9454:
-

Just double checked - these files exist in flink-1.5.0-src.tgz archive.

So probably build from downloaded sources does not work by default.

> Build instructions for MapR do not work
> ---
>
> Key: FLINK-9454
> URL: https://issues.apache.org/jira/browse/FLINK-9454
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.5.0, 1.4.1
> Environment: {{$ java -version}}
>  {{java version }}{{"1.8.0_172"}}
>  {{Java(TM) SE Runtime Environment (build 1.8.0_172-b11)}}
>  {{Java HotSpot(TM) 64-Bit Server VM (build 25.172-b11, mixed mode)}}
>   
>  {{$ mvn -version}}
>  {{Apache Maven 3.5.3 (3383c37e1f9e9b3bc3df5050c29c8aff9f295297; 
> 2018-02-24T20:49:05+01:00)}}
>  {{Java version: 1.8.0_172, vendor: Oracle Corporation}}
>  {{Default locale: en_US, platform encoding: ISO-8859-1}}
>  {{OS name: }}{{"linux"}}{{, version: }}{{"3.10.0-327.36.3.el7.x86_64"}}{{, 
> arch: }}{{"amd64"}}{{, family: }}{{"unix"}}
>Reporter: Andrei Shumanski
>Priority: Minor
>
> Build instruction provided on the 
> [page|https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/deployment/mapr_setup.html]
>  does not work for MapR 6.
> When executing:
> mvn clean install -DskipTests -Pvendor-repos,mapr 
> -Dhadoop.version=2.7.0-mapr-1710 -Dzookeeper.version=3.4.5-mapr-1710
>  
> The result is:
> [ERROR] Failed to execute goal org.apache.rat:apache-rat-plugin:0.12:check 
> (default) on project flink-parent: Too many files with unapproved license: 7 
> See RAT report in: ...
>  
> I fixed the problem by adding "-Drat.skip=true" but there might be a better 
> fix.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-9454) Build instructions for MapR do not work

2018-05-28 Thread Andrei Shumanski (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9454?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16492753#comment-16492753
 ] 

Andrei Shumanski edited comment on FLINK-9454 at 5/28/18 2:58 PM:
--

I downloaded the source code from the official page: 
[http://flink.apache.org/downloads.html#source]

It is not cloned from git.

 

Files with unapproved licenses:

flink-connectors/flink-orc/src/test/resources/test-data-decimal.orc
 flink-connectors/flink-orc/src/test/resources/test-data-nestedlist.orc
 flink-connectors/flink-orc/src/test/resources/test-data-timetypes.orc
 flink-formats/flink-avro/src/test/resources/flink-1.3-avro-type-serialized-data
 flink-runtime/src/test/resources/heap_keyed_statebackend_1_2.snapshot
 flink-runtime/src/test/resources/heap_keyed_statebackend_1_5_map.snapshot

 


was (Author: shumanski):
I downloaded the source code from the official page: 
[http://flink.apache.org/downloads.html#source]

It is not cloned from git.

 

Files with unapproved licenses:

flink-connectors/flink-orc/src/test/resources/test-data-decimal.orc
 flink-connectors/flink-orc/src/test/resources/test-data-nestedlist.orc
 flink-connectors/flink-orc/src/test/resources/test-data-timetypes.orc
 flink-formats/flink-avro/src/test/resources/flink-1.3-avro-type-serialized-data
 flink-runtime/src/test/resources/heap_keyed_statebackend_1_2.snapshot
 flink-runtime/src/test/resources/heap_keyed_statebackend_1_5_map.snapshot
 build.txt

 

> Build instructions for MapR do not work
> ---
>
> Key: FLINK-9454
> URL: https://issues.apache.org/jira/browse/FLINK-9454
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.5.0, 1.4.1
> Environment: {{$ java -version}}
>  {{java version }}{{"1.8.0_172"}}
>  {{Java(TM) SE Runtime Environment (build 1.8.0_172-b11)}}
>  {{Java HotSpot(TM) 64-Bit Server VM (build 25.172-b11, mixed mode)}}
>   
>  {{$ mvn -version}}
>  {{Apache Maven 3.5.3 (3383c37e1f9e9b3bc3df5050c29c8aff9f295297; 
> 2018-02-24T20:49:05+01:00)}}
>  {{Java version: 1.8.0_172, vendor: Oracle Corporation}}
>  {{Default locale: en_US, platform encoding: ISO-8859-1}}
>  {{OS name: }}{{"linux"}}{{, version: }}{{"3.10.0-327.36.3.el7.x86_64"}}{{, 
> arch: }}{{"amd64"}}{{, family: }}{{"unix"}}
>Reporter: Andrei Shumanski
>Priority: Minor
>
> Build instruction provided on the 
> [page|https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/deployment/mapr_setup.html]
>  does not work for MapR 6.
> When executing:
> mvn clean install -DskipTests -Pvendor-repos,mapr 
> -Dhadoop.version=2.7.0-mapr-1710 -Dzookeeper.version=3.4.5-mapr-1710
>  
> The result is:
> [ERROR] Failed to execute goal org.apache.rat:apache-rat-plugin:0.12:check 
> (default) on project flink-parent: Too many files with unapproved license: 7 
> See RAT report in: ...
>  
> I fixed the problem by adding "-Drat.skip=true" but there might be a better 
> fix.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9454) Build instructions for MapR do not work

2018-05-28 Thread Andrei Shumanski (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9454?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16492761#comment-16492761
 ] 

Andrei Shumanski commented on FLINK-9454:
-

As all incorrect files are in /test/ and I run with "-DskipTests" I can 
probably delete them without any harm.

But I think is still shall be mentioned in the documentation somehow...

> Build instructions for MapR do not work
> ---
>
> Key: FLINK-9454
> URL: https://issues.apache.org/jira/browse/FLINK-9454
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.5.0, 1.4.1
> Environment: {{$ java -version}}
>  {{java version }}{{"1.8.0_172"}}
>  {{Java(TM) SE Runtime Environment (build 1.8.0_172-b11)}}
>  {{Java HotSpot(TM) 64-Bit Server VM (build 25.172-b11, mixed mode)}}
>   
>  {{$ mvn -version}}
>  {{Apache Maven 3.5.3 (3383c37e1f9e9b3bc3df5050c29c8aff9f295297; 
> 2018-02-24T20:49:05+01:00)}}
>  {{Java version: 1.8.0_172, vendor: Oracle Corporation}}
>  {{Default locale: en_US, platform encoding: ISO-8859-1}}
>  {{OS name: }}{{"linux"}}{{, version: }}{{"3.10.0-327.36.3.el7.x86_64"}}{{, 
> arch: }}{{"amd64"}}{{, family: }}{{"unix"}}
>Reporter: Andrei Shumanski
>Priority: Minor
>
> Build instruction provided on the 
> [page|https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/deployment/mapr_setup.html]
>  does not work for MapR 6.
> When executing:
> mvn clean install -DskipTests -Pvendor-repos,mapr 
> -Dhadoop.version=2.7.0-mapr-1710 -Dzookeeper.version=3.4.5-mapr-1710
>  
> The result is:
> [ERROR] Failed to execute goal org.apache.rat:apache-rat-plugin:0.12:check 
> (default) on project flink-parent: Too many files with unapproved license: 7 
> See RAT report in: ...
>  
> I fixed the problem by adding "-Drat.skip=true" but there might be a better 
> fix.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9352) In Standalone checkpoint recover mode many jobs with same checkpoint interval cause IO pressure

2018-05-28 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9352?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-9352:
-
Affects Version/s: 1.6.0
   1.5.0
   1.4.2

> In Standalone checkpoint recover mode many jobs with same checkpoint interval 
> cause IO pressure
> ---
>
> Key: FLINK-9352
> URL: https://issues.apache.org/jira/browse/FLINK-9352
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.4.2, 1.6.0
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>
> currently, the periodic checkpoint coordinator startCheckpointScheduler uses 
> *baseInterval* as the initialDelay parameter. the *baseInterval* is also the 
> checkpoint interval. 
> In standalone checkpoint mode, many jobs config the same checkpoint interval. 
> When all jobs being recovered (the cluster restart or jobmanager leadership 
> switched), all jobs' checkpoint period will tend to accordance. All jobs' 
> CheckpointCoordinator would start and trigger in a approximate time point.
> This caused the high IO cost in the same time period in our production 
> scenario.
> I suggest let the scheduleAtFixedRate's initial delay parameter as a API 
> config which can let user scatter checkpoint in this scenario.
>  
> cc [~StephanEwen] [~Zentol]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9352) In Standalone checkpoint recover mode many jobs with same checkpoint interval cause IO pressure

2018-05-28 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9352?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-9352:
-
Issue Type: Improvement  (was: Bug)

> In Standalone checkpoint recover mode many jobs with same checkpoint interval 
> cause IO pressure
> ---
>
> Key: FLINK-9352
> URL: https://issues.apache.org/jira/browse/FLINK-9352
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Critical
>
> currently, the periodic checkpoint coordinator startCheckpointScheduler uses 
> *baseInterval* as the initialDelay parameter. the *baseInterval* is also the 
> checkpoint interval. 
> In standalone checkpoint mode, many jobs config the same checkpoint interval. 
> When all jobs being recovered (the cluster restart or jobmanager leadership 
> switched), all jobs' checkpoint period will tend to accordance. All jobs' 
> CheckpointCoordinator would start and trigger in a approximate time point.
> This caused the high IO cost in the same time period in our production 
> scenario.
> I suggest let the scheduleAtFixedRate's initial delay parameter as a API 
> config which can let user scatter checkpoint in this scenario.
>  
> cc [~StephanEwen] [~Zentol]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9352) In Standalone checkpoint recover mode many jobs with same checkpoint interval cause IO pressure

2018-05-28 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9352?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-9352:
-
Priority: Major  (was: Critical)

> In Standalone checkpoint recover mode many jobs with same checkpoint interval 
> cause IO pressure
> ---
>
> Key: FLINK-9352
> URL: https://issues.apache.org/jira/browse/FLINK-9352
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.4.2, 1.6.0
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>
> currently, the periodic checkpoint coordinator startCheckpointScheduler uses 
> *baseInterval* as the initialDelay parameter. the *baseInterval* is also the 
> checkpoint interval. 
> In standalone checkpoint mode, many jobs config the same checkpoint interval. 
> When all jobs being recovered (the cluster restart or jobmanager leadership 
> switched), all jobs' checkpoint period will tend to accordance. All jobs' 
> CheckpointCoordinator would start and trigger in a approximate time point.
> This caused the high IO cost in the same time period in our production 
> scenario.
> I suggest let the scheduleAtFixedRate's initial delay parameter as a API 
> config which can let user scatter checkpoint in this scenario.
>  
> cc [~StephanEwen] [~Zentol]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-9454) Build instructions for MapR do not work

2018-05-28 Thread Andrei Shumanski (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9454?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16492753#comment-16492753
 ] 

Andrei Shumanski edited comment on FLINK-9454 at 5/28/18 2:55 PM:
--

I downloaded the source code from the official page: 
[http://flink.apache.org/downloads.html#source]

It is not cloned from git.

 

Files with unapproved licenses:

flink-connectors/flink-orc/src/test/resources/test-data-decimal.orc
 flink-connectors/flink-orc/src/test/resources/test-data-nestedlist.orc
 flink-connectors/flink-orc/src/test/resources/test-data-timetypes.orc
 flink-formats/flink-avro/src/test/resources/flink-1.3-avro-type-serialized-data
 flink-runtime/src/test/resources/heap_keyed_statebackend_1_2.snapshot
 flink-runtime/src/test/resources/heap_keyed_statebackend_1_5_map.snapshot
 build.txt

 


was (Author: shumanski):
I downloaded the source code from the official page: 
[http://flink.apache.org/downloads.html#source]

It is not cloned from git.

 

The wrong files are not printed by default, I will re-run the job with more 
logging.

 

> Build instructions for MapR do not work
> ---
>
> Key: FLINK-9454
> URL: https://issues.apache.org/jira/browse/FLINK-9454
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.5.0, 1.4.1
> Environment: {{$ java -version}}
>  {{java version }}{{"1.8.0_172"}}
>  {{Java(TM) SE Runtime Environment (build 1.8.0_172-b11)}}
>  {{Java HotSpot(TM) 64-Bit Server VM (build 25.172-b11, mixed mode)}}
>   
>  {{$ mvn -version}}
>  {{Apache Maven 3.5.3 (3383c37e1f9e9b3bc3df5050c29c8aff9f295297; 
> 2018-02-24T20:49:05+01:00)}}
>  {{Java version: 1.8.0_172, vendor: Oracle Corporation}}
>  {{Default locale: en_US, platform encoding: ISO-8859-1}}
>  {{OS name: }}{{"linux"}}{{, version: }}{{"3.10.0-327.36.3.el7.x86_64"}}{{, 
> arch: }}{{"amd64"}}{{, family: }}{{"unix"}}
>Reporter: Andrei Shumanski
>Priority: Minor
>
> Build instruction provided on the 
> [page|https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/deployment/mapr_setup.html]
>  does not work for MapR 6.
> When executing:
> mvn clean install -DskipTests -Pvendor-repos,mapr 
> -Dhadoop.version=2.7.0-mapr-1710 -Dzookeeper.version=3.4.5-mapr-1710
>  
> The result is:
> [ERROR] Failed to execute goal org.apache.rat:apache-rat-plugin:0.12:check 
> (default) on project flink-parent: Too many files with unapproved license: 7 
> See RAT report in: ...
>  
> I fixed the problem by adding "-Drat.skip=true" but there might be a better 
> fix.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9231) Enable SO_REUSEADDR on listen sockets

2018-05-28 Thread Andrey Zagrebin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16492760#comment-16492760
 ] 

Andrey Zagrebin commented on FLINK-9231:


[~yuzhih...@gmail.com] just wondering could you please provide some details 
about the original problem without this fix? what was the usage scenario?

> Enable SO_REUSEADDR on listen sockets
> -
>
> Key: FLINK-9231
> URL: https://issues.apache.org/jira/browse/FLINK-9231
> Project: Flink
>  Issue Type: Improvement
>  Components: Webfrontend
>Reporter: Ted Yu
>Assignee: Triones Deng
>Priority: Major
>
> This allows sockets to be bound even if there are sockets from a previous 
> application that are still pending closure.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9454) Build instructions for MapR do not work

2018-05-28 Thread Andrei Shumanski (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9454?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16492753#comment-16492753
 ] 

Andrei Shumanski commented on FLINK-9454:
-

I downloaded the source code from the official page: 
[http://flink.apache.org/downloads.html#source]

It is not cloned from git.

 

The wrong files are not printed by default, I will re-run the job with more 
logging.

 

> Build instructions for MapR do not work
> ---
>
> Key: FLINK-9454
> URL: https://issues.apache.org/jira/browse/FLINK-9454
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.5.0, 1.4.1
> Environment: {{$ java -version}}
>  {{java version }}{{"1.8.0_172"}}
>  {{Java(TM) SE Runtime Environment (build 1.8.0_172-b11)}}
>  {{Java HotSpot(TM) 64-Bit Server VM (build 25.172-b11, mixed mode)}}
>   
>  {{$ mvn -version}}
>  {{Apache Maven 3.5.3 (3383c37e1f9e9b3bc3df5050c29c8aff9f295297; 
> 2018-02-24T20:49:05+01:00)}}
>  {{Java version: 1.8.0_172, vendor: Oracle Corporation}}
>  {{Default locale: en_US, platform encoding: ISO-8859-1}}
>  {{OS name: }}{{"linux"}}{{, version: }}{{"3.10.0-327.36.3.el7.x86_64"}}{{, 
> arch: }}{{"amd64"}}{{, family: }}{{"unix"}}
>Reporter: Andrei Shumanski
>Priority: Minor
>
> Build instruction provided on the 
> [page|https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/deployment/mapr_setup.html]
>  does not work for MapR 6.
> When executing:
> mvn clean install -DskipTests -Pvendor-repos,mapr 
> -Dhadoop.version=2.7.0-mapr-1710 -Dzookeeper.version=3.4.5-mapr-1710
>  
> The result is:
> [ERROR] Failed to execute goal org.apache.rat:apache-rat-plugin:0.12:check 
> (default) on project flink-parent: Too many files with unapproved license: 7 
> See RAT report in: ...
>  
> I fixed the problem by adding "-Drat.skip=true" but there might be a better 
> fix.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-9454) Build instructions for MapR do not work

2018-05-28 Thread Dawid Wysakowicz (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9454?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16492751#comment-16492751
 ] 

Dawid Wysakowicz edited comment on FLINK-9454 at 5/28/18 2:48 PM:
--

Did you run it on a cleanly checked out repository? You probably have some 
leftovers from previous builds. (By previous builds I mean builds of different 
versions - mvn clean after switching to different version might not clean all 
generated files)

Try removing manually files reported in the RAT report and build it one more 
time.


was (Author: dawidwys):
Did you run it on a cleanly checked out repository? You probably have some 
leftovers from previous builds.

Try removing manually files reported in the RAT report and build it one more 
time.

> Build instructions for MapR do not work
> ---
>
> Key: FLINK-9454
> URL: https://issues.apache.org/jira/browse/FLINK-9454
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.5.0, 1.4.1
> Environment: {{$ java -version}}
>  {{java version }}{{"1.8.0_172"}}
>  {{Java(TM) SE Runtime Environment (build 1.8.0_172-b11)}}
>  {{Java HotSpot(TM) 64-Bit Server VM (build 25.172-b11, mixed mode)}}
>   
>  {{$ mvn -version}}
>  {{Apache Maven 3.5.3 (3383c37e1f9e9b3bc3df5050c29c8aff9f295297; 
> 2018-02-24T20:49:05+01:00)}}
>  {{Java version: 1.8.0_172, vendor: Oracle Corporation}}
>  {{Default locale: en_US, platform encoding: ISO-8859-1}}
>  {{OS name: }}{{"linux"}}{{, version: }}{{"3.10.0-327.36.3.el7.x86_64"}}{{, 
> arch: }}{{"amd64"}}{{, family: }}{{"unix"}}
>Reporter: Andrei Shumanski
>Priority: Minor
>
> Build instruction provided on the 
> [page|https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/deployment/mapr_setup.html]
>  does not work for MapR 6.
> When executing:
> mvn clean install -DskipTests -Pvendor-repos,mapr 
> -Dhadoop.version=2.7.0-mapr-1710 -Dzookeeper.version=3.4.5-mapr-1710
>  
> The result is:
> [ERROR] Failed to execute goal org.apache.rat:apache-rat-plugin:0.12:check 
> (default) on project flink-parent: Too many files with unapproved license: 7 
> See RAT report in: ...
>  
> I fixed the problem by adding "-Drat.skip=true" but there might be a better 
> fix.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9454) Build instructions for MapR do not work

2018-05-28 Thread Dawid Wysakowicz (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9454?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16492751#comment-16492751
 ] 

Dawid Wysakowicz commented on FLINK-9454:
-

Did you run it on a cleanly checked out repository? You probably have some 
leftovers from previous builds.

Try removing manually files reported in the RAT report and build it one more 
time.

> Build instructions for MapR do not work
> ---
>
> Key: FLINK-9454
> URL: https://issues.apache.org/jira/browse/FLINK-9454
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.5.0, 1.4.1
> Environment: {{$ java -version}}
>  {{java version }}{{"1.8.0_172"}}
>  {{Java(TM) SE Runtime Environment (build 1.8.0_172-b11)}}
>  {{Java HotSpot(TM) 64-Bit Server VM (build 25.172-b11, mixed mode)}}
>   
>  {{$ mvn -version}}
>  {{Apache Maven 3.5.3 (3383c37e1f9e9b3bc3df5050c29c8aff9f295297; 
> 2018-02-24T20:49:05+01:00)}}
>  {{Java version: 1.8.0_172, vendor: Oracle Corporation}}
>  {{Default locale: en_US, platform encoding: ISO-8859-1}}
>  {{OS name: }}{{"linux"}}{{, version: }}{{"3.10.0-327.36.3.el7.x86_64"}}{{, 
> arch: }}{{"amd64"}}{{, family: }}{{"unix"}}
>Reporter: Andrei Shumanski
>Priority: Minor
>
> Build instruction provided on the 
> [page|https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/deployment/mapr_setup.html]
>  does not work for MapR 6.
> When executing:
> mvn clean install -DskipTests -Pvendor-repos,mapr 
> -Dhadoop.version=2.7.0-mapr-1710 -Dzookeeper.version=3.4.5-mapr-1710
>  
> The result is:
> [ERROR] Failed to execute goal org.apache.rat:apache-rat-plugin:0.12:check 
> (default) on project flink-parent: Too many files with unapproved license: 7 
> See RAT report in: ...
>  
> I fixed the problem by adding "-Drat.skip=true" but there might be a better 
> fix.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9454) Build instructions for MapR do not work

2018-05-28 Thread Andrei Shumanski (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9454?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrei Shumanski updated FLINK-9454:

Environment: 
{{$ java -version}}
 {{java version }}{{"1.8.0_172"}}
 {{Java(TM) SE Runtime Environment (build 1.8.0_172-b11)}}
 {{Java HotSpot(TM) 64-Bit Server VM (build 25.172-b11, mixed mode)}}
  
 {{$ mvn -version}}
 {{Apache Maven 3.5.3 (3383c37e1f9e9b3bc3df5050c29c8aff9f295297; 
2018-02-24T20:49:05+01:00)}}
 {{Java version: 1.8.0_172, vendor: Oracle Corporation}}
 {{Default locale: en_US, platform encoding: ISO-8859-1}}
 {{OS name: }}{{"linux"}}{{, version: }}{{"3.10.0-327.36.3.el7.x86_64"}}{{, 
arch: }}{{"amd64"}}{{, family: }}{{"unix"}}

  was:
{{$ java -version}}
{{java version }}{{"1.8.0_172"}}
{{Java(TM) SE Runtime Environment (build 1.8.0_172-b11)}}
{{Java HotSpot(TM) 64-Bit Server VM (build 25.172-b11, mixed mode)}}
 
{{$ mvn -version}}
{{Apache Maven 3.5.3 (3383c37e1f9e9b3bc3df5050c29c8aff9f295297; 
2018-02-24T20:49:05+01:00)}}
{{Maven home: }}{{/app/vbuild/tools/maven/3}}{{.5.3}}
{{Java version: 1.8.0_172, vendor: Oracle Corporation}}
{{Java home: 
}}{{/afs/sunrise}}{{.ericsson.se}}{{/se/app/vbuild/SLED11-x86_64/jdk/1}}{{.8.0_172}}{{/jre}}
{{Default locale: en_US, platform encoding: ISO-8859-1}}
{{OS name: }}{{"linux"}}{{, version: }}{{"3.10.0-327.36.3.el7.x86_64"}}{{, 
arch: }}{{"amd64"}}{{, family: }}{{"unix"}}


> Build instructions for MapR do not work
> ---
>
> Key: FLINK-9454
> URL: https://issues.apache.org/jira/browse/FLINK-9454
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.5.0, 1.4.1
> Environment: {{$ java -version}}
>  {{java version }}{{"1.8.0_172"}}
>  {{Java(TM) SE Runtime Environment (build 1.8.0_172-b11)}}
>  {{Java HotSpot(TM) 64-Bit Server VM (build 25.172-b11, mixed mode)}}
>   
>  {{$ mvn -version}}
>  {{Apache Maven 3.5.3 (3383c37e1f9e9b3bc3df5050c29c8aff9f295297; 
> 2018-02-24T20:49:05+01:00)}}
>  {{Java version: 1.8.0_172, vendor: Oracle Corporation}}
>  {{Default locale: en_US, platform encoding: ISO-8859-1}}
>  {{OS name: }}{{"linux"}}{{, version: }}{{"3.10.0-327.36.3.el7.x86_64"}}{{, 
> arch: }}{{"amd64"}}{{, family: }}{{"unix"}}
>Reporter: Andrei Shumanski
>Priority: Minor
>
> Build instruction provided on the 
> [page|https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/deployment/mapr_setup.html]
>  does not work for MapR 6.
> When executing:
> mvn clean install -DskipTests -Pvendor-repos,mapr 
> -Dhadoop.version=2.7.0-mapr-1710 -Dzookeeper.version=3.4.5-mapr-1710
>  
> The result is:
> [ERROR] Failed to execute goal org.apache.rat:apache-rat-plugin:0.12:check 
> (default) on project flink-parent: Too many files with unapproved license: 7 
> See RAT report in: ...
>  
> I fixed the problem by adding "-Drat.skip=true" but there might be a better 
> fix.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9454) Build instructions for MapR do not work

2018-05-28 Thread Andrei Shumanski (JIRA)
Andrei Shumanski created FLINK-9454:
---

 Summary: Build instructions for MapR do not work
 Key: FLINK-9454
 URL: https://issues.apache.org/jira/browse/FLINK-9454
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Affects Versions: 1.4.1, 1.5.0
 Environment: {{$ java -version}}
{{java version }}{{"1.8.0_172"}}
{{Java(TM) SE Runtime Environment (build 1.8.0_172-b11)}}
{{Java HotSpot(TM) 64-Bit Server VM (build 25.172-b11, mixed mode)}}
 
{{$ mvn -version}}
{{Apache Maven 3.5.3 (3383c37e1f9e9b3bc3df5050c29c8aff9f295297; 
2018-02-24T20:49:05+01:00)}}
{{Maven home: }}{{/app/vbuild/tools/maven/3}}{{.5.3}}
{{Java version: 1.8.0_172, vendor: Oracle Corporation}}
{{Java home: 
}}{{/afs/sunrise}}{{.ericsson.se}}{{/se/app/vbuild/SLED11-x86_64/jdk/1}}{{.8.0_172}}{{/jre}}
{{Default locale: en_US, platform encoding: ISO-8859-1}}
{{OS name: }}{{"linux"}}{{, version: }}{{"3.10.0-327.36.3.el7.x86_64"}}{{, 
arch: }}{{"amd64"}}{{, family: }}{{"unix"}}
Reporter: Andrei Shumanski


Build instruction provided on the 
[page|https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/deployment/mapr_setup.html]
 does not work for MapR 6.

When executing:

mvn clean install -DskipTests -Pvendor-repos,mapr 
-Dhadoop.version=2.7.0-mapr-1710 -Dzookeeper.version=3.4.5-mapr-1710

 

The result is:

[ERROR] Failed to execute goal org.apache.rat:apache-rat-plugin:0.12:check 
(default) on project flink-parent: Too many files with unapproved license: 7 
See RAT report in: ...

 

I fixed the problem by adding "-Drat.skip=true" but there might be a better fix.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9453) Flink 1.5 incorrectly shows statistics in web UI

2018-05-28 Thread Andrei Shumanski (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9453?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrei Shumanski updated FLINK-9453:

Description: 
Flink 1.5.0 incorrectly shows execution statistics and differently from Flink 
1.4.1.

I attach screenshots from both versions. It looks like 1.5.0 sums all steps in 
operators connected with "forward()". It makes it impossible to understand how 
much data was actually consumed.

My test code:

 
{code:java}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

env.setStateBackend(new 
FsStateBackend("file:///mapr/rdidev1/user/eshuand/tmp/", true));

List input = new ArrayList<>();

input.add(1);
input.add(2);
input.add(3);
input.add(4);
input.add(5);

DataStream stream = env.fromCollection(input).rebalance();

stream.map(i -> i+1).name("incr").forward().map(i -> 
i-1).name("decr").forward().print();

env.execute();

}
{code}
 

  was:
Flink 1.5.0 incorrectly shows execution statistics and differently from Flink 
1.4.1.

I attach screenshots from both versions. It looks like 1.5.0 sums all steps in 
operators connected with "forward()". It makes it impossible to understand how 
much data was actually consumed.

My test code:

 

 

 

public static void main(String[] args) throws Exception { 
 StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();


 env.setStateBackend(new 
FsStateBackend("file:///mapr/rdidev1/user/eshuand/tmp/", true));


 List input = new ArrayList<>();


 input.add(1);

input.add(2);

input.add(3);

input.add(4);

input.add(5);


 DataStream stream = env.fromCollection(input).rebalance();


 stream.map(i -> i+1).name("incr").forward().map(i -> 
i-1).name("decr").forward().print();


 env.execute();

 

}

 


> Flink 1.5 incorrectly shows statistics in web UI
> 
>
> Key: FLINK-9453
> URL: https://issues.apache.org/jira/browse/FLINK-9453
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming, Web Client, Webfrontend
>Affects Versions: 1.5.0
>Reporter: Andrei Shumanski
>Priority: Major
> Attachments: Flink_1.4.PNG, Flink_1.5.PNG
>
>
> Flink 1.5.0 incorrectly shows execution statistics and differently from Flink 
> 1.4.1.
> I attach screenshots from both versions. It looks like 1.5.0 sums all steps 
> in operators connected with "forward()". It makes it impossible to understand 
> how much data was actually consumed.
> My test code:
>  
> {code:java}
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStateBackend(new 
> FsStateBackend("file:///mapr/rdidev1/user/eshuand/tmp/", true));
> List input = new ArrayList<>();
> input.add(1);
> input.add(2);
> input.add(3);
> input.add(4);
> input.add(5);
> DataStream stream = env.fromCollection(input).rebalance();
> stream.map(i -> i+1).name("incr").forward().map(i -> 
> i-1).name("decr").forward().print();
> env.execute();
> }
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9453) Flink 1.5 incorrectly shows statistics in web UI

2018-05-28 Thread Andrei Shumanski (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9453?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrei Shumanski updated FLINK-9453:

Description: 
Flink 1.5.0 incorrectly shows execution statistics and differently from Flink 
1.4.1.

I attach screenshots from both versions. It looks like 1.5.0 sums all steps in 
operators connected with "forward()". It makes it impossible to understand how 
much data was actually consumed.

My test code:

 

 

 

public static void main(String[] args) throws Exception { 
 StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();


 env.setStateBackend(new 
FsStateBackend("file:///mapr/rdidev1/user/eshuand/tmp/", true));


 List input = new ArrayList<>();


 input.add(1);

input.add(2);

input.add(3);

input.add(4);

input.add(5);


 DataStream stream = env.fromCollection(input).rebalance();


 stream.map(i -> i+1).name("incr").forward().map(i -> 
i-1).name("decr").forward().print();


 env.execute();

 

}

 

  was:
Flink 1.5.0 incorrectly shows execution statistics and differently from Flink 
1.4.1.

I attach screenshots from both versions. It looks like 1.5.0 sums all steps in 
operators connected with "forward()". It makes it impossible to understand how 
much data was actually consumed.

My test code:

 

 

public static void main(String[] args) throws Exception {
 StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 
 env.setStateBackend(new 
FsStateBackend("file:///mapr/rdidev1/user/eshuand/tmp/", true));
 
 List input = new ArrayList<>();
 
 input.add(1);
 input.add(2);
 input.add(3);
 input.add(4);
 input.add(5);
 
 DataStream stream = env.fromCollection(input).rebalance();
 
 stream.map(i -> i+1).name("incr").forward().map(i -> 
i-1).name("decr").forward().print();
 
 env.execute();

}

 


> Flink 1.5 incorrectly shows statistics in web UI
> 
>
> Key: FLINK-9453
> URL: https://issues.apache.org/jira/browse/FLINK-9453
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming, Web Client, Webfrontend
>Affects Versions: 1.5.0
>Reporter: Andrei Shumanski
>Priority: Major
> Attachments: Flink_1.4.PNG, Flink_1.5.PNG
>
>
> Flink 1.5.0 incorrectly shows execution statistics and differently from Flink 
> 1.4.1.
> I attach screenshots from both versions. It looks like 1.5.0 sums all steps 
> in operators connected with "forward()". It makes it impossible to understand 
> how much data was actually consumed.
> My test code:
>  
>  
>  
> public static void main(String[] args) throws Exception { 
>  StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>  env.setStateBackend(new 
> FsStateBackend("file:///mapr/rdidev1/user/eshuand/tmp/", true));
>  List input = new ArrayList<>();
>  input.add(1);
> input.add(2);
> input.add(3);
> input.add(4);
> input.add(5);
>  DataStream stream = env.fromCollection(input).rebalance();
>  stream.map(i -> i+1).name("incr").forward().map(i -> 
> i-1).name("decr").forward().print();
>  env.execute();
>  
> }
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9453) Flink 1.5 incorrectly shows statistics in web UI

2018-05-28 Thread Andrei Shumanski (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9453?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrei Shumanski updated FLINK-9453:

Description: 
Flink 1.5.0 incorrectly shows execution statistics and differently from Flink 
1.4.1.

I attach screenshots from both versions. It looks like 1.5.0 sums all steps in 
operators connected with "forward()". It makes it impossible to understand how 
much data was actually consumed.

My test code:

 
{code:java}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

List input = new ArrayList<>();

input.add(1);
input.add(2);
input.add(3);
input.add(4);
input.add(5);

DataStream stream = env.fromCollection(input).rebalance();

stream.map(i -> i+1).name("incr").forward().map(i -> 
i-1).name("decr").forward().print();

env.execute();

}
{code}
 

  was:
Flink 1.5.0 incorrectly shows execution statistics and differently from Flink 
1.4.1.

I attach screenshots from both versions. It looks like 1.5.0 sums all steps in 
operators connected with "forward()". It makes it impossible to understand how 
much data was actually consumed.

My test code:

 
{code:java}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

env.setStateBackend(new 
FsStateBackend("file:///mapr/rdidev1/user/eshuand/tmp/", true));

List input = new ArrayList<>();

input.add(1);
input.add(2);
input.add(3);
input.add(4);
input.add(5);

DataStream stream = env.fromCollection(input).rebalance();

stream.map(i -> i+1).name("incr").forward().map(i -> 
i-1).name("decr").forward().print();

env.execute();

}
{code}
 


> Flink 1.5 incorrectly shows statistics in web UI
> 
>
> Key: FLINK-9453
> URL: https://issues.apache.org/jira/browse/FLINK-9453
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming, Web Client, Webfrontend
>Affects Versions: 1.5.0
>Reporter: Andrei Shumanski
>Priority: Major
> Attachments: Flink_1.4.PNG, Flink_1.5.PNG
>
>
> Flink 1.5.0 incorrectly shows execution statistics and differently from Flink 
> 1.4.1.
> I attach screenshots from both versions. It looks like 1.5.0 sums all steps 
> in operators connected with "forward()". It makes it impossible to understand 
> how much data was actually consumed.
> My test code:
>  
> {code:java}
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> List input = new ArrayList<>();
> input.add(1);
> input.add(2);
> input.add(3);
> input.add(4);
> input.add(5);
> DataStream stream = env.fromCollection(input).rebalance();
> stream.map(i -> i+1).name("incr").forward().map(i -> 
> i-1).name("decr").forward().print();
> env.execute();
> }
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9453) Flink 1.5 incorrectly shows statistics in web UI

2018-05-28 Thread Andrei Shumanski (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9453?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrei Shumanski updated FLINK-9453:

Description: 
Flink 1.5.0 incorrectly shows execution statistics and differently from Flink 
1.4.1.

I attach screenshots from both versions. It looks like 1.5.0 sums all steps in 
operators connected with "forward()". It makes it impossible to understand how 
much data was actually consumed.

My test code:

 

 

public static void main(String[] args) throws Exception {
 StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 
 env.setStateBackend(new 
FsStateBackend("file:///mapr/rdidev1/user/eshuand/tmp/", true));
 
 List input = new ArrayList<>();
 
 input.add(1);
 input.add(2);
 input.add(3);
 input.add(4);
 input.add(5);
 
 DataStream stream = env.fromCollection(input).rebalance();
 
 stream.map(i -> i+1).name("incr").forward().map(i -> 
i-1).name("decr").forward().print();
 
 env.execute();

}

 

  was:
Flink 1.5.0 incorrectly shows execution statistics and differently from Flink 
1.4.1.

I attach screenshots from both versions. It looks like 1.5.0 sums all steps in 
operators connected with "forward()". It makes it impossible to understand how 
much data was actually consumed.

My test code:

 

 

{{public static void main(String[] args) throws Exception {}}
{{ StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();}}

{{ List input = new ArrayList<>();}}

{{ input.add(1);}}
{{ input.add(2);}}
{{ input.add(3);}}
{{ input.add(4);}}
{{ input.add(5);}}

{{ DataStream stream = env.fromCollection(input).rebalance();}}

{{ stream.map(i -> i+1).name("incr").forward().map(i -> 
i-1).name("decr").forward().print();}}

{{ env.execute();}}{{}}}


> Flink 1.5 incorrectly shows statistics in web UI
> 
>
> Key: FLINK-9453
> URL: https://issues.apache.org/jira/browse/FLINK-9453
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming, Web Client, Webfrontend
>Affects Versions: 1.5.0
>Reporter: Andrei Shumanski
>Priority: Major
> Attachments: Flink_1.4.PNG, Flink_1.5.PNG
>
>
> Flink 1.5.0 incorrectly shows execution statistics and differently from Flink 
> 1.4.1.
> I attach screenshots from both versions. It looks like 1.5.0 sums all steps 
> in operators connected with "forward()". It makes it impossible to understand 
> how much data was actually consumed.
> My test code:
>  
>  
> public static void main(String[] args) throws Exception {
>  StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>  
>  env.setStateBackend(new 
> FsStateBackend("file:///mapr/rdidev1/user/eshuand/tmp/", true));
>  
>  List input = new ArrayList<>();
>  
>  input.add(1);
>  input.add(2);
>  input.add(3);
>  input.add(4);
>  input.add(5);
>  
>  DataStream stream = env.fromCollection(input).rebalance();
>  
>  stream.map(i -> i+1).name("incr").forward().map(i -> 
> i-1).name("decr").forward().print();
>  
>  env.execute();
> }
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9436) Remove generic parameter namespace from InternalTimeServiceManager

2018-05-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9436?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16492735#comment-16492735
 ] 

ASF GitHub Bot commented on FLINK-9436:
---

Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/6077
  
+1


> Remove generic parameter namespace from InternalTimeServiceManager
> --
>
> Key: FLINK-9436
> URL: https://issues.apache.org/jira/browse/FLINK-9436
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Minor
> Fix For: 1.6.0
>
>
> Currently {{InternalTimeServiceManager}} declares a generic parameter 
> {{N}} for namespace. This parameter is not useful or even misleading, because 
> in the current implementation the managed timer services can all have 
> different types for their {{N}}. I suggest to remove the parameter.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9453) Flink 1.5 incorrectly shows statistics in web UI

2018-05-28 Thread Andrei Shumanski (JIRA)
Andrei Shumanski created FLINK-9453:
---

 Summary: Flink 1.5 incorrectly shows statistics in web UI
 Key: FLINK-9453
 URL: https://issues.apache.org/jira/browse/FLINK-9453
 Project: Flink
  Issue Type: Bug
  Components: Streaming, Web Client, Webfrontend
Affects Versions: 1.5.0
Reporter: Andrei Shumanski
 Attachments: Flink_1.4.PNG, Flink_1.5.PNG

Flink 1.5.0 incorrectly shows execution statistics and differently from Flink 
1.4.1.

I attach screenshots from both versions. It looks like 1.5.0 sums all steps in 
operators connected with "forward()". It makes it impossible to understand how 
much data was actually consumed.

My test code:

 

 

{{public static void main(String[] args) throws Exception {}}
{{ StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();}}

{{ List input = new ArrayList<>();}}

{{ input.add(1);}}
{{ input.add(2);}}
{{ input.add(3);}}
{{ input.add(4);}}
{{ input.add(5);}}

{{ DataStream stream = env.fromCollection(input).rebalance();}}

{{ stream.map(i -> i+1).name("incr").forward().map(i -> 
i-1).name("decr").forward().print();}}

{{ env.execute();}}{{}}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6077: [FLINK-9436][state] Remove generic parameter namespace fr...

2018-05-28 Thread sihuazhou
Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/6077
  
+1


---


[jira] [Commented] (FLINK-8790) Improve performance for recovery from incremental checkpoint

2018-05-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16492732#comment-16492732
 ] 

ASF GitHub Bot commented on FLINK-8790:
---

Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/5582
  
Hi @StefanRRichter could you please have a look at this?


> Improve performance for recovery from incremental checkpoint
> 
>
> Key: FLINK-8790
> URL: https://issues.apache.org/jira/browse/FLINK-8790
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.6.0
>
>
> When there are multi state handle to be restored, we can improve the 
> performance as follow:
> 1. Choose the best state handle to init the target db
> 2. Use the other state handles to create temp db, and clip the db according 
> to the target key group range (via rocksdb.deleteRange()), this can help use 
> get rid of the `key group check` in 
>  `data insertion loop` and also help us get rid of traversing the useless 
> record.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5582: [FLINK-8790][State] Improve performance for recovery from...

2018-05-28 Thread sihuazhou
Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/5582
  
Hi @StefanRRichter could you please have a look at this?


---


[jira] [Commented] (FLINK-8205) Multi key get

2018-05-28 Thread Sihua Zhou (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16492731#comment-16492731
 ] 

Sihua Zhou commented on FLINK-8205:
---

Hi [~kkl0u] are you still willing to help with the design and the reviews for 
this ticket?

> Multi key get
> -
>
> Key: FLINK-8205
> URL: https://issues.apache.org/jira/browse/FLINK-8205
> Project: Flink
>  Issue Type: New Feature
>  Components: Queryable State
>Affects Versions: 1.4.0
> Environment: Any
>Reporter: Martin Eden
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.6.0
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Currently the Java queryable state api only allows for fetching one key at a 
> time. It would be extremely useful and more efficient if a similar call 
> exists for submitting multiple keys.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9417) Send heartbeat requests from RPC endpoint's main thread

2018-05-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16492729#comment-16492729
 ] 

ASF GitHub Bot commented on FLINK-9417:
---

GitHub user sihuazhou opened a pull request:

https://github.com/apache/flink/pull/6088

[FLINK-9417][ Distributed Coordination] Send heartbeat requests from RPC 
endpoint's main thread

## What is the purpose of the change

This PR try to send heartbeat requests from RPC endpoint's main thread to 
avoid the faker alive information.

## Brief change log

  - *Send heartbeat requests from RPC endpoint's main thread*


## Verifying this change

This change is a trivial rework .

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
  - The S3 file system connector: (no)

## Documentation

- no


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sihuazhou/flink FLINK-9417

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6088.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6088


commit 7fe857592d4932b5a77a9dbcb12fab53dd207be9
Author: sihuazhou 
Date:   2018-05-28T14:06:21Z

Send heartbeat requests from RPC endpoint's main thread.




> Send heartbeat requests from RPC endpoint's main thread
> ---
>
> Key: FLINK-9417
> URL: https://issues.apache.org/jira/browse/FLINK-9417
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Till Rohrmann
>Assignee: Sihua Zhou
>Priority: Major
>
> Currently, we use the {{RpcService#scheduledExecutor}} to send heartbeat 
> requests to remote targets. This has the problem that we still see heartbeats 
> from this endpoint also if its main thread is currently blocked. Due to this, 
> the heartbeat response cannot be processed and the remote target times out. 
> On the remote side, this won't be noticed because it still receives the 
> heartbeat requests.
> A solution to this problem would be to send the heartbeat requests to the 
> remote thread through the RPC endpoint's main thread. That way, also the 
> heartbeats would be blocked if the main thread is blocked/busy.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6088: [FLINK-9417][ Distributed Coordination] Send heart...

2018-05-28 Thread sihuazhou
GitHub user sihuazhou opened a pull request:

https://github.com/apache/flink/pull/6088

[FLINK-9417][ Distributed Coordination] Send heartbeat requests from RPC 
endpoint's main thread

## What is the purpose of the change

This PR try to send heartbeat requests from RPC endpoint's main thread to 
avoid the faker alive information.

## Brief change log

  - *Send heartbeat requests from RPC endpoint's main thread*


## Verifying this change

This change is a trivial rework .

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
  - The S3 file system connector: (no)

## Documentation

- no


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sihuazhou/flink FLINK-9417

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6088.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6088


commit 7fe857592d4932b5a77a9dbcb12fab53dd207be9
Author: sihuazhou 
Date:   2018-05-28T14:06:21Z

Send heartbeat requests from RPC endpoint's main thread.




---


[jira] [Commented] (FLINK-6469) Configure Memory Sizes with units

2018-05-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16492721#comment-16492721
 ] 

ASF GitHub Bot commented on FLINK-6469:
---

Github user dawidwys commented on the issue:

https://github.com/apache/flink/pull/5448
  
Hi @yanghua 
I think those changes look overall good. The biggest concern I had with 
handling `MANAGED_MEMORY_SIZE` as I think if used with old style configuration 
file, the resulting value will differ.


> Configure Memory Sizes with units
> -
>
> Key: FLINK-6469
> URL: https://issues.apache.org/jira/browse/FLINK-6469
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Reporter: Stephan Ewen
>Assignee: vinoyang
>Priority: Major
>
> Currently, memory sizes are configured by pure numbers, the interpretation is 
> different from configuration parameter to parameter.
> For example, heap sizes are configured in megabytes, network buffer memory is 
> configured in bytes, alignment thresholds are configured in bytes.
> I propose to configure all memory parameters the same way, with units similar 
> to time. The JVM itself configured heap size similarly: {{Xmx5g}} or 
> {{Xmx2000m}}.
> {code}
> 1  -> bytes
> 10 kb
> 64 mb
> 1 gb
> ...
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5448: [FLINK-6469] Configure Memory Sizes with units

2018-05-28 Thread dawidwys
Github user dawidwys commented on the issue:

https://github.com/apache/flink/pull/5448
  
Hi @yanghua 
I think those changes look overall good. The biggest concern I had with 
handling `MANAGED_MEMORY_SIZE` as I think if used with old style configuration 
file, the resulting value will differ.


---


[jira] [Commented] (FLINK-6469) Configure Memory Sizes with units

2018-05-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16492717#comment-16492717
 ] 

ASF GitHub Bot commented on FLINK-6469:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/5448#discussion_r191199370
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
 ---
@@ -223,9 +224,17 @@ public static TaskManagerServicesConfiguration 
fromConfiguration(
parseQueryableStateConfiguration(configuration);
 
// extract memory settings
-   long configuredMemory = 
configuration.getLong(TaskManagerOptions.MANAGED_MEMORY_SIZE);
+   long configuredMemory = 
Long.valueOf(TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue());
+   if 
(!configuration.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue()))
 {
+   try {
+   configuredMemory = 
MemorySize.parse(configuration.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE)).getMebiBytes();
+   } catch (IllegalArgumentException e) {
+
+   }
+   }
--- End diff --

Same comments as before.  How about code like this:

long configuredMemory;
String managedMemorySizeDefaultVal = 
TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue();
if 
(!config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(managedMemorySizeDefaultVal))
 {
try {
configuredMemory = 
MemorySize.parse(config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE))
.getMebiBytes();
} catch (IllegalArgumentException e) {
throw new IllegalConfigurationException(
"Could not read " + 
TaskManagerOptions.MANAGED_MEMORY_SIZE.key(), e);
}
} else {
configuredMemory = Long.valueOf(managedMemorySizeDefaultVal);
}


> Configure Memory Sizes with units
> -
>
> Key: FLINK-6469
> URL: https://issues.apache.org/jira/browse/FLINK-6469
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Reporter: Stephan Ewen
>Assignee: vinoyang
>Priority: Major
>
> Currently, memory sizes are configured by pure numbers, the interpretation is 
> different from configuration parameter to parameter.
> For example, heap sizes are configured in megabytes, network buffer memory is 
> configured in bytes, alignment thresholds are configured in bytes.
> I propose to configure all memory parameters the same way, with units similar 
> to time. The JVM itself configured heap size similarly: {{Xmx5g}} or 
> {{Xmx2000m}}.
> {code}
> 1  -> bytes
> 10 kb
> 64 mb
> 1 gb
> ...
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6469) Configure Memory Sizes with units

2018-05-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16492713#comment-16492713
 ] 

ASF GitHub Bot commented on FLINK-6469:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/5448#discussion_r191190916
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java 
---
@@ -176,19 +188,19 @@
/**
 * Size of memory buffers used by the network stack and the memory 
manager (in bytes).
 */
-   public static final ConfigOption MEMORY_SEGMENT_SIZE =
+   public static final ConfigOption MEMORY_SEGMENT_SIZE =
key("taskmanager.memory.segment-size")
-   .defaultValue(32768)
-   .withDescription("Size of memory buffers used by the 
network stack and the memory manager (in bytes).");
+   .defaultValue("32768")
+   .withDescription("Size of memory buffers used by the 
network stack and the memory manager.");
 
/**
 * Amount of memory to be allocated by the task manager's memory 
manager (in megabytes). If not
 * set, a relative fraction will be allocated, as defined by {@link 
#MANAGED_MEMORY_FRACTION}.
 */
-   public static final ConfigOption MANAGED_MEMORY_SIZE =
+   public static final ConfigOption MANAGED_MEMORY_SIZE =
--- End diff --

Also I think we should add tests that explicitly test using old style 
configuration.


> Configure Memory Sizes with units
> -
>
> Key: FLINK-6469
> URL: https://issues.apache.org/jira/browse/FLINK-6469
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Reporter: Stephan Ewen
>Assignee: vinoyang
>Priority: Major
>
> Currently, memory sizes are configured by pure numbers, the interpretation is 
> different from configuration parameter to parameter.
> For example, heap sizes are configured in megabytes, network buffer memory is 
> configured in bytes, alignment thresholds are configured in bytes.
> I propose to configure all memory parameters the same way, with units similar 
> to time. The JVM itself configured heap size similarly: {{Xmx5g}} or 
> {{Xmx2000m}}.
> {code}
> 1  -> bytes
> 10 kb
> 64 mb
> 1 gb
> ...
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5448: [FLINK-6469] Configure Memory Sizes with units

2018-05-28 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/5448#discussion_r191191358
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java 
---
@@ -72,10 +72,19 @@
" leader-election service (like ZooKeeper) is used to 
elect and discover the JobManager" +
" leader from potentially multiple standby 
JobManagers.");
 
+   /**
+* JVM heap size for the JobManager with memory size.
+*/
+   public static final ConfigOption JOB_MANAGER_HEAP_MEMORY =
+   key("jobmanager.heap.size")
+   .defaultValue("1024m")
+   .withDescription("JVM heap size for the JobManager.");
+
/**
 * JVM heap size (in megabytes) for the JobManager.
+* @deprecated use {@link #JOB_MANAGER_HEAP_MEMORY}
 */
-   public static final ConfigOption JOB_MANAGER_HEAP_MEMORY =
+   public static final ConfigOption JOB_MANAGER_HEAP_MEMORY_MB =
--- End diff --

Also think we should add some note to release notes, that the migration of 
parameters is advised.


---


[GitHub] flink pull request #5448: [FLINK-6469] Configure Memory Sizes with units

2018-05-28 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/5448#discussion_r191197602
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
 ---
@@ -647,7 +649,16 @@ public static long calculateHeapSizeMB(long 
totalJavaMemorySizeMB, Configuration
final long heapSizeMB;
if (useOffHeap) {
 
-   long offHeapSize = 
config.getLong(TaskManagerOptions.MANAGED_MEMORY_SIZE);
+   String managedMemorySizeDefaultVal = 
TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue();
+   long offHeapSize = 0;
+   try {
+   offHeapSize = 
Long.valueOf(managedMemorySizeDefaultVal);
+   if 
(!config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(managedMemorySizeDefaultVal))
 {
+   offHeapSize = 
MemorySize.parse(config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE)).getMebiBytes();
+   }
+   } catch (IllegalArgumentException e) {
+
--- End diff --

Rethrow with IllegalConfigurationException with a pointer that 
`MANAGED_MEMORY_SIZE` was badly configured.


---


[jira] [Commented] (FLINK-6469) Configure Memory Sizes with units

2018-05-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16492715#comment-16492715
 ] 

ASF GitHub Bot commented on FLINK-6469:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/5448#discussion_r191191358
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java 
---
@@ -72,10 +72,19 @@
" leader-election service (like ZooKeeper) is used to 
elect and discover the JobManager" +
" leader from potentially multiple standby 
JobManagers.");
 
+   /**
+* JVM heap size for the JobManager with memory size.
+*/
+   public static final ConfigOption JOB_MANAGER_HEAP_MEMORY =
+   key("jobmanager.heap.size")
+   .defaultValue("1024m")
+   .withDescription("JVM heap size for the JobManager.");
+
/**
 * JVM heap size (in megabytes) for the JobManager.
+* @deprecated use {@link #JOB_MANAGER_HEAP_MEMORY}
 */
-   public static final ConfigOption JOB_MANAGER_HEAP_MEMORY =
+   public static final ConfigOption JOB_MANAGER_HEAP_MEMORY_MB =
--- End diff --

Also think we should add some note to release notes, that the migration of 
parameters is advised.


> Configure Memory Sizes with units
> -
>
> Key: FLINK-6469
> URL: https://issues.apache.org/jira/browse/FLINK-6469
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Reporter: Stephan Ewen
>Assignee: vinoyang
>Priority: Major
>
> Currently, memory sizes are configured by pure numbers, the interpretation is 
> different from configuration parameter to parameter.
> For example, heap sizes are configured in megabytes, network buffer memory is 
> configured in bytes, alignment thresholds are configured in bytes.
> I propose to configure all memory parameters the same way, with units similar 
> to time. The JVM itself configured heap size similarly: {{Xmx5g}} or 
> {{Xmx2000m}}.
> {code}
> 1  -> bytes
> 10 kb
> 64 mb
> 1 gb
> ...
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6469) Configure Memory Sizes with units

2018-05-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16492709#comment-16492709
 ] 

ASF GitHub Bot commented on FLINK-6469:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/5448#discussion_r191147488
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java 
---
@@ -32,10 +32,22 @@
//  General TaskManager Options
// 

 
+   /**
+* JVM heap size for the TaskManagers with memory size.
+*/
+   public static final ConfigOption TASK_MANAGER_HEAP_MEMORY =
+   key("taskmanager.heap.size")
+   .defaultValue("1024m")
+   .withDescription("JVM heap size for the TaskManagers, 
which are the parallel workers of" +
+   " the system. On YARN setups, this 
value is automatically configured to the size of the TaskManager's" +
+   " YARN container, minus a certain 
tolerance value.");
+
/**
 * JVM heap size (in megabytes) for the TaskManagers.
+*
+* @deprecated use {@link #TASK_MANAGER_HEAP_MEMORY}
 */
-   public static final ConfigOption TASK_MANAGER_HEAP_MEMORY =
+   public static final ConfigOption TASK_MANAGER_HEAP_MEMORY_MB =
--- End diff --

Add `@Deprecated`


> Configure Memory Sizes with units
> -
>
> Key: FLINK-6469
> URL: https://issues.apache.org/jira/browse/FLINK-6469
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Reporter: Stephan Ewen
>Assignee: vinoyang
>Priority: Major
>
> Currently, memory sizes are configured by pure numbers, the interpretation is 
> different from configuration parameter to parameter.
> For example, heap sizes are configured in megabytes, network buffer memory is 
> configured in bytes, alignment thresholds are configured in bytes.
> I propose to configure all memory parameters the same way, with units similar 
> to time. The JVM itself configured heap size similarly: {{Xmx5g}} or 
> {{Xmx2000m}}.
> {code}
> 1  -> bytes
> 10 kb
> 64 mb
> 1 gb
> ...
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6469) Configure Memory Sizes with units

2018-05-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16492714#comment-16492714
 ] 

ASF GitHub Bot commented on FLINK-6469:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/5448#discussion_r191197602
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
 ---
@@ -647,7 +649,16 @@ public static long calculateHeapSizeMB(long 
totalJavaMemorySizeMB, Configuration
final long heapSizeMB;
if (useOffHeap) {
 
-   long offHeapSize = 
config.getLong(TaskManagerOptions.MANAGED_MEMORY_SIZE);
+   String managedMemorySizeDefaultVal = 
TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue();
+   long offHeapSize = 0;
+   try {
+   offHeapSize = 
Long.valueOf(managedMemorySizeDefaultVal);
+   if 
(!config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(managedMemorySizeDefaultVal))
 {
+   offHeapSize = 
MemorySize.parse(config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE)).getMebiBytes();
+   }
+   } catch (IllegalArgumentException e) {
+
--- End diff --

Rethrow with IllegalConfigurationException with a pointer that 
`MANAGED_MEMORY_SIZE` was badly configured.


> Configure Memory Sizes with units
> -
>
> Key: FLINK-6469
> URL: https://issues.apache.org/jira/browse/FLINK-6469
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Reporter: Stephan Ewen
>Assignee: vinoyang
>Priority: Major
>
> Currently, memory sizes are configured by pure numbers, the interpretation is 
> different from configuration parameter to parameter.
> For example, heap sizes are configured in megabytes, network buffer memory is 
> configured in bytes, alignment thresholds are configured in bytes.
> I propose to configure all memory parameters the same way, with units similar 
> to time. The JVM itself configured heap size similarly: {{Xmx5g}} or 
> {{Xmx2000m}}.
> {code}
> 1  -> bytes
> 10 kb
> 64 mb
> 1 gb
> ...
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6469) Configure Memory Sizes with units

2018-05-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16492710#comment-16492710
 ] 

ASF GitHub Bot commented on FLINK-6469:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/5448#discussion_r191186380
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java 
---
@@ -176,19 +188,19 @@
/**
 * Size of memory buffers used by the network stack and the memory 
manager (in bytes).
 */
-   public static final ConfigOption MEMORY_SEGMENT_SIZE =
+   public static final ConfigOption MEMORY_SEGMENT_SIZE =
key("taskmanager.memory.segment-size")
-   .defaultValue(32768)
-   .withDescription("Size of memory buffers used by the 
network stack and the memory manager (in bytes).");
+   .defaultValue("32768")
+   .withDescription("Size of memory buffers used by the 
network stack and the memory manager.");
 
/**
 * Amount of memory to be allocated by the task manager's memory 
manager (in megabytes). If not
--- End diff --

remove (in megabytes)


> Configure Memory Sizes with units
> -
>
> Key: FLINK-6469
> URL: https://issues.apache.org/jira/browse/FLINK-6469
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Reporter: Stephan Ewen
>Assignee: vinoyang
>Priority: Major
>
> Currently, memory sizes are configured by pure numbers, the interpretation is 
> different from configuration parameter to parameter.
> For example, heap sizes are configured in megabytes, network buffer memory is 
> configured in bytes, alignment thresholds are configured in bytes.
> I propose to configure all memory parameters the same way, with units similar 
> to time. The JVM itself configured heap size similarly: {{Xmx5g}} or 
> {{Xmx2000m}}.
> {code}
> 1  -> bytes
> 10 kb
> 64 mb
> 1 gb
> ...
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5448: [FLINK-6469] Configure Memory Sizes with units

2018-05-28 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/5448#discussion_r191190916
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java 
---
@@ -176,19 +188,19 @@
/**
 * Size of memory buffers used by the network stack and the memory 
manager (in bytes).
 */
-   public static final ConfigOption MEMORY_SEGMENT_SIZE =
+   public static final ConfigOption MEMORY_SEGMENT_SIZE =
key("taskmanager.memory.segment-size")
-   .defaultValue(32768)
-   .withDescription("Size of memory buffers used by the 
network stack and the memory manager (in bytes).");
+   .defaultValue("32768")
+   .withDescription("Size of memory buffers used by the 
network stack and the memory manager.");
 
/**
 * Amount of memory to be allocated by the task manager's memory 
manager (in megabytes). If not
 * set, a relative fraction will be allocated, as defined by {@link 
#MANAGED_MEMORY_FRACTION}.
 */
-   public static final ConfigOption MANAGED_MEMORY_SIZE =
+   public static final ConfigOption MANAGED_MEMORY_SIZE =
--- End diff --

Also I think we should add tests that explicitly test using old style 
configuration.


---


[jira] [Commented] (FLINK-6469) Configure Memory Sizes with units

2018-05-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16492712#comment-16492712
 ] 

ASF GitHub Bot commented on FLINK-6469:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/5448#discussion_r191147297
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java 
---
@@ -72,10 +72,19 @@
" leader-election service (like ZooKeeper) is used to 
elect and discover the JobManager" +
" leader from potentially multiple standby 
JobManagers.");
 
+   /**
+* JVM heap size for the JobManager with memory size.
+*/
+   public static final ConfigOption JOB_MANAGER_HEAP_MEMORY =
+   key("jobmanager.heap.size")
+   .defaultValue("1024m")
+   .withDescription("JVM heap size for the JobManager.");
+
/**
 * JVM heap size (in megabytes) for the JobManager.
+* @deprecated use {@link #JOB_MANAGER_HEAP_MEMORY}
 */
-   public static final ConfigOption JOB_MANAGER_HEAP_MEMORY =
+   public static final ConfigOption JOB_MANAGER_HEAP_MEMORY_MB =
--- End diff --

Missing annotation `@Deprecated`. That way we can remove that option from 
docs (which I think we should do).


> Configure Memory Sizes with units
> -
>
> Key: FLINK-6469
> URL: https://issues.apache.org/jira/browse/FLINK-6469
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Reporter: Stephan Ewen
>Assignee: vinoyang
>Priority: Major
>
> Currently, memory sizes are configured by pure numbers, the interpretation is 
> different from configuration parameter to parameter.
> For example, heap sizes are configured in megabytes, network buffer memory is 
> configured in bytes, alignment thresholds are configured in bytes.
> I propose to configure all memory parameters the same way, with units similar 
> to time. The JVM itself configured heap size similarly: {{Xmx5g}} or 
> {{Xmx2000m}}.
> {code}
> 1  -> bytes
> 10 kb
> 64 mb
> 1 gb
> ...
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6469) Configure Memory Sizes with units

2018-05-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16492711#comment-16492711
 ] 

ASF GitHub Bot commented on FLINK-6469:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/5448#discussion_r191190775
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java 
---
@@ -176,19 +188,19 @@
/**
 * Size of memory buffers used by the network stack and the memory 
manager (in bytes).
 */
-   public static final ConfigOption MEMORY_SEGMENT_SIZE =
+   public static final ConfigOption MEMORY_SEGMENT_SIZE =
key("taskmanager.memory.segment-size")
-   .defaultValue(32768)
-   .withDescription("Size of memory buffers used by the 
network stack and the memory manager (in bytes).");
+   .defaultValue("32768")
+   .withDescription("Size of memory buffers used by the 
network stack and the memory manager.");
 
/**
 * Amount of memory to be allocated by the task manager's memory 
manager (in megabytes). If not
 * set, a relative fraction will be allocated, as defined by {@link 
#MANAGED_MEMORY_FRACTION}.
 */
-   public static final ConfigOption MANAGED_MEMORY_SIZE =
+   public static final ConfigOption MANAGED_MEMORY_SIZE =
--- End diff --

This option requires special handling (maybe similar to 
JOB_MANAGER_HEAP_MEMORY/TASK_MANAGER_HEAP_MEMORY). Right now if an old config 
file will be used with new version the megabytes will be treated as bytes. 
Therefore the value will 1000 smaller.


> Configure Memory Sizes with units
> -
>
> Key: FLINK-6469
> URL: https://issues.apache.org/jira/browse/FLINK-6469
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Reporter: Stephan Ewen
>Assignee: vinoyang
>Priority: Major
>
> Currently, memory sizes are configured by pure numbers, the interpretation is 
> different from configuration parameter to parameter.
> For example, heap sizes are configured in megabytes, network buffer memory is 
> configured in bytes, alignment thresholds are configured in bytes.
> I propose to configure all memory parameters the same way, with units similar 
> to time. The JVM itself configured heap size similarly: {{Xmx5g}} or 
> {{Xmx2000m}}.
> {code}
> 1  -> bytes
> 10 kb
> 64 mb
> 1 gb
> ...
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5448: [FLINK-6469] Configure Memory Sizes with units

2018-05-28 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/5448#discussion_r191199370
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
 ---
@@ -223,9 +224,17 @@ public static TaskManagerServicesConfiguration 
fromConfiguration(
parseQueryableStateConfiguration(configuration);
 
// extract memory settings
-   long configuredMemory = 
configuration.getLong(TaskManagerOptions.MANAGED_MEMORY_SIZE);
+   long configuredMemory = 
Long.valueOf(TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue());
+   if 
(!configuration.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue()))
 {
+   try {
+   configuredMemory = 
MemorySize.parse(configuration.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE)).getMebiBytes();
+   } catch (IllegalArgumentException e) {
+
+   }
+   }
--- End diff --

Same comments as before.  How about code like this:

long configuredMemory;
String managedMemorySizeDefaultVal = 
TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue();
if 
(!config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(managedMemorySizeDefaultVal))
 {
try {
configuredMemory = 
MemorySize.parse(config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE))
.getMebiBytes();
} catch (IllegalArgumentException e) {
throw new IllegalConfigurationException(
"Could not read " + 
TaskManagerOptions.MANAGED_MEMORY_SIZE.key(), e);
}
} else {
configuredMemory = Long.valueOf(managedMemorySizeDefaultVal);
}


---


[jira] [Commented] (FLINK-6469) Configure Memory Sizes with units

2018-05-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16492716#comment-16492716
 ] 

ASF GitHub Bot commented on FLINK-6469:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/5448#discussion_r191198882
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
 ---
@@ -647,7 +649,16 @@ public static long calculateHeapSizeMB(long 
totalJavaMemorySizeMB, Configuration
final long heapSizeMB;
if (useOffHeap) {
 
-   long offHeapSize = 
config.getLong(TaskManagerOptions.MANAGED_MEMORY_SIZE);
+   String managedMemorySizeDefaultVal = 
TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue();
+   long offHeapSize = 0;
+   try {
+   offHeapSize = 
Long.valueOf(managedMemorySizeDefaultVal);
+   if 
(!config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(managedMemorySizeDefaultVal))
 {
+   offHeapSize = 
MemorySize.parse(config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE)).getMebiBytes();
+   }
+   } catch (IllegalArgumentException e) {
+
--- End diff --

I found the flow with default value somewhat counterintuitive. How about we 
structure this code like this:

long offHeapSize;
String managedMemorySizeDefaultVal = 
TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue();
if 
(!config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(managedMemorySizeDefaultVal))
 {
try {
offHeapSize = 
MemorySize.parse(config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE))
.getMebiBytes();
} catch (IllegalArgumentException e) {
throw new IllegalConfigurationException(
"Could not read " + 
TaskManagerOptions.MANAGED_MEMORY_SIZE.key(), e);
}
} else {
offHeapSize = Long.valueOf(managedMemorySizeDefaultVal);
}


> Configure Memory Sizes with units
> -
>
> Key: FLINK-6469
> URL: https://issues.apache.org/jira/browse/FLINK-6469
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Reporter: Stephan Ewen
>Assignee: vinoyang
>Priority: Major
>
> Currently, memory sizes are configured by pure numbers, the interpretation is 
> different from configuration parameter to parameter.
> For example, heap sizes are configured in megabytes, network buffer memory is 
> configured in bytes, alignment thresholds are configured in bytes.
> I propose to configure all memory parameters the same way, with units similar 
> to time. The JVM itself configured heap size similarly: {{Xmx5g}} or 
> {{Xmx2000m}}.
> {code}
> 1  -> bytes
> 10 kb
> 64 mb
> 1 gb
> ...
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5448: [FLINK-6469] Configure Memory Sizes with units

2018-05-28 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/5448#discussion_r191190775
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java 
---
@@ -176,19 +188,19 @@
/**
 * Size of memory buffers used by the network stack and the memory 
manager (in bytes).
 */
-   public static final ConfigOption MEMORY_SEGMENT_SIZE =
+   public static final ConfigOption MEMORY_SEGMENT_SIZE =
key("taskmanager.memory.segment-size")
-   .defaultValue(32768)
-   .withDescription("Size of memory buffers used by the 
network stack and the memory manager (in bytes).");
+   .defaultValue("32768")
+   .withDescription("Size of memory buffers used by the 
network stack and the memory manager.");
 
/**
 * Amount of memory to be allocated by the task manager's memory 
manager (in megabytes). If not
 * set, a relative fraction will be allocated, as defined by {@link 
#MANAGED_MEMORY_FRACTION}.
 */
-   public static final ConfigOption MANAGED_MEMORY_SIZE =
+   public static final ConfigOption MANAGED_MEMORY_SIZE =
--- End diff --

This option requires special handling (maybe similar to 
JOB_MANAGER_HEAP_MEMORY/TASK_MANAGER_HEAP_MEMORY). Right now if an old config 
file will be used with new version the megabytes will be treated as bytes. 
Therefore the value will 1000 smaller.


---


[GitHub] flink pull request #5448: [FLINK-6469] Configure Memory Sizes with units

2018-05-28 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/5448#discussion_r191147488
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java 
---
@@ -32,10 +32,22 @@
//  General TaskManager Options
// 

 
+   /**
+* JVM heap size for the TaskManagers with memory size.
+*/
+   public static final ConfigOption TASK_MANAGER_HEAP_MEMORY =
+   key("taskmanager.heap.size")
+   .defaultValue("1024m")
+   .withDescription("JVM heap size for the TaskManagers, 
which are the parallel workers of" +
+   " the system. On YARN setups, this 
value is automatically configured to the size of the TaskManager's" +
+   " YARN container, minus a certain 
tolerance value.");
+
/**
 * JVM heap size (in megabytes) for the TaskManagers.
+*
+* @deprecated use {@link #TASK_MANAGER_HEAP_MEMORY}
 */
-   public static final ConfigOption TASK_MANAGER_HEAP_MEMORY =
+   public static final ConfigOption TASK_MANAGER_HEAP_MEMORY_MB =
--- End diff --

Add `@Deprecated`


---


[GitHub] flink pull request #5448: [FLINK-6469] Configure Memory Sizes with units

2018-05-28 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/5448#discussion_r191147297
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java 
---
@@ -72,10 +72,19 @@
" leader-election service (like ZooKeeper) is used to 
elect and discover the JobManager" +
" leader from potentially multiple standby 
JobManagers.");
 
+   /**
+* JVM heap size for the JobManager with memory size.
+*/
+   public static final ConfigOption JOB_MANAGER_HEAP_MEMORY =
+   key("jobmanager.heap.size")
+   .defaultValue("1024m")
+   .withDescription("JVM heap size for the JobManager.");
+
/**
 * JVM heap size (in megabytes) for the JobManager.
+* @deprecated use {@link #JOB_MANAGER_HEAP_MEMORY}
 */
-   public static final ConfigOption JOB_MANAGER_HEAP_MEMORY =
+   public static final ConfigOption JOB_MANAGER_HEAP_MEMORY_MB =
--- End diff --

Missing annotation `@Deprecated`. That way we can remove that option from 
docs (which I think we should do).


---


[GitHub] flink pull request #5448: [FLINK-6469] Configure Memory Sizes with units

2018-05-28 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/5448#discussion_r191186380
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java 
---
@@ -176,19 +188,19 @@
/**
 * Size of memory buffers used by the network stack and the memory 
manager (in bytes).
 */
-   public static final ConfigOption MEMORY_SEGMENT_SIZE =
+   public static final ConfigOption MEMORY_SEGMENT_SIZE =
key("taskmanager.memory.segment-size")
-   .defaultValue(32768)
-   .withDescription("Size of memory buffers used by the 
network stack and the memory manager (in bytes).");
+   .defaultValue("32768")
+   .withDescription("Size of memory buffers used by the 
network stack and the memory manager.");
 
/**
 * Amount of memory to be allocated by the task manager's memory 
manager (in megabytes). If not
--- End diff --

remove (in megabytes)


---


[GitHub] flink pull request #5448: [FLINK-6469] Configure Memory Sizes with units

2018-05-28 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/5448#discussion_r191198882
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
 ---
@@ -647,7 +649,16 @@ public static long calculateHeapSizeMB(long 
totalJavaMemorySizeMB, Configuration
final long heapSizeMB;
if (useOffHeap) {
 
-   long offHeapSize = 
config.getLong(TaskManagerOptions.MANAGED_MEMORY_SIZE);
+   String managedMemorySizeDefaultVal = 
TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue();
+   long offHeapSize = 0;
+   try {
+   offHeapSize = 
Long.valueOf(managedMemorySizeDefaultVal);
+   if 
(!config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(managedMemorySizeDefaultVal))
 {
+   offHeapSize = 
MemorySize.parse(config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE)).getMebiBytes();
+   }
+   } catch (IllegalArgumentException e) {
+
--- End diff --

I found the flow with default value somewhat counterintuitive. How about we 
structure this code like this:

long offHeapSize;
String managedMemorySizeDefaultVal = 
TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue();
if 
(!config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(managedMemorySizeDefaultVal))
 {
try {
offHeapSize = 
MemorySize.parse(config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE))
.getMebiBytes();
} catch (IllegalArgumentException e) {
throw new IllegalConfigurationException(
"Could not read " + 
TaskManagerOptions.MANAGED_MEMORY_SIZE.key(), e);
}
} else {
offHeapSize = Long.valueOf(managedMemorySizeDefaultVal);
}


---


[jira] [Commented] (FLINK-8545) Implement upsert stream table source

2018-05-28 Thread Piotr Nowojski (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16492699#comment-16492699
 ] 

Piotr Nowojski commented on FLINK-8545:
---

[~hequn8128], could say what's the status of your work here? :) Are you still 
working on it, have you created some code?

> Implement upsert stream table source 
> -
>
> Key: FLINK-8545
> URL: https://issues.apache.org/jira/browse/FLINK-8545
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>
> As more and more users are eager for ingesting data with upsert mode in flink 
> sql/table-api, it is valuable to enable table source with upsert mode. I will 
> provide a design doc later and we can have more discussions. Any suggestions 
> are warmly welcomed !
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored

2018-05-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16492691#comment-16492691
 ] 

ASF GitHub Bot commented on FLINK-9143:
---

Github user yuqi1129 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5846#discussion_r191211627
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java 
---
@@ -896,6 +898,10 @@ public static JobGraph getJobGraph(Configuration 
flinkConfig, FlinkPlan optPlan,
job = gen.compileJobGraph((OptimizedPlan) optPlan);
}
 
+   //  if we disable checkpoint and do not set restart strategy, 
Restart strategy will be set as in flink-conf.yaml
+   //  in flip6, jobmaster do not set this conf, so we have set 
this conf here.
--- End diff --

You are right, I omit this point, sorry


> Restart strategy defined in flink-conf.yaml is ignored
> --
>
> Key: FLINK-9143
> URL: https://issues.apache.org/jira/browse/FLINK-9143
> Project: Flink
>  Issue Type: Bug
>  Components: Configuration
>Affects Versions: 1.4.2
>Reporter: Alex Smirnov
>Assignee: yuqi
>Priority: Major
> Attachments: execution_config.png, jobmanager.log, jobmanager.png
>
>
> Restart strategy defined in flink-conf.yaml is disregarded, when user enables 
> checkpointing.
> Steps to reproduce:
> 1. Download flink distribution (1.4.2), update flink-conf.yaml:
>   
>  restart-strategy: none
>  state.backend: rocksdb
>  state.backend.fs.checkpointdir: 
> [file:///tmp/nfsrecovery/flink-checkpoints-metadata]
>  state.backend.rocksdb.checkpointdir: 
> [file:///tmp/nfsrecovery/flink-checkpoints-rocksdb]
>   
>  2. create new java project as described at 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html]
>  here's the code:
>  public class FailedJob
>  {
>      static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class);
>      public static void main( String[] args ) throws Exception
>      {
>          final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>          env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
>          DataStream stream = 
> env.fromCollection(Arrays.asList("test"));
>          stream.map(new MapFunction(){
>              @Override
>              public String map(String obj)
> {                 throw new NullPointerException("NPE");             }
>  
>          });
>          env.execute("Failed job");
>      }
>  }
>   
>  3. Compile: mvn clean package; submit it to the cluster
>   
>  4. Go to Job Manager configuration in WebUI, ensure settings from 
> flink-conf.yaml is there (screenshot attached)
>   
>  5. Go to Job's configuration, see Execution Configuration section
>   
>  *Expected result*: restart strategy as defined in flink-conf.yaml
>   
>  *Actual result*: Restart with fixed delay (1 ms). #2147483647 restart 
> attempts.
>   
>   
>  see attached screenshots and jobmanager log (line 1 and 31)
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5846: [FLINK-9143] [client] Restart strategy defined in ...

2018-05-28 Thread yuqi1129
Github user yuqi1129 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5846#discussion_r191211627
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java 
---
@@ -896,6 +898,10 @@ public static JobGraph getJobGraph(Configuration 
flinkConfig, FlinkPlan optPlan,
job = gen.compileJobGraph((OptimizedPlan) optPlan);
}
 
+   //  if we disable checkpoint and do not set restart strategy, 
Restart strategy will be set as in flink-conf.yaml
+   //  in flip6, jobmaster do not set this conf, so we have set 
this conf here.
--- End diff --

You are right, I omit this point, sorry


---


[jira] [Commented] (FLINK-8946) TaskManager stop sending metrics after JobManager failover

2018-05-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16492688#comment-16492688
 ] 

ASF GitHub Bot commented on FLINK-8946:
---

Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6060
  
cc @zentol have a look at this?


> TaskManager stop sending metrics after JobManager failover
> --
>
> Key: FLINK-8946
> URL: https://issues.apache.org/jira/browse/FLINK-8946
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics, TaskManager
>Affects Versions: 1.4.2
>Reporter: Truong Duc Kien
>Assignee: vinoyang
>Priority: Critical
> Fix For: 1.5.1
>
>
> Running in Yarn-standalone mode, when the Job Manager performs a failover, 
> all TaskManager that are inherited from the previous JobManager will not send 
> metrics to the new JobManager and other registered metric reporters.
>  
> A cursory glance reveal that these line of code might be the cause
> [https://github.com/apache/flink/blob/a3478fdfa0f792104123fefbd9bdf01f5029de51/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala#L1082-L1086]
> Perhap the TaskManager close its metrics group when disassociating 
> JobManager, but not creating a new one on fail-over association ?
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6060: [FLINK-8946] TaskManager stop sending metrics after JobMa...

2018-05-28 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6060
  
cc @zentol have a look at this?


---


[jira] [Closed] (FLINK-7901) Detecting whether an operator is restored doesn't work with chained state (Flink 1.3)

2018-05-28 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-7901.
---
   Resolution: Won't Fix
Fix Version/s: (was: 1.3.4)

> Detecting whether an operator is restored doesn't work with chained state 
> (Flink 1.3)
> -
>
> Key: FLINK-7901
> URL: https://issues.apache.org/jira/browse/FLINK-7901
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, State Backends, Checkpointing
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Aljoscha Krettek
>Priority: Blocker
> Attachments: StreamingJob.java
>
>
> Originally reported on the ML: 
> https://lists.apache.org/thread.html/22a2cf83de3107aa81a03a921325a191c29df8aa8676798fcd497199@%3Cuser.flink.apache.org%3E
> If we have a chain of operators where multiple of them have operator state, 
> detection of the {{context.isRestored()}} flag (of {{CheckpointedFunction}}) 
> does not work correctly. It's best exemplified using this minimal example 
> where both the source and the flatMap have state:
> {code}
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env
>   .addSource(new MaSource()).uid("source-1")
>   .flatMap(new MaFlatMap()).uid("flatMap-1");
> env.execute("testing");
> {code}
> If I do a savepoint with these UIDs, then change "source-1" to "source-2" and 
> restore from the savepoint {{context.isRestored()}} still reports {{true}} 
> for the source.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (FLINK-8759) Bump Netty to 4.0.56

2018-05-28 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8759?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler reopened FLINK-8759:
-

changing resolution...

> Bump Netty to 4.0.56
> 
>
> Key: FLINK-8759
> URL: https://issues.apache.org/jira/browse/FLINK-8759
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
>
> For a bug in Netty's shutdown sequence and overall improvements in Netty, I'd 
> like to bump the version (and stay within the 4.0 series for now). The 
> problem we faced in the past should not be relevant for credit-based flow 
> control anymore and can be worked around (for the fallback code path) by 
> restoring {{LengthFieldBasedFrameDecoder}}'s old behaviour of copying 
> contents to new buffers instead of slicing the existing one (please refer to 
> FLINK-7428 for the inverse direction).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-8759) Bump Netty to 4.0.56

2018-05-28 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8759?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-8759.
---
Resolution: Won't Fix

> Bump Netty to 4.0.56
> 
>
> Key: FLINK-8759
> URL: https://issues.apache.org/jira/browse/FLINK-8759
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
>
> For a bug in Netty's shutdown sequence and overall improvements in Netty, I'd 
> like to bump the version (and stay within the 4.0 series for now). The 
> problem we faced in the past should not be relevant for credit-based flow 
> control anymore and can be worked around (for the fallback code path) by 
> restoring {{LengthFieldBasedFrameDecoder}}'s old behaviour of copying 
> contents to new buffers instead of slicing the existing one (please refer to 
> FLINK-7428 for the inverse direction).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8759) Bump Netty to 4.0.56

2018-05-28 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8759?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-8759:

Fix Version/s: (was: 1.6.0)

> Bump Netty to 4.0.56
> 
>
> Key: FLINK-8759
> URL: https://issues.apache.org/jira/browse/FLINK-8759
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
>
> For a bug in Netty's shutdown sequence and overall improvements in Netty, I'd 
> like to bump the version (and stay within the 4.0 series for now). The 
> problem we faced in the past should not be relevant for credit-based flow 
> control anymore and can be worked around (for the fallback code path) by 
> restoring {{LengthFieldBasedFrameDecoder}}'s old behaviour of copying 
> contents to new buffers instead of slicing the existing one (please refer to 
> FLINK-7428 for the inverse direction).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-8759) Bump Netty to 4.0.56

2018-05-28 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8759?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-8759.
---
Resolution: Fixed

> Bump Netty to 4.0.56
> 
>
> Key: FLINK-8759
> URL: https://issues.apache.org/jira/browse/FLINK-8759
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
>
> For a bug in Netty's shutdown sequence and overall improvements in Netty, I'd 
> like to bump the version (and stay within the 4.0 series for now). The 
> problem we faced in the past should not be relevant for credit-based flow 
> control anymore and can be worked around (for the fallback code path) by 
> restoring {{LengthFieldBasedFrameDecoder}}'s old behaviour of copying 
> contents to new buffers instead of slicing the existing one (please refer to 
> FLINK-7428 for the inverse direction).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8874) rewrite Flink docs/dev/stream/operators/process_function.md to recommend using KeyedProcessFunction

2018-05-28 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8874?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-8874:

Fix Version/s: (was: 1.6.0)

> rewrite Flink docs/dev/stream/operators/process_function.md to recommend 
> using KeyedProcessFunction
> ---
>
> Key: FLINK-8874
> URL: https://issues.apache.org/jira/browse/FLINK-8874
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API, Documentation
>Affects Versions: 1.5.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
>
> We need to completely rewrite Flink 
> docs/dev/stream/operators/process_function.md to recommend using 
> KeyedProcessFunction



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


  1   2   3   >