[jira] [Commented] (FLINK-9579) Remove unnecessary clear with cep elementQueue

2018-06-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512048#comment-16512048
 ] 

ASF GitHub Bot commented on FLINK-9579:
---

GitHub user Aitozi opened a pull request:

https://github.com/apache/flink/pull/6162

[FLINK-9579][CEP]Remove unneeded clear on elementQueueState

## What is the purpose of the change

Remove unneeded clear on elementQueueState, when soretedTimestamp is empty, 
the elements in elementQueueState are all removed, so don't need to clear again 
to waste time on RocksDB operation.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Aitozi/flink remove-clear

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6162.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6162


commit 62a1a506cf8dab263a247d81fa7092eaa0743624
Author: minwenjun 
Date:   2018-06-14T06:20:42Z

[FLINK-9579][CEP]Remove unneeded clear on elementQueueState




> Remove unnecessary clear with cep elementQueue
> --
>
> Key: FLINK-9579
> URL: https://issues.apache.org/jira/browse/FLINK-9579
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.5.0
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>
> when deal with eventtime, the elementQueueState is cleared when 
> sortedTimestamps isEmpty, but I think this operation is not needed because 
> the elements in elementQueueState are all removed if the sortedTimestamps 
> isEmpty, and do not need to clear again to spend time on rocksdb operation? 
> what's your idea [~dawidwys]?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6162: [FLINK-9579][CEP]Remove unneeded clear on elementQ...

2018-06-13 Thread Aitozi
GitHub user Aitozi opened a pull request:

https://github.com/apache/flink/pull/6162

[FLINK-9579][CEP]Remove unneeded clear on elementQueueState

## What is the purpose of the change

Remove unneeded clear on elementQueueState, when soretedTimestamp is 
empty, the elements in elementQueueState are all removed, so don't need to 
clear again to waste time on RocksDB operation.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Aitozi/flink remove-clear

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6162.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6162


commit 62a1a506cf8dab263a247d81fa7092eaa0743624
Author: minwenjun 
Date:   2018-06-14T06:20:42Z

[FLINK-9579][CEP]Remove unneeded clear on elementQueueState




---


[jira] [Commented] (FLINK-9560) RateLimiting for FileSystem

2018-06-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512008#comment-16512008
 ] 

ASF GitHub Bot commented on FLINK-9560:
---

Github user etiennecarriere commented on the issue:

https://github.com/apache/flink/pull/6149
  
@pnowojski , I already addressed the small remarks but not the test part. 
I will try to propose a first test versions in the next days. 


> RateLimiting for FileSystem
> ---
>
> Key: FLINK-9560
> URL: https://issues.apache.org/jira/browse/FLINK-9560
> Project: Flink
>  Issue Type: Improvement
>  Components: FileSystem
>Affects Versions: 1.5.0
>Reporter: Etienne CARRIERE
>Priority: Major
>
> *Pain*: On our system, we see that during checkpoint , all the bandwidth is 
> take to send the checkpoint to object storage (s3 in our case)
> *Proposal* : After the creation of some limitation on Filesystem (mostly 
> number of connections with the  tickets FLINK-8125/FLINK-8198/FLINK-9468), I 
> propose to add ratelimiting "per Filesystem" .
> *Proposal of implementation* : Modify LimitedConnectionsFileSystem to add a 
> ratelimiter on both Input and OutputStream.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6149: [FLINK-9560] Add RateLimiting for FileSystem

2018-06-13 Thread etiennecarriere
Github user etiennecarriere commented on the issue:

https://github.com/apache/flink/pull/6149
  
@pnowojski , I already addressed the small remarks but not the test part. 
I will try to propose a first test versions in the next days. 


---


[jira] [Commented] (FLINK-9487) Prepare InternalTimerHeap for asynchronous snapshots

2018-06-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511981#comment-16511981
 ] 

ASF GitHub Bot commented on FLINK-9487:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6159#discussion_r195299958
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyGroupPartitioner.java
 ---
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+
+/**
+ * Abstract class that contains the base algorithm for partitioning data 
into key-groups. This algorithm currently works
+ * with two array (input, output) for optimal algorithmic complexity. 
Notice that this could also be implemented over a
+ * single array, using some cuckoo-hashing-style element replacement. This 
would have worse algorithmic complexity but
+ * better space efficiency. We currently prefer the trade-off in favor of 
better algorithmic complexity.
+ */
--- End diff --

Maybe we could add a param annotation for `T`.


> Prepare InternalTimerHeap for asynchronous snapshots
> 
>
> Key: FLINK-9487
> URL: https://issues.apache.org/jira/browse/FLINK-9487
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing, Streaming
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.6.0
>
>
> When we want to snapshot timers with the keyed backend state, this must 
> happen as part of an asynchronous snapshot.
> The data structure {{InternalTimerHeap}} needs to offer support for this 
> through a lightweight copy mechanism (e.g. arraycopy of the timer queue, 
> because timers are immutable w.r.t. serialization).
> We can also stop keeping the dedup maps in {{InternalTimerHeap}} separated by 
> key-group, all timers can go into one map.
> Instead, we can implement online-partitioning as part of the asynchronous 
> operation, similar to what we do in {{CopyOnWriteStateTable}} snapshots. 
> Notice that in this intermediate state, the code will still run in the 
> synchronous part until we are integrated with the backends for async 
> snapshotting (next subtask of this jira).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6159: [FLINK-9487] Prepare InternalTimerHeap for asynchr...

2018-06-13 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6159#discussion_r195299958
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyGroupPartitioner.java
 ---
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+
+/**
+ * Abstract class that contains the base algorithm for partitioning data 
into key-groups. This algorithm currently works
+ * with two array (input, output) for optimal algorithmic complexity. 
Notice that this could also be implemented over a
+ * single array, using some cuckoo-hashing-style element replacement. This 
would have worse algorithmic complexity but
+ * better space efficiency. We currently prefer the trade-off in favor of 
better algorithmic complexity.
+ */
--- End diff --

Maybe we could add a param annotation for `T`.


---


[jira] [Created] (FLINK-9583) Wrong number of TaskManagers' slots after recovery.

2018-06-13 Thread Truong Duc Kien (JIRA)
Truong Duc Kien created FLINK-9583:
--

 Summary: Wrong number of TaskManagers' slots after recovery.
 Key: FLINK-9583
 URL: https://issues.apache.org/jira/browse/FLINK-9583
 Project: Flink
  Issue Type: Bug
  Components: ResourceManager
Affects Versions: 1.5.0
 Environment: Flink 1.5.0 on YARN with the default execution mode.
Reporter: Truong Duc Kien
 Attachments: jm.log

We started a job with 120 slots, using a FixedDelayRestart strategy with the 
delay of 1 minutes.

During recovery, some but not all Slots were released.

When the job restarts again, Flink requests a new batch of slots.

The total number of slots is now 193, larger than the configured amount, but 
the excess slots are never released.

 

This bug does not happen with legacy mode. I've attach the job manager log.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9579) Remove unnecessary clear with cep elementQueue

2018-06-13 Thread aitozi (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9579?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

aitozi updated FLINK-9579:
--
Summary: Remove unnecessary clear with cep elementQueue  (was: Remove 
unnecessary check with cep elementQueue)

> Remove unnecessary clear with cep elementQueue
> --
>
> Key: FLINK-9579
> URL: https://issues.apache.org/jira/browse/FLINK-9579
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.5.0
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>
> when deal with eventtime, the elementQueueState is cleared when 
> sortedTimestamps isEmpty, but I think this operation is not needed because 
> the elements in elementQueueState are all removed if the sortedTimestamps 
> isEmpty, and do not need to clear again to spend time on rocksdb operation? 
> what's your idea [~dawidwys]?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9187) add prometheus pushgateway reporter

2018-06-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511883#comment-16511883
 ] 

ASF GitHub Bot commented on FLINK-9187:
---

Github user lamber-ken commented on a diff in the pull request:

https://github.com/apache/flink/pull/5857#discussion_r195281287
  
--- Diff: 
flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporter.java
 ---
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.prometheus;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.metrics.reporter.Scheduled;
+import org.apache.flink.util.AbstractID;
+
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.exporter.PushGateway;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * /**
+ * {@link MetricReporter} that exports {@link Metric Metrics} via 
Prometheus Pushgateway.
+ */
+@PublicEvolving
+public class PrometheusPushGatewayReporter extends 
AbstractPrometheusReporter implements Scheduled {
+   private static final Logger LOG = 
LoggerFactory.getLogger(PrometheusPushGatewayReporter.class);
+
+   public static final String ARG_HOST = "host";
+   public static final String ARG_PORT = "port";
+
+   public static final char JOB_NAME_SEPARATOR = '-';
+   public static final String JOB_NAME_PREFIX = "flink" + 
JOB_NAME_SEPARATOR;
+
+   private PushGateway pushGateway;
+   private final String jobName;
+
+   public PrometheusPushGatewayReporter() {
+   String random = new AbstractID().toString();
+   jobName = JOB_NAME_PREFIX + random;
--- End diff --

ok,I see


> add prometheus pushgateway reporter
> ---
>
> Key: FLINK-9187
> URL: https://issues.apache.org/jira/browse/FLINK-9187
> Project: Flink
>  Issue Type: New Feature
>  Components: Metrics
>Affects Versions: 1.4.2
>Reporter: lamber-ken
>Priority: Minor
>  Labels: features
> Fix For: 1.6.0
>
>
> make flink system can send metrics to prometheus via pushgateway.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5857: [FLINK-9187][METRICS] add prometheus pushgateway r...

2018-06-13 Thread lamber-ken
Github user lamber-ken commented on a diff in the pull request:

https://github.com/apache/flink/pull/5857#discussion_r195281287
  
--- Diff: 
flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporter.java
 ---
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.prometheus;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.metrics.reporter.Scheduled;
+import org.apache.flink.util.AbstractID;
+
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.exporter.PushGateway;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * /**
+ * {@link MetricReporter} that exports {@link Metric Metrics} via 
Prometheus Pushgateway.
+ */
+@PublicEvolving
+public class PrometheusPushGatewayReporter extends 
AbstractPrometheusReporter implements Scheduled {
+   private static final Logger LOG = 
LoggerFactory.getLogger(PrometheusPushGatewayReporter.class);
+
+   public static final String ARG_HOST = "host";
+   public static final String ARG_PORT = "port";
+
+   public static final char JOB_NAME_SEPARATOR = '-';
+   public static final String JOB_NAME_PREFIX = "flink" + 
JOB_NAME_SEPARATOR;
+
+   private PushGateway pushGateway;
+   private final String jobName;
+
+   public PrometheusPushGatewayReporter() {
+   String random = new AbstractID().toString();
+   jobName = JOB_NAME_PREFIX + random;
--- End diff --

ok,I see


---


[jira] [Assigned] (FLINK-9580) Potentially unclosed ByteBufInputStream in RestClient#readRawResponse

2018-06-13 Thread vinoyang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9580?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang reassigned FLINK-9580:
---

Assignee: vinoyang

> Potentially unclosed ByteBufInputStream in RestClient#readRawResponse
> -
>
> Key: FLINK-9580
> URL: https://issues.apache.org/jira/browse/FLINK-9580
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Assignee: vinoyang
>Priority: Minor
>
> Here is related code:
> {code}
>   ByteBufInputStream in = new ByteBufInputStream(content);
>   byte[] data = new byte[in.available()];
>   in.readFully(data);
> {code}
> In the catch block, ByteBufInputStream is not closed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9582) dist assemblies access jars outside of flink-dist

2018-06-13 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9582?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-9582:

Summary: dist assemblies access jars outside of flink-dist  (was: dist 
assemblies access files outside of flink-dist)

> dist assemblies access jars outside of flink-dist
> -
>
> Key: FLINK-9582
> URL: https://issues.apache.org/jira/browse/FLINK-9582
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.6.0
>
>
> The flink-dist assemblies access compiled jars outside of flink-dist, for 
> example like this:
> {code:java}
> ../flink-libraries/flink-cep/target/flink-cep_${scala.binary.version}-${project.version}.jar{code}
> As usual, accessing files outside of the module that you're building is a 
> terrible idea.
> It's brittle as it relies on paths that aren't guaranteed to be stable, and 
> requires these modules to be built beforehand. There's also an inherent 
> potential for dependency conflicts when building flink-dist on it's own, as 
> maven may download certain snapshot artifacts, but the assemblies ignore 
> these and bundle jars present in Flink.
> We can use the maven-dependency plugin to copy required dependencies into the 
> {{target}} directory of flink-dist, and point the assemblies to these jars.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9582) dist assemblies access files outside of flink-dist

2018-06-13 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-9582:
---

 Summary: dist assemblies access files outside of flink-dist
 Key: FLINK-9582
 URL: https://issues.apache.org/jira/browse/FLINK-9582
 Project: Flink
  Issue Type: Bug
  Components: Build System
Affects Versions: 1.5.0, 1.6.0
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.6.0


The flink-dist assemblies access compiled jars outside of flink-dist, for 
example like this:
{code:java}
../flink-libraries/flink-cep/target/flink-cep_${scala.binary.version}-${project.version}.jar{code}
As usual, accessing files outside of the module that you're building is a 
terrible idea.

It's brittle as it relies on paths that aren't guaranteed to be stable, and 
requires these modules to be built beforehand. There's also an inherent 
potential for dependency conflicts when building flink-dist on it's own, as 
maven may download certain snapshot artifacts, but the assemblies ignore these 
and bundle jars present in Flink.

We can use the maven-dependency plugin to copy required dependencies into the 
{{target}} directory of flink-dist, and point the assemblies to these jars.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9187) add prometheus pushgateway reporter

2018-06-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511640#comment-16511640
 ] 

