[jira] [Commented] (FLINK-8533) Support MasterTriggerRestoreHook state reinitialization

2018-02-11 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16360404#comment-16360404
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-8533:


[~eronwright]thanks for the heads up on this ticket.

Regarding your comment on the Kafka connector's `setStartFromGroupOffsets`:
The problem should, AFAIK, not occur there because we differentiate between 
having some restored state / no restored state, when determining whether or not 
to respect the `StartupMode`.

But yes, for the Pravega connector, I think this is valid. Will take a look at 
the pull request soon.

> Support MasterTriggerRestoreHook state reinitialization
> ---
>
> Key: FLINK-8533
> URL: https://issues.apache.org/jira/browse/FLINK-8533
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.0
>Reporter: Eron Wright 
>Assignee: Eron Wright 
>Priority: Major
>
> {{MasterTriggerRestoreHook}} enables coordination with an external system for 
> taking or restoring checkpoints. When execution is restarted from a 
> checkpoint, {{restoreCheckpoint}} is called to restore or reinitialize the 
> external system state. There's an edge case where the external state is not 
> adequately reinitialized, that is when execution fails _before the first 
> checkpoint_. In that case, the hook is not invoked and has no opportunity to 
> restore the external state to initial conditions.
> The impact is a loss of exactly-once semantics in this case. For example, in 
> the Pravega source function, the reader group state (e.g. stream position 
> data) is stored externally. In the normal restore case, the reader group 
> state is forcibly rewound to the checkpointed position. In the edge case 
> where no checkpoint has yet been successful, the reader group state is not 
> rewound and consequently some amount of stream data is not reprocessed.
> A possible fix would be to introduce an {{initializeState}} method on the 
> hook interface. Similar to {{CheckpointedFunction::initializeState}}, this 
> method would be invoked unconditionally upon hook initialization. The Pravega 
> hook would, for example, initialize or forcibly reinitialize the reader group 
> state.    



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8364) Add iterator() to ListState which returns empty iterator when it has no value

2018-02-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16360389#comment-16360389
 ] 

ASF GitHub Bot commented on FLINK-8364:
---

Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5356
  
@StefanRRichter I think how `null` is handled is one major benefit, not 
all. Another benefit is that `iterator()` is more intuitive in traversing all 
values. For example, this change makes ListState more similar to how MapState 
handles traversing - In MapState, it has `values()` and `iterator()` that both 
traverse entries in the map. 


> Add iterator() to ListState which returns empty iterator when it has no value
> -
>
> Key: FLINK-8364
> URL: https://issues.apache.org/jira/browse/FLINK-8364
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.5.0
>
>
> Add iterator() to ListState which returns empty iterator when it has no value



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5356: [FLINK-8364][state backend] Add iterator() to ListState w...

2018-02-11 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5356
  
@StefanRRichter I think how `null` is handled is one major benefit, not 
all. Another benefit is that `iterator()` is more intuitive in traversing all 
values. For example, this change makes ListState more similar to how MapState 
handles traversing - In MapState, it has `values()` and `iterator()` that both 
traverse entries in the map. 


---


[jira] [Closed] (FLINK-8638) Job restart when Checkpoint On Barrier failed

2018-02-11 Thread Ran Tao (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ran Tao closed FLINK-8638.
--
Resolution: Duplicate

> Job restart when Checkpoint On Barrier failed
> -
>
> Key: FLINK-8638
> URL: https://issues.apache.org/jira/browse/FLINK-8638
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Ran Tao
>Priority: Major
>
> The following example comes from the one snapshotState process by using hdfs, 
> snapshotState failed due to hdfs disk problems, so that 
> triggerCheckpointOnBarrier fails and throws an exception to make the 
> application restart. However, when restarting, flink needs to recover from 
> the recent completed checkpoint and start chasing the data, which can lead to 
> significant delays. We think that when StreamTask's 
> triggerCheckpointOnBarrier (including the triggerCheckpoint at source) fails, 
> the application should not restart but instead continue running and mark the 
> checkpoint failed. Finally, notify the JobManager this checkpoint
>  failed. By adding Checkpoint failure alarm let developers or users know this 
> situation, and take the appropriate action. During this time, the flink job 
> always keeps running.
>  
> {code:java}
> java.lang.Exception: Could not perform checkpoint 45843 for operator 
> TriggerWindow(TumblingEventTimeWindows(6), 
> ReducingStateDescriptor{serializer=com.op.router.streaming.Launcher$$anon$49$$anon$13@af3ff050,
>  
> reduceFunction=org.apache.flink.streaming.api.scala.function.util.ScalaReduceFunction@26f7cdf8},
>  EventTimeTrigger(), WindowedStream.reduce(WindowedStream.java:300)) -> Map 
> (153/459).
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:552)
> at 
> org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:378)
> at 
> org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:281)
> at 
> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:183)
> at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:213)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.Exception: Could not complete snapshot 45843 for 
> operator TriggerWindow(TumblingEventTimeWindows(6), 
> ReducingStateDescriptor{serializer=com.op.router.streaming.Launcher$$anon$49$$anon$13@af3ff050,
>  
> reduceFunction=org.apache.flink.streaming.api.scala.function.util.ScalaReduceFunction@26f7cdf8},
>  EventTimeTrigger(), WindowedStream.reduce(WindowedStream.java:300)) -> Map 
> (153/459).
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:407)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1162)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1094)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:654)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:590)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:543)
> ... 8 more
> Caused by: java.io.IOException: Could not flush and close the file system 
> output stream to 
> hdfs://hadoop/checkpoint/flink/2406585590309979134/c8ac238c751d7b7b3e3b498bc396550f
>  in order to obtain the stream state handle
> at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:336)
> at 
> org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream.closeAndGetHandle(KeyedStateCheckpointOutputStream.java:105)
> at 
> org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream.closeAndGetHandle(KeyedStateCheckpointOutputStream.java:30)
> at 
> org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.closeAndUnregisterStreamToObtainStateHandle(StateSnapshotContextSynchronousImpl.java:131)
> at 
> org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.getKeyedStateStreamFuture(StateSnapshotContextSynchronousImpl.java:113)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:387)
> ... 13 more
> Caused by: java.io.IOException: All datanodes DatanodeInfoWithStorage are 
> bad. 

[jira] [Commented] (FLINK-8638) Job restart when Checkpoint On Barrier failed

2018-02-11 Thread Ran Tao (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16360378#comment-16360378
 ] 

Ran Tao commented on FLINK-8638:


[~yanghua] OK, thanks.

> Job restart when Checkpoint On Barrier failed
> -
>
> Key: FLINK-8638
> URL: https://issues.apache.org/jira/browse/FLINK-8638
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Ran Tao
>Priority: Major
>
> The following example comes from the one snapshotState process by using hdfs, 
> snapshotState failed due to hdfs disk problems, so that 
> triggerCheckpointOnBarrier fails and throws an exception to make the 
> application restart. However, when restarting, flink needs to recover from 
> the recent completed checkpoint and start chasing the data, which can lead to 
> significant delays. We think that when StreamTask's 
> triggerCheckpointOnBarrier (including the triggerCheckpoint at source) fails, 
> the application should not restart but instead continue running and mark the 
> checkpoint failed. Finally, notify the JobManager this checkpoint
>  failed. By adding Checkpoint failure alarm let developers or users know this 
> situation, and take the appropriate action. During this time, the flink job 
> always keeps running.
>  
> {code:java}
> java.lang.Exception: Could not perform checkpoint 45843 for operator 
> TriggerWindow(TumblingEventTimeWindows(6), 
> ReducingStateDescriptor{serializer=com.op.router.streaming.Launcher$$anon$49$$anon$13@af3ff050,
>  
> reduceFunction=org.apache.flink.streaming.api.scala.function.util.ScalaReduceFunction@26f7cdf8},
>  EventTimeTrigger(), WindowedStream.reduce(WindowedStream.java:300)) -> Map 
> (153/459).
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:552)
> at 
> org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:378)
> at 
> org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:281)
> at 
> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:183)
> at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:213)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.Exception: Could not complete snapshot 45843 for 
> operator TriggerWindow(TumblingEventTimeWindows(6), 
> ReducingStateDescriptor{serializer=com.op.router.streaming.Launcher$$anon$49$$anon$13@af3ff050,
>  
> reduceFunction=org.apache.flink.streaming.api.scala.function.util.ScalaReduceFunction@26f7cdf8},
>  EventTimeTrigger(), WindowedStream.reduce(WindowedStream.java:300)) -> Map 
> (153/459).
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:407)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1162)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1094)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:654)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:590)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:543)
> ... 8 more
> Caused by: java.io.IOException: Could not flush and close the file system 
> output stream to 
> hdfs://hadoop/checkpoint/flink/2406585590309979134/c8ac238c751d7b7b3e3b498bc396550f
>  in order to obtain the stream state handle
> at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:336)
> at 
> org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream.closeAndGetHandle(KeyedStateCheckpointOutputStream.java:105)
> at 
> org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream.closeAndGetHandle(KeyedStateCheckpointOutputStream.java:30)
> at 
> org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.closeAndUnregisterStreamToObtainStateHandle(StateSnapshotContextSynchronousImpl.java:131)
> at 
> org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.getKeyedStateStreamFuture(StateSnapshotContextSynchronousImpl.java:113)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:387)
> ... 13 more
> Caused by: java.io.IOException: All datanodes 

[jira] [Commented] (FLINK-8516) FlinkKinesisConsumer does not balance shards over subtasks

2018-02-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16360377#comment-16360377
 ] 

ASF GitHub Bot commented on FLINK-8516:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5393
  
@tweise changes look good, will merge this to `master`.
Thanks a lot for the work!


> FlinkKinesisConsumer does not balance shards over subtasks
> --
>
> Key: FLINK-8516
> URL: https://issues.apache.org/jira/browse/FLINK-8516
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.3.2, 1.5.0
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>
> The hash code of the shard is used to distribute discovered shards over 
> subtasks round robin. This works as long as shard identifiers are sequential. 
> After shards are rebalanced in Kinesis, that may no longer be the case and 
> the distribution become skewed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5393: [FLINK-8516] Allow for custom hash function for shard to ...

2018-02-11 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5393
  
@tweise changes look good, will merge this to `master`.
Thanks a lot for the work!


---


[jira] [Assigned] (FLINK-8459) Implement cancelWithSavepoint in RestClusterClient

2018-02-11 Thread vinoyang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8459?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang reassigned FLINK-8459:
---

Assignee: vinoyang

> Implement cancelWithSavepoint in RestClusterClient
> --
>
> Key: FLINK-8459
> URL: https://issues.apache.org/jira/browse/FLINK-8459
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>Assignee: vinoyang
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Implement the method
> {{RestClusterClient#cancelWithSavepoint(JobID jobId, @Nullable String 
> savepointDirectory)}}.
> by either taking a savepoint and cancel the job separately, or by migrating 
> the logic in {{JobCancellationWithSavepointHandlers}}. The former will have 
> different semantics because the checkpoint scheduler is not stopped. Thus it 
> is not guaranteed that there won't be additional checkpoints between the 
> savepoint and the job cancelation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8459) Implement cancelWithSavepoint in RestClusterClient

2018-02-11 Thread vinoyang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16360352#comment-16360352
 ] 

vinoyang commented on FLINK-8459:
-

Hi [~gjy] , are you handling this issue? If not , can I assign to myself?

> Implement cancelWithSavepoint in RestClusterClient
> --
>
> Key: FLINK-8459
> URL: https://issues.apache.org/jira/browse/FLINK-8459
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Implement the method
> {{RestClusterClient#cancelWithSavepoint(JobID jobId, @Nullable String 
> savepointDirectory)}}.
> by either taking a savepoint and cancel the job separately, or by migrating 
> the logic in {{JobCancellationWithSavepointHandlers}}. The former will have 
> different semantics because the checkpoint scheduler is not stopped. Thus it 
> is not guaranteed that there won't be additional checkpoints between the 
> savepoint and the job cancelation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7711) Port JarListHandler to new REST endpoint

2018-02-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16360348#comment-16360348
 ] 

ASF GitHub Bot commented on FLINK-7711:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5455#discussion_r167474541
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ng/JarListHandler.java
 ---
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers.ng;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.FlinkException;
+
+import javax.annotation.Nonnull;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+import java.util.jar.JarFile;
+import java.util.jar.Manifest;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Handle request for listing uploaded jars.
+ */
+public class JarListHandler extends AbstractRestHandler {
+
+   private final File jarDir;
+
+   private final Executor executor;
+
+   public JarListHandler(
+   CompletableFuture localRestAddress,
+   GatewayRetriever 
leaderRetriever,
+   Time timeout,
+   Map responseHeaders,
+   MessageHeaders messageHeaders,
+   File jarDir,
+   Executor executor) {
+   super(localRestAddress, leaderRetriever, timeout, 
responseHeaders, messageHeaders);
+
+   this.jarDir = requireNonNull(jarDir);
+   this.executor = requireNonNull(executor);
+   }
+
+   @Override
+   protected CompletableFuture handleRequest(@Nonnull 
HandlerRequest request, @Nonnull 
RestfulGateway gateway) throws RestHandlerException {
+   final String localAddress;
+   checkState(localAddressFuture.isDone());
+
+   try {
+   localAddress = localAddressFuture.get();
+   } catch (Exception e) {
+   return FutureUtils.completedExceptionally(e);
+   }
+
+   return CompletableFuture.supplyAsync(() -> {
+   try {
+   List jarFileList = new 
ArrayList<>();
+   File[] list = jarDir.listFiles(new 
FilenameFilter() {
+   @Override
+   public boolean accept(File dir, String 
name) {
+   return name.endsWith(".jar");
+   }
+   });
+   // last modified ascending order
+

[GitHub] flink pull request #5455: [FLINK-7711][flip6] Port JarListHandler

2018-02-11 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5455#discussion_r167474541
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ng/JarListHandler.java
 ---
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers.ng;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.FlinkException;
+
+import javax.annotation.Nonnull;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+import java.util.jar.JarFile;
+import java.util.jar.Manifest;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Handle request for listing uploaded jars.
+ */
+public class JarListHandler extends AbstractRestHandler {
+
+   private final File jarDir;
+
+   private final Executor executor;
+
+   public JarListHandler(
+   CompletableFuture localRestAddress,
+   GatewayRetriever 
leaderRetriever,
+   Time timeout,
+   Map responseHeaders,
+   MessageHeaders messageHeaders,
+   File jarDir,
+   Executor executor) {
+   super(localRestAddress, leaderRetriever, timeout, 
responseHeaders, messageHeaders);
+
+   this.jarDir = requireNonNull(jarDir);
+   this.executor = requireNonNull(executor);
+   }
+
+   @Override
+   protected CompletableFuture handleRequest(@Nonnull 
HandlerRequest request, @Nonnull 
RestfulGateway gateway) throws RestHandlerException {
+   final String localAddress;
+   checkState(localAddressFuture.isDone());
+
+   try {
+   localAddress = localAddressFuture.get();
+   } catch (Exception e) {
+   return FutureUtils.completedExceptionally(e);
+   }
+
+   return CompletableFuture.supplyAsync(() -> {
+   try {
+   List jarFileList = new 
ArrayList<>();
+   File[] list = jarDir.listFiles(new 
FilenameFilter() {
+   @Override
+   public boolean accept(File dir, String 
name) {
+   return name.endsWith(".jar");
+   }
+   });
+   // last modified ascending order
+   Arrays.sort(list, (f1, f2) -> 
Long.compare(f2.lastModified(), f1.lastModified()));
+
+   for (File f : list) {
--- End diff --

Can cause an NPE if jarDir does not exist.

[jira] [Assigned] (FLINK-8638) Job restart when Checkpoint On Barrier failed

2018-02-11 Thread vinoyang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang reassigned FLINK-8638:
---

Assignee: (was: vinoyang)

> Job restart when Checkpoint On Barrier failed
> -
>
> Key: FLINK-8638
> URL: https://issues.apache.org/jira/browse/FLINK-8638
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Ran Tao
>Priority: Major
>
> The following example comes from the one snapshotState process by using hdfs, 
> snapshotState failed due to hdfs disk problems, so that 
> triggerCheckpointOnBarrier fails and throws an exception to make the 
> application restart. However, when restarting, flink needs to recover from 
> the recent completed checkpoint and start chasing the data, which can lead to 
> significant delays. We think that when StreamTask's 
> triggerCheckpointOnBarrier (including the triggerCheckpoint at source) fails, 
> the application should not restart but instead continue running and mark the 
> checkpoint failed. Finally, notify the JobManager this checkpoint
>  failed. By adding Checkpoint failure alarm let developers or users know this 
> situation, and take the appropriate action. During this time, the flink job 
> always keeps running.
>  
> {code:java}
> java.lang.Exception: Could not perform checkpoint 45843 for operator 
> TriggerWindow(TumblingEventTimeWindows(6), 
> ReducingStateDescriptor{serializer=com.op.router.streaming.Launcher$$anon$49$$anon$13@af3ff050,
>  
> reduceFunction=org.apache.flink.streaming.api.scala.function.util.ScalaReduceFunction@26f7cdf8},
>  EventTimeTrigger(), WindowedStream.reduce(WindowedStream.java:300)) -> Map 
> (153/459).
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:552)
> at 
> org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:378)
> at 
> org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:281)
> at 
> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:183)
> at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:213)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.Exception: Could not complete snapshot 45843 for 
> operator TriggerWindow(TumblingEventTimeWindows(6), 
> ReducingStateDescriptor{serializer=com.op.router.streaming.Launcher$$anon$49$$anon$13@af3ff050,
>  
> reduceFunction=org.apache.flink.streaming.api.scala.function.util.ScalaReduceFunction@26f7cdf8},
>  EventTimeTrigger(), WindowedStream.reduce(WindowedStream.java:300)) -> Map 
> (153/459).
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:407)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1162)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1094)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:654)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:590)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:543)
> ... 8 more
> Caused by: java.io.IOException: Could not flush and close the file system 
> output stream to 
> hdfs://hadoop/checkpoint/flink/2406585590309979134/c8ac238c751d7b7b3e3b498bc396550f
>  in order to obtain the stream state handle
> at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:336)
> at 
> org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream.closeAndGetHandle(KeyedStateCheckpointOutputStream.java:105)
> at 
> org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream.closeAndGetHandle(KeyedStateCheckpointOutputStream.java:30)
> at 
> org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.closeAndUnregisterStreamToObtainStateHandle(StateSnapshotContextSynchronousImpl.java:131)
> at 
> org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.getKeyedStateStreamFuture(StateSnapshotContextSynchronousImpl.java:113)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:387)
> ... 13 more
> Caused by: java.io.IOException: All datanodes DatanodeInfoWithStorage 