ASF GitHub Bot commented on FLINK-9187:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5857#discussion_r195229711
  
--- Diff: 
flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporter.java
 ---
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.prometheus;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.metrics.reporter.Scheduled;
+import org.apache.flink.util.AbstractID;
+
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.exporter.PushGateway;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * /**
+ * {@link MetricReporter} that exports {@link Metric Metrics} via 
Prometheus Pushgateway.
+ */
+@PublicEvolving
+public class PrometheusPushGatewayReporter extends 
AbstractPrometheusReporter implements Scheduled {
+   private static final Logger LOG = 
LoggerFactory.getLogger(PrometheusPushGatewayReporter.class);
+
+   public static final String ARG_HOST = "host";
+   public static final String ARG_PORT = "port";
+
+   public static final char JOB_NAME_SEPARATOR = '-';
+   public static final String JOB_NAME_PREFIX = "flink" + 
JOB_NAME_SEPARATOR;
+
+   private PushGateway pushGateway;
+   private final String jobName;
+
+   public PrometheusPushGatewayReporter() {
+   String random = new AbstractID().toString();
+   jobName = JOB_NAME_PREFIX + random;
--- End diff --

I'm inclined to block the PR on the JobManager ID exposure. 
([FLINK-9543](https://issues.apache.org/jira/browse/FLINK-9543))

The PR is not at risk at becoming outdated, so keeping it open for a while 
isn't a problem.


> add prometheus pushgateway reporter
> ---
>
> Key: FLINK-9187
> URL: https://issues.apache.org/jira/browse/FLINK-9187
> Project: Flink
>  Issue Type: New Feature
>  Components: Metrics
>Affects Versions: 1.4.2
>Reporter: lamber-ken
>Priority: Minor
>  Labels: features
> Fix For: 1.6.0
>
>
> make flink system can send metrics to prometheus via pushgateway.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5857: [FLINK-9187][METRICS] add prometheus pushgateway r...

2018-06-13 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5857#discussion_r195229711
  
--- Diff: 
flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporter.java
 ---
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.prometheus;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.metrics.reporter.Scheduled;
+import org.apache.flink.util.AbstractID;
+
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.exporter.PushGateway;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * /**
+ * {@link MetricReporter} that exports {@link Metric Metrics} via 
Prometheus Pushgateway.
+ */
+@PublicEvolving
+public class PrometheusPushGatewayReporter extends 
AbstractPrometheusReporter implements Scheduled {
+   private static final Logger LOG = 
LoggerFactory.getLogger(PrometheusPushGatewayReporter.class);
+
+   public static final String ARG_HOST = "host";
+   public static final String ARG_PORT = "port";
+
+   public static final char JOB_NAME_SEPARATOR = '-';
+   public static final String JOB_NAME_PREFIX = "flink" + 
JOB_NAME_SEPARATOR;
+
+   private PushGateway pushGateway;
+   private final String jobName;
+
+   public PrometheusPushGatewayReporter() {
+   String random = new AbstractID().toString();
+   jobName = JOB_NAME_PREFIX + random;
--- End diff --

I'm inclined to block the PR on the JobManager ID exposure. 
([FLINK-9543](https://issues.apache.org/jira/browse/FLINK-9543))

The PR is not at risk at becoming outdated, so keeping it open for a while 
isn't a problem.


---


[jira] [Commented] (FLINK-9581) Redundant spaces for Collect at sql.md

2018-06-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511638#comment-16511638
 ] 

ASF GitHub Bot commented on FLINK-9581:
---

GitHub user snuyanzin opened a pull request:

https://github.com/apache/flink/pull/6161

[FLINK-9581] Remove extra spaces to make COLLECT left aligned

## Brief change log

extra spaces removed

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): ( no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: ( no )
  - The runtime per-record code paths (performance sensitive): ( no )
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: ( no )
  - The S3 file system connector: ( no )

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/snuyanzin/flink COLLECT_DOC_TYPO

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6161.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6161


commit 009afd05bc1316f35dd9841ac6367aab101d2970
Author: snuyanzin 
Date:   2018-06-13T17:30:26Z

Remove trailing space to make COLLECT left aligned




> Redundant spaces for Collect at sql.md
> --
>
> Key: FLINK-9581
> URL: https://issues.apache.org/jira/browse/FLINK-9581
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation, Table API & SQL
>Reporter: Sergey Nuyanzin
>Assignee: Sergey Nuyanzin
>Priority: Trivial
> Attachments: collect.png
>
>
> could be seen at 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql.html
> + attach



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6161: [FLINK-9581] Remove extra spaces to make COLLECT l...

2018-06-13 Thread snuyanzin
GitHub user snuyanzin opened a pull request:

https://github.com/apache/flink/pull/6161

[FLINK-9581] Remove extra spaces to make COLLECT left aligned

## Brief change log

extra spaces removed

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): ( no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: ( no )
  - The runtime per-record code paths (performance sensitive): ( no )
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: ( no )
  - The S3 file system connector: ( no )

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/snuyanzin/flink COLLECT_DOC_TYPO

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6161.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6161


commit 009afd05bc1316f35dd9841ac6367aab101d2970
Author: snuyanzin 
Date:   2018-06-13T17:30:26Z

Remove trailing space to make COLLECT left aligned




---


[jira] [Created] (FLINK-9581) Redundant spaces for Collect at sql.md

2018-06-13 Thread Sergey Nuyanzin (JIRA)
Sergey Nuyanzin created FLINK-9581:
--

 Summary: Redundant spaces for Collect at sql.md
 Key: FLINK-9581
 URL: https://issues.apache.org/jira/browse/FLINK-9581
 Project: Flink
  Issue Type: Bug
  Components: Documentation, Table API & SQL
Reporter: Sergey Nuyanzin
Assignee: Sergey Nuyanzin
 Attachments: collect.png

could be seen at 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql.html
+ attach



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9580) Potentially unclosed ByteBufInputStream in RestClient#readRawResponse

2018-06-13 Thread Ted Yu (JIRA)
Ted Yu created FLINK-9580:
-

 Summary: Potentially unclosed ByteBufInputStream in 
RestClient#readRawResponse
 Key: FLINK-9580
 URL: https://issues.apache.org/jira/browse/FLINK-9580
 Project: Flink
  Issue Type: Bug
Reporter: Ted Yu


Here is related code:
{code}
  ByteBufInputStream in = new ByteBufInputStream(content);
  byte[] data = new byte[in.available()];
  in.readFully(data);
{code}
In the catch block, ByteBufInputStream is not closed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9572) Extend InternalAppendingState with internal stored state access

2018-06-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9572?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511587#comment-16511587
 ] 

ASF GitHub Bot commented on FLINK-9572:
---

Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/6156
  
+1 LGTM


> Extend InternalAppendingState with internal stored state access
> ---
>
> Key: FLINK-9572
> URL: https://issues.apache.org/jira/browse/FLINK-9572
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
> Fix For: 1.6.0
>
>
>  
> {code:java}
> public interface InternalAppendingState ... {
>     SV getInternal();
>     void updateInternal(SV);
> }
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6156: [FLINK-9572] Extend InternalAppendingState with internal ...

2018-06-13 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/6156
  
+1 LGTM


---


[jira] [Commented] (FLINK-9545) Support read a file multiple times in Flink DataStream

2018-06-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511582#comment-16511582
 ] 

ASF GitHub Bot commented on FLINK-9545:
---

Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/6130
  
Not really. It's not about having n copies of data. One use case is 
File-fed stream pipeline usually runs very fast with inadequate metrics, users 
need to run it end-to-end for a longer time to gather stable metrics and tune 
all components in the pipeline.

I'll close it if community is not interested.


> Support read a file multiple times in Flink DataStream 
> ---
>
> Key: FLINK-9545
> URL: https://issues.apache.org/jira/browse/FLINK-9545
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.6.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.6.0
>
>
> Motivation: We have the requirements to read a bunch files, each file to read 
> multiple times, to feed our streams
> Specifically we need {{StreamExecutionEnvironment.readFile/readTextFile}} to 
> be able to read a file for a specified {{N}} times, but currently it only 
> supports reading file once.
> We've implemented this internally. Would be good to get it back to the 
> community version. This jira is to add support for the feature. 
> Plan:
> add a new processing mode as PROCESSING_N_TIMES, and add additional parameter 
> {{numTimes}} for {{StreamExecutionEnvironment.readFile/readTextFile}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6130: [FLINK-9545] Support read a file multiple times in Flink ...

2018-06-13 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/6130
  
Not really. It's not about having n copies of data. One use case is 
File-fed stream pipeline usually runs very fast with inadequate metrics, users 
need to run it end-to-end for a longer time to gather stable metrics and tune 
all components in the pipeline.

I'll close it if community is not interested.


---


[jira] [Commented] (FLINK-9487) Prepare InternalTimerHeap for asynchronous snapshots

2018-06-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511453#comment-16511453
 ] 

ASF GitHub Bot commented on FLINK-9487:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6159#discussion_r195156251
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyGroupPartitioner.java
 ---
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+
+/**
+ * Abstract class that contains the base algorithm for partitioning data 
into key-groups. This algorithm currently works
+ * with two array (input, output) for optimal algorithmic complexity. 
Notice that this could also be implemented over a
+ * single array, using some cuckoo-hashing-style element replacement. This 
would have worse algorithmic complexity but
+ * better space efficiency. We currently prefer the trade-off in favor of 
better algorithmic complexity.
+ */
+public abstract class AbstractKeyGroupPartitioner {
+
+   /** Total number of input elements. */
+   @Nonnegative
+   protected final int numberOfElements;
+
+   /** The total number of key-groups in the job. */
+   @Nonnegative
+   protected final int totalKeyGroups;
+
+   /** The key-group range for the input data, covered in this 
partitioning. */
+   @Nonnull
+   protected final KeyGroupRange keyGroupRange;
+
+   /**
+* This bookkeeping array is used to count the elements in each 
key-group. In a second step, it is transformed into
+* a histogram by accumulation.
+*/
+   @Nonnull
+   protected final int[] counterHistogram;
+
+   /**
+* This is a helper array that caches the key-group for each element, 
so we do not have to compute them twice.
+*/
+   @Nonnull
+   protected final int[] elementKeyGroups;
+
+   /** Cached value of keyGroupRange#firstKeyGroup. */
+   @Nonnegative
+   protected final int firstKeyGroup;
+
+   /** Cached result. */
+   protected PartitioningResult computedResult;
+
+   /**
+* @param keyGroupRange the key-group range of the data that will be 
partitioned by this instance.
+* @param totalKeyGroups the total number of key groups in the job.
+*/
+   public AbstractKeyGroupPartitioner(
+   @Nonnegative int numberOfElements,
+   @Nonnull KeyGroupRange keyGroupRange,
+   @Nonnegative int totalKeyGroups) {
+
+   this.numberOfElements = numberOfElements;
+   this.keyGroupRange = keyGroupRange;
+   this.totalKeyGroups = totalKeyGroups;
+   this.firstKeyGroup = keyGroupRange.getStartKeyGroup();
+   this.elementKeyGroups = new int[numberOfElements];
+   this.counterHistogram = new 
int[keyGroupRange.getNumberOfKeyGroups()];
+   this.computedResult = null;
+   }
+
+   /**
+* Partitions the data into key-groups and returns the result via 
{@link PartitioningResult}.
+*/
+   public PartitioningResult partitionByKeyGroup() {
+   if (computedResult == null) {
+   reportAllElementKeyGroups();
+   buildHistogramFromCounts();
+   executePartitioning();
+   }
+   return computedResult;
+   }
+
+   /**
+* This method iterates over the input data and reports the key-group 
for each element.
+*/
+   protected void reportAllElementKeyGroups() {
+   final T[] input = getPartitioningInput();
+
+   Preconditions.checkState(input.length >= numberOfElements);
+
+   for (int i = 0; i < numberOfElements; ++i) {
+   

[jira] [Commented] (FLINK-9487) Prepare InternalTimerHeap for asynchronous snapshots

2018-06-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511454#comment-16511454
 ] 

ASF GitHub Bot commented on FLINK-9487:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6159#discussion_r195168815
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerHeap.java
 ---
@@ -224,6 +227,15 @@ void bulkAddRestoredTimers(Collection> restoredTim
return result;
}
 
+   @Nonnull
+   StateSnapshot snapshot(TimerHeapInternalTimer.TimerSerializer 
serializer) {
+   return new InternalTimerHeapSnapshot<>(
+   Arrays.copyOfRange(queue, 1, size + 1),
+   serializer,
--- End diff --

The `serializer` was be duplicated here or in the constructor of 
`InternalTimerHeapSnapshot` in the current code, do you plan to duplicate it 
when calling the `snapshot()` in the next code?


> Prepare InternalTimerHeap for asynchronous snapshots
> 
>
> Key: FLINK-9487
> URL: https://issues.apache.org/jira/browse/FLINK-9487
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing, Streaming
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.6.0
>
>
> When we want to snapshot timers with the keyed backend state, this must 
> happen as part of an asynchronous snapshot.
> The data structure {{InternalTimerHeap}} needs to offer support for this 
> through a lightweight copy mechanism (e.g. arraycopy of the timer queue, 
> because timers are immutable w.r.t. serialization).
> We can also stop keeping the dedup maps in {{InternalTimerHeap}} separated by 
> key-group, all timers can go into one map.
> Instead, we can implement online-partitioning as part of the asynchronous 
> operation, similar to what we do in {{CopyOnWriteStateTable}} snapshots. 
> Notice that in this intermediate state, the code will still run in the 
> synchronous part until we are integrated with the backends for async 
> snapshotting (next subtask of this jira).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6159: [FLINK-9487] Prepare InternalTimerHeap for asynchr...

2018-06-13 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6159#discussion_r195156251
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyGroupPartitioner.java
 ---
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+
+/**
+ * Abstract class that contains the base algorithm for partitioning data 
into key-groups. This algorithm currently works
+ * with two array (input, output) for optimal algorithmic complexity. 
Notice that this could also be implemented over a
+ * single array, using some cuckoo-hashing-style element replacement. This 
would have worse algorithmic complexity but
+ * better space efficiency. We currently prefer the trade-off in favor of 
better algorithmic complexity.
+ */
+public abstract class AbstractKeyGroupPartitioner {
+
+   /** Total number of input elements. */
+   @Nonnegative
+   protected final int numberOfElements;
+
+   /** The total number of key-groups in the job. */
+   @Nonnegative
+   protected final int totalKeyGroups;
+
+   /** The key-group range for the input data, covered in this 
partitioning. */
+   @Nonnull
+   protected final KeyGroupRange keyGroupRange;
+
+   /**
+* This bookkeeping array is used to count the elements in each 
key-group. In a second step, it is transformed into
+* a histogram by accumulation.
+*/
+   @Nonnull
+   protected final int[] counterHistogram;
+
+   /**
+* This is a helper array that caches the key-group for each element, 
so we do not have to compute them twice.
+*/
+   @Nonnull
+   protected final int[] elementKeyGroups;
+
+   /** Cached value of keyGroupRange#firstKeyGroup. */
+   @Nonnegative
+   protected final int firstKeyGroup;
+
+   /** Cached result. */
+   protected PartitioningResult computedResult;
+
+   /**
+* @param keyGroupRange the key-group range of the data that will be 
partitioned by this instance.
+* @param totalKeyGroups the total number of key groups in the job.
+*/
+   public AbstractKeyGroupPartitioner(
+   @Nonnegative int numberOfElements,
+   @Nonnull KeyGroupRange keyGroupRange,
+   @Nonnegative int totalKeyGroups) {
+
+   this.numberOfElements = numberOfElements;
+   this.keyGroupRange = keyGroupRange;
+   this.totalKeyGroups = totalKeyGroups;
+   this.firstKeyGroup = keyGroupRange.getStartKeyGroup();
+   this.elementKeyGroups = new int[numberOfElements];
+   this.counterHistogram = new 
int[keyGroupRange.getNumberOfKeyGroups()];
+   this.computedResult = null;
+   }
+
+   /**
+* Partitions the data into key-groups and returns the result via 
{@link PartitioningResult}.
+*/
+   public PartitioningResult partitionByKeyGroup() {
+   if (computedResult == null) {
+   reportAllElementKeyGroups();
+   buildHistogramFromCounts();
+   executePartitioning();
+   }
+   return computedResult;
+   }
+
+   /**
+* This method iterates over the input data and reports the key-group 
for each element.
+*/
+   protected void reportAllElementKeyGroups() {
+   final T[] input = getPartitioningInput();
+
+   Preconditions.checkState(input.length >= numberOfElements);
+
+   for (int i = 0; i < numberOfElements; ++i) {
+   int keyGroup = 
KeyGroupRangeAssignment.assignToKeyGroup(extractKeyFromElement(input[i]), 
totalKeyGroups);
+   reportKeyGroupOfElementAtIndex(i, keyGroup);
+   }
+   }
+
+   /**
+* Returns the

[GitHub] flink pull request #6159: [FLINK-9487] Prepare InternalTimerHeap for asynchr...

2018-06-13 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6159#discussion_r195168815
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerHeap.java
 ---
@@ -224,6 +227,15 @@ void bulkAddRestoredTimers(Collection> restoredTim
return result;
}
 
+   @Nonnull
+   StateSnapshot snapshot(TimerHeapInternalTimer.TimerSerializer 
serializer) {
+   return new InternalTimerHeapSnapshot<>(
+   Arrays.copyOfRange(queue, 1, size + 1),
+   serializer,
--- End diff --

The `serializer` was be duplicated here or in the constructor of 
`InternalTimerHeapSnapshot` in the current code, do you plan to duplicate it 
when calling the `snapshot()` in the next code?


---


[jira] [Commented] (FLINK-9444) KafkaAvroTableSource failed to work for map and array fields

2018-06-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9444?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511388#comment-16511388
 ] 

ASF GitHub Bot commented on FLINK-9444:
---

Github user tragicjun commented on the issue:

https://github.com/apache/flink/pull/6082
  
@twalthr Sure, please go ahead and let me know if anything I can help 
further. 


> KafkaAvroTableSource failed to work for map and array fields
> 
>
> Key: FLINK-9444
> URL: https://issues.apache.org/jira/browse/FLINK-9444
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Table API & SQL
>Affects Versions: 1.6.0
>Reporter: Jun Zhang
>Priority: Blocker
>  Labels: patch
> Fix For: 1.6.0
>
> Attachments: flink-9444.patch
>
>
> When some Avro schema has map/array fields and the corresponding TableSchema 
> declares *MapTypeInfo/ListTypeInfo* for these fields, an exception will be 
> thrown when registering the *KafkaAvroTableSource*, complaining like:
> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> Type Map of table field 'event' does not match with type 
> GenericType of the field 'event' of the TableSource return 
> type.
>  at org.apache.flink.table.api.ValidationException$.apply(exceptions.scala:74)
>  at 
> org.apache.flink.table.sources.TableSourceUtil$$anonfun$validateTableSource$1.apply(TableSourceUtil.scala:92)
>  at 
> org.apache.flink.table.sources.TableSourceUtil$$anonfun$validateTableSource$1.apply(TableSourceUtil.scala:71)
>  at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>  at 
> org.apache.flink.table.sources.TableSourceUtil$.validateTableSource(TableSourceUtil.scala:71)
>  at 
> org.apache.flink.table.plan.schema.StreamTableSourceTable.(StreamTableSourceTable.scala:33)
>  at 
> org.apache.flink.table.api.StreamTableEnvironment.registerTableSourceInternal(StreamTableEnvironment.scala:124)
>  at 
> org.apache.flink.table.api.TableEnvironment.registerTableSource(TableEnvironment.scala:438)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6082: [FLINK-9444][table] KafkaAvroTableSource failed to work f...

2018-06-13 Thread tragicjun
Github user tragicjun commented on the issue:

https://github.com/apache/flink/pull/6082
  
@twalthr Sure, please go ahead and let me know if anything I can help 
further. 


---


[jira] [Commented] (FLINK-9548) Flink Apache Kudu Connector

2018-06-13 Thread Sandish Kumar HN (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511383#comment-16511383
 ] 

Sandish Kumar HN commented on FLINK-9548:
-

I would say 

for Batch  Kudu to Dataset  to Kudu and OtherSource to Dataset to Kudu

for stream OtherSource to DataStream to Kudu

I have not thought about SQL/Table API (batch), SQL/Table API (streaming) yet, 
but I'm looking forward to add as many features as possible. 

> Flink Apache Kudu Connector
> ---
>
> Key: FLINK-9548
> URL: https://issues.apache.org/jira/browse/FLINK-9548
> Project: Flink
>  Issue Type: New Feature
>Reporter: Sandish Kumar HN
>Assignee: Sandish Kumar HN
>Priority: Minor
>
> Flink Apache Kudu Connector will be good addition. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9548) Flink Apache Kudu Connector

2018-06-13 Thread Fabian Hueske (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511379#comment-16511379
 ] 

Fabian Hueske commented on FLINK-9548:
--

Yes, I know what Kudu is. But I think it would be good to have a clear scope 
for this feature. We can also break it down into several subtasks.
Also, which APIs will this be for: DataSet, DataStream, SQL/Table API (batch), 
SQL/Table API (streaming), etc.

Do you have a concrete use case in mind?


> Flink Apache Kudu Connector
> ---
>
> Key: FLINK-9548
> URL: https://issues.apache.org/jira/browse/FLINK-9548
> Project: Flink
>  Issue Type: New Feature
>Reporter: Sandish Kumar HN
>Assignee: Sandish Kumar HN
>Priority: Minor
>
> Flink Apache Kudu Connector will be good addition. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9548) Flink Apache Kudu Connector

2018-06-13 Thread Sandish Kumar HN (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511376#comment-16511376
 ] 

Sandish Kumar HN commented on FLINK-9548:
-

Hi [~fhueske],

Kudu is NoSql Columnar DB, so we can have both as source and sink.

scope can be like reading TB's of data from kudu tables and processing in flink 
and storing tbs of data back to kudu tables


I don't think so much work to maintain because kudu is already at 1.7 version 
and also a apache project and yes I will take care of maintaining the this 
connector in the future. 

 

> Flink Apache Kudu Connector
> ---
>
> Key: FLINK-9548
> URL: https://issues.apache.org/jira/browse/FLINK-9548
> Project: Flink
>  Issue Type: New Feature
>Reporter: Sandish Kumar HN
>Assignee: Sandish Kumar HN
>Priority: Minor
>
> Flink Apache Kudu Connector will be good addition. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9548) Flink Apache Kudu Connector

2018-06-13 Thread Fabian Hueske (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511371#comment-16511371
 ] 

Fabian Hueske commented on FLINK-9548:
--

Hi [~sanysand...@gmail.com], I assigned the issue to you and gave you 
contributor permissions.

Before you start contributing, can you please give more details about the scope 
of this feature and extend the issue description?
Should this be a source or sink connector, for batch or streaming tasks (if 
streaming, which consistency guarantees do you envision: at-least-once, 
exactly-once, how to you want to support it).

When adding new connectors, we are typically also checking how much effort it 
is to maintain them. Would you be up to take care of that?

Thanks, Fabian

> Flink Apache Kudu Connector
> ---
>
> Key: FLINK-9548
> URL: https://issues.apache.org/jira/browse/FLINK-9548
> Project: Flink
>  Issue Type: New Feature
>Reporter: Sandish Kumar HN
>Assignee: Sandish Kumar HN
>Priority: Minor
>
> Flink Apache Kudu Connector will be good addition. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-9548) Flink Apache Kudu Connector

2018-06-13 Thread Fabian Hueske (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9548?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske reassigned FLINK-9548:


Assignee: Sandish Kumar HN

> Flink Apache Kudu Connector
> ---
>
> Key: FLINK-9548
> URL: https://issues.apache.org/jira/browse/FLINK-9548
> Project: Flink
>  Issue Type: New Feature
>Reporter: Sandish Kumar HN
>Assignee: Sandish Kumar HN
>Priority: Minor
>
> Flink Apache Kudu Connector will be good addition. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9560) RateLimiting for FileSystem

2018-06-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511357#comment-16511357
 ] 

ASF GitHub Bot commented on FLINK-9560:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6149#discussion_r195137063
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/core/fs/LimitedConnectionsFileSystem.java
 ---
@@ -88,6 +90,12 @@
/** The nanoseconds that a stream may spend not writing any bytes 
before it is closed as inactive. */
private final long streamInactivityTimeoutNanos;
 
+   /** Rate limiter of incoming bytes for this filesystem. */
+   private final RateLimiter inputRateLimiter;
--- End diff --

Instead of nullable field, please use `Optional` on both 
fields. annotating `@Nullable` wouldn't suffice, since unfortunately we do not 
enforce such annotation and `Optional` is always enforced.


> RateLimiting for FileSystem
> ---
>
> Key: FLINK-9560
> URL: https://issues.apache.org/jira/browse/FLINK-9560
> Project: Flink
>  Issue Type: Improvement
>  Components: FileSystem
>Affects Versions: 1.5.0
>Reporter: Etienne CARRIERE
>Priority: Major
>
> *Pain*: On our system, we see that during checkpoint , all the bandwidth is 
> take to send the checkpoint to object storage (s3 in our case)
> *Proposal* : After the creation of some limitation on Filesystem (mostly 
> number of connections with the  tickets FLINK-8125/FLINK-8198/FLINK-9468), I 
> propose to add ratelimiting "per Filesystem" .
> *Proposal of implementation* : Modify LimitedConnectionsFileSystem to add a 
> ratelimiter on both Input and OutputStream.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9560) RateLimiting for FileSystem

2018-06-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511359#comment-16511359
 ] 

ASF GitHub Bot commented on FLINK-9560:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6149#discussion_r195138248
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/core/fs/LimitedConnectionsFileSystem.java
 ---
@@ -741,6 +804,10 @@ public void write(int b) throws IOException {
public void write(byte[] b, int off, int len) throws 
IOException {
try {
originalStream.write(b, off, len);
+   if (fs.outputRateLimiter != null){
--- End diff --

With optional: `rate.ifPresent(limiter -> limiter.acquire(len));`


> RateLimiting for FileSystem
> ---
>
> Key: FLINK-9560
> URL: https://issues.apache.org/jira/browse/FLINK-9560
> Project: Flink
>  Issue Type: Improvement
>  Components: FileSystem
>Affects Versions: 1.5.0
>Reporter: Etienne CARRIERE
>Priority: Major
>
> *Pain*: On our system, we see that during checkpoint , all the bandwidth is 
> take to send the checkpoint to object storage (s3 in our case)
> *Proposal* : After the creation of some limitation on Filesystem (mostly 
> number of connections with the  tickets FLINK-8125/FLINK-8198/FLINK-9468), I 
> propose to add ratelimiting "per Filesystem" .
> *Proposal of implementation* : Modify LimitedConnectionsFileSystem to add a 
> ratelimiter on both Input and OutputStream.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9560) RateLimiting for FileSystem

2018-06-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511363#comment-16511363
 ] 

ASF GitHub Bot commented on FLINK-9560:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6149#discussion_r195142699
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/core/fs/LimitedConnectionsFileSystem.java
 ---
@@ -166,12 +174,47 @@ public LimitedConnectionsFileSystem(
int maxNumOpenInputStreams,
long streamOpenTimeout,
long streamInactivityTimeout) {
+   this(originalFs, maxNumOpenStreamsTotal, 
maxNumOpenOutputStreams, maxNumOpenInputStreams, streamOpenTimeout, 
streamInactivityTimeout, 0, 0);
+   }
+
+   /**
+* Creates a new output connection limiting file system, limiting input 
and output streams with
+* potentially different quotas.
+*
+* If streams are inactive (meaning not writing bytes) for longer 
than the given timeout,
+* then they are terminated as "inactive", to prevent that the limited 
number of connections gets
+* stuck on only blocked threads.
+*
+* @param originalFs  The original file system to which 
connections are limited.
+* @param maxNumOpenStreamsTotal  The maximum number of concurrent open 
streams (0 means no limit).
+* @param maxNumOpenOutputStreams The maximum number of concurrent open 
output streams (0 means no limit).
+* @param maxNumOpenInputStreams  The maximum number of concurrent open 
input streams (0 means no limit).
+* @param streamOpenTimeout   The maximum number of milliseconds 
that the file system will wait when
+*no more connections are currently 
permitted.
+* @param streamInactivityTimeout The milliseconds that a stream may 
spend not writing any
+*bytes before it is closed as inactive.
+* @param inputBytesPerSecondRate The rate limiting of Bytes red per 
second on the FileSystem (0 means no limit)
+* @param outputBytesPerSecondRate The rate limiting of Bytes written 
per second on the FileSystem (0 means no limit)
+*/
+
+   public LimitedConnectionsFileSystem(
+   FileSystem originalFs,
+   int maxNumOpenStreamsTotal,
+   int maxNumOpenOutputStreams,
+   int maxNumOpenInputStreams,
+   long streamOpenTimeout,
+   long streamInactivityTimeout,
+   long inputBytesPerSecondRate,
+   long outputBytesPerSecondRate
+   ) {
--- End diff --

remove the newline after last argument


> RateLimiting for FileSystem
> ---
>
> Key: FLINK-9560
> URL: https://issues.apache.org/jira/browse/FLINK-9560
> Project: Flink
>  Issue Type: Improvement
>  Components: FileSystem
>Affects Versions: 1.5.0
>Reporter: Etienne CARRIERE
>Priority: Major
>
> *Pain*: On our system, we see that during checkpoint , all the bandwidth is 
> take to send the checkpoint to object storage (s3 in our case)
> *Proposal* : After the creation of some limitation on Filesystem (mostly 
> number of connections with the  tickets FLINK-8125/FLINK-8198/FLINK-9468), I 
> propose to add ratelimiting "per Filesystem" .
> *Proposal of implementation* : Modify LimitedConnectionsFileSystem to add a 
> ratelimiter on both Input and OutputStream.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9560) RateLimiting for FileSystem

2018-06-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511356#comment-16511356
 ] 

ASF GitHub Bot commented on FLINK-9560:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6149#discussion_r195134228
  
--- Diff: docs/ops/filesystems.md ---
@@ -102,6 +102,8 @@ fs..limit.input: (number, 0/-1 mean no limit)
 fs..limit.output: (number, 0/-1 mean no limit)
 fs..limit.timeout: (milliseconds, 0 means infinite)
 fs..limit.stream-timeout: (milliseconds, 0 means infinite)
+fs..limit.rateLimitingInput: (bytes/s, 0 means infinite)
+fs..limit.rateLimitingOutput: (bytes/s, 0 means infinite)
--- End diff --

`output-rate-limit`? 


> RateLimiting for FileSystem
> ---
>
> Key: FLINK-9560
> URL: https://issues.apache.org/jira/browse/FLINK-9560
> Project: Flink
>  Issue Type: Improvement
>  Components: FileSystem
>Affects Versions: 1.5.0
>Reporter: Etienne CARRIERE
>Priority: Major
>
> *Pain*: On our system, we see that during checkpoint , all the bandwidth is 
> take to send the checkpoint to object storage (s3 in our case)
> *Proposal* : After the creation of some limitation on Filesystem (mostly 
> number of connections with the  tickets FLINK-8125/FLINK-8198/FLINK-9468), I 
> propose to add ratelimiting "per Filesystem" .
> *Proposal of implementation* : Modify LimitedConnectionsFileSystem to add a 
> ratelimiter on both Input and OutputStream.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9560) RateLimiting for FileSystem

2018-06-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511358#comment-16511358
 ] 

ASF GitHub Bot commented on FLINK-9560:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6149#discussion_r195136244
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/core/fs/ConnectionLimitingFactory.java
 ---
@@ -62,7 +62,9 @@ public FileSystem create(URI fsUri) throws IOException {
FileSystem original = factory.create(fsUri);
return new LimitedConnectionsFileSystem(original,
settings.limitTotal, settings.limitOutput, 
settings.limitInput,
-   settings.streamOpenTimeout, 
settings.streamInactivityTimeout);
+   settings.streamOpenTimeout, 
settings.streamInactivityTimeout,
--- End diff --

please reformat previous arguments (one per line) since you have just 
touched this one


> RateLimiting for FileSystem
> ---
>
> Key: FLINK-9560
> URL: https://issues.apache.org/jira/browse/FLINK-9560
> Project: Flink
>  Issue Type: Improvement
>  Components: FileSystem
>Affects Versions: 1.5.0
>Reporter: Etienne CARRIERE
>Priority: Major
>
> *Pain*: On our system, we see that during checkpoint , all the bandwidth is 
> take to send the checkpoint to object storage (s3 in our case)
> *Proposal* : After the creation of some limitation on Filesystem (mostly 
> number of connections with the  tickets FLINK-8125/FLINK-8198/FLINK-9468), I 
> propose to add ratelimiting "per Filesystem" .
> *Proposal of implementation* : Modify LimitedConnectionsFileSystem to add a 
> ratelimiter on both Input and OutputStream.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9560) RateLimiting for FileSystem

2018-06-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511354#comment-16511354
 ] 

ASF GitHub Bot commented on FLINK-9560:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6149#discussion_r195134150
  
--- Diff: docs/ops/filesystems.md ---
@@ -102,6 +102,8 @@ fs..limit.input: (number, 0/-1 mean no limit)
 fs..limit.output: (number, 0/-1 mean no limit)
 fs..limit.timeout: (milliseconds, 0 means infinite)
 fs..limit.stream-timeout: (milliseconds, 0 means infinite)
+fs..limit.rateLimitingInput: (bytes/s, 0 means infinite)
--- End diff --

`input-rate-limit`? + document  default value? Or the default value is 
visible via some automatically generated docs?


> RateLimiting for FileSystem
> ---
>
> Key: FLINK-9560
> URL: https://issues.apache.org/jira/browse/FLINK-9560
> Project: Flink
>  Issue Type: Improvement
>  Components: FileSystem
>Affects Versions: 1.5.0
>Reporter: Etienne CARRIERE
>Priority: Major
>
> *Pain*: On our system, we see that during checkpoint , all the bandwidth is 
> take to send the checkpoint to object storage (s3 in our case)
> *Proposal* : After the creation of some limitation on Filesystem (mostly 
> number of connections with the  tickets FLINK-8125/FLINK-8198/FLINK-9468), I 
> propose to add ratelimiting "per Filesystem" .
> *Proposal of implementation* : Modify LimitedConnectionsFileSystem to add a 
> ratelimiter on both Input and OutputStream.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9560) RateLimiting for FileSystem

2018-06-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511360#comment-16511360
 ] 

ASF GitHub Bot commented on FLINK-9560:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6149#discussion_r195138105
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/core/fs/LimitedConnectionsFileSystem.java
 ---
@@ -731,6 +791,9 @@ public boolean checkNewBytesAndMark(long timestamp) 
throws IOException {
public void write(int b) throws IOException {
try {
originalStream.write(b);
+   if (fs.outputRateLimiter != null){
--- End diff --

With optional: `rate.ifPresent(RateLimiter::acquire);`


> RateLimiting for FileSystem
> ---
>
> Key: FLINK-9560
> URL: https://issues.apache.org/jira/browse/FLINK-9560
> Project: Flink
>  Issue Type: Improvement
>  Components: FileSystem
>Affects Versions: 1.5.0
>Reporter: Etienne CARRIERE
>Priority: Major
>
> *Pain*: On our system, we see that during checkpoint , all the bandwidth is 
> take to send the checkpoint to object storage (s3 in our case)
> *Proposal* : After the creation of some limitation on Filesystem (mostly 
> number of connections with the  tickets FLINK-8125/FLINK-8198/FLINK-9468), I 
> propose to add ratelimiting "per Filesystem" .
> *Proposal of implementation* : Modify LimitedConnectionsFileSystem to add a 
> ratelimiter on both Input and OutputStream.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9560) RateLimiting for FileSystem

2018-06-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511355#comment-16511355
 ] 

ASF GitHub Bot commented on FLINK-9560:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6149#discussion_r195134923
  
--- Diff: docs/ops/filesystems.md ---
@@ -89,9 +89,9 @@ For example, if the default file system configured as 
`fs.default-scheme: hdfs:/
  Connection limiting
 
 You can limit the total number of connections that a file system can 
concurrently open. This is useful when the file system cannot handle a large 
number
-of concurrent reads / writes or open connections at the same time.
+of concurrent reads / writes or open connections at the same time. You can 
also limit the throughput/bandwidth used to read/write from/to the FileSystem
 
-For example, very small HDFS clusters with few RPC handlers can sometimes 
be overwhelmed by a large Flink job trying to build up many connections during 
a checkpoint.
+For example, very small HDFS clusters with few RPC handlers can sometimes 
be overwhelmed by a large Flink job trying to build up many connections during 
a checkpoint. 
--- End diff --

white space at the end?


> RateLimiting for FileSystem
> ---
>
> Key: FLINK-9560
> URL: https://issues.apache.org/jira/browse/FLINK-9560
> Project: Flink
>  Issue Type: Improvement
>  Components: FileSystem
>Affects Versions: 1.5.0
>Reporter: Etienne CARRIERE
>Priority: Major
>
> *Pain*: On our system, we see that during checkpoint , all the bandwidth is 
> take to send the checkpoint to object storage (s3 in our case)
> *Proposal* : After the creation of some limitation on Filesystem (mostly 
> number of connections with the  tickets FLINK-8125/FLINK-8198/FLINK-9468), I 
> propose to add ratelimiting "per Filesystem" .
> *Proposal of implementation* : Modify LimitedConnectionsFileSystem to add a 
> ratelimiter on both Input and OutputStream.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6149: [FLINK-9560] Add RateLimiting for FileSystem

2018-06-13 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6149#discussion_r195142699
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/core/fs/LimitedConnectionsFileSystem.java
 ---
@@ -166,12 +174,47 @@ public LimitedConnectionsFileSystem(
int maxNumOpenInputStreams,
long streamOpenTimeout,
long streamInactivityTimeout) {
+   this(originalFs, maxNumOpenStreamsTotal, 
maxNumOpenOutputStreams, maxNumOpenInputStreams, streamOpenTimeout, 
streamInactivityTimeout, 0, 0);
+   }
+
+   /**
+* Creates a new output connection limiting file system, limiting input 
and output streams with
+* potentially different quotas.
+*
+* If streams are inactive (meaning not writing bytes) for longer 
than the given timeout,
+* then they are terminated as "inactive", to prevent that the limited 
number of connections gets
+* stuck on only blocked threads.
+*
+* @param originalFs  The original file system to which 
connections are limited.
+* @param maxNumOpenStreamsTotal  The maximum number of concurrent open 
streams (0 means no limit).
+* @param maxNumOpenOutputStreams The maximum number of concurrent open 
output streams (0 means no limit).
+* @param maxNumOpenInputStreams  The maximum number of concurrent open 
input streams (0 means no limit).
+* @param streamOpenTimeout   The maximum number of milliseconds 
that the file system will wait when
+*no more connections are currently 
permitted.
+* @param streamInactivityTimeout The milliseconds that a stream may 
spend not writing any
+*bytes before it is closed as inactive.
+* @param inputBytesPerSecondRate The rate limiting of Bytes red per 
second on the FileSystem (0 means no limit)
+* @param outputBytesPerSecondRate The rate limiting of Bytes written 
per second on the FileSystem (0 means no limit)
+*/
+
+   public LimitedConnectionsFileSystem(
+   FileSystem originalFs,
+   int maxNumOpenStreamsTotal,
+   int maxNumOpenOutputStreams,
+   int maxNumOpenInputStreams,
+   long streamOpenTimeout,
+   long streamInactivityTimeout,
+   long inputBytesPerSecondRate,
+   long outputBytesPerSecondRate
+   ) {
--- End diff --

remove the newline after last argument


---


[GitHub] flink pull request #6149: [FLINK-9560] Add RateLimiting for FileSystem

2018-06-13 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6149#discussion_r195134150
  
--- Diff: docs/ops/filesystems.md ---
@@ -102,6 +102,8 @@ fs..limit.input: (number, 0/-1 mean no limit)
 fs..limit.output: (number, 0/-1 mean no limit)
 fs..limit.timeout: (milliseconds, 0 means infinite)
 fs..limit.stream-timeout: (milliseconds, 0 means infinite)
+fs..limit.rateLimitingInput: (bytes/s, 0 means infinite)
--- End diff --

`input-rate-limit`? + document  default value? Or the default value is 
visible via some automatically generated docs?


---


[GitHub] flink pull request #6149: [FLINK-9560] Add RateLimiting for FileSystem

2018-06-13 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6149#discussion_r195138248
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/core/fs/LimitedConnectionsFileSystem.java
 ---
@@ -741,6 +804,10 @@ public void write(int b) throws IOException {
public void write(byte[] b, int off, int len) throws 
IOException {
try {
originalStream.write(b, off, len);
+   if (fs.outputRateLimiter != null){
--- End diff --

With optional: `rate.ifPresent(limiter -> limiter.acquire(len));`


---


[GitHub] flink pull request #6149: [FLINK-9560] Add RateLimiting for FileSystem

2018-06-13 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6149#discussion_r195138105
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/core/fs/LimitedConnectionsFileSystem.java
 ---
@@ -731,6 +791,9 @@ public boolean checkNewBytesAndMark(long timestamp) 
throws IOException {
public void write(int b) throws IOException {
try {
originalStream.write(b);
+   if (fs.outputRateLimiter != null){
--- End diff --

With optional: `rate.ifPresent(RateLimiter::acquire);`


---


[GitHub] flink pull request #6149: [FLINK-9560] Add RateLimiting for FileSystem

2018-06-13 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6149#discussion_r195134228
  
--- Diff: docs/ops/filesystems.md ---
@@ -102,6 +102,8 @@ fs..limit.input: (number, 0/-1 mean no limit)
 fs..limit.output: (number, 0/-1 mean no limit)
 fs..limit.timeout: (milliseconds, 0 means infinite)
 fs..limit.stream-timeout: (milliseconds, 0 means infinite)
+fs..limit.rateLimitingInput: (bytes/s, 0 means infinite)
+fs..limit.rateLimitingOutput: (bytes/s, 0 means infinite)
--- End diff --

`output-rate-limit`? 


---


[GitHub] flink pull request #6149: [FLINK-9560] Add RateLimiting for FileSystem

2018-06-13 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6149#discussion_r195137063
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/core/fs/LimitedConnectionsFileSystem.java
 ---
@@ -88,6 +90,12 @@
/** The nanoseconds that a stream may spend not writing any bytes 
before it is closed as inactive. */
private final long streamInactivityTimeoutNanos;
 
+   /** Rate limiter of incoming bytes for this filesystem. */
+   private final RateLimiter inputRateLimiter;
--- End diff --

Instead of nullable field, please use `Optional` on both 
fields. annotating `@Nullable` wouldn't suffice, since unfortunately we do not 
enforce such annotation and `Optional` is always enforced.


---


[GitHub] flink pull request #6149: [FLINK-9560] Add RateLimiting for FileSystem

2018-06-13 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6149#discussion_r195136244
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/core/fs/ConnectionLimitingFactory.java
 ---
@@ -62,7 +62,9 @@ public FileSystem create(URI fsUri) throws IOException {
FileSystem original = factory.create(fsUri);
return new LimitedConnectionsFileSystem(original,
settings.limitTotal, settings.limitOutput, 
settings.limitInput,
-   settings.streamOpenTimeout, 
settings.streamInactivityTimeout);
+   settings.streamOpenTimeout, 
settings.streamInactivityTimeout,
--- End diff --

please reformat previous arguments (one per line) since you have just 
touched this one


---


[GitHub] flink pull request #6149: [FLINK-9560] Add RateLimiting for FileSystem

2018-06-13 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6149#discussion_r195134923
  
--- Diff: docs/ops/filesystems.md ---
@@ -89,9 +89,9 @@ For example, if the default file system configured as 
`fs.default-scheme: hdfs:/
  Connection limiting
 
 You can limit the total number of connections that a file system can 
concurrently open. This is useful when the file system cannot handle a large 
number
-of concurrent reads / writes or open connections at the same time.
+of concurrent reads / writes or open connections at the same time. You can 
also limit the throughput/bandwidth used to read/write from/to the FileSystem
 
-For example, very small HDFS clusters with few RPC handlers can sometimes 
be overwhelmed by a large Flink job trying to build up many connections during 
a checkpoint.
+For example, very small HDFS clusters with few RPC handlers can sometimes 
be overwhelmed by a large Flink job trying to build up many connections during 
a checkpoint. 
--- End diff --

white space at the end?


---


[jira] [Commented] (FLINK-9572) Extend InternalAppendingState with internal stored state access

2018-06-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9572?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511319#comment-16511319
 ] 

ASF GitHub Bot commented on FLINK-9572:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/6156
  
I think when all comments are addressed this is good to merge 👍 . Will 
merge it tomorrow once we have a green build.


> Extend InternalAppendingState with internal stored state access
> ---
>
> Key: FLINK-9572
> URL: https://issues.apache.org/jira/browse/FLINK-9572
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
> Fix For: 1.6.0
>
>
>  
> {code:java}
> public interface InternalAppendingState ... {
>     SV getInternal();
>     void updateInternal(SV);
> }
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9257) End-to-end tests prints "All tests PASS" even if individual test-script returns non-zero exit code

2018-06-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511317#comment-16511317
 ] 

ASF GitHub Bot commented on FLINK-9257:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/6053
  
Looks good to me, let's see what travis says.


> End-to-end tests prints "All tests PASS" even if individual test-script 
> returns non-zero exit code
> --
>
> Key: FLINK-9257
> URL: https://issues.apache.org/jira/browse/FLINK-9257
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Florian Schmidt
>Assignee: Florian Schmidt
>Priority: Critical
> Fix For: 1.6.0
>
>
> In some cases the test-suite exits with non-zero exit code but still prints 
> "All tests PASS" to stdout. This happens because how the test runner works, 
> which is roughly as follows
>  # Either run-nightly-tests.sh or run-precommit-tests.sh executes a suite of 
> tests consisting of one multiple bash scripts.
>  # As soon as one of those bash scripts exists with non-zero exit code, the 
> tests won't continue to run and the test-suite will also exit with non-zero 
> exit code.
>  # *During the cleanup hook (trap cleanup EXIT in common.sh) it will be 
> checked whether there are non-empty out files or log files with certain 
> exceptions. If a tests fails with non-zero exit code, but does not have any 
> exceptions or .out files, this will still print "All tests PASS" to stdout, 
> even though they don't*
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6156: [FLINK-9572] Extend InternalAppendingState with internal ...

2018-06-13 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/6156
  
I think when all comments are addressed this is good to merge 👍 . Will 
merge it tomorrow once we have a green build.


---


[GitHub] flink issue #6053: [FLINK-9257][E2E Tests] Fix wrong "All tests pass" messag...

2018-06-13 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/6053
  
Looks good to me, let's see what travis says.


---


[jira] [Commented] (FLINK-9572) Extend InternalAppendingState with internal stored state access

2018-06-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9572?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511315#comment-16511315
 ] 