[jira] [Commented] (FLINK-8638) Job restart when Checkpoint On Barrier failed

2018-02-11 Thread vinoyang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16360342#comment-16360342
 ] 

vinoyang commented on FLINK-8638:
-

[~insomnia] , I also fixed this bug in our inner version. But I just searched 
the flink issues, maybe we need not do this job, someone has fixed this issue, 
it just not contained into released v1.4, but is in master branch currently. 
More detail : https://issues.apache.org/jira/browse/FLINK-4808 and sub task 
FLINK_4809.

> Job restart when Checkpoint On Barrier failed
> -
>
> Key: FLINK-8638
> URL: https://issues.apache.org/jira/browse/FLINK-8638
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Ran Tao
>Assignee: vinoyang
>Priority: Major
>
> The following example comes from the one snapshotState process by using hdfs, 
> snapshotState failed due to hdfs disk problems, so that 
> triggerCheckpointOnBarrier fails and throws an exception to make the 
> application restart. However, when restarting, flink needs to recover from 
> the recent completed checkpoint and start chasing the data, which can lead to 
> significant delays. We think that when StreamTask's 
> triggerCheckpointOnBarrier (including the triggerCheckpoint at source) fails, 
> the application should not restart but instead continue running and mark the 
> checkpoint failed. Finally, notify the JobManager this checkpoint
>  failed. By adding Checkpoint failure alarm let developers or users know this 
> situation, and take the appropriate action. During this time, the flink job 
> always keeps running.
>  
> {code:java}
> java.lang.Exception: Could not perform checkpoint 45843 for operator 
> TriggerWindow(TumblingEventTimeWindows(6), 
> ReducingStateDescriptor{serializer=com.op.router.streaming.Launcher$$anon$49$$anon$13@af3ff050,
>  
> reduceFunction=org.apache.flink.streaming.api.scala.function.util.ScalaReduceFunction@26f7cdf8},
>  EventTimeTrigger(), WindowedStream.reduce(WindowedStream.java:300)) -> Map 
> (153/459).
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:552)
> at 
> org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:378)
> at 
> org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:281)
> at 
> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:183)
> at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:213)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.Exception: Could not complete snapshot 45843 for 
> operator TriggerWindow(TumblingEventTimeWindows(6), 
> ReducingStateDescriptor{serializer=com.op.router.streaming.Launcher$$anon$49$$anon$13@af3ff050,
>  
> reduceFunction=org.apache.flink.streaming.api.scala.function.util.ScalaReduceFunction@26f7cdf8},
>  EventTimeTrigger(), WindowedStream.reduce(WindowedStream.java:300)) -> Map 
> (153/459).
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:407)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1162)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1094)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:654)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:590)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:543)
> ... 8 more
> Caused by: java.io.IOException: Could not flush and close the file system 
> output stream to 
> hdfs://hadoop/checkpoint/flink/2406585590309979134/c8ac238c751d7b7b3e3b498bc396550f
>  in order to obtain the stream state handle
> at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:336)
> at 
> org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream.closeAndGetHandle(KeyedStateCheckpointOutputStream.java:105)
> at 
> org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream.closeAndGetHandle(KeyedStateCheckpointOutputStream.java:30)
> at 
> 

[jira] [Commented] (FLINK-8638) Job restart when Checkpoint On Barrier failed

2018-02-11 Thread Ran Tao (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16360292#comment-16360292
 ] 

Ran Tao commented on FLINK-8638:


Hi, [~yanghua]

It is indeed the case. I fixed this bug in the inner version of the company I 
worked for, can I make a pull request?

> Job restart when Checkpoint On Barrier failed
> -
>
> Key: FLINK-8638
> URL: https://issues.apache.org/jira/browse/FLINK-8638
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Ran Tao
>Assignee: vinoyang
>Priority: Major
>
> The following example comes from the one snapshotState process by using hdfs, 
> snapshotState failed due to hdfs disk problems, so that 
> triggerCheckpointOnBarrier fails and throws an exception to make the 
> application restart. However, when restarting, flink needs to recover from 
> the recent completed checkpoint and start chasing the data, which can lead to 
> significant delays. We think that when StreamTask's 
> triggerCheckpointOnBarrier (including the triggerCheckpoint at source) fails, 
> the application should not restart but instead continue running and mark the 
> checkpoint failed. Finally, notify the JobManager this checkpoint
>  failed. By adding Checkpoint failure alarm let developers or users know this 
> situation, and take the appropriate action. During this time, the flink job 
> always keeps running.
>  
> {code:java}
> java.lang.Exception: Could not perform checkpoint 45843 for operator 
> TriggerWindow(TumblingEventTimeWindows(6), 
> ReducingStateDescriptor{serializer=com.op.router.streaming.Launcher$$anon$49$$anon$13@af3ff050,
>  
> reduceFunction=org.apache.flink.streaming.api.scala.function.util.ScalaReduceFunction@26f7cdf8},
>  EventTimeTrigger(), WindowedStream.reduce(WindowedStream.java:300)) -> Map 
> (153/459).
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:552)
> at 
> org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:378)
> at 
> org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:281)
> at 
> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:183)
> at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:213)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.Exception: Could not complete snapshot 45843 for 
> operator TriggerWindow(TumblingEventTimeWindows(6), 
> ReducingStateDescriptor{serializer=com.op.router.streaming.Launcher$$anon$49$$anon$13@af3ff050,
>  
> reduceFunction=org.apache.flink.streaming.api.scala.function.util.ScalaReduceFunction@26f7cdf8},
>  EventTimeTrigger(), WindowedStream.reduce(WindowedStream.java:300)) -> Map 
> (153/459).
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:407)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1162)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1094)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:654)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:590)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:543)
> ... 8 more
> Caused by: java.io.IOException: Could not flush and close the file system 
> output stream to 
> hdfs://hadoop/checkpoint/flink/2406585590309979134/c8ac238c751d7b7b3e3b498bc396550f
>  in order to obtain the stream state handle
> at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:336)
> at 
> org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream.closeAndGetHandle(KeyedStateCheckpointOutputStream.java:105)
> at 
> org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream.closeAndGetHandle(KeyedStateCheckpointOutputStream.java:30)
> at 
> org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.closeAndUnregisterStreamToObtainStateHandle(StateSnapshotContextSynchronousImpl.java:131)
> at 
> org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.getKeyedStateStreamFuture(StateSnapshotContextSynchronousImpl.java:113)
> at 
> 

[jira] [Commented] (FLINK-8638) Job restart when Checkpoint On Barrier failed

2018-02-11 Thread vinoyang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16360273#comment-16360273
 ] 

vinoyang commented on FLINK-8638:
-

Hi [~insomnia] , we also see this problem in our production environment,if hdfs 
occurs exception, the checkpoint will fail then trigger the job fail and 
recover.

The reason is because this code :
{code:java}
owner.handleAsyncException("Failure in asynchronous checkpoint 
materialization", asyncException);
{code}
it will trigger the job fail:
{code:java}
getEnvironment().failExternally(exception);
{code}
If one checkpoint failed(maybe *temporary* HDFS unavailable), the job will be 
recoverd, the cost is very expensive.

I will try to fix it.

 

 

> Job restart when Checkpoint On Barrier failed
> -
>
> Key: FLINK-8638
> URL: https://issues.apache.org/jira/browse/FLINK-8638
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Ran Tao
>Assignee: vinoyang
>Priority: Major
>
> The following example comes from the one snapshotState process by using hdfs, 
> snapshotState failed due to hdfs disk problems, so that 
> triggerCheckpointOnBarrier fails and throws an exception to make the 
> application restart. However, when restarting, flink needs to recover from 
> the recent completed checkpoint and start chasing the data, which can lead to 
> significant delays. We think that when StreamTask's 
> triggerCheckpointOnBarrier (including the triggerCheckpoint at source) fails, 
> the application should not restart but instead continue running and mark the 
> checkpoint failed. Finally, notify the JobManager this checkpoint
>  failed. By adding Checkpoint failure alarm let developers or users know this 
> situation, and take the appropriate action. During this time, the flink job 
> always keeps running.
>  
> {code:java}
> java.lang.Exception: Could not perform checkpoint 45843 for operator 
> TriggerWindow(TumblingEventTimeWindows(6), 
> ReducingStateDescriptor{serializer=com.op.router.streaming.Launcher$$anon$49$$anon$13@af3ff050,
>  
> reduceFunction=org.apache.flink.streaming.api.scala.function.util.ScalaReduceFunction@26f7cdf8},
>  EventTimeTrigger(), WindowedStream.reduce(WindowedStream.java:300)) -> Map 
> (153/459).
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:552)
> at 
> org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:378)
> at 
> org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:281)
> at 
> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:183)
> at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:213)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.Exception: Could not complete snapshot 45843 for 
> operator TriggerWindow(TumblingEventTimeWindows(6), 
> ReducingStateDescriptor{serializer=com.op.router.streaming.Launcher$$anon$49$$anon$13@af3ff050,
>  
> reduceFunction=org.apache.flink.streaming.api.scala.function.util.ScalaReduceFunction@26f7cdf8},
>  EventTimeTrigger(), WindowedStream.reduce(WindowedStream.java:300)) -> Map 
> (153/459).
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:407)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1162)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1094)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:654)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:590)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:543)
> ... 8 more
> Caused by: java.io.IOException: Could not flush and close the file system 
> output stream to 
> hdfs://hadoop/checkpoint/flink/2406585590309979134/c8ac238c751d7b7b3e3b498bc396550f
>  in order to obtain the stream state handle
> at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:336)
> at 
> org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream.closeAndGetHandle(KeyedStateCheckpointOutputStream.java:105)
> at 
> 

[jira] [Assigned] (FLINK-8638) Job restart when Checkpoint On Barrier failed

2018-02-11 Thread vinoyang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang reassigned FLINK-8638:
---

Assignee: vinoyang

> Job restart when Checkpoint On Barrier failed
> -
>
> Key: FLINK-8638
> URL: https://issues.apache.org/jira/browse/FLINK-8638
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Ran Tao
>Assignee: vinoyang
>Priority: Major
>
> The following example comes from the one snapshotState process by using hdfs, 
> snapshotState failed due to hdfs disk problems, so that 
> triggerCheckpointOnBarrier fails and throws an exception to make the 
> application restart. However, when restarting, flink needs to recover from 
> the recent completed checkpoint and start chasing the data, which can lead to 
> significant delays. We think that when StreamTask's 
> triggerCheckpointOnBarrier (including the triggerCheckpoint at source) fails, 
> the application should not restart but instead continue running and mark the 
> checkpoint failed. Finally, notify the JobManager this checkpoint
>  failed. By adding Checkpoint failure alarm let developers or users know this 
> situation, and take the appropriate action. During this time, the flink job 
> always keeps running.
>  
> {code:java}
> java.lang.Exception: Could not perform checkpoint 45843 for operator 
> TriggerWindow(TumblingEventTimeWindows(6), 
> ReducingStateDescriptor{serializer=com.op.router.streaming.Launcher$$anon$49$$anon$13@af3ff050,
>  
> reduceFunction=org.apache.flink.streaming.api.scala.function.util.ScalaReduceFunction@26f7cdf8},
>  EventTimeTrigger(), WindowedStream.reduce(WindowedStream.java:300)) -> Map 
> (153/459).
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:552)
> at 
> org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:378)
> at 
> org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:281)
> at 
> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:183)
> at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:213)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.Exception: Could not complete snapshot 45843 for 
> operator TriggerWindow(TumblingEventTimeWindows(6), 
> ReducingStateDescriptor{serializer=com.op.router.streaming.Launcher$$anon$49$$anon$13@af3ff050,
>  
> reduceFunction=org.apache.flink.streaming.api.scala.function.util.ScalaReduceFunction@26f7cdf8},
>  EventTimeTrigger(), WindowedStream.reduce(WindowedStream.java:300)) -> Map 
> (153/459).
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:407)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1162)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1094)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:654)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:590)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:543)
> ... 8 more
> Caused by: java.io.IOException: Could not flush and close the file system 
> output stream to 
> hdfs://hadoop/checkpoint/flink/2406585590309979134/c8ac238c751d7b7b3e3b498bc396550f
>  in order to obtain the stream state handle
> at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:336)
> at 
> org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream.closeAndGetHandle(KeyedStateCheckpointOutputStream.java:105)
> at 
> org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream.closeAndGetHandle(KeyedStateCheckpointOutputStream.java:30)
> at 
> org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.closeAndUnregisterStreamToObtainStateHandle(StateSnapshotContextSynchronousImpl.java:131)
> at 
> org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.getKeyedStateStreamFuture(StateSnapshotContextSynchronousImpl.java:113)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:387)
> ... 13 more
> Caused by: java.io.IOException: All datanodes 

[jira] [Updated] (FLINK-8638) Job restart when Checkpoint On Barrier failed

2018-02-11 Thread Ran Tao (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ran Tao updated FLINK-8638:
---
Description: 
The following example comes from the one snapshotState process by using hdfs, 
snapshotState failed due to hdfs disk problems, so that 
triggerCheckpointOnBarrier fails and throws an exception to make the 
application restart. However, when restarting, flink needs to recover from the 
recent completed checkpoint and start chasing the data, which can lead to 
significant delays. We think that when StreamTask's triggerCheckpointOnBarrier 
(including the triggerCheckpoint at source) fails, the application should not 
restart but instead continue running and mark the checkpoint failed. Finally, 
notify the JobManager this checkpoint
 failed. By adding Checkpoint failure alarm let developers or users know this 
situation, and take the appropriate action. During this time, the flink job 
always keeps running.

 
{code:java}
java.lang.Exception: Could not perform checkpoint 45843 for operator 
TriggerWindow(TumblingEventTimeWindows(6), 
ReducingStateDescriptor{serializer=com.op.router.streaming.Launcher$$anon$49$$anon$13@af3ff050,
 
reduceFunction=org.apache.flink.streaming.api.scala.function.util.ScalaReduceFunction@26f7cdf8},
 EventTimeTrigger(), WindowedStream.reduce(WindowedStream.java:300)) -> Map 
(153/459).
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:552)
at 
org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:378)
at 
org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:281)
at 
org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:183)
at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:213)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:745)