ASF GitHub Bot commented on FLINK-9572:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6156#discussion_r195136079
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalAppendingState.java
 ---
@@ -31,4 +31,22 @@
  * @param  The type of elements in the state
  * @param  The type of the resulting element in the state
  */
-public interface InternalAppendingState extends 
InternalKvState, AppendingState {}
+public interface InternalAppendingState extends 
InternalKvState, AppendingState {
--- End diff --

I had a second thought about this and I think just adding the methods only 
in `InternalAppendingState` might be a better choice in the end, because it 
feels only required here to have a way of manipulating the internal type. Sorry 
for that :)


> Extend InternalAppendingState with internal stored state access
> ---
>
> Key: FLINK-9572
> URL: https://issues.apache.org/jira/browse/FLINK-9572
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
> Fix For: 1.6.0
>
>
>  
> {code:java}
> public interface InternalAppendingState ... {
>     SV getInternal();
>     void updateInternal(SV);
> }
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6156: [FLINK-9572] Extend InternalAppendingState with in...

2018-06-13 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6156#discussion_r195136079
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalAppendingState.java
 ---
@@ -31,4 +31,22 @@
  * @param  The type of elements in the state
  * @param  The type of the resulting element in the state
  */
-public interface InternalAppendingState extends 
InternalKvState, AppendingState {}
+public interface InternalAppendingState extends 
InternalKvState, AppendingState {
--- End diff --

I had a second thought about this and I think just adding the methods only 
in `InternalAppendingState` might be a better choice in the end, because it 
feels only required here to have a way of manipulating the internal type. Sorry 
for that :)