Caused by: java.lang.Exception: Could not complete snapshot 45843 for operator 
TriggerWindow(TumblingEventTimeWindows(6), 
ReducingStateDescriptor{serializer=com.op.router.streaming.Launcher$$anon$49$$anon$13@af3ff050,
 
reduceFunction=org.apache.flink.streaming.api.scala.function.util.ScalaReduceFunction@26f7cdf8},
 EventTimeTrigger(), WindowedStream.reduce(WindowedStream.java:300)) -> Map 
(153/459).
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:407)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1162)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1094)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:654)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:590)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:543)
... 8 more

Caused by: java.io.IOException: Could not flush and close the file system 
output stream to 
hdfs://hadoop/checkpoint/flink/2406585590309979134/c8ac238c751d7b7b3e3b498bc396550f
 in order to obtain the stream state handle
at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:336)
at 
org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream.closeAndGetHandle(KeyedStateCheckpointOutputStream.java:105)
at 
org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream.closeAndGetHandle(KeyedStateCheckpointOutputStream.java:30)
at 
org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.closeAndUnregisterStreamToObtainStateHandle(StateSnapshotContextSynchronousImpl.java:131)
at 
org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.getKeyedStateStreamFuture(StateSnapshotContextSynchronousImpl.java:113)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:387)
... 13 more

Caused by: java.io.IOException: All datanodes DatanodeInfoWithStorage are bad. 
Aborting...
at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:1109)
at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.processDatanodeError(DFSOutputStream.java:871)
at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:401){code}
 

  was:
The following example comes from the one snapshotState process by using hdfs, 
snapshotState failed due to hdfs disk problems, so that 
triggerCheckpointOnBarrier fails and throws an exception to make the 
application 

[jira] [Created] (FLINK-8638) Job restart when Checkpoint On Barrier failed

2018-02-11 Thread Ran Tao (JIRA)
Ran Tao created FLINK-8638:
--

 Summary: Job restart when Checkpoint On Barrier failed
 Key: FLINK-8638
 URL: https://issues.apache.org/jira/browse/FLINK-8638
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.3.2, 1.4.0
Reporter: Ran Tao


The following example comes from the one snapshotState process by using hdfs, 
snapshotState failed due to hdfs disk problems, so that 
triggerCheckpointOnBarrier fails and throws an exception to make the 
application restart. However, when restarting, flink needs to recover from the 
recent completed checkpoint and start chasing the data, which can lead to 
significant delays. We think that when StreamTask's triggerCheckpointOnBarrier 
(including the triggerCheckpoint at source) fails, the application should not 
restart but instead continue running and mark the checkpoint failed. Finally, 
notify the JobManager this checkpoint
failed. By adding Checkpoint failure alarm let developers or users know this 
situation, and take the appropriate action. During this time, the flink job 
always keeps running.

 
{code:java}
java.lang.Exception: Could not perform checkpoint 45843 for operator 
TriggerWindow(TumblingEventTimeWindows(6), 
ReducingStateDescriptor{serializer=com.didi.op.router.streaming.Launcher$$anon$49$$anon$13@af3ff050,
 
reduceFunction=org.apache.flink.streaming.api.scala.function.util.ScalaReduceFunction@26f7cdf8},
 EventTimeTrigger(), WindowedStream.reduce(WindowedStream.java:300)) -> Map 
(153/459).
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:552)
at 
org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:378)
at 
org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:281)
at 
org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:183)
at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:213)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.Exception: Could not complete snapshot 45843 for operator 
TriggerWindow(TumblingEventTimeWindows(6), 
ReducingStateDescriptor{serializer=com.didi.op.router.streaming.Launcher$$anon$49$$anon$13@af3ff050,
 
reduceFunction=org.apache.flink.streaming.api.scala.function.util.ScalaReduceFunction@26f7cdf8},
 EventTimeTrigger(), WindowedStream.reduce(WindowedStream.java:300)) -> Map 
(153/459).
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:407)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1162)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1094)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:654)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:590)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:543)
... 8 more
Caused by: java.io.IOException: Could not flush and close the file system 
output stream to 
hdfs://hadoop/checkpoint/flink/2406585590309979134/c8ac238c750d7b7b3e3b498bc396570f/chk-45843/5905b7a2-fc8c-4500-898a-7b87fa5470ee
 in order to obtain the stream state handle
at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:336)
at 
org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream.closeAndGetHandle(KeyedStateCheckpointOutputStream.java:105)
at 
org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream.closeAndGetHandle(KeyedStateCheckpointOutputStream.java:30)
at 
org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.closeAndUnregisterStreamToObtainStateHandle(StateSnapshotContextSynchronousImpl.java:131)
at 
org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.getKeyedStateStreamFuture(StateSnapshotContextSynchronousImpl.java:113)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:387)
... 13 more
Caused by: java.io.IOException: All datanodes DatanodeInfoWithStorage are bad. 
Aborting...
at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:1109)
at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.processDatanodeError(DFSOutputStream.java:871)
at 

[jira] [Updated] (FLINK-8636) Pass TaskManagerServices to TaskExecutor

2018-02-11 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8636?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-8636:
-
Issue Type: Improvement  (was: Bug)

> Pass TaskManagerServices to TaskExecutor
> 
>
> Key: FLINK-8636
> URL: https://issues.apache.org/jira/browse/FLINK-8636
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> In order to maintain the sheer number of {{TaskExecutor}} services, we should 
> pass them to the {{TaskExecutor}} via the {{TaskManagerServices}} instead of 
> individually.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5457: [FLINK-8637] [flip6] Use JobManagerSharedServices ...

2018-02-11 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/5457

[FLINK-8637] [flip6] Use JobManagerSharedServices to pass in services to 
JobMaster

## What is the purpose of the change

Pass a JobManagerSharedServices instance to the JobMaster instead of the
each service individually.

This PR is based on #5443.

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink jobManagerSharedServices

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5457.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5457


commit 7ef623662d9fe78a3ba19fdad90b67e8b3ad848d
Author: Till Rohrmann 
Date:   2018-02-09T13:07:31Z

[FLINK-8626] Introduce BackPressureStatsTracker interface

Renames BackPressureStatsTracker into BackPressureStatsTrackerImpl and 
introduce
a BackPressureStatsTracker interface. This will make testing easier when we 
don't
have to set up all the different components.

commit 358750d2ace5feab4d4c29c92167b0a8a663fe49
Author: Till Rohrmann 
Date:   2018-02-10T22:26:51Z

fixup! [FLINK-8626] Introduce BackPressureStatsTracker interface

commit d91b44af0f46c2b1fd6cb909244e0c5666cee43b
Author: Till Rohrmann 
Date:   2018-02-11T23:51:48Z

[FLINK-8637] [flip6] Use JobManagerSharedServices to pass in services to 
JobMaster

Pass a JobManagerSharedServices instance to the JobMaster instead of the
each service individually.




---


[jira] [Commented] (FLINK-8637) Pass JobManagerSharedServices to JobMaster

2018-02-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16360180#comment-16360180
 ] 

ASF GitHub Bot commented on FLINK-8637:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/5457

[FLINK-8637] [flip6] Use JobManagerSharedServices to pass in services to 
JobMaster

## What is the purpose of the change

Pass a JobManagerSharedServices instance to the JobMaster instead of the
each service individually.

This PR is based on #5443.

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink jobManagerSharedServices

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5457.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5457


commit 7ef623662d9fe78a3ba19fdad90b67e8b3ad848d
Author: Till Rohrmann 
Date:   2018-02-09T13:07:31Z

[FLINK-8626] Introduce BackPressureStatsTracker interface

Renames BackPressureStatsTracker into BackPressureStatsTrackerImpl and 
introduce
a BackPressureStatsTracker interface. This will make testing easier when we 
don't
have to set up all the different components.

commit 358750d2ace5feab4d4c29c92167b0a8a663fe49
Author: Till Rohrmann 
Date:   2018-02-10T22:26:51Z

fixup! [FLINK-8626] Introduce BackPressureStatsTracker interface

commit d91b44af0f46c2b1fd6cb909244e0c5666cee43b
Author: Till Rohrmann 
Date:   2018-02-11T23:51:48Z

[FLINK-8637] [flip6] Use JobManagerSharedServices to pass in services to 
JobMaster

Pass a JobManagerSharedServices instance to the JobMaster instead of the
each service individually.




> Pass JobManagerSharedServices to JobMaster
> --
>
> Key: FLINK-8637
> URL: https://issues.apache.org/jira/browse/FLINK-8637
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> In order to pass in the {{JobMaster}} services we should use a 
> {{JobManagerServices}} instance. That way we don't have to pass all services 
> to the constructor individually.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8637) Pass JobManagerSharedServices to JobMaster

2018-02-11 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-8637:


 Summary: Pass JobManagerSharedServices to JobMaster
 Key: FLINK-8637
 URL: https://issues.apache.org/jira/browse/FLINK-8637
 Project: Flink
  Issue Type: Improvement
  Components: Distributed Coordination
Affects Versions: 1.5.0
Reporter: Till Rohrmann
Assignee: Till Rohrmann
 Fix For: 1.5.0


In order to pass in the {{JobMaster}} services we should use a 
{{JobManagerServices}} instance. That way we don't have to pass all services to 
the constructor individually.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8636) Pass TaskManagerServices to TaskExecutor

2018-02-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8636?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16360177#comment-16360177
 ] 

ASF GitHub Bot commented on FLINK-8636:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/5456

[FLINK-8636] [flip6] Use TaskManagerServices to pass in services to 
TaskExecutor

## What is the purpose of the change

Pass in the TaskExecutor services via the TaskManagerServices instead
of individually.

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink taskManagerServices

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5456.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5456


commit 22445a483b21e6f4a46442b11f6090a3f172c32a
Author: Till Rohrmann 
Date:   2018-02-11T22:18:06Z

[FLINK-8636] [flip6] Use TaskManagerServices to pass in services to 
TaskExecutor

Pass in the TaskExecutor services via the TaskManagerServices instead
of individually.




> Pass TaskManagerServices to TaskExecutor
> 
>
> Key: FLINK-8636
> URL: https://issues.apache.org/jira/browse/FLINK-8636
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> In order to maintain the sheer number of {{TaskExecutor}} services, we should 
> pass them to the {{TaskExecutor}} via the {{TaskManagerServices}} instead of 
> individually.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5456: [FLINK-8636] [flip6] Use TaskManagerServices to pa...

2018-02-11 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/5456

[FLINK-8636] [flip6] Use TaskManagerServices to pass in services to 
TaskExecutor

## What is the purpose of the change

Pass in the TaskExecutor services via the TaskManagerServices instead
of individually.

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink taskManagerServices

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5456.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5456


commit 22445a483b21e6f4a46442b11f6090a3f172c32a
Author: Till Rohrmann 
Date:   2018-02-11T22:18:06Z

[FLINK-8636] [flip6] Use TaskManagerServices to pass in services to 
TaskExecutor

Pass in the TaskExecutor services via the TaskManagerServices instead
of individually.




---


[jira] [Created] (FLINK-8636) Pass TaskManagerServices to TaskExecutor

2018-02-11 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-8636:


 Summary: Pass TaskManagerServices to TaskExecutor
 Key: FLINK-8636
 URL: https://issues.apache.org/jira/browse/FLINK-8636
 Project: Flink
  Issue Type: Bug
  Components: Distributed Coordination
Affects Versions: 1.5.0
Reporter: Till Rohrmann
Assignee: Till Rohrmann
 Fix For: 1.5.0


In order to maintain the sheer number of {{TaskExecutor}} services, we should 
pass them to the {{TaskExecutor}} via the {{TaskManagerServices}} instead of 
individually.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8560) Access to the current key in ProcessFunction after keyBy()

2018-02-11 Thread Bowen Li (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16360156#comment-16360156
 ] 

Bowen Li commented on FLINK-8560:
-

[~aljoscha] [~pnowojski]  I have taken a look at this, and am wondering if this 
is worth it. If we take the approach that Piotr suggested, it means we need to 
keep the key for every timer explicitly in memory, no matter users will use it 
or not - is this worth the effort and memory overhead?

> Access to the current key in ProcessFunction after keyBy()
> --
>
> Key: FLINK-8560
> URL: https://issues.apache.org/jira/browse/FLINK-8560
> Project: Flink
>  Issue Type: Wish
>  Components: DataStream API
>Reporter: Jürgen Thomann
>Assignee: Bowen Li
>Priority: Minor
>
> Currently it is required to store the key of a keyBy() in the processElement 
> method to have access to it in the OnTimerContext.
> This is not so good as you have to check in the processElement method for 
> every element if the key is already stored and set it if it's not already set.
> A possible solution would adding OnTimerContext#getCurrentKey() or a similar 
> method. Maybe having it in the open() method could maybe work as well.
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-Key-from-keyBy-in-ProcessFunction-tt18126.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8297) RocksDBListState stores whole list in single byte[]

2018-02-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16360123#comment-16360123
 ] 

ASF GitHub Bot commented on FLINK-8297:
---

Github user je-ik commented on the issue:

https://github.com/apache/flink/pull/5185
  
@aljoscha I (partly) reworked this PR as you suggest. There are still some 
unresolved questions though:
 1) I'm not 100% sure how to cleanly support the migration between list 
state savepoints, would you have any pointers on how should I address this?
 2) I didn't test the new version on actual flink job yet, it just passes 
tests
I think there will be some more modifications needed, so I will test this 
on real data when there is agreement on the actual implementation.
Thanks in advance for any comments!


> RocksDBListState stores whole list in single byte[]
> ---
>
> Key: FLINK-8297
> URL: https://issues.apache.org/jira/browse/FLINK-8297
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Jan Lukavský
>Priority: Major
>
> RocksDBListState currently keeps whole list of data in single RocksDB 
> key-value pair, which implies that the list actually must fit into memory. 
> Larger lists are not supported and end up with OOME or other error. The 
> RocksDBListState could be modified so that individual items in list are 
> stored in separate keys in RocksDB and can then be iterated over. A simple 
> implementation could reuse existing RocksDBMapState, with key as index to the 
> list and a single RocksDBValueState keeping track of how many items has 
> already been added to the list. Because this implementation might be less 
> efficient in come cases, it would be good to make it opt-in by a construct 
> like
> {{new RocksDBStateBackend().enableLargeListsPerKey()}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5185: [FLINK-8297] [flink-rocksdb] optionally use RocksDBMapSta...

2018-02-11 Thread je-ik
Github user je-ik commented on the issue:

https://github.com/apache/flink/pull/5185
  
@aljoscha I (partly) reworked this PR as you suggest. There are still some 
unresolved questions though:
 1) I'm not 100% sure how to cleanly support the migration between list 
state savepoints, would you have any pointers on how should I address this?
 2) I didn't test the new version on actual flink job yet, it just passes 
tests
I think there will be some more modifications needed, so I will test this 
on real data when there is agreement on the actual implementation.
Thanks in advance for any comments!


---


[jira] [Commented] (FLINK-6469) Configure Memory Sizes with units

2018-02-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16359954#comment-16359954
 ] 

ASF GitHub Bot commented on FLINK-6469:
---

Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/5448
  
Hi @StephanEwen , with the help of your `MemorySize` (sub task of 
FLINK-6469),I finished the remained work, replaced the old memory config (in 
code、config file and shell script) with the memory size. 

For the config item in `conf/flink-conf.yaml`, retained but deprecated the 
config key `jobmanager.heap.mb` , `taskmanager.heap.mb` and introduced 
`jobmanager.heap.size`, `taskmanager.heap.size` with the unit `m`. 

Would you please review this PR, thanks!


> Configure Memory Sizes with units
> -
>
> Key: FLINK-6469
> URL: https://issues.apache.org/jira/browse/FLINK-6469
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Reporter: Stephan Ewen
>Assignee: vinoyang
>Priority: Major
>
> Currently, memory sizes are configured by pure numbers, the interpretation is 
> different from configuration parameter to parameter.
> For example, heap sizes are configured in megabytes, network buffer memory is 
> configured in bytes, alignment thresholds are configured in bytes.
> I propose to configure all memory parameters the same way, with units similar 
> to time. The JVM itself configured heap size similarly: {{Xmx5g}} or 
> {{Xmx2000m}}.
> {code}
> 1  -> bytes
> 10 kb
> 64 mb
> 1 gb
> ...
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5448: [FLINK-6469] Configure Memory Sizes with units

2018-02-11 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/5448
  