---


[jira] [Commented] (FLINK-9220) Table program cannot be compiled

2018-06-13 Thread Fabian Hueske (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511290#comment-16511290
 ] 

Fabian Hueske commented on FLINK-9220:
--

In my case the UDF class was not present in the classpath.

> Table program cannot be compiled
> 
>
> Key: FLINK-9220
> URL: https://issues.apache.org/jira/browse/FLINK-9220
> Project: Flink
>  Issue Type: Bug
>Reporter: Saurabh Garg
>Assignee: Timo Walther
>Priority: Major
>
> Flink job gets failed with scalar UDF. I am using Flink 1.4. Issue came with 
> Scalar UDF
> Below is the error logs:
>  
> org.apache.flink.api.common.InvalidProgramException: Table program cannot be 
> compiled. This is a bug. Please file an issue. at 
> org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36) at 
> org.apache.flink.table.runtime.CRowProcessRunner.compile(CRowProcessRunner.scala:35)
>  at 
> org.apache.flink.table.runtime.CRowProcessRunner.open(CRowProcessRunner.scala:49)
>  at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>  at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>  at 
> org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:56)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at 
> java.lang.Thread.run(Thread.java:748) Caused by: 
> org.codehaus.commons.compiler.CompileException: Line 6, Column 18: Cannot 
> determine simple type name "com" at 
> org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11672) at 
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6416) at 
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6177) at 
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6190) at 
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6190) at 
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6190) at 
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6190) at 
> org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6156) at 
> org.codehaus.janino.UnitCompiler.access$13300(UnitCompiler.java:212) at 
> org.codehaus.janino.UnitCompiler$18$1.visitReferenceType(UnitCompiler.java:6064)
>  at 
> org.codehaus.janino.UnitCompiler$18$1.visitReferenceType(UnitCompiler.java:6059)
>  at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3754) at 
> org.codehaus.janino.UnitCompiler$18.visitType(UnitCompiler.java:6059) at 
> org.codehaus.janino.UnitCompiler$18.visitType(UnitCompiler.java:6052) at 
> org.codehaus.janino.Java$ReferenceType.accept(Java.java:3753) at 
> org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6052) at 
> org.codehaus.janino.UnitCompiler.access$1200(UnitCompiler.java:212) at 
> org.codehaus.janino.UnitCompiler$21.getType(UnitCompiler.java:7844) at 
> org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6456) at 
> org.codehaus.janino.UnitCompiler.access$13800(UnitCompiler.java:212) at 
> org.codehaus.janino.UnitCompiler$18$2$1.visitFieldAccess(UnitCompiler.java:6082)
>  at 
> org.codehaus.janino.UnitCompiler$18$2$1.visitFieldAccess(UnitCompiler.java:6077)
>  at org.codehaus.janino.Java$FieldAccess.accept(Java.java:4136) at 
> org.codehaus.janino.UnitCompiler$18$2.visitLvalue(UnitCompiler.java:6077) at 
> org.codehaus.janino.UnitCompiler$18$2.visitLvalue(UnitCompiler.java:6073) at 
> org.codehaus.janino.Java$Lvalue.accept(Java.java:3974) at 
> org.codehaus.janino.UnitCompiler$18.visitRvalue(UnitCompiler.java:6073) at 
> org.codehaus.janino.UnitCompiler$18.visitRvalue(UnitCompiler.java:6052) at 
> org.codehaus.janino.Java$Rvalue.accept(Java.java:3942) at 
> org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6052) at 
> org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6438) at 
> org.codehaus.janino.UnitCompiler.access$13600(UnitCompiler.java:212) at 
> org.codehaus.janino.UnitCompiler$18$2$1.visitAmbiguousName(UnitCompiler.java:6080)
>  at 
> org.codehaus.janino.UnitCompiler$18$2$1.visitAmbiguousName(UnitCompiler.java:6077)
>  at org.codehaus.janino.Java$AmbiguousName.accept(Java.java:4050) at 
> org.codehaus.janino.UnitCompiler$18$2.visitLvalue(UnitCompiler.java:6077) at 
> org.codehaus.janino.UnitCompiler$18$2.visitLvalue(UnitCompiler.java:6073) at 
> org.codehaus.janino.Java$Lvalue.accept(Java.java:3974) at 
> org.codehaus.janino.UnitCompiler$18.visitRvalue(UnitCompiler.java:6073) at 
> org.codehaus.janino.UnitCompiler$18.visitRvalue(UnitCompiler.java:6052) at 
> org.codehaus.janino.Java$Rval