Hi @StephanEwen , with the help of your `MemorySize` (sub task of 
FLINK-6469),I finished the remained work, replaced the old memory config (in 
code、config file and shell script) with the memory size. 

For the config item in `conf/flink-conf.yaml`, retained but deprecated the 
config key `jobmanager.heap.mb` , `taskmanager.heap.mb` and introduced 
`jobmanager.heap.size`, `taskmanager.heap.size` with the unit `m`. 

Would you please review this PR, thanks!


---


[jira] [Commented] (FLINK-7711) Port JarListHandler to new REST endpoint

2018-02-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16359904#comment-16359904
 ] 

ASF GitHub Bot commented on FLINK-7711:
---

GitHub user GJL opened a pull request:

https://github.com/apache/flink/pull/5455

[FLINK-7711][flip6] Port JarListHandler

## What is the purpose of the change

*Port JarListHandler.*

cc: @tillrohrmann 

PR is based on #5442 

## Brief change log

  - *Port JarListHandler.*

## Verifying this change

This change added tests and can be verified as follows:

  - *Started cluster in flip6 mode and ran `curl -v  
http://127.0.0.1:9065/jars | jq .`*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/GJL/flink FLINK-7711-2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5455.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5455


commit caf74f63db6047a79393e53e9434eb1cf078ee48
Author: gyao 
Date:   2018-02-09T14:46:22Z

[FLINK-7713][flip6] Implement JarUploadHandler

commit 5afb6ce29f2ead997008e21581a10d787c90680a
Author: gyao 
Date:   2018-02-11T11:16:47Z

[FLINK-7711][flip6] Implement JarListHandler




> Port JarListHandler to new REST endpoint
> 
>
> Key: FLINK-7711
> URL: https://issues.apache.org/jira/browse/FLINK-7711
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Gary Yao
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Port {{JarListHandler}} to new REST endpoint



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5455: [FLINK-7711][flip6] Port JarListHandler

2018-02-11 Thread GJL
GitHub user GJL opened a pull request:

https://github.com/apache/flink/pull/5455

[FLINK-7711][flip6] Port JarListHandler

## What is the purpose of the change

*Port JarListHandler.*

cc: @tillrohrmann 

PR is based on #5442 

## Brief change log

  - *Port JarListHandler.*

## Verifying this change

This change added tests and can be verified as follows:

  - *Started cluster in flip6 mode and ran `curl -v  
http://127.0.0.1:9065/jars | jq .`*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/GJL/flink FLINK-7711-2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5455.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5455


commit caf74f63db6047a79393e53e9434eb1cf078ee48
Author: gyao 
Date:   2018-02-09T14:46:22Z

[FLINK-7713][flip6] Implement JarUploadHandler

commit 5afb6ce29f2ead997008e21581a10d787c90680a
Author: gyao 
Date:   2018-02-11T11:16:47Z

[FLINK-7711][flip6] Implement JarListHandler




---


[jira] [Commented] (FLINK-7713) Port JarUploadHandler to new REST endpoint

2018-02-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16359894#comment-16359894
 ] 

ASF GitHub Bot commented on FLINK-7713:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5442#discussion_r167430276
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
 ---
@@ -207,6 +215,53 @@ public static WebMonitor startWebRuntimeMonitor(
}
}
 
+   @SuppressWarnings({"unchecked", "rawtypes"})
+   public static  
List> 
tryLoadJarUploadHandler(
--- End diff --

Should be `T extends DispatcherGateway`


> Port JarUploadHandler to new REST endpoint
> --
>
> Key: FLINK-7713
> URL: https://issues.apache.org/jira/browse/FLINK-7713
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Gary Yao
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Port {{JarUploadHandler}} to new REST endpoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5442: [FLINK-7713][flip6] Implement JarUploadHandler

2018-02-11 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5442#discussion_r167430276
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
 ---
@@ -207,6 +215,53 @@ public static WebMonitor startWebRuntimeMonitor(
}
}
 
+   @SuppressWarnings({"unchecked", "rawtypes"})
+   public static  
List> 
tryLoadJarUploadHandler(
--- End diff --

Should be `T extends DispatcherGateway`


---


[jira] [Commented] (FLINK-7713) Port JarUploadHandler to new REST endpoint

2018-02-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16359888#comment-16359888
 ] 

ASF GitHub Bot commented on FLINK-7713:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5442#discussion_r167429820
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
 ---
@@ -207,6 +215,53 @@ public static WebMonitor startWebRuntimeMonitor(
}
}
 
+   @SuppressWarnings({"unchecked", "rawtypes"})
+   public static  
List> 
tryLoadJarUploadHandler(
--- End diff --

tests are missing


> Port JarUploadHandler to new REST endpoint
> --
>
> Key: FLINK-7713
> URL: https://issues.apache.org/jira/browse/FLINK-7713
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Gary Yao
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Port {{JarUploadHandler}} to new REST endpoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5442: [FLINK-7713][flip6] Implement JarUploadHandler

2018-02-11 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5442#discussion_r167429820
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
 ---
@@ -207,6 +215,53 @@ public static WebMonitor startWebRuntimeMonitor(
}
}
 
+   @SuppressWarnings({"unchecked", "rawtypes"})
+   public static  
List> 
tryLoadJarUploadHandler(
--- End diff --

tests are missing


---


[GitHub] flink pull request #5442: [FLINK-7713][flip6] Implement JarUploadHandler

2018-02-11 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5442#discussion_r167429671
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
 ---
@@ -207,6 +215,53 @@ public static WebMonitor startWebRuntimeMonitor(
}
}
 
+   @SuppressWarnings({"unchecked", "rawtypes"})
+   public static  
List> 
tryLoadJarUploadHandler(
+   GatewayRetriever leaderRetriever,
+   CompletableFuture restAddressFuture,
+   Time timeout,
+   java.nio.file.Path uploadDir,
+   Executor executor) {
+
+   // 1. Check if flink-runtime-web is in the classpath
+   try {
+   final String classname = 
"org.apache.flink.runtime.webmonitor.WebRuntimeMonitor";
--- End diff --

Should be a constant.


---


[jira] [Commented] (FLINK-7713) Port JarUploadHandler to new REST endpoint

2018-02-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16359886#comment-16359886
 ] 

ASF GitHub Bot commented on FLINK-7713:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5442#discussion_r167429671
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
 ---
@@ -207,6 +215,53 @@ public static WebMonitor startWebRuntimeMonitor(
}
}
 
+   @SuppressWarnings({"unchecked", "rawtypes"})
+   public static  
List> 
tryLoadJarUploadHandler(
+   GatewayRetriever leaderRetriever,
+   CompletableFuture restAddressFuture,
+   Time timeout,
+   java.nio.file.Path uploadDir,
+   Executor executor) {
+
+   // 1. Check if flink-runtime-web is in the classpath
+   try {
+   final String classname = 
"org.apache.flink.runtime.webmonitor.WebRuntimeMonitor";
--- End diff --

Should be a constant.


> Port JarUploadHandler to new REST endpoint
> --
>
> Key: FLINK-7713
> URL: https://issues.apache.org/jira/browse/FLINK-7713
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Gary Yao
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Port {{JarUploadHandler}} to new REST endpoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7713) Port JarUploadHandler to new REST endpoint

2018-02-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16359885#comment-16359885
 ] 

ASF GitHub Bot commented on FLINK-7713:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5442#discussion_r167429608
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
 ---
@@ -100,6 +107,16 @@ public DispatcherRestEndpoint(
timeout,
responseHeaders);
 
+   final List> optJarHandlers =
--- End diff --

Should also check `WebOptions.SUBMIT_ENABLE`.


> Port JarUploadHandler to new REST endpoint
> --
>
> Key: FLINK-7713
> URL: https://issues.apache.org/jira/browse/FLINK-7713
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Gary Yao
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Port {{JarUploadHandler}} to new REST endpoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5442: [FLINK-7713][flip6] Implement JarUploadHandler

2018-02-11 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5442#discussion_r167429608
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
 ---
@@ -100,6 +107,16 @@ public DispatcherRestEndpoint(
timeout,
responseHeaders);
 
+   final List> optJarHandlers =
--- End diff --

Should also check `WebOptions.SUBMIT_ENABLE`.


---


[jira] [Assigned] (FLINK-7711) Port JarListHandler to new REST endpoint

2018-02-11 Thread Gary Yao (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7711?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Yao reassigned FLINK-7711:
---

Assignee: Gary Yao  (was: Fang Yong)

> Port JarListHandler to new REST endpoint
> 
>
> Key: FLINK-7711
> URL: https://issues.apache.org/jira/browse/FLINK-7711
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Gary Yao
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Port {{JarListHandler}} to new REST endpoint



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8212) Pull EnvironmentInformation out of TaskManagerServices

2018-02-11 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16359861#comment-16359861
 ] 

mingleizhang commented on FLINK-8212:
-

I can directly move those methods to {{TaskManagerServices}} class and then 
refactor {{NetworkBufferCalculationTest}}. But I am not sure whether it is a 
better way to do that. 

> Pull EnvironmentInformation out of TaskManagerServices
> --
>
> Key: FLINK-8212
> URL: https://issues.apache.org/jira/browse/FLINK-8212
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime, Network
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Priority: Major
> Fix For: 1.5.0
>
>
> We should pull the {{EnvironmentInformation}} out of the 
> {{TaskManagerServices}} where it is used to get access to the memory settings 
> of the executing JVM. This unnecessarily couples the former with the latter 
> and makes testing extremely hard (one has to use {{PowerMockRunner}} and mock 
> the static {{EnvironmentInformation}}).
> When addressing this issue, then we should also refactor 
> {{NetworkBufferCalculationTest}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8543) Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen

2018-02-11 Thread chris snow (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16359852#comment-16359852
 ] 

chris snow commented on FLINK-8543:
---

I'm hoping that I can get access to an internal cluster that will give me root 
access and hence more debugging capabilities.  I’m thinking of adding some code 
to print out the stacktrace and the thread ID from the flush() and close() 
methods.

Are there any other areas that you would like me to investigate?

> Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen
> --
>
> Key: FLINK-8543
> URL: https://issues.apache.org/jira/browse/FLINK-8543
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.4.0
> Environment: IBM Analytics Engine - 
> [https://console.bluemix.net/docs/services/AnalyticsEngine/index.html#introduction]
> The cluster is based on Hortonworks Data Platform 2.6.2. The following 
> components are made available.
> Apache Spark 2.1.1 Hadoop 2.7.3
> Apache Livy 0.3.0
> Knox 0.12.0
> Ambari 2.5.2
> Anaconda with Python 2.7.13 and 3.5.2 
> Jupyter Enterprise Gateway 0.5.0 
> HBase 1.1.2 * 
> Hive 1.2.1 *
> Oozie 4.2.0 *
> Flume 1.5.2 * 
> Tez 0.7.0 * 
> Pig 0.16.0 * 
> Sqoop 1.4.6 * 
> Slider 0.92.0 * 
>Reporter: chris snow
>Priority: Blocker
> Fix For: 1.5.0
>
> Attachments: Screen Shot 2018-01-30 at 18.34.51.png
>
>
> I'm hitting an issue with my BucketingSink from a streaming job.
>  
> {code:java}
> return new BucketingSink>(path)
>  .setWriter(writer)
>  .setBucketer(new DateTimeBucketer Object>>(formatString));
> {code}
>  
> I can see that a few files have run into issues with uploading to S3:
> !Screen Shot 2018-01-30 at 18.34.51.png!   
> The Flink console output is showing an exception being thrown by 
> S3AOutputStream, so I've grabbed the S3AOutputStream class from my cluster 
> and added some additional logging to the checkOpen() method to log the 'key' 
> just before the exception is thrown:
>  
> {code:java}
> /*
>  * Decompiled with CFR.
>  */
> package org.apache.hadoop.fs.s3a;
> import com.amazonaws.AmazonClientException;
> import com.amazonaws.event.ProgressListener;
> import com.amazonaws.services.s3.model.ObjectMetadata;
> import com.amazonaws.services.s3.model.PutObjectRequest;
> import com.amazonaws.services.s3.transfer.Upload;
> import com.amazonaws.services.s3.transfer.model.UploadResult;
> import java.io.BufferedOutputStream;
> import java.io.File;
> import java.io.FileOutputStream;
> import java.io.IOException;
> import java.io.InterruptedIOException;
> import java.io.OutputStream;
> import java.util.concurrent.atomic.AtomicBoolean;
> import org.apache.hadoop.classification.InterfaceAudience;
> import org.apache.hadoop.classification.InterfaceStability;
> import org.apache.hadoop.conf.Configuration;
> import org.apache.hadoop.fs.s3a.ProgressableProgressListener;
> import org.apache.hadoop.fs.s3a.S3AFileSystem;
> import org.apache.hadoop.fs.s3a.S3AUtils;
> import org.apache.hadoop.util.Progressable;
> import org.slf4j.Logger;
> @InterfaceAudience.Private
> @InterfaceStability.Evolving
> public class S3AOutputStream
> extends OutputStream {
> private final OutputStream backupStream;
> private final File backupFile;
> private final AtomicBoolean closed = new AtomicBoolean(false);
> private final String key;
> private final Progressable progress;
> private final S3AFileSystem fs;
> public static final Logger LOG = S3AFileSystem.LOG;
> public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, 
> Progressable progress) throws IOException {
> this.key = key;
> this.progress = progress;
> this.fs = fs;
> this.backupFile = fs.createTmpFileForWrite("output-", -1, conf);
> LOG.debug("OutputStream for key '{}' writing to tempfile: {}", 
> (Object)key, (Object)this.backupFile);
> this.backupStream = new BufferedOutputStream(new 
> FileOutputStream(this.backupFile));
> }
> void checkOpen() throws IOException {
> if (!this.closed.get()) return;
> // vv-- Additional logging --vvv
> LOG.error("OutputStream for key '{}' closed.", (Object)this.key);
> throw new IOException("Output Stream closed");
> }
> @Override
> public void flush() throws IOException {
> this.checkOpen();
> this.backupStream.flush();
> }
> @Override
> public void close() throws IOException {
> if (this.closed.getAndSet(true)) {
> return;
> }
> this.backupStream.close();
> LOG.debug("OutputStream for key '{}' closed. Now beginning upload", 
> (Object)this.key);
> 

[jira] [Commented] (FLINK-8477) Add api to support for user to skip the first incomplete window data

2018-02-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16359841#comment-16359841
 ] 

ASF GitHub Bot commented on FLINK-8477:
---

Github user Aitozi commented on the issue:

https://github.com/apache/flink/pull/5405
  
Hi @aljoscha , you have mentioned two points : 
1. The events arrived may out of order in event-time processing 
2. We can use windowFunction or ProcessWindowFunction to filter serverl 
window by specify the start time of window and the endtime.

I have some differerent ideas: 
1. when we deal with the out-of-order eventtime stream, we may specify the 
maxOutOfOrder to avoid the too much late elements skipped, so when the job 
restart/start the maxNumOfWindow to be skipped can be set to  
maxOutOfOrder/(the length of the thumbling window), So that the late elements 
will not produce incorrect results. The num of the window need to be skipped is 
according to the degree of the out of order
2. We need to skip the serveral broken window data , and we dont know which 
window is broken, we can just detect which window is first fired and the serval 
window after this is broken too. The num should very from the production 
(according to the maxOutOfOrder & the length of the window )


> Add api to support for user to skip the first incomplete window data
> 
>
> Key: FLINK-8477
> URL: https://issues.apache.org/jira/browse/FLINK-8477
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.4.0, 1.3.2
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
> Fix For: 1.4.2
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5405: [FLINK-8477][Window]Add api to support user to skip serva...

2018-02-11 Thread Aitozi
Github user Aitozi commented on the issue:

https://github.com/apache/flink/pull/5405
  
Hi @aljoscha , you have mentioned two points : 
1. The events arrived may out of order in event-time processing 
2. We can use windowFunction or ProcessWindowFunction to filter serverl 
window by specify the start time of window and the endtime.

I have some differerent ideas: 
1. when we deal with the out-of-order eventtime stream, we may specify the 
maxOutOfOrder to avoid the too much late elements skipped, so when the job 
restart/start the maxNumOfWindow to be skipped can be set to  
maxOutOfOrder/(the length of the thumbling window), So that the late elements 
will not produce incorrect results. The num of the window need to be skipped is 
according to the degree of the out of order
2. We need to skip the serveral broken window data , and we dont know which 
window is broken, we can just detect which window is first fired and the serval 
window after this is broken too. The num should very from the production 
(according to the maxOutOfOrder & the length of the window )


---