[jira] [Created] (FLINK-9579) Remove unnecessary check with cep elementQueue

2018-06-13 Thread aitozi (JIRA)
aitozi created FLINK-9579:
-

 Summary: Remove unnecessary check with cep elementQueue
 Key: FLINK-9579
 URL: https://issues.apache.org/jira/browse/FLINK-9579
 Project: Flink
  Issue Type: Improvement
  Components: CEP
Affects Versions: 1.5.0
Reporter: aitozi
Assignee: aitozi


when deal with eventtime, the elementQueueState is cleared when 
sortedTimestamps isEmpty, but I think this operation is not needed because the 
elements in elementQueueState are all removed if the sortedTimestamps isEmpty, 
and do not need to clear again to spend time on rocksdb operation? what's your 
idea [~dawidwys]?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8067) User code ClassLoader not set before calling ProcessingTimeCallback

2018-06-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511288#comment-16511288
 ] 

ASF GitHub Bot commented on FLINK-8067:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6081#discussion_r195125463
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 ---
@@ -806,6 +811,44 @@ public void testOperatorClosingBeforeStopRunning() 
throws Throwable {
}
}
 
+   /**
+* Test set user code ClassLoader before calling ProcessingTimeCallback.
+*/
+   @Test
+   public void testSetsUserCodeClassLoaderForTimerThreadFactory() throws 
Throwable {
+   syncLatch = new OneShotLatch();
+
+   try (MockEnvironment mockEnvironment =
+   new MockEnvironmentBuilder()
+   .setUserCodeClassLoader(new 
TestUserCodeClassLoader())
+   .build()) {
+   TimeServiceTask timerServiceTask = new 
TimeServiceTask(mockEnvironment);
+
+   final AtomicReference atomicThrowable = new 
AtomicReference<>(null);
+
+   CompletableFuture invokeFuture = 
CompletableFuture.runAsync(
+   () -> {
+   try {
+   timerServiceTask.invoke();
+   } catch (Exception e) {
+   atomicThrowable.set(e);
--- End diff --

you can fail this with a `CompletionException` instead, then we don't need 
the atomic reference and will fail at `invokeFuture.get`


> User code ClassLoader not set before calling ProcessingTimeCallback
> ---
>
> Key: FLINK-8067
> URL: https://issues.apache.org/jira/browse/FLINK-8067
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.4.0
>Reporter: Gary Yao
>Assignee: vinoyang
>Priority: Minor
> Fix For: 1.6.0, 1.5.1
>
>
> The user code ClassLoader is not set as the context ClassLoader for the 
> thread invoking {{ProcessingTimeCallback#onProcessingTime(long timestamp)}}:
> https://github.com/apache/flink/blob/84a07a34ac22af14f2dd0319447ca5f45de6d0bb/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L222
> This is problematic, for example, if user code dynamically loads classes in 
> {{ProcessFunction#onTimer(long timestamp, OnTimerContext ctx, Collector 
> out)}} using the current thread's context ClassLoader (also see FLINK-8005).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6081: [FLINK-8067] User code ClassLoader not set before ...

2018-06-13 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6081#discussion_r195125463
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 ---
@@ -806,6 +811,44 @@ public void testOperatorClosingBeforeStopRunning() 
throws Throwable {
}
}
 
+   /**
+* Test set user code ClassLoader before calling ProcessingTimeCallback.
+*/
+   @Test
+   public void testSetsUserCodeClassLoaderForTimerThreadFactory() throws 
Throwable {
+   syncLatch = new OneShotLatch();
+
+   try (MockEnvironment mockEnvironment =
+   new MockEnvironmentBuilder()
+   .setUserCodeClassLoader(new 
TestUserCodeClassLoader())
+   .build()) {
+   TimeServiceTask timerServiceTask = new 
TimeServiceTask(mockEnvironment);
+
+   final AtomicReference atomicThrowable = new 
AtomicReference<>(null);
+
+   CompletableFuture invokeFuture = 
CompletableFuture.runAsync(
+   () -> {
+   try {
+   timerServiceTask.invoke();
+   } catch (Exception e) {
+   atomicThrowable.set(e);
--- End diff --

you can fail this with a `CompletionException` instead, then we don't need 
the atomic reference and will fail at `invokeFuture.get`


---


[jira] [Commented] (FLINK-8067) User code ClassLoader not set before calling ProcessingTimeCallback

2018-06-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511285#comment-16511285
 ] 

ASF GitHub Bot commented on FLINK-8067:
---

Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6081#discussion_r195124501
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 ---
@@ -806,6 +811,44 @@ public void testOperatorClosingBeforeStopRunning() 
throws Throwable {
}
}
 
+   /**
+* Test set user code ClassLoader before calling ProcessingTimeCallback.
+*/
+   @Test
+   public void testSetsUserCodeClassLoaderForTimerThreadFactory() throws 
Throwable {
+   syncLatch = new OneShotLatch();
+
+   try (MockEnvironment mockEnvironment =
+   new MockEnvironmentBuilder()
+   .setUserCodeClassLoader(new 
TestUserCodeClassLoader())
+   .build()) {
+   TimeServiceTask timerServiceTask = new 
TimeServiceTask(mockEnvironment);
+
+   final AtomicReference atomicThrowable = new 
AtomicReference<>(null);
+
+   CompletableFuture invokeFuture = 
CompletableFuture.runAsync(
+   () -> {
+   try {
+   timerServiceTask.invoke();
+   } catch (Exception e) {
+   atomicThrowable.set(e);
+   }
+   },
+   TestingUtils.defaultExecutor());
+
+   // wait until the invoke is complete
+   invokeFuture.get();
+
+   assertThat(timerServiceTask.getClassLoaders(), 
hasSize(greaterThanOrEqualTo(1)));
+   assertThat(timerServiceTask.getClassLoaders(), 
everyItem(instanceOf(TestUserCodeClassLoader.class)));
+
+   // check if an exception occurred
--- End diff --

hold on, will fix it~


> User code ClassLoader not set before calling ProcessingTimeCallback
> ---
>
> Key: FLINK-8067
> URL: https://issues.apache.org/jira/browse/FLINK-8067
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.4.0
>Reporter: Gary Yao
>Assignee: vinoyang
>Priority: Minor
> Fix For: 1.6.0, 1.5.1
>
>
> The user code ClassLoader is not set as the context ClassLoader for the 
> thread invoking {{ProcessingTimeCallback#onProcessingTime(long timestamp)}}:
> https://github.com/apache/flink/blob/84a07a34ac22af14f2dd0319447ca5f45de6d0bb/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L222
> This is problematic, for example, if user code dynamically loads classes in 
> {{ProcessFunction#onTimer(long timestamp, OnTimerContext ctx, Collector 
> out)}} using the current thread's context ClassLoader (also see FLINK-8005).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6081: [FLINK-8067] User code ClassLoader not set before ...

2018-06-13 Thread yanghua
Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6081#discussion_r195124501
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 ---
@@ -806,6 +811,44 @@ public void testOperatorClosingBeforeStopRunning() 
throws Throwable {
}
}
 
+   /**
+* Test set user code ClassLoader before calling ProcessingTimeCallback.
+*/
+   @Test
+   public void testSetsUserCodeClassLoaderForTimerThreadFactory() throws 
Throwable {
+   syncLatch = new OneShotLatch();
+
+   try (MockEnvironment mockEnvironment =
+   new MockEnvironmentBuilder()
+   .setUserCodeClassLoader(new 
TestUserCodeClassLoader())
+   .build()) {
+   TimeServiceTask timerServiceTask = new 
TimeServiceTask(mockEnvironment);
+
+   final AtomicReference atomicThrowable = new 
AtomicReference<>(null);
+
+   CompletableFuture invokeFuture = 
CompletableFuture.runAsync(
+   () -> {
+   try {
+   timerServiceTask.invoke();
+   } catch (Exception e) {
+   atomicThrowable.set(e);
+   }
+   },
+   TestingUtils.defaultExecutor());
+
+   // wait until the invoke is complete
+   invokeFuture.get();
+
+   assertThat(timerServiceTask.getClassLoaders(), 
hasSize(greaterThanOrEqualTo(1)));
+   assertThat(timerServiceTask.getClassLoaders(), 
everyItem(instanceOf(TestUserCodeClassLoader.class)));
+
+   // check if an exception occurred
--- End diff --

hold on, will fix it~


---


[jira] [Commented] (FLINK-8067) User code ClassLoader not set before calling ProcessingTimeCallback

2018-06-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511283#comment-16511283
 ] 

ASF GitHub Bot commented on FLINK-8067:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6081#discussion_r195123173
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 ---
@@ -806,6 +811,44 @@ public void testOperatorClosingBeforeStopRunning() 
throws Throwable {
}
}
 
+   /**
+* Test set user code ClassLoader before calling ProcessingTimeCallback.
+*/
+   @Test
+   public void testSetsUserCodeClassLoaderForTimerThreadFactory() throws 
Throwable {
+   syncLatch = new OneShotLatch();
+
+   try (MockEnvironment mockEnvironment =
+   new MockEnvironmentBuilder()
+   .setUserCodeClassLoader(new 
TestUserCodeClassLoader())
+   .build()) {
+   TimeServiceTask timerServiceTask = new 
TimeServiceTask(mockEnvironment);
+
+   final AtomicReference atomicThrowable = new 
AtomicReference<>(null);
+
+   CompletableFuture invokeFuture = 
CompletableFuture.runAsync(
+   () -> {
+   try {
+   timerServiceTask.invoke();
+   } catch (Exception e) {
+   atomicThrowable.set(e);
+   }
+   },
+   TestingUtils.defaultExecutor());
+
+   // wait until the invoke is complete
+   invokeFuture.get();
+
+   assertThat(timerServiceTask.getClassLoaders(), 
hasSize(greaterThanOrEqualTo(1)));
+   assertThat(timerServiceTask.getClassLoaders(), 
everyItem(instanceOf(TestUserCodeClassLoader.class)));
+
+   // check if an exception occurred
--- End diff --

tiny nits: you could drop this (and the one above) comment. The code is 
pretty self explanatory on it's own.


> User code ClassLoader not set before calling ProcessingTimeCallback
> ---
>
> Key: FLINK-8067
> URL: https://issues.apache.org/jira/browse/FLINK-8067
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.4.0
>Reporter: Gary Yao
>Assignee: vinoyang
>Priority: Minor
> Fix For: 1.6.0, 1.5.1
>
>
> The user code ClassLoader is not set as the context ClassLoader for the 
> thread invoking {{ProcessingTimeCallback#onProcessingTime(long timestamp)}}:
> https://github.com/apache/flink/blob/84a07a34ac22af14f2dd0319447ca5f45de6d0bb/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L222
> This is problematic, for example, if user code dynamically loads classes in 
> {{ProcessFunction#onTimer(long timestamp, OnTimerContext ctx, Collector 
> out)}} using the current thread's context ClassLoader (also see FLINK-8005).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6081: [FLINK-8067] User code ClassLoader not set before ...

2018-06-13 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6081#discussion_r195123173
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 ---
@@ -806,6 +811,44 @@ public void testOperatorClosingBeforeStopRunning() 
throws Throwable {
}
}
 
+   /**
+* Test set user code ClassLoader before calling ProcessingTimeCallback.
+*/
+   @Test
+   public void testSetsUserCodeClassLoaderForTimerThreadFactory() throws 
Throwable {
+   syncLatch = new OneShotLatch();
+
+   try (MockEnvironment mockEnvironment =
+   new MockEnvironmentBuilder()
+   .setUserCodeClassLoader(new 
TestUserCodeClassLoader())
+   .build()) {
+   TimeServiceTask timerServiceTask = new 
TimeServiceTask(mockEnvironment);
+
+   final AtomicReference atomicThrowable = new 
AtomicReference<>(null);
+
+   CompletableFuture invokeFuture = 
CompletableFuture.runAsync(
+   () -> {
+   try {
+   timerServiceTask.invoke();
+   } catch (Exception e) {
+   atomicThrowable.set(e);
+   }
+   },
+   TestingUtils.defaultExecutor());
+
+   // wait until the invoke is complete
+   invokeFuture.get();
+
+   assertThat(timerServiceTask.getClassLoaders(), 
hasSize(greaterThanOrEqualTo(1)));
+   assertThat(timerServiceTask.getClassLoaders(), 
everyItem(instanceOf(TestUserCodeClassLoader.class)));
+
+   // check if an exception occurred
--- End diff --

tiny nits: you could drop this (and the one above) comment. The code is 
pretty self explanatory on it's own.


---


[jira] [Commented] (FLINK-9572) Extend InternalAppendingState with internal stored state access

2018-06-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9572?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511276#comment-16511276
 ] 

ASF GitHub Bot commented on FLINK-9572:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6156#discussion_r195120961
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalAppendingState.java
 ---
@@ -22,7 +22,7 @@
 
 /**
  * The peer to the {@link AppendingState} in the internal state type 
hierarchy.
- * 
+ *
--- End diff --

Please revert this.


> Extend InternalAppendingState with internal stored state access
> ---
>
> Key: FLINK-9572
> URL: https://issues.apache.org/jira/browse/FLINK-9572
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
> Fix For: 1.6.0
>
>
>  
> {code:java}
> public interface InternalAppendingState ... {
>     SV getInternal();
>     void updateInternal(SV);
> }
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6156: [FLINK-9572] Extend InternalAppendingState with in...

2018-06-13 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6156#discussion_r195120961
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalAppendingState.java
 ---
@@ -22,7 +22,7 @@
 
 /**
  * The peer to the {@link AppendingState} in the internal state type 
hierarchy.
- * 
+ *
--- End diff --

Please revert this.


---


[jira] [Commented] (FLINK-9578) Allow to define an auto watermark interval in SQL Client

2018-06-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511274#comment-16511274
 ] 

ASF GitHub Bot commented on FLINK-9578:
---

GitHub user twalthr opened a pull request:

https://github.com/apache/flink/pull/6160

[FLINK-9578] [sql-client] Allow to define an auto watermark interval in SQL 
Client

## What is the purpose of the change

This PR allows for setting the auto watermark interval in a non-programatic 
way for the SQL Client.


## Brief change log

- Introduction of a property `periodic-watermarks-interval`


## Verifying this change

- Existing tests adapted and new ExecutionContextTest added

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
  - The S3 file system connector: no

## Documentation

  - Does this pull request introduce a new feature? yes
  - If yes, how is the feature documented? docs


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/twalthr/flink FLINK-9578

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6160.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6160


commit cebfefc33b25dd090cc9bf5bd4130333f9a0ff83
Author: Timo Walther 
Date:   2018-06-13T14:49:00Z

[FLINK-9578] [sql-client] Allow to define an auto watermark interval in SQL 
Client




> Allow to define an auto watermark interval in SQL Client
> 
>
> Key: FLINK-9578
> URL: https://issues.apache.org/jira/browse/FLINK-9578
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.5.0
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> Currently it is not possible to define an auto watermark interval in a 
> non-programmatic way for the SQL Client.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6160: [FLINK-9578] [sql-client] Allow to define an auto ...

2018-06-13 Thread twalthr
GitHub user twalthr opened a pull request:

https://github.com/apache/flink/pull/6160

[FLINK-9578] [sql-client] Allow to define an auto watermark interval in SQL 
Client

## What is the purpose of the change

This PR allows for setting the auto watermark interval in a non-programatic 
way for the SQL Client.


## Brief change log

- Introduction of a property `periodic-watermarks-interval`


## Verifying this change

- Existing tests adapted and new ExecutionContextTest added

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
  - The S3 file system connector: no

## Documentation

  - Does this pull request introduce a new feature? yes
  - If yes, how is the feature documented? docs


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/twalthr/flink FLINK-9578

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6160.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6160


commit cebfefc33b25dd090cc9bf5bd4130333f9a0ff83
Author: Timo Walther 
Date:   2018-06-13T14:49:00Z

[FLINK-9578] [sql-client] Allow to define an auto watermark interval in SQL 
Client




---


[jira] [Commented] (FLINK-9572) Extend InternalAppendingState with internal stored state access

2018-06-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9572?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511269#comment-16511269
 ] 

ASF GitHub Bot commented on FLINK-9572:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6156#discussion_r195118823
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java
 ---
@@ -30,7 +30,7 @@
  * @param  The type of the value.
  */
 public class HeapValueState
-   extends AbstractHeapState>
+   extends AbstractHeapState
--- End diff --

The same also holds for `HeapMapState` etc.


> Extend InternalAppendingState with internal stored state access
> ---
>
> Key: FLINK-9572
> URL: https://issues.apache.org/jira/browse/FLINK-9572
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
> Fix For: 1.6.0
>
>
>  
> {code:java}
> public interface InternalAppendingState ... {
>     SV getInternal();
>     void updateInternal(SV);
> }
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8067) User code ClassLoader not set before calling ProcessingTimeCallback

2018-06-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511270#comment-16511270
 ] 

ASF GitHub Bot commented on FLINK-8067:
---

Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6081
  
@pnowojski  test error has fixed~


> User code ClassLoader not set before calling ProcessingTimeCallback
> ---
>
> Key: FLINK-8067
> URL: https://issues.apache.org/jira/browse/FLINK-8067
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.4.0
>Reporter: Gary Yao
>Assignee: vinoyang
>Priority: Minor
> Fix For: 1.6.0, 1.5.1
>
>
> The user code ClassLoader is not set as the context ClassLoader for the 
> thread invoking {{ProcessingTimeCallback#onProcessingTime(long timestamp)}}:
> https://github.com/apache/flink/blob/84a07a34ac22af14f2dd0319447ca5f45de6d0bb/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L222
> This is problematic, for example, if user code dynamically loads classes in 
> {{ProcessFunction#onTimer(long timestamp, OnTimerContext ctx, Collector 
> out)}} using the current thread's context ClassLoader (also see FLINK-8005).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6983) Do not serialize States with NFA

2018-06-13 Thread Dawid Wysakowicz (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-6983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511268#comment-16511268
 ] 

Dawid Wysakowicz commented on FLINK-6983:
-

Hi [~dian.fu] I think the goal of this ticket was implemented in  FLINK-8725, 
so I will close this issue. If you think otherwise please reopen it.

> Do not serialize States with NFA
> 
>
> Key: FLINK-6983
> URL: https://issues.apache.org/jira/browse/FLINK-6983
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Reporter: Dawid Wysakowicz
>Assignee: Dian Fu
>Priority: Major
> Fix For: 1.6.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-6983) Do not serialize States with NFA

2018-06-13 Thread Dawid Wysakowicz (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-6983?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dawid Wysakowicz resolved FLINK-6983.
-
   Resolution: Fixed
Fix Version/s: 1.6.0

> Do not serialize States with NFA
> 
>
> Key: FLINK-6983
> URL: https://issues.apache.org/jira/browse/FLINK-6983
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Reporter: Dawid Wysakowicz
>Assignee: Dian Fu
>Priority: Major
> Fix For: 1.6.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9572) Extend InternalAppendingState with internal stored state access

2018-06-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9572?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511267#comment-16511267
 ] 

ASF GitHub Bot commented on FLINK-9572:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6156#discussion_r195118305
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java
 ---
@@ -30,7 +30,7 @@
  * @param  The type of the value.
  */
 public class HeapValueState
-   extends AbstractHeapState>
+   extends AbstractHeapState
--- End diff --

You could consider to also use the new `getInternal()` and 
`updateInternal()` methods insider the methods of this class as well to replace 
direct calls to the `stateTable`.


> Extend InternalAppendingState with internal stored state access
> ---
>
> Key: FLINK-9572
> URL: https://issues.apache.org/jira/browse/FLINK-9572
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
> Fix For: 1.6.0
>
>
>  
> {code:java}
> public interface InternalAppendingState ... {
>     SV getInternal();
>     void updateInternal(SV);
> }
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6081: [FLINK-8067] User code ClassLoader not set before calling...

2018-06-13 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6081
  
@pnowojski  test error has fixed~


---


[GitHub] flink pull request #6156: [FLINK-9572] Extend InternalAppendingState with in...

2018-06-13 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6156#discussion_r195118823
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java
 ---
@@ -30,7 +30,7 @@
  * @param  The type of the value.
  */
 public class HeapValueState
-   extends AbstractHeapState>
+   extends AbstractHeapState
--- End diff --

The same also holds for `HeapMapState` etc.


---


[GitHub] flink pull request #6156: [FLINK-9572] Extend InternalAppendingState with in...

2018-06-13 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6156#discussion_r195118305
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java
 ---
@@ -30,7 +30,7 @@
  * @param  The type of the value.
  */
 public class HeapValueState
-   extends AbstractHeapState>
+   extends AbstractHeapState
--- End diff --

You could consider to also use the new `getInternal()` and 
`updateInternal()` methods insider the methods of this class as well to replace 
direct calls to the `stateTable`.


---


[jira] [Resolved] (FLINK-6939) Not store IterativeCondition with NFA state

2018-06-13 Thread Dawid Wysakowicz (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-6939?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dawid Wysakowicz resolved FLINK-6939.
-
Resolution: Fixed

> Not store IterativeCondition with NFA state
> ---
>
> Key: FLINK-6939
> URL: https://issues.apache.org/jira/browse/FLINK-6939
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Jark Wu
>Assignee: Jark Wu
>Priority: Major
> Fix For: 1.6.0
>
>
> Currently, the IterativeCondition is stored with the total NFA state. And 
> de/serialized every time when update/get the NFA state. It is a heavy 
> operation and not necessary. In addition it is a required feature for 
> FLINK-6938.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-8725) Separate NFA-state from NFA in CEP library

2018-06-13 Thread Dawid Wysakowicz (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-8725?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dawid Wysakowicz resolved FLINK-8725.
-
Resolution: Fixed

> Separate NFA-state from NFA in CEP library
> --
>
> Key: FLINK-8725
> URL: https://issues.apache.org/jira/browse/FLINK-8725
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Major
> Fix For: 1.6.0
>
>
> The CEP library currently serialises the static parts of the NFA in the state 
> for each key. This is wasteful, because that part is static and problematic 
> because the static part can contain user code in the form of filter functions.
>  
> We should only serialise the dynamic state of the NFA (current states, seen 
> elements).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6939) Not store IterativeCondition with NFA state

2018-06-13 Thread Dawid Wysakowicz (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-6939?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511266#comment-16511266
 ] 

Dawid Wysakowicz commented on FLINK-6939:
-

Hi [~jark] I will close this issue, as I think the goal of this ticket was 
implemented in FLINK-8725. Please reopen if you think otherwise.

> Not store IterativeCondition with NFA state
> ---
>
> Key: FLINK-6939
> URL: https://issues.apache.org/jira/browse/FLINK-6939
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Jark Wu
>Assignee: Jark Wu
>Priority: Major
> Fix For: 1.6.0
>
>
> Currently, the IterativeCondition is stored with the total NFA state. And 
> de/serialized every time when update/get the NFA state. It is a heavy 
> operation and not necessary. In addition it is a required feature for 
> FLINK-6938.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8725) Separate NFA-state from NFA in CEP library

2018-06-13 Thread Dawid Wysakowicz (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-8725?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dawid Wysakowicz updated FLINK-8725:

Fix Version/s: 1.6.0

> Separate NFA-state from NFA in CEP library
> --
>
> Key: FLINK-8725
> URL: https://issues.apache.org/jira/browse/FLINK-8725
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Major
> Fix For: 1.6.0
>
>
> The CEP library currently serialises the static parts of the NFA in the state 
> for each key. This is wasteful, because that part is static and problematic 
> because the static part can contain user code in the form of filter functions.
>  
> We should only serialise the dynamic state of the NFA (current states, seen 
> elements).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9521) Add shade plugin executions to package table example jar

2018-06-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511262#comment-16511262
 ] 

ASF GitHub Bot commented on FLINK-9521:
---

Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6123
  
cc @dawidwys can you review this?


> Add shade plugin executions to package table example jar
> 
>
> Key: FLINK-9521
> URL: https://issues.apache.org/jira/browse/FLINK-9521
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.5.0, 1.6.0
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Minor
> Fix For: 1.6.0, 1.5.1
>
>
> this is a preparatory work for issue FLINK-9519, so that we can get those 
> examples' fat jar and then move them to example dir like batch and streaming. 
> Because, there is no table examples in flink binary package.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-6939) Not store IterativeCondition with NFA state

2018-06-13 Thread Dawid Wysakowicz (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-6939?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dawid Wysakowicz updated FLINK-6939:

Fix Version/s: 1.6.0

> Not store IterativeCondition with NFA state
> ---
>
> Key: FLINK-6939
> URL: https://issues.apache.org/jira/browse/FLINK-6939
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Jark Wu
>Assignee: Jark Wu
>Priority: Major
> Fix For: 1.6.0
>
>
> Currently, the IterativeCondition is stored with the total NFA state. And 
> de/serialized every time when update/get the NFA state. It is a heavy 
> operation and not necessary. In addition it is a required feature for 
> FLINK-6938.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6123: [FLINK-9521] Add shade plugin executions to package table...

2018-06-13 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6123
  
cc @dawidwys can you review this?


---


[jira] [Commented] (FLINK-9572) Extend InternalAppendingState with internal stored state access

2018-06-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9572?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511261#comment-16511261
 ] 

ASF GitHub Bot commented on FLINK-9572:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6156#discussion_r195116930
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractStateTableSnapshot.java
 ---
@@ -48,4 +48,4 @@
@Override
public void release() {
}
-}
\ No newline at end of file
+}
--- End diff --

Please revert, because it produces change in an unrelated class.


> Extend InternalAppendingState with internal stored state access
> ---
>
> Key: FLINK-9572
> URL: https://issues.apache.org/jira/browse/FLINK-9572
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
> Fix For: 1.6.0
>
>
>  
> {code:java}
> public interface InternalAppendingState ... {
>     SV getInternal();
>     void updateInternal(SV);
> }
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-9418) Migrate SharedBuffer to use MapState

2018-06-13 Thread Dawid Wysakowicz (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9418?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dawid Wysakowicz resolved FLINK-9418.
-
Resolution: Fixed

> Migrate SharedBuffer to use MapState
> 
>
> Key: FLINK-9418
> URL: https://issues.apache.org/jira/browse/FLINK-9418
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
> Fix For: 1.6.0
>
>
> Right now {{SharedBuffer}} is implemented with java Collections and the whole 
> buffer is deserialized on each access. We should migrate it to MapState, so 
> that only the necessary parts (e.g. tail entries) are deserialized.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9538) Make KeyedStateFunction an interface

2018-06-13 Thread Dawid Wysakowicz (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9538?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dawid Wysakowicz updated FLINK-9538:

Fix Version/s: 1.6.0

> Make KeyedStateFunction an interface
> 
>
> Key: FLINK-9538
> URL: https://issues.apache.org/jira/browse/FLINK-9538
> Project: Flink
>  Issue Type: Improvement
>Reporter: Dawid Wysakowicz
>Assignee: vinoyang
>Priority: Major
> Fix For: 1.6.0
>
>
> I suggest to change the KeyedStateFunction from abstract class to interface 
> (FunctionalInterface in particular) to enable passing lambdas.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6156: [FLINK-9572] Extend InternalAppendingState with in...

2018-06-13 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6156#discussion_r195116930
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractStateTableSnapshot.java
 ---
@@ -48,4 +48,4 @@
@Override
public void release() {
}
-}
\ No newline at end of file
+}
--- End diff --

Please revert, because it produces change in an unrelated class.


---


[jira] [Resolved] (FLINK-9538) Make KeyedStateFunction an interface

2018-06-13 Thread Dawid Wysakowicz (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9538?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dawid Wysakowicz resolved FLINK-9538.
-
Resolution: Fixed

> Make KeyedStateFunction an interface
> 
>
> Key: FLINK-9538
> URL: https://issues.apache.org/jira/browse/FLINK-9538
> Project: Flink
>  Issue Type: Improvement
>Reporter: Dawid Wysakowicz
>Assignee: vinoyang
>Priority: Major
>
> I suggest to change the KeyedStateFunction from abstract class to interface 
> (FunctionalInterface in particular) to enable passing lambdas.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9572) Extend InternalAppendingState with internal stored state access

2018-06-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9572?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511259#comment-16511259
 ] 

ASF GitHub Bot commented on FLINK-9572:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6156#discussion_r195116411
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapMergingState.java
 ---
@@ -34,12 +33,10 @@
  * @param  The type of the input elements.
  * @param  The type of the values in the state.
  * @param  The type of the output elements.
- * @param  The type of State
  */
-public abstract class AbstractHeapMergingState
-   extends AbstractHeapState
-   implements InternalMergingState {
-
+public abstract class AbstractHeapMergingState
+   extends AbstractHeapState
+   implements InternalMergingState, 
org.apache.flink.runtime.state.internal.InternalAppendingState {
--- End diff --

I would make this a regular import.


> Extend InternalAppendingState with internal stored state access
> ---
>
> Key: FLINK-9572
> URL: https://issues.apache.org/jira/browse/FLINK-9572
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
> Fix For: 1.6.0
>
>
>  
> {code:java}
> public interface InternalAppendingState ... {
>     SV getInternal();
>     void updateInternal(SV);
> }
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6156: [FLINK-9572] Extend InternalAppendingState with in...

2018-06-13 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6156#discussion_r195116411
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapMergingState.java
 ---
@@ -34,12 +33,10 @@
  * @param  The type of the input elements.
  * @param  The type of the values in the state.
  * @param  The type of the output elements.
- * @param  The type of State
  */
-public abstract class AbstractHeapMergingState
-   extends AbstractHeapState
-   implements InternalMergingState {
-
+public abstract class AbstractHeapMergingState
+   extends AbstractHeapState
+   implements InternalMergingState, 
org.apache.flink.runtime.state.internal.InternalAppendingState {
--- End diff --

I would make this a regular import.


---


[jira] [Commented] (FLINK-9538) Make KeyedStateFunction an interface

2018-06-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511258#comment-16511258
 ] 

ASF GitHub Bot commented on FLINK-9538:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6134


> Make KeyedStateFunction an interface
> 
>
> Key: FLINK-9538
> URL: https://issues.apache.org/jira/browse/FLINK-9538
> Project: Flink
>  Issue Type: Improvement
>Reporter: Dawid Wysakowicz
>Assignee: vinoyang
>Priority: Major
>
> I suggest to change the KeyedStateFunction from abstract class to interface 
> (FunctionalInterface in particular) to enable passing lambdas.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6134: [FLINK-9538] Make KeyedStateFunction an interface

2018-06-13 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6134


---


[GitHub] flink pull request #6059: [Flink-9418] Migrate SharedBuffer to use MapState

2018-06-13 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6059


---


[GitHub] flink pull request #5960: [Flink-8725] Separate state from NFA in CEP librar...

2018-06-13 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5960


---


[jira] [Assigned] (FLINK-9577) Divide-by-zero in PageRank

2018-06-13 Thread vinoyang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9577?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang reassigned FLINK-9577:
---

Assignee: vinoyang

> Divide-by-zero in PageRank
> --
>
> Key: FLINK-9577
> URL: https://issues.apache.org/jira/browse/FLINK-9577
> Project: Flink
>  Issue Type: Bug
>  Components: Gelly
>Affects Versions: 1.4.0, 1.5.0, 1.6.0
>Reporter: Chesnay Schepler
>Assignee: vinoyang
>Priority: Major
>
> {code}
> // org.apache.flink.graph.library.linkanalysis.PageRank#AdjustScores#open
> this.vertexCount = vertexCountIterator.hasNext() ? 
> vertexCountIterator.next().getValue() : 0;
> this.uniformlyDistributedScore = ((1 - dampingFactor) + dampingFactor * 
> sumOfSinks) / this.vertexCount;
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9572) Extend InternalAppendingState with internal stored state access

2018-06-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9572?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511252#comment-16511252
 ] 

ASF GitHub Bot commented on FLINK-9572:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6156#discussion_r195114015
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
 ---
@@ -94,6 +97,15 @@ public RocksDBListState(
return valueSerializer;
}
 
+   @Override
+   public byte[] getSerializedValue(
--- End diff --

I don't think this overriding makes sense.


> Extend InternalAppendingState with internal stored state access
> ---
>
> Key: FLINK-9572
> URL: https://issues.apache.org/jira/browse/FLINK-9572
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
> Fix For: 1.6.0
>
>
>  
> {code:java}
> public interface InternalAppendingState ... {
>     SV getInternal();
>     void updateInternal(SV);
> }
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6156: [FLINK-9572] Extend InternalAppendingState with in...

2018-06-13 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6156#discussion_r195114015
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
 ---
@@ -94,6 +97,15 @@ public RocksDBListState(
return valueSerializer;
}
 
+   @Override
+   public byte[] getSerializedValue(
--- End diff --

I don't think this overriding makes sense.


---


[jira] [Commented] (FLINK-9572) Extend InternalAppendingState with internal stored state access

2018-06-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9572?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511250#comment-16511250
 ] 

ASF GitHub Bot commented on FLINK-9572:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6156#discussion_r195111820
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
 ---
@@ -237,4 +249,20 @@ public void addAll(List values) throws Exception {
 
return keySerializationStream.toByteArray();
}
+
+   @Override
+   public List getInternal() {
+   Iterable list = get();
+   if (list == null) {
+   return null;
+   }
+   List collected = new ArrayList<>();
--- End diff --

We could currently also safe the whole repacking if we change the signature 
of `Iterable get()` in this class to return ``List``.
However, I think in the long run it might be worth considering to have this 
class be based on `Iterable` instead of `List` because we essentially only use 
iterable semantics. @aljoscha what do you think?


> Extend InternalAppendingState with internal stored state access
> ---
>
> Key: FLINK-9572
> URL: https://issues.apache.org/jira/browse/FLINK-9572
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
> Fix For: 1.6.0
>
>
>  
> {code:java}
> public interface InternalAppendingState ... {
>     SV getInternal();
>     void updateInternal(SV);
> }
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6156: [FLINK-9572] Extend InternalAppendingState with in...

2018-06-13 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6156#discussion_r195111820
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
 ---
@@ -237,4 +249,20 @@ public void addAll(List values) throws Exception {
 
return keySerializationStream.toByteArray();
}
+
+   @Override
+   public List getInternal() {
+   Iterable list = get();
+   if (list == null) {
+   return null;
+   }
+   List collected = new ArrayList<>();
--- End diff --

We could currently also safe the whole repacking if we change the signature 
of `Iterable get()` in this class to return ``List``.
However, I think in the long run it might be worth considering to have this 
class be based on `Iterable` instead of `List` because we essentially only use 
iterable semantics. @aljoscha what do you think?


---


[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-06-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511248#comment-16511248
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6147#discussion_r19537
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
 ---
@@ -65,7 +86,96 @@ public JobSubmitHandler(
e);
}
 
-   return gateway.submitJob(jobGraph, timeout)
+   updateJarEntriesInJobGraph(jobGraph, 
requestBody.getUploadedJars(), log);
+   updateUserArtifactEntriesInJobGraph(jobGraph, 
requestBody.getUploadedArtifacts(), log);
+
+   CompletableFuture blobServerPortFuture = 
gateway.getBlobServerPort(timeout);
+
+   CompletableFuture jobGraphFuture = 
blobServerPortFuture.thenApply(blobServerPort -> {
+   final InetSocketAddress address = new 
InetSocketAddress(getDispatcherHost(gateway), blobServerPort);
+   final List keys;
+   try {
+   keys = BlobClient.uploadFiles(address, config, 
jobGraph.getJobID(), jobGraph.getUserJars());
+   jobGraph.uploadUserArtifacts(address, config);
+   } catch (IOException ioe) {
+   log.error("Could not upload job jar files.", 
ioe);
+   throw new CompletionException(new 
RestHandlerException("Could not upload job jar files.", 
HttpResponseStatus.INTERNAL_SERVER_ERROR));
+   }
+
+   for (PermanentBlobKey key : keys) {
+   jobGraph.addUserJarBlobKey(key);
+   }
+
+   return jobGraph;
+   });
+
+   CompletableFuture submissionFuture = 
jobGraphFuture
+   .thenCompose(finalizedJobGraph -> 
gateway.submitJob(jobGraph, timeout))
.thenApply(ack -> new JobSubmitResponseBody("/jobs/" + 
jobGraph.getJobID()));
+
+   CompletableFuture submissionCleanupFuture = 
submissionFuture.thenRun(requestBody::cleanup);
+
+   return submissionFuture.thenCombine(submissionCleanupFuture, 
(responseBody, ignored) -> responseBody);
+   }
+
+   /**
+* Updates the jar entries in the given JobGraph to refer to the 
uploaded jar files instead of client-local files.
+*/
+   private static void updateJarEntriesInJobGraph(JobGraph jobGraph, 
Collection uploadedJars, Logger log) {
--- End diff --

correct, this field is used in `JobGraph#uploadUserJars` to upload the jars 
to the blob-server. Since this is now done on the server but the original 
entries still point to client-local files we have to update the entries.

If we move the upload out of the jobgraph we can skip this step.


> Extend JobSubmitHandler to accept jar files
> ---
>
> Key: FLINK-9280
> URL: https://issues.apache.org/jira/browse/FLINK-9280
> Project: Flink
>  Issue Type: New Feature
>  Components: Job-Submission, REST
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
> Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first uploads all require jars to the blob 
> server, sets the blob keys in the jobgraph, and then uploads this graph to 
> The {{JobSubmitHandler}} which submits it to the Dispatcher.
> This process has the downside that it requires jars to be uploaded to the 
> blobserver before submitting the job graph, which does not happen via REST.
> I propose an extension to the the {{JobSubmitHandler}} to also accept an 
> optional list of jar files, that were previously uploaded through the 
> {{JarUploadHandler}}. If present, the handler would upload these jars to the 
> blobserver and set the blob keys.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


  1   2   3   4   >