[GitHub] [flink] flinkbot edited a comment on issue #9825: [FLINK-14299] Introduce ProcessMetricGroup

2019-10-02 Thread GitBox
flinkbot edited a comment on issue #9825: [FLINK-14299] Introduce 
ProcessMetricGroup
URL: https://github.com/apache/flink/pull/9825#issuecomment-536938289
 
 
   
   ## CI report:
   
   * 53c41b006ae2155206a7843b67721b598b3591d1 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/129823434)
   * 6039a68165fc94b780c8129bd01844ae7028e97a : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/130034403)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on issue #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-10-02 Thread GitBox
tillrohrmann commented on issue #9501: [FLINK-12697] [State Backends] Support 
on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#issuecomment-537509801
 
 
   > About the performance impact, lease refer to [the analysis of JDK CSLM 
implementation](https://docs.google.com/document/d/16VIY7o-18sM-pIlIYkbTuhKPmwfnqabCt_nlOARAzdg/edit#)
 and a compacted data structure we introduced for HBase to reduce GC pressure. 
Search for `Key space schema` and `Value space schema` in `SkipListUtils` and 
we could find a similar design here.
   
   Where exactly do I see the performance comparison?
   
   > About reusable object, it will add a lot of efforts/complexity making sure 
to prevent concurrent manipulation on it.
   
   Why would this be the case? The only accessing threads should be the Task's 
main thread and the asynchronous checkpointing, right? Couldn't we say that the 
asynchronous checkpointing creates one single instance and reuses this instance 
for the whole checkpointing procedure? One could make it even a thread local 
variable if one wants to have an easy solution. So I'm not sure where the 
argument comes from that using a thin wrapping object around a pointer will 
necessarily decrease performance.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-10-02 Thread GitBox
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r330567573
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMapTest.java
 ##
 @@ -0,0 +1,1448 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.StateEntry;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+import org.apache.flink.runtime.state.StateTransformationFunction;
+import org.apache.flink.runtime.state.heap.space.Allocator;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link CopyOnWriteSkipListStateMap}.
+ */
+public class CopyOnWriteSkipListStateMapTest extends TestLogger {
+
+   private TestAllocator spaceAllocator;
+
+   @Before
+   public void setUp() {
+   int maxAllocateSize = 256;
+   spaceAllocator = new TestAllocator(maxAllocateSize);
+   }
+
+   @After
+   public void tearDown() {
+   IOUtils.closeQuietly(spaceAllocator);
+   }
+
+   /**
+* Test initialization of state map.
+*/
+   @Test
+   public void testInitStateMap() {
+   TypeSerializer keySerializer = IntSerializer.INSTANCE;
+   TypeSerializer namespaceSerializer = 
LongSerializer.INSTANCE;
+   TypeSerializer stateSerializer = 
StringSerializer.INSTANCE;
+   CopyOnWriteSkipListStateMap stateMap = 
new CopyOnWriteSkipListStateMap<>(
+   keySerializer, namespaceSerializer, stateSerializer, 
spaceAllocator);
+
+   assertTrue(stateMap.isEmpty());
+   assertEquals(0, stateMap.size());
+   assertEquals(0, stateMap.totalSize());
+   assertEquals(0, stateMap.getRequestCount());
+   assertTrue(stateMap.getLogicallyRemovedNodes().isEmpty());
+   assertEquals(0, stateMap.getHighestRequiredSnapshotVersion());
+   assertEquals(0, stateMap.getHighestFinishedSnapshotVersion());
+   assertTrue(stateMap.getSnapshotVersions().isEmpty());
+   assertTrue(stateMap.getPruningValueNodes().isEmpty());
+   assertEquals(0, stateMap.getResourceGuard().getLeaseCount());
+   assertFalse(stateMap.getResourceGuard().isClosed());
+   assertFalse(stateMap.isClosed());
+
+   assertNull(stateMap.get(0, 0L));
+   

[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-10-02 Thread GitBox
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r330566832
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMapTest.java
 ##
 @@ -0,0 +1,1448 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.StateEntry;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+import org.apache.flink.runtime.state.StateTransformationFunction;
+import org.apache.flink.runtime.state.heap.space.Allocator;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link CopyOnWriteSkipListStateMap}.
+ */
+public class CopyOnWriteSkipListStateMapTest extends TestLogger {
+
+   private TestAllocator spaceAllocator;
+
+   @Before
+   public void setUp() {
+   int maxAllocateSize = 256;
+   spaceAllocator = new TestAllocator(maxAllocateSize);
+   }
+
+   @After
+   public void tearDown() {
+   IOUtils.closeQuietly(spaceAllocator);
+   }
+
+   /**
+* Test initialization of state map.
+*/
+   @Test
+   public void testInitStateMap() {
+   TypeSerializer keySerializer = IntSerializer.INSTANCE;
+   TypeSerializer namespaceSerializer = 
LongSerializer.INSTANCE;
+   TypeSerializer stateSerializer = 
StringSerializer.INSTANCE;
+   CopyOnWriteSkipListStateMap stateMap = 
new CopyOnWriteSkipListStateMap<>(
+   keySerializer, namespaceSerializer, stateSerializer, 
spaceAllocator);
+
+   assertTrue(stateMap.isEmpty());
+   assertEquals(0, stateMap.size());
+   assertEquals(0, stateMap.totalSize());
+   assertEquals(0, stateMap.getRequestCount());
+   assertTrue(stateMap.getLogicallyRemovedNodes().isEmpty());
+   assertEquals(0, stateMap.getHighestRequiredSnapshotVersion());
+   assertEquals(0, stateMap.getHighestFinishedSnapshotVersion());
+   assertTrue(stateMap.getSnapshotVersions().isEmpty());
+   assertTrue(stateMap.getPruningValueNodes().isEmpty());
+   assertEquals(0, stateMap.getResourceGuard().getLeaseCount());
+   assertFalse(stateMap.getResourceGuard().isClosed());
+   assertFalse(stateMap.isClosed());
+
+   assertNull(stateMap.get(0, 0L));
+   

[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-10-02 Thread GitBox
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r330565842
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMapTest.java
 ##
 @@ -0,0 +1,1448 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.StateEntry;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+import org.apache.flink.runtime.state.StateTransformationFunction;
+import org.apache.flink.runtime.state.heap.space.Allocator;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link CopyOnWriteSkipListStateMap}.
+ */
+public class CopyOnWriteSkipListStateMapTest extends TestLogger {
+
+   private TestAllocator spaceAllocator;
+
+   @Before
+   public void setUp() {
+   int maxAllocateSize = 256;
+   spaceAllocator = new TestAllocator(maxAllocateSize);
+   }
+
+   @After
+   public void tearDown() {
+   IOUtils.closeQuietly(spaceAllocator);
+   }
+
+   /**
+* Test initialization of state map.
+*/
+   @Test
+   public void testInitStateMap() {
+   TypeSerializer keySerializer = IntSerializer.INSTANCE;
+   TypeSerializer namespaceSerializer = 
LongSerializer.INSTANCE;
+   TypeSerializer stateSerializer = 
StringSerializer.INSTANCE;
+   CopyOnWriteSkipListStateMap stateMap = 
new CopyOnWriteSkipListStateMap<>(
+   keySerializer, namespaceSerializer, stateSerializer, 
spaceAllocator);
+
+   assertTrue(stateMap.isEmpty());
+   assertEquals(0, stateMap.size());
+   assertEquals(0, stateMap.totalSize());
+   assertEquals(0, stateMap.getRequestCount());
+   assertTrue(stateMap.getLogicallyRemovedNodes().isEmpty());
+   assertEquals(0, stateMap.getHighestRequiredSnapshotVersion());
+   assertEquals(0, stateMap.getHighestFinishedSnapshotVersion());
+   assertTrue(stateMap.getSnapshotVersions().isEmpty());
+   assertTrue(stateMap.getPruningValueNodes().isEmpty());
+   assertEquals(0, stateMap.getResourceGuard().getLeaseCount());
+   assertFalse(stateMap.getResourceGuard().isClosed());
+   assertFalse(stateMap.isClosed());
+
+   assertNull(stateMap.get(0, 0L));
+   

[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-10-02 Thread GitBox
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r330565842
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMapTest.java
 ##
 @@ -0,0 +1,1448 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.StateEntry;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+import org.apache.flink.runtime.state.StateTransformationFunction;
+import org.apache.flink.runtime.state.heap.space.Allocator;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link CopyOnWriteSkipListStateMap}.
+ */
+public class CopyOnWriteSkipListStateMapTest extends TestLogger {
+
+   private TestAllocator spaceAllocator;
+
+   @Before
+   public void setUp() {
+   int maxAllocateSize = 256;
+   spaceAllocator = new TestAllocator(maxAllocateSize);
+   }
+
+   @After
+   public void tearDown() {
+   IOUtils.closeQuietly(spaceAllocator);
+   }
+
+   /**
+* Test initialization of state map.
+*/
+   @Test
+   public void testInitStateMap() {
+   TypeSerializer keySerializer = IntSerializer.INSTANCE;
+   TypeSerializer namespaceSerializer = 
LongSerializer.INSTANCE;
+   TypeSerializer stateSerializer = 
StringSerializer.INSTANCE;
+   CopyOnWriteSkipListStateMap stateMap = 
new CopyOnWriteSkipListStateMap<>(
+   keySerializer, namespaceSerializer, stateSerializer, 
spaceAllocator);
+
+   assertTrue(stateMap.isEmpty());
+   assertEquals(0, stateMap.size());
+   assertEquals(0, stateMap.totalSize());
+   assertEquals(0, stateMap.getRequestCount());
+   assertTrue(stateMap.getLogicallyRemovedNodes().isEmpty());
+   assertEquals(0, stateMap.getHighestRequiredSnapshotVersion());
+   assertEquals(0, stateMap.getHighestFinishedSnapshotVersion());
+   assertTrue(stateMap.getSnapshotVersions().isEmpty());
+   assertTrue(stateMap.getPruningValueNodes().isEmpty());
+   assertEquals(0, stateMap.getResourceGuard().getLeaseCount());
+   assertFalse(stateMap.getResourceGuard().isClosed());
+   assertFalse(stateMap.isClosed());
+
+   assertNull(stateMap.get(0, 0L));
+   

[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-10-02 Thread GitBox
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r330565096
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/space/SpaceUtils.java
 ##
 @@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap.space;
+
+import static 
org.apache.flink.runtime.state.heap.space.Constants.FOUR_BYTES_BITS;
+import static 
org.apache.flink.runtime.state.heap.space.Constants.FOUR_BYTES_MARK;
+
+/**
+ * Utils.
+ */
+public class SpaceUtils {
+
+   public static int getChunkIdByAddress(long offset) {
+   return (int) ((offset >>> FOUR_BYTES_BITS) & FOUR_BYTES_MARK);
+   }
+
+   public static int getChunkOffsetByAddress(long offset) {
+   return (int) (offset & FOUR_BYTES_MARK);
+   }
+}
 
 Review comment:
   I would really be interested in learning what performance gain we actually 
achieve with it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-10-02 Thread GitBox
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r330564637
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/space/Allocator.java
 ##
 @@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap.space;
+
+import java.io.Closeable;
+
+/**
+ * Implementations are responsible for allocate space.
+ */
+public interface Allocator extends Closeable {
+
+   /**
+* Allocate space with the given size.
+*
+* @param size size of space to allocate.
+* @return address of the allocated space, or -1 when allocation is 
failed.
+*/
+   long allocate(int size);
 
 Review comment:
   I don't understand it. If the space allocation fails, then we should also 
fail the operation right? How exactly will this be caught? It looks as if we 
pass the return value of `allocate` unconditionally to other methods. Are you 
saying that these methods will simply swallow this value or fail at a different 
place?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-10-02 Thread GitBox
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r330561678
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMapSnapshot.java
 ##
 @@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+import org.apache.flink.util.ResourceGuard;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import static org.apache.flink.runtime.state.heap.SkipListUtils.HEAD_NODE;
+import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE;
+import static 
org.apache.flink.runtime.state.heap.SkipListUtils.NIL_VALUE_POINTER;
+
+/**
+ * This class represents the snapshot of a {@link CopyOnWriteSkipListStateMap}.
+ *
+ * @param  type of key
+ * @param  type of namespace
+ * @param  type of state
+ */
+public class CopyOnWriteSkipListStateMapSnapshot
+   extends StateMapSnapshot> 
{
+
+   /**
+* Version of the {@link CopyOnWriteSkipListStateMap} when this 
snapshot was created. This can be used to release the snapshot.
+*/
+   private final int snapshotVersion;
+
+   /** The number of (non-null) entries in snapshotData. */
+   @Nonnegative
+   private final int numberOfEntriesInSnapshotData;
+
+   /**
+* This lease protects the state map resources.
+*/
+   private final ResourceGuard.Lease lease;
+
+   /**
+* Creates a new {@link CopyOnWriteSkipListStateMap}.
+*
+* @param owningStateMap the {@link CopyOnWriteSkipListStateMap} for 
which this object represents a snapshot.
+* @param lease the lease protects the state map resources.
+*/
+   CopyOnWriteSkipListStateMapSnapshot(
+   CopyOnWriteSkipListStateMap owningStateMap,
+   ResourceGuard.Lease lease) {
+   super(owningStateMap);
+
+   this.snapshotVersion = owningStateMap.getStateMapVersion();
+   this.numberOfEntriesInSnapshotData = owningStateMap.size();
+   this.lease = lease;
+   }
+
+   /**
+* Returns the internal version of the when this snapshot was created.
+*/
+   int getSnapshotVersion() {
+   return snapshotVersion;
+   }
+
+   @Override
+   public void release() {
+   owningStateMap.releaseSnapshot(this);
+   lease.close();
+   }
+
+   @Override
+   public void writeState(
+   TypeSerializer keySerializer,
+   TypeSerializer namespaceSerializer,
+   TypeSerializer stateSerializer,
+   @Nonnull DataOutputView dov,
+   @Nullable StateSnapshotTransformer stateSnapshotTransformer) 
throws IOException {
+   if (stateSnapshotTransformer == null) {
+   writeStateWithNoTransform(dov);
+   } else {
+   writeStateWithTransform(stateSerializer, dov, 
stateSnapshotTransformer);
+   }
+   }
+
+   private void writeStateWithNoTransform(@Nonnull DataOutputView dov) 
throws IOException {
+   dov.writeInt(numberOfEntriesInSnapshotData);
+   SnapshotNodeIterator nodeIterator = new 
SnapshotNodeIterator(true);
+   while (nodeIterator.hasNext()) {
+   Tuple2 tuple = nodeIterator.next();
+   writeKeyAndNamespace(tuple.f0, dov);
+   writeValue(tuple.f1, dov);
+   }
+   }
+
+   private void writeStateWithTransform(
+   TypeSerializer stateSerializer,
+   @Nonnull 

[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-10-02 Thread GitBox
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r330561193
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java
 ##
 @@ -0,0 +1,1527 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.ByteBufferUtils;
+import org.apache.flink.runtime.state.StateEntry;
+import org.apache.flink.runtime.state.StateTransformationFunction;
+import org.apache.flink.runtime.state.heap.space.Allocator;
+import org.apache.flink.runtime.state.heap.space.Chunk;
+import org.apache.flink.runtime.state.heap.space.SpaceUtils;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ResourceGuard;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.Spliterators;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import static org.apache.flink.runtime.state.heap.SkipListUtils.HEAD_NODE;
+import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE;
+import static 
org.apache.flink.runtime.state.heap.SkipListUtils.NIL_VALUE_POINTER;
+
+/**
+ * Implementation of state map which is based on skip list with copy-on-write 
support. states will
+ * be serialized to bytes and stored in the space allocated with the given 
allocator.
+ *
+ * @param  type of key
+ * @param  type of namespace
+ * @param  type of state
+ */
+public class CopyOnWriteSkipListStateMap extends StateMap 
implements AutoCloseable {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(CopyOnWriteSkipListStateMap.class);
+
+   /**
+* Default max number of logically-removed keys to delete one time.
+*/
+   private static final int DEFAULT_MAX_KEYS_TO_DELETE_ONE_TIME = 3;
+
+   /**
+* Default ratio of the logically-removed keys to trigger deletion when 
snapshot.
+*/
+   private static final float DEFAULT_LOGICAL_REMOVED_KEYS_RATIO = 0.2f;
+
+   /**
+* The serializer used to serialize the key and namespace to bytes 
stored in skip list.
+*/
+   private final SkipListKeySerializer skipListKeySerializer;
+
+   /**
+* The serializer used to serialize the state to bytes stored in skip 
list.
+*/
+   private final SkipListValueSerializer skipListValueSerializer;
+
+   /**
+* Space allocator.
+*/
+   private final Allocator spaceAllocator;
+
+   /**
+* The level index header.
+*/
+   private final LevelIndexHeader levelIndexHeader;
+
+   /**
+* Seed to generate random index level.
+*/
+   private int randomSeed;
+
+   /**
+* The current version of this map. Used for copy-on-write mechanics.
+*/
+   private int stateMapVersion;
+
+   /**
+* The highest version of this map that is still required by any 
unreleased snapshot.
+*/
+   private int highestRequiredSnapshotVersion;
+
+   /**
+* Snapshots no more than this version must have been finished, but 
there may be some
+* snapshots more than this version are still running.
+*/
+   private volatile int highestFinishedSnapshotVersion;
+
+   /**
+* Maintains an ordered set of version ids that are 

[GitHub] [flink] flinkbot edited a comment on issue #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator

2019-10-02 Thread GitBox
flinkbot edited a comment on issue #9717: [FLINK-14044] [runtime] Reducing 
synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#issuecomment-533068819
 
 
   
   ## CI report:
   
   * 9cfda801891969ac460f45ea639d636b519f22db : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/128317194)
   * af7f6be848f0bd90a049d5ee9f7a38b1c3e2b972 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/128336908)
   * 43157eb165d9409fbdf4a2f773ef7d52dd74e759 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/128365390)
   * ff400d149ed36c63a90d2f3fcd517bde738e5af1 : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/128402339)
   * 02dbf61e50e6c913610df7f586d7eb0f20529c13 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/128406923)
   * 68cf941c7e165a5c6b2a27f7ac716b29f76d1918 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/128472867)
   * 54aa0043ec73b03580eda2e0310b44bdee352a55 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/128495989)
   * e081ea5c536d6ebb828eb8b68c404379d23f5aef : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/128502020)
   * 911bdd80f4b1cbb6cee2ffb517cafc5e3c83ee0b : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/128622324)
   * 40fd94b291968062eec85a9436a3a755914d282a : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/128904653)
   * e60ea55f913143611d9162fd79a75cfa575aa9f0 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/128907420)
   * c2c7b2cc91961bc2c8707f9077c4d66ea6535b8e : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/129102072)
   * 1048cb10486f94983a52582953379afa02a3b350 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/129122517)
   * 773eb07af5ac848ec70cd32663ebf657b0419f3e : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/129823477)
   * 7c4191061dbacaf142f2b3e6fa8dec30d7e16595 : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/130030060)
   * d8cee5bd77379a05ba6a501444f7285e73a4be43 : UNKNOWN
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-10-02 Thread GitBox
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r330560285
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java
 ##
 @@ -0,0 +1,1527 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.ByteBufferUtils;
+import org.apache.flink.runtime.state.StateEntry;
+import org.apache.flink.runtime.state.StateTransformationFunction;
+import org.apache.flink.runtime.state.heap.space.Allocator;
+import org.apache.flink.runtime.state.heap.space.Chunk;
+import org.apache.flink.runtime.state.heap.space.SpaceUtils;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ResourceGuard;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.Spliterators;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import static org.apache.flink.runtime.state.heap.SkipListUtils.HEAD_NODE;
+import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE;
+import static 
org.apache.flink.runtime.state.heap.SkipListUtils.NIL_VALUE_POINTER;
+
+/**
+ * Implementation of state map which is based on skip list with copy-on-write 
support. states will
+ * be serialized to bytes and stored in the space allocated with the given 
allocator.
+ *
+ * @param  type of key
+ * @param  type of namespace
+ * @param  type of state
+ */
+public class CopyOnWriteSkipListStateMap extends StateMap 
implements AutoCloseable {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(CopyOnWriteSkipListStateMap.class);
+
+   /**
+* Default max number of logically-removed keys to delete one time.
+*/
+   private static final int DEFAULT_MAX_KEYS_TO_DELETE_ONE_TIME = 3;
+
+   /**
+* Default ratio of the logically-removed keys to trigger deletion when 
snapshot.
+*/
+   private static final float DEFAULT_LOGICAL_REMOVED_KEYS_RATIO = 0.2f;
+
+   /**
+* The serializer used to serialize the key and namespace to bytes 
stored in skip list.
+*/
+   private final SkipListKeySerializer skipListKeySerializer;
+
+   /**
+* The serializer used to serialize the state to bytes stored in skip 
list.
+*/
+   private final SkipListValueSerializer skipListValueSerializer;
+
+   /**
+* Space allocator.
+*/
+   private final Allocator spaceAllocator;
+
+   /**
+* The level index header.
+*/
+   private final LevelIndexHeader levelIndexHeader;
+
+   /**
+* Seed to generate random index level.
+*/
+   private int randomSeed;
+
+   /**
+* The current version of this map. Used for copy-on-write mechanics.
+*/
+   private int stateMapVersion;
+
+   /**
+* The highest version of this map that is still required by any 
unreleased snapshot.
+*/
+   private int highestRequiredSnapshotVersion;
+
+   /**
+* Snapshots no more than this version must have been finished, but 
there may be some
+* snapshots more than this version are still running.
+*/
+   private volatile int highestFinishedSnapshotVersion;
+
+   /**
+* Maintains an ordered set of version ids that are 

[GitHub] [flink] flinkbot edited a comment on issue #9834: [bp-1.9][FLINK-13827][script] shell variable should be escaped

2019-10-02 Thread GitBox
flinkbot edited a comment on issue #9834: [bp-1.9][FLINK-13827][script] shell 
variable should be escaped
URL: https://github.com/apache/flink/pull/9834#issuecomment-537478603
 
 
   
   ## CI report:
   
   * 65dff9edaba9515d520fdefa5f4b9a7361d3e92f : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/130026021)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9827: [FLINK-14303][metrics] Replace JobManagerMetricGroup with ResourceManagerMetricGroup in ResourceManager

2019-10-02 Thread GitBox
flinkbot edited a comment on issue #9827: [FLINK-14303][metrics] Replace 
JobManagerMetricGroup with ResourceManagerMetricGroup in ResourceManager
URL: https://github.com/apache/flink/pull/9827#issuecomment-536948437
 
 
   
   ## CI report:
   
   * fa367aaeb2089366de1d9497f1bbec9b11d75e72 : UNKNOWN
   * f865d19c9a7c1530d06d194652582ebe7257176a : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/129826911)
   * 53201b462b67f199c628367918e19c475204765d : UNKNOWN
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9828: [FLINK-14305] Transfer ownership of JobManagerMetricGroup to Dispatcher

2019-10-02 Thread GitBox
flinkbot edited a comment on issue #9828: [FLINK-14305] Transfer ownership of 
JobManagerMetricGroup to Dispatcher
URL: https://github.com/apache/flink/pull/9828#issuecomment-536958155
 
 
   
   ## CI report:
   
   * 582fcdf7a275b9416d2e5f947352bf1310ff5d88 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/129830601)
   * 12bfe6e05cc2cf1e92d4f99ac9d41964026388ee : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/129854671)
   * 7529618b93035b1238efb7795813a2f550cda04a : UNKNOWN
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-10-02 Thread GitBox
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r330557424
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java
 ##
 @@ -0,0 +1,1527 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.ByteBufferUtils;
+import org.apache.flink.runtime.state.StateEntry;
+import org.apache.flink.runtime.state.StateTransformationFunction;
+import org.apache.flink.runtime.state.heap.space.Allocator;
+import org.apache.flink.runtime.state.heap.space.Chunk;
+import org.apache.flink.runtime.state.heap.space.SpaceUtils;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ResourceGuard;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.Spliterators;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import static org.apache.flink.runtime.state.heap.SkipListUtils.HEAD_NODE;
+import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE;
+import static 
org.apache.flink.runtime.state.heap.SkipListUtils.NIL_VALUE_POINTER;
+
+/**
+ * Implementation of state map which is based on skip list with copy-on-write 
support. states will
+ * be serialized to bytes and stored in the space allocated with the given 
allocator.
+ *
+ * @param  type of key
+ * @param  type of namespace
+ * @param  type of state
+ */
+public class CopyOnWriteSkipListStateMap extends StateMap 
implements AutoCloseable {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(CopyOnWriteSkipListStateMap.class);
+
+   /**
+* Default max number of logically-removed keys to delete one time.
+*/
+   private static final int DEFAULT_MAX_KEYS_TO_DELETE_ONE_TIME = 3;
+
+   /**
+* Default ratio of the logically-removed keys to trigger deletion when 
snapshot.
+*/
+   private static final float DEFAULT_LOGICAL_REMOVED_KEYS_RATIO = 0.2f;
+
+   /**
+* The serializer used to serialize the key and namespace to bytes 
stored in skip list.
+*/
+   private final SkipListKeySerializer skipListKeySerializer;
+
+   /**
+* The serializer used to serialize the state to bytes stored in skip 
list.
+*/
+   private final SkipListValueSerializer skipListValueSerializer;
+
+   /**
+* Space allocator.
+*/
+   private final Allocator spaceAllocator;
+
+   /**
+* The level index header.
+*/
+   private final LevelIndexHeader levelIndexHeader;
+
+   /**
+* Seed to generate random index level.
+*/
+   private int randomSeed;
+
+   /**
+* The current version of this map. Used for copy-on-write mechanics.
+*/
+   private int stateMapVersion;
+
+   /**
+* The highest version of this map that is still required by any 
unreleased snapshot.
+*/
+   private int highestRequiredSnapshotVersion;
+
+   /**
+* Snapshots no more than this version must have been finished, but 
there may be some
+* snapshots more than this version are still running.
+*/
+   private volatile int highestFinishedSnapshotVersion;
+
+   /**
+* Maintains an ordered set of version ids that are 

[GitHub] [flink] flinkbot edited a comment on issue #9825: [FLINK-14299] Introduce ProcessMetricGroup

2019-10-02 Thread GitBox
flinkbot edited a comment on issue #9825: [FLINK-14299] Introduce 
ProcessMetricGroup
URL: https://github.com/apache/flink/pull/9825#issuecomment-536938289
 
 
   
   ## CI report:
   
   * 53c41b006ae2155206a7843b67721b598b3591d1 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/129823434)
   * 6039a68165fc94b780c8129bd01844ae7028e97a : UNKNOWN
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-10-02 Thread GitBox
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r33054
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java
 ##
 @@ -0,0 +1,1527 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.ByteBufferUtils;
+import org.apache.flink.runtime.state.StateEntry;
+import org.apache.flink.runtime.state.StateTransformationFunction;
+import org.apache.flink.runtime.state.heap.space.Allocator;
+import org.apache.flink.runtime.state.heap.space.Chunk;
+import org.apache.flink.runtime.state.heap.space.SpaceUtils;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ResourceGuard;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.Spliterators;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import static org.apache.flink.runtime.state.heap.SkipListUtils.HEAD_NODE;
+import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE;
+import static 
org.apache.flink.runtime.state.heap.SkipListUtils.NIL_VALUE_POINTER;
+
+/**
+ * Implementation of state map which is based on skip list with copy-on-write 
support. states will
+ * be serialized to bytes and stored in the space allocated with the given 
allocator.
+ *
+ * @param  type of key
+ * @param  type of namespace
+ * @param  type of state
+ */
+public class CopyOnWriteSkipListStateMap extends StateMap 
implements AutoCloseable {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(CopyOnWriteSkipListStateMap.class);
+
+   /**
+* Default max number of logically-removed keys to delete one time.
+*/
+   private static final int DEFAULT_MAX_KEYS_TO_DELETE_ONE_TIME = 3;
+
+   /**
+* Default ratio of the logically-removed keys to trigger deletion when 
snapshot.
+*/
+   private static final float DEFAULT_LOGICAL_REMOVED_KEYS_RATIO = 0.2f;
+
+   /**
+* The serializer used to serialize the key and namespace to bytes 
stored in skip list.
+*/
+   private final SkipListKeySerializer skipListKeySerializer;
+
+   /**
+* The serializer used to serialize the state to bytes stored in skip 
list.
+*/
+   private final SkipListValueSerializer skipListValueSerializer;
+
+   /**
+* Space allocator.
+*/
+   private final Allocator spaceAllocator;
+
+   /**
+* The level index header.
+*/
+   private final LevelIndexHeader levelIndexHeader;
+
+   /**
+* Seed to generate random index level.
+*/
+   private int randomSeed;
+
+   /**
+* The current version of this map. Used for copy-on-write mechanics.
+*/
+   private int stateMapVersion;
+
+   /**
+* The highest version of this map that is still required by any 
unreleased snapshot.
+*/
+   private int highestRequiredSnapshotVersion;
+
+   /**
+* Snapshots no more than this version must have been finished, but 
there may be some
+* snapshots more than this version are still running.
+*/
+   private volatile int highestFinishedSnapshotVersion;
+
+   /**
+* Maintains an ordered set of version ids that are 

[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-10-02 Thread GitBox
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r330554852
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java
 ##
 @@ -0,0 +1,1527 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.ByteBufferUtils;
+import org.apache.flink.runtime.state.StateEntry;
+import org.apache.flink.runtime.state.StateTransformationFunction;
+import org.apache.flink.runtime.state.heap.space.Allocator;
+import org.apache.flink.runtime.state.heap.space.Chunk;
+import org.apache.flink.runtime.state.heap.space.SpaceUtils;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ResourceGuard;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.Spliterators;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import static org.apache.flink.runtime.state.heap.SkipListUtils.HEAD_NODE;
+import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE;
+import static 
org.apache.flink.runtime.state.heap.SkipListUtils.NIL_VALUE_POINTER;
+
+/**
+ * Implementation of state map which is based on skip list with copy-on-write 
support. states will
+ * be serialized to bytes and stored in the space allocated with the given 
allocator.
+ *
+ * @param  type of key
+ * @param  type of namespace
+ * @param  type of state
+ */
+public class CopyOnWriteSkipListStateMap extends StateMap 
implements AutoCloseable {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(CopyOnWriteSkipListStateMap.class);
+
+   /**
+* Default max number of logically-removed keys to delete one time.
+*/
+   private static final int DEFAULT_MAX_KEYS_TO_DELETE_ONE_TIME = 3;
+
+   /**
+* Default ratio of the logically-removed keys to trigger deletion when 
snapshot.
+*/
+   private static final float DEFAULT_LOGICAL_REMOVED_KEYS_RATIO = 0.2f;
+
+   /**
+* The serializer used to serialize the key and namespace to bytes 
stored in skip list.
+*/
+   private final SkipListKeySerializer skipListKeySerializer;
+
+   /**
+* The serializer used to serialize the state to bytes stored in skip 
list.
+*/
+   private final SkipListValueSerializer skipListValueSerializer;
+
+   /**
+* Space allocator.
+*/
+   private final Allocator spaceAllocator;
+
+   /**
+* The level index header.
+*/
+   private final LevelIndexHeader levelIndexHeader;
+
+   /**
+* Seed to generate random index level.
+*/
+   private int randomSeed;
+
+   /**
+* The current version of this map. Used for copy-on-write mechanics.
+*/
+   private int stateMapVersion;
+
+   /**
+* The highest version of this map that is still required by any 
unreleased snapshot.
+*/
+   private int highestRequiredSnapshotVersion;
+
+   /**
+* Snapshots no more than this version must have been finished, but 
there may be some
+* snapshots more than this version are still running.
+*/
+   private volatile int highestFinishedSnapshotVersion;
+
+   /**
+* Maintains an ordered set of version ids that are 

[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-10-02 Thread GitBox
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r330554289
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java
 ##
 @@ -0,0 +1,1527 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.ByteBufferUtils;
+import org.apache.flink.runtime.state.StateEntry;
+import org.apache.flink.runtime.state.StateTransformationFunction;
+import org.apache.flink.runtime.state.heap.space.Allocator;
+import org.apache.flink.runtime.state.heap.space.Chunk;
+import org.apache.flink.runtime.state.heap.space.SpaceUtils;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ResourceGuard;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.Spliterators;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import static org.apache.flink.runtime.state.heap.SkipListUtils.HEAD_NODE;
+import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE;
+import static 
org.apache.flink.runtime.state.heap.SkipListUtils.NIL_VALUE_POINTER;
+
+/**
+ * Implementation of state map which is based on skip list with copy-on-write 
support. states will
+ * be serialized to bytes and stored in the space allocated with the given 
allocator.
+ *
+ * @param  type of key
+ * @param  type of namespace
+ * @param  type of state
+ */
+public class CopyOnWriteSkipListStateMap extends StateMap 
implements AutoCloseable {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(CopyOnWriteSkipListStateMap.class);
+
+   /**
+* Default max number of logically-removed keys to delete one time.
+*/
+   private static final int DEFAULT_MAX_KEYS_TO_DELETE_ONE_TIME = 3;
+
+   /**
+* Default ratio of the logically-removed keys to trigger deletion when 
snapshot.
+*/
+   private static final float DEFAULT_LOGICAL_REMOVED_KEYS_RATIO = 0.2f;
+
+   /**
+* The serializer used to serialize the key and namespace to bytes 
stored in skip list.
+*/
+   private final SkipListKeySerializer skipListKeySerializer;
+
+   /**
+* The serializer used to serialize the state to bytes stored in skip 
list.
+*/
+   private final SkipListValueSerializer skipListValueSerializer;
+
+   /**
+* Space allocator.
+*/
+   private final Allocator spaceAllocator;
+
+   /**
+* The level index header.
+*/
+   private final LevelIndexHeader levelIndexHeader;
+
+   /**
+* Seed to generate random index level.
+*/
+   private int randomSeed;
+
+   /**
+* The current version of this map. Used for copy-on-write mechanics.
+*/
+   private int stateMapVersion;
+
+   /**
+* The highest version of this map that is still required by any 
unreleased snapshot.
+*/
+   private int highestRequiredSnapshotVersion;
+
+   /**
+* Snapshots no more than this version must have been finished, but 
there may be some
+* snapshots more than this version are still running.
+*/
+   private volatile int highestFinishedSnapshotVersion;
+
+   /**
+* Maintains an ordered set of version ids that are 

[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-10-02 Thread GitBox
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r330552051
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java
 ##
 @@ -0,0 +1,1527 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.ByteBufferUtils;
+import org.apache.flink.runtime.state.StateEntry;
+import org.apache.flink.runtime.state.StateTransformationFunction;
+import org.apache.flink.runtime.state.heap.space.Allocator;
+import org.apache.flink.runtime.state.heap.space.Chunk;
+import org.apache.flink.runtime.state.heap.space.SpaceUtils;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ResourceGuard;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.Spliterators;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import static org.apache.flink.runtime.state.heap.SkipListUtils.HEAD_NODE;
+import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE;
+import static 
org.apache.flink.runtime.state.heap.SkipListUtils.NIL_VALUE_POINTER;
+
+/**
+ * Implementation of state map which is based on skip list with copy-on-write 
support. states will
+ * be serialized to bytes and stored in the space allocated with the given 
allocator.
+ *
+ * @param  type of key
+ * @param  type of namespace
+ * @param  type of state
+ */
+public class CopyOnWriteSkipListStateMap extends StateMap 
implements AutoCloseable {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(CopyOnWriteSkipListStateMap.class);
+
+   /**
+* Default max number of logically-removed keys to delete one time.
+*/
+   private static final int DEFAULT_MAX_KEYS_TO_DELETE_ONE_TIME = 3;
+
+   /**
+* Default ratio of the logically-removed keys to trigger deletion when 
snapshot.
+*/
+   private static final float DEFAULT_LOGICAL_REMOVED_KEYS_RATIO = 0.2f;
+
+   /**
+* The serializer used to serialize the key and namespace to bytes 
stored in skip list.
+*/
+   private final SkipListKeySerializer skipListKeySerializer;
+
+   /**
+* The serializer used to serialize the state to bytes stored in skip 
list.
+*/
+   private final SkipListValueSerializer skipListValueSerializer;
+
+   /**
+* Space allocator.
+*/
+   private final Allocator spaceAllocator;
+
+   /**
+* The level index header.
+*/
+   private final LevelIndexHeader levelIndexHeader;
+
+   /**
+* Seed to generate random index level.
+*/
+   private int randomSeed;
+
+   /**
+* The current version of this map. Used for copy-on-write mechanics.
+*/
+   private int stateMapVersion;
+
+   /**
+* The highest version of this map that is still required by any 
unreleased snapshot.
+*/
+   private int highestRequiredSnapshotVersion;
+
+   /**
+* Snapshots no more than this version must have been finished, but 
there may be some
+* snapshots more than this version are still running.
+*/
+   private volatile int highestFinishedSnapshotVersion;
+
+   /**
+* Maintains an ordered set of version ids that are 

[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-10-02 Thread GitBox
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r330552051
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java
 ##
 @@ -0,0 +1,1527 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.ByteBufferUtils;
+import org.apache.flink.runtime.state.StateEntry;
+import org.apache.flink.runtime.state.StateTransformationFunction;
+import org.apache.flink.runtime.state.heap.space.Allocator;
+import org.apache.flink.runtime.state.heap.space.Chunk;
+import org.apache.flink.runtime.state.heap.space.SpaceUtils;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ResourceGuard;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.Spliterators;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import static org.apache.flink.runtime.state.heap.SkipListUtils.HEAD_NODE;
+import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE;
+import static 
org.apache.flink.runtime.state.heap.SkipListUtils.NIL_VALUE_POINTER;
+
+/**
+ * Implementation of state map which is based on skip list with copy-on-write 
support. states will
+ * be serialized to bytes and stored in the space allocated with the given 
allocator.
+ *
+ * @param  type of key
+ * @param  type of namespace
+ * @param  type of state
+ */
+public class CopyOnWriteSkipListStateMap extends StateMap 
implements AutoCloseable {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(CopyOnWriteSkipListStateMap.class);
+
+   /**
+* Default max number of logically-removed keys to delete one time.
+*/
+   private static final int DEFAULT_MAX_KEYS_TO_DELETE_ONE_TIME = 3;
+
+   /**
+* Default ratio of the logically-removed keys to trigger deletion when 
snapshot.
+*/
+   private static final float DEFAULT_LOGICAL_REMOVED_KEYS_RATIO = 0.2f;
+
+   /**
+* The serializer used to serialize the key and namespace to bytes 
stored in skip list.
+*/
+   private final SkipListKeySerializer skipListKeySerializer;
+
+   /**
+* The serializer used to serialize the state to bytes stored in skip 
list.
+*/
+   private final SkipListValueSerializer skipListValueSerializer;
+
+   /**
+* Space allocator.
+*/
+   private final Allocator spaceAllocator;
+
+   /**
+* The level index header.
+*/
+   private final LevelIndexHeader levelIndexHeader;
+
+   /**
+* Seed to generate random index level.
+*/
+   private int randomSeed;
+
+   /**
+* The current version of this map. Used for copy-on-write mechanics.
+*/
+   private int stateMapVersion;
+
+   /**
+* The highest version of this map that is still required by any 
unreleased snapshot.
+*/
+   private int highestRequiredSnapshotVersion;
+
+   /**
+* Snapshots no more than this version must have been finished, but 
there may be some
+* snapshots more than this version are still running.
+*/
+   private volatile int highestFinishedSnapshotVersion;
+
+   /**
+* Maintains an ordered set of version ids that are 

[GitHub] [flink] 1u0 commented on a change in pull request #9735: [FLINK-14156][runtime] Submit timer trigger letters to task's mailbox with operator's precedence

2019-10-02 Thread GitBox
1u0 commented on a change in pull request #9735: [FLINK-14156][runtime] Submit 
timer trigger letters to task's mailbox with operator's precedence
URL: https://github.com/apache/flink/pull/9735#discussion_r330551710
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskOperatorTimerTest.java
 ##
 @@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.YieldingOperatorFactory;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
+import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import 
org.apache.flink.streaming.runtime.tasks.mailbox.execution.MailboxExecutor;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+/**
+ * Test to verify that timer triggers are run according to operator precedence 
(combined with yield() at operator level).
+ */
+public class StreamTaskOperatorTimerTest extends TestLogger {
+   private static List events;
 
 Review comment:
   I don't like usage of static (mutable) variables myself (including in 
tests), but I don't see other way how to get "feedback" from the test operators 
here.
   I'm open if you have a suggestion how to avoid it here.
   
   The mailbox is in a different situation, as it's provided by normal 
(production) code by the runtime.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-10-02 Thread GitBox
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r330551180
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java
 ##
 @@ -0,0 +1,1527 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.ByteBufferUtils;
+import org.apache.flink.runtime.state.StateEntry;
+import org.apache.flink.runtime.state.StateTransformationFunction;
+import org.apache.flink.runtime.state.heap.space.Allocator;
+import org.apache.flink.runtime.state.heap.space.Chunk;
+import org.apache.flink.runtime.state.heap.space.SpaceUtils;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ResourceGuard;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.Spliterators;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import static org.apache.flink.runtime.state.heap.SkipListUtils.HEAD_NODE;
+import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE;
+import static 
org.apache.flink.runtime.state.heap.SkipListUtils.NIL_VALUE_POINTER;
+
+/**
+ * Implementation of state map which is based on skip list with copy-on-write 
support. states will
+ * be serialized to bytes and stored in the space allocated with the given 
allocator.
+ *
+ * @param  type of key
+ * @param  type of namespace
+ * @param  type of state
+ */
+public class CopyOnWriteSkipListStateMap extends StateMap 
implements AutoCloseable {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(CopyOnWriteSkipListStateMap.class);
+
+   /**
+* Default max number of logically-removed keys to delete one time.
+*/
+   private static final int DEFAULT_MAX_KEYS_TO_DELETE_ONE_TIME = 3;
+
+   /**
+* Default ratio of the logically-removed keys to trigger deletion when 
snapshot.
+*/
+   private static final float DEFAULT_LOGICAL_REMOVED_KEYS_RATIO = 0.2f;
+
+   /**
+* The serializer used to serialize the key and namespace to bytes 
stored in skip list.
+*/
+   private final SkipListKeySerializer skipListKeySerializer;
+
+   /**
+* The serializer used to serialize the state to bytes stored in skip 
list.
+*/
+   private final SkipListValueSerializer skipListValueSerializer;
+
+   /**
+* Space allocator.
+*/
+   private final Allocator spaceAllocator;
+
+   /**
+* The level index header.
+*/
+   private final LevelIndexHeader levelIndexHeader;
+
+   /**
+* Seed to generate random index level.
+*/
+   private int randomSeed;
+
+   /**
+* The current version of this map. Used for copy-on-write mechanics.
+*/
+   private int stateMapVersion;
+
+   /**
+* The highest version of this map that is still required by any 
unreleased snapshot.
+*/
+   private int highestRequiredSnapshotVersion;
+
+   /**
+* Snapshots no more than this version must have been finished, but 
there may be some
+* snapshots more than this version are still running.
+*/
+   private volatile int highestFinishedSnapshotVersion;
+
+   /**
+* Maintains an ordered set of version ids that are 

[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-10-02 Thread GitBox
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r330550525
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java
 ##
 @@ -0,0 +1,1527 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.ByteBufferUtils;
+import org.apache.flink.runtime.state.StateEntry;
+import org.apache.flink.runtime.state.StateTransformationFunction;
+import org.apache.flink.runtime.state.heap.space.Allocator;
+import org.apache.flink.runtime.state.heap.space.Chunk;
+import org.apache.flink.runtime.state.heap.space.SpaceUtils;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ResourceGuard;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.Spliterators;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import static org.apache.flink.runtime.state.heap.SkipListUtils.HEAD_NODE;
+import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE;
+import static 
org.apache.flink.runtime.state.heap.SkipListUtils.NIL_VALUE_POINTER;
+
+/**
+ * Implementation of state map which is based on skip list with copy-on-write 
support. states will
+ * be serialized to bytes and stored in the space allocated with the given 
allocator.
+ *
+ * @param  type of key
+ * @param  type of namespace
+ * @param  type of state
+ */
+public class CopyOnWriteSkipListStateMap extends StateMap 
implements AutoCloseable {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(CopyOnWriteSkipListStateMap.class);
+
+   /**
+* Default max number of logically-removed keys to delete one time.
+*/
+   private static final int DEFAULT_MAX_KEYS_TO_DELETE_ONE_TIME = 3;
+
+   /**
+* Default ratio of the logically-removed keys to trigger deletion when 
snapshot.
+*/
+   private static final float DEFAULT_LOGICAL_REMOVED_KEYS_RATIO = 0.2f;
+
+   /**
+* The serializer used to serialize the key and namespace to bytes 
stored in skip list.
+*/
+   private final SkipListKeySerializer skipListKeySerializer;
+
+   /**
+* The serializer used to serialize the state to bytes stored in skip 
list.
+*/
+   private final SkipListValueSerializer skipListValueSerializer;
+
+   /**
+* Space allocator.
+*/
+   private final Allocator spaceAllocator;
+
+   /**
+* The level index header.
+*/
+   private final LevelIndexHeader levelIndexHeader;
+
+   /**
+* Seed to generate random index level.
+*/
+   private int randomSeed;
+
+   /**
+* The current version of this map. Used for copy-on-write mechanics.
+*/
+   private int stateMapVersion;
+
+   /**
+* The highest version of this map that is still required by any 
unreleased snapshot.
+*/
+   private int highestRequiredSnapshotVersion;
+
+   /**
+* Snapshots no more than this version must have been finished, but 
there may be some
+* snapshots more than this version are still running.
+*/
+   private volatile int highestFinishedSnapshotVersion;
+
+   /**
+* Maintains an ordered set of version ids that are 

[GitHub] [flink] 1u0 commented on a change in pull request #9735: [FLINK-14156][runtime] Submit timer trigger letters to task's mailbox with operator's precedence

2019-10-02 Thread GitBox
1u0 commented on a change in pull request #9735: [FLINK-14156][runtime] Submit 
timer trigger letters to task's mailbox with operator's precedence
URL: https://github.com/apache/flink/pull/9735#discussion_r330549295
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
 ##
 @@ -250,37 +250,36 @@ int getNumTasksScheduled() {
// 

 
/**
-* A context to which {@link ProcessingTimeCallback} would be passed to 
be invoked when a timer is up.
+* An exception handler, called when {@link ProcessingTimeCallback} 
throws an exception.
 */
-   interface ScheduledCallbackExecutionContext {
-
-   void invoke(ProcessingTimeCallback callback, long timestamp);
+   interface ExceptionHandler {
 
 Review comment:
   I would not use `org.apache.flink.runtime.operators.sort.ExceptionHandler` 
or `AsyncExceptionHandler` here, as they are both used in their own use case 
(and were probably introduced on their own to have separation). Reusing them 
here would be a false code reuse, imo.
   
   I see a value to have a generic interface defined somewhere in 
`org.apache.flink.util` (in `flink-core`?).
   But even for the generic interface, the question is what should be the api 
(currently, all three mentioned interfaces have some small different details).


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-10-02 Thread GitBox
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r330548923
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java
 ##
 @@ -0,0 +1,1527 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.ByteBufferUtils;
+import org.apache.flink.runtime.state.StateEntry;
+import org.apache.flink.runtime.state.StateTransformationFunction;
+import org.apache.flink.runtime.state.heap.space.Allocator;
+import org.apache.flink.runtime.state.heap.space.Chunk;
+import org.apache.flink.runtime.state.heap.space.SpaceUtils;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ResourceGuard;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.Spliterators;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import static org.apache.flink.runtime.state.heap.SkipListUtils.HEAD_NODE;
+import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE;
+import static 
org.apache.flink.runtime.state.heap.SkipListUtils.NIL_VALUE_POINTER;
+
+/**
+ * Implementation of state map which is based on skip list with copy-on-write 
support. states will
+ * be serialized to bytes and stored in the space allocated with the given 
allocator.
+ *
+ * @param  type of key
+ * @param  type of namespace
+ * @param  type of state
+ */
+public class CopyOnWriteSkipListStateMap extends StateMap 
implements AutoCloseable {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(CopyOnWriteSkipListStateMap.class);
+
+   /**
+* Default max number of logically-removed keys to delete one time.
+*/
+   private static final int DEFAULT_MAX_KEYS_TO_DELETE_ONE_TIME = 3;
+
+   /**
+* Default ratio of the logically-removed keys to trigger deletion when 
snapshot.
+*/
+   private static final float DEFAULT_LOGICAL_REMOVED_KEYS_RATIO = 0.2f;
+
+   /**
+* The serializer used to serialize the key and namespace to bytes 
stored in skip list.
+*/
+   private final SkipListKeySerializer skipListKeySerializer;
+
+   /**
+* The serializer used to serialize the state to bytes stored in skip 
list.
+*/
+   private final SkipListValueSerializer skipListValueSerializer;
+
+   /**
+* Space allocator.
+*/
+   private final Allocator spaceAllocator;
+
+   /**
+* The level index header.
+*/
+   private final LevelIndexHeader levelIndexHeader;
+
+   /**
+* Seed to generate random index level.
+*/
+   private int randomSeed;
+
+   /**
+* The current version of this map. Used for copy-on-write mechanics.
+*/
+   private int stateMapVersion;
+
+   /**
+* The highest version of this map that is still required by any 
unreleased snapshot.
+*/
+   private int highestRequiredSnapshotVersion;
+
+   /**
+* Snapshots no more than this version must have been finished, but 
there may be some
+* snapshots more than this version are still running.
+*/
+   private volatile int highestFinishedSnapshotVersion;
+
+   /**
+* Maintains an ordered set of version ids that are 

[GitHub] [flink] AHeise commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator

2019-10-02 Thread GitBox
AHeise commented on a change in pull request #9717: [FLINK-14044] [runtime] 
Reducing synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r330547315
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
 ##
 @@ -293,140 +252,138 @@ public void close() throws Exception {
waitInFlightInputsFinished();
}
finally {
-   Exception exception = null;
-
-   try {
-   super.close();
-   } catch (InterruptedException interrupted) {
-   exception = interrupted;
-
-   Thread.currentThread().interrupt();
-   } catch (Exception e) {
-   exception = e;
-   }
-
-   try {
-   // terminate the emitter, the emitter thread 
and the executor
-   stopResources(true);
-   } catch (InterruptedException interrupted) {
-   exception = 
ExceptionUtils.firstOrSuppressed(interrupted, exception);
-
-   Thread.currentThread().interrupt();
-   } catch (Exception e) {
-   exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
-   }
-
-   if (exception != null) {
-   LOG.warn("Errors occurred while closing the 
AsyncWaitOperator.", exception);
-   }
+   super.close();
}
}
 
-   @Override
-   public void dispose() throws Exception {
-   Exception exception = null;
+   /**
+* Add the given stream element to the operator's stream element queue. 
This operation blocks until the element
+* has been added.
+*
+* Between two insertion attempts, this method yields the execution 
to the mailbox, such that events as well
+* as asynchronous results can be processed.
+*
+* @param streamElement to add to the operator's queue
+* @throws InterruptedException if the current thread has been 
interrupted while yielding to mailbox
+* @return a handle that allows to set the result of the async 
computation for the given element.
+*/
+   private ResultFuture addToWorkQueue(StreamElement streamElement) 
throws InterruptedException {
+   assert(Thread.holdsLock(checkpointingLock));
 
-   try {
-   super.dispose();
-   } catch (InterruptedException interrupted) {
-   exception = interrupted;
+   pendingStreamElement = streamElement;
 
 Review comment:
   Fixed in hotfix.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-10-02 Thread GitBox
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r330546245
 
 

 ##
 File path: 
flink-core/src/test/java/org/apache/flink/core/memory/ByteBufferUtilsTest.java
 ##
 @@ -0,0 +1,195 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.flink.core.memory;
+
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.lessThan;
+
+/**
+ * Tests for {@link ByteBufferUtils}.
+ */
+public class ByteBufferUtilsTest extends TestLogger {
+
+   @Test
+   public void testDirectBBWriteAndRead() {
+   testWithDifferentOffset(true);
+   }
+
+   @Test
+   public void testHeapBBWriteAndRead() {
+   testWithDifferentOffset(false);
+   }
+
+   @Test
+   public void testCompareDirectBBToArray() {
+   testCompareTo(true, false, false);
+   }
+
+   @Test
+   public void testCompareDirectBBToDirectBB() {
+   testCompareTo(true, true, true);
+   }
+
+   @Test
+   public void testCompareDirectBBToHeapBB() {
+   testCompareTo(true, true, false);
+   }
+
+   @Test
+   public void testCompareHeapBBToArray() {
+   testCompareTo(false, false, false);
+   }
+
+   @Test
+   public void testCompareHeapBBToDirectBB() {
+   testCompareTo(false, true, true);
+   }
+
+   @Test
+   public void testCompareHeapBBToHeapBB() {
+   testCompareTo(false, true, false);
+   }
+
+   private void testCompareTo(boolean isLeftBBDirect, boolean 
isRightBuffer, boolean isRightDirect) {
+   testEquals(isLeftBBDirect, isRightBuffer, isRightDirect);
+   testLessThan(isLeftBBDirect, isRightBuffer, isRightDirect);
+   testGreaterThan(isLeftBBDirect, isRightBuffer, isRightDirect);
+   }
+
+   private void testEquals(boolean isLeftBBDirect, boolean isRightBuffer, 
boolean isRightDirect) {
+   byte[] leftBufferBytes = new byte[]{'a', 'b', 'c', 'd', 'e'};
+   byte[] rightBufferBytes = new byte[]{'b', 'c', 'd', 'e', 'f'};
+   ByteBuffer left = isLeftBBDirect
+   ? 
ByteBuffer.allocateDirect(leftBufferBytes.length).put(leftBufferBytes)
+   : ByteBuffer.wrap(leftBufferBytes);
+   ByteBuffer right = null;
+   if (isRightBuffer) {
+   right = isRightDirect
+   ? 
ByteBuffer.allocateDirect(rightBufferBytes.length).put(rightBufferBytes)
+   : ByteBuffer.wrap(rightBufferBytes);
+   }
+   if (right != null) {
+   Assert.assertThat(
+   ByteBufferUtils.compareTo(left, 1, 4, right, 0, 
4),
 
 Review comment:
   I think we should simply have a dedicated test for this instead of 
complicating another test.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator

2019-10-02 Thread GitBox
flinkbot edited a comment on issue #9717: [FLINK-14044] [runtime] Reducing 
synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#issuecomment-533068819
 
 
   
   ## CI report:
   
   * 9cfda801891969ac460f45ea639d636b519f22db : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/128317194)
   * af7f6be848f0bd90a049d5ee9f7a38b1c3e2b972 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/128336908)
   * 43157eb165d9409fbdf4a2f773ef7d52dd74e759 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/128365390)
   * ff400d149ed36c63a90d2f3fcd517bde738e5af1 : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/128402339)
   * 02dbf61e50e6c913610df7f586d7eb0f20529c13 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/128406923)
   * 68cf941c7e165a5c6b2a27f7ac716b29f76d1918 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/128472867)
   * 54aa0043ec73b03580eda2e0310b44bdee352a55 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/128495989)
   * e081ea5c536d6ebb828eb8b68c404379d23f5aef : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/128502020)
   * 911bdd80f4b1cbb6cee2ffb517cafc5e3c83ee0b : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/128622324)
   * 40fd94b291968062eec85a9436a3a755914d282a : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/128904653)
   * e60ea55f913143611d9162fd79a75cfa575aa9f0 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/128907420)
   * c2c7b2cc91961bc2c8707f9077c4d66ea6535b8e : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/129102072)
   * 1048cb10486f94983a52582953379afa02a3b350 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/129122517)
   * 773eb07af5ac848ec70cd32663ebf657b0419f3e : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/129823477)
   * 7c4191061dbacaf142f2b3e6fa8dec30d7e16595 : UNKNOWN
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-10-02 Thread GitBox
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r330546245
 
 

 ##
 File path: 
flink-core/src/test/java/org/apache/flink/core/memory/ByteBufferUtilsTest.java
 ##
 @@ -0,0 +1,195 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.flink.core.memory;
+
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.lessThan;
+
+/**
+ * Tests for {@link ByteBufferUtils}.
+ */
+public class ByteBufferUtilsTest extends TestLogger {
+
+   @Test
+   public void testDirectBBWriteAndRead() {
+   testWithDifferentOffset(true);
+   }
+
+   @Test
+   public void testHeapBBWriteAndRead() {
+   testWithDifferentOffset(false);
+   }
+
+   @Test
+   public void testCompareDirectBBToArray() {
+   testCompareTo(true, false, false);
+   }
+
+   @Test
+   public void testCompareDirectBBToDirectBB() {
+   testCompareTo(true, true, true);
+   }
+
+   @Test
+   public void testCompareDirectBBToHeapBB() {
+   testCompareTo(true, true, false);
+   }
+
+   @Test
+   public void testCompareHeapBBToArray() {
+   testCompareTo(false, false, false);
+   }
+
+   @Test
+   public void testCompareHeapBBToDirectBB() {
+   testCompareTo(false, true, true);
+   }
+
+   @Test
+   public void testCompareHeapBBToHeapBB() {
+   testCompareTo(false, true, false);
+   }
+
+   private void testCompareTo(boolean isLeftBBDirect, boolean 
isRightBuffer, boolean isRightDirect) {
+   testEquals(isLeftBBDirect, isRightBuffer, isRightDirect);
+   testLessThan(isLeftBBDirect, isRightBuffer, isRightDirect);
+   testGreaterThan(isLeftBBDirect, isRightBuffer, isRightDirect);
+   }
+
+   private void testEquals(boolean isLeftBBDirect, boolean isRightBuffer, 
boolean isRightDirect) {
+   byte[] leftBufferBytes = new byte[]{'a', 'b', 'c', 'd', 'e'};
+   byte[] rightBufferBytes = new byte[]{'b', 'c', 'd', 'e', 'f'};
+   ByteBuffer left = isLeftBBDirect
+   ? 
ByteBuffer.allocateDirect(leftBufferBytes.length).put(leftBufferBytes)
+   : ByteBuffer.wrap(leftBufferBytes);
+   ByteBuffer right = null;
+   if (isRightBuffer) {
+   right = isRightDirect
+   ? 
ByteBuffer.allocateDirect(rightBufferBytes.length).put(rightBufferBytes)
+   : ByteBuffer.wrap(rightBufferBytes);
+   }
+   if (right != null) {
+   Assert.assertThat(
+   ByteBufferUtils.compareTo(left, 1, 4, right, 0, 
4),
 
 Review comment:
   I think should simply have a dedicated test for this.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] TisonKun commented on issue #9682: [FLINK-14041][tests] Refactor LeaderRetrievalServiceHostnameResolutionTest and remove StandaloneUtils

2019-10-02 Thread GitBox
TisonKun commented on issue #9682: [FLINK-14041][tests] Refactor 
LeaderRetrievalServiceHostnameResolutionTest and remove StandaloneUtils
URL: https://github.com/apache/flink/pull/9682#issuecomment-537489792
 
 
   Thanks for your review @zentol ! I can see your concern here.
   
   Actually the original tests test `AkkaRpcServicesUtils#getRpcUrl` and since 
that `StandaloneUtils` is never used outside these tests it seems we tests 
nothing about abstract leader retrieval service. In fact, the retrieval of 
WebMonitor is a bit different from `AkkaRpcServicesUtils#getRpcUrl`(said it is 
`HighAvailabilityServicesUtils#getWebMonitorAddress`). I think we can even 
guard by `AkkaUtilsTest` and maybe add a dedicate test for 
`HighAvailabilityServicesUtils#getWebMonitorAddress` and delete 
`LeaderRetrievalServiceHostnameResolutionTest` because it doesn't test 
something at that abstraction level.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] GJL commented on issue #8760: [FLINK-12869] Add yarn acls capability to flink containers

2019-10-02 Thread GitBox
GJL commented on issue #8760: [FLINK-12869] Add yarn acls capability to flink 
containers
URL: https://github.com/apache/flink/pull/8760#issuecomment-537489889
 
 
   Thanks for updating the PR. I'll have another look


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] 1u0 commented on a change in pull request #9735: [FLINK-14156][runtime] Submit timer trigger letters to task's mailbox with operator's precedence

2019-10-02 Thread GitBox
1u0 commented on a change in pull request #9735: [FLINK-14156][runtime] Submit 
timer trigger letters to task's mailbox with operator's precedence
URL: https://github.com/apache/flink/pull/9735#discussion_r330543800
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TimerService.java
 ##
 @@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ *
+ * The registration of timers follows a life cycle of three phases:
+ * 
+ * In the initial state, it accepts timer registrations and triggers 
when the time is reached.
+ * After calling {@link #quiesce()}, further calls to
+ * {@link #registerTimer(long, ProcessingTimeCallback)} will not 
register any further timers, and will
+ * return a "dummy" future as a result. This is used for clean 
shutdown, where currently firing
+ * timers are waited for and no future timers can be scheduled, 
without causing hard exceptions.
+ * After a call to {@link #shutdownService()}, all calls to {@link 
#registerTimer(long, ProcessingTimeCallback)}
+ * will result in a hard exception.
+ * 
+ */
+@Internal
+public interface TimerService extends ProcessingTimeService {
 
 Review comment:
   Although, the commit message says that the lifecycle methods were extracted 
into a separate interface, my main motivation was to separate "application 
level api" (`ProcessingTimeService` that exposed to operators) from the system 
timer provider (implementation of `TimerService`).
   It just happens that currently in this PR `TimerService` piggybacks on 
`ProcessingTimeService`
   interface (and `ProcessingTimeCallback`) to define the system timer api.
   
   I can introduce a new, separate timer api for `TimerService` to explicitly 
separate them. It should not make big changes in the runtime code, only some 
tests should be affected to be more explicit what they are testing against. I'd 
like to hear from others if you are ok with this direction.
   
   Regarding the naming, `ProcessingTimeService` is simply the preserved name 
that operators see and `TimerService` is a common name in Flink code base for 
classes with similar functionality.
   
   Introducing a common "life-cycle" interface may be too early generalization 
on my side.
   Also, unifying/consolidating the `TimerService` classes may be a separate 
effort on it's own.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9834: [bp-1.9][FLINK-13827][script] shell variable should be escaped

2019-10-02 Thread GitBox
flinkbot edited a comment on issue #9834: [bp-1.9][FLINK-13827][script] shell 
variable should be escaped
URL: https://github.com/apache/flink/pull/9834#issuecomment-537478603
 
 
   
   ## CI report:
   
   * 65dff9edaba9515d520fdefa5f4b9a7361d3e92f : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/130026021)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on issue #9827: [FLINK-14303][metrics] Replace JobManagerMetricGroup with ResourceManagerMetricGroup in ResourceManager

2019-10-02 Thread GitBox
tillrohrmann commented on issue #9827: [FLINK-14303][metrics] Replace 
JobManagerMetricGroup with ResourceManagerMetricGroup in ResourceManager
URL: https://github.com/apache/flink/pull/9827#issuecomment-537488117
 
 
   Thanks for the review @zentol. I've rebased and force pushed to let CI run 
another time.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] 1u0 commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator

2019-10-02 Thread GitBox
1u0 commented on a change in pull request #9717: [FLINK-14044] [runtime] 
Reducing synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r330540787
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
 ##
 @@ -293,140 +245,136 @@ public void close() throws Exception {
waitInFlightInputsFinished();
}
finally {
-   Exception exception = null;
-
-   try {
-   super.close();
-   } catch (InterruptedException interrupted) {
-   exception = interrupted;
-
-   Thread.currentThread().interrupt();
-   } catch (Exception e) {
-   exception = e;
-   }
-
-   try {
-   // terminate the emitter, the emitter thread 
and the executor
-   stopResources(true);
-   } catch (InterruptedException interrupted) {
-   exception = 
ExceptionUtils.firstOrSuppressed(interrupted, exception);
-
-   Thread.currentThread().interrupt();
-   } catch (Exception e) {
-   exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
-   }
-
-   if (exception != null) {
-   LOG.warn("Errors occurred while closing the 
AsyncWaitOperator.", exception);
-   }
+   super.close();
}
}
 
-   @Override
-   public void dispose() throws Exception {
-   Exception exception = null;
-
-   try {
-   super.dispose();
-   } catch (InterruptedException interrupted) {
-   exception = interrupted;
+   /**
+* Add the given stream element to the operator's stream element queue. 
This operation blocks until the element
+* has been added.
+*
+* Between two insertion attempts, this method yields the execution 
to the mailbox, such that events as well
+* as asynchronous results can be processed.
+*
+* @param streamElement to add to the operator's queue
+* @throws InterruptedException if the current thread has been 
interrupted while yielding to mailbox
+* @return a handle that allows to set the result of the async 
computation for the given element.
+*/
+   private ResultFuture addToWorkQueue(StreamElement streamElement) 
throws InterruptedException {
+   assert(Thread.holdsLock(checkpointingLock));
 
-   Thread.currentThread().interrupt();
-   } catch (Exception e) {
-   exception = e;
+   Optional> queueEntry;
+   while (!(queueEntry = queue.tryPut(streamElement)).isPresent()) 
{
+   mailboxExecutor.yield();
}
 
-   try {
-   stopResources(false);
-   } catch (InterruptedException interrupted) {
-   exception = 
ExceptionUtils.firstOrSuppressed(interrupted, exception);
+   return queueEntry.get();
+   }
 
-   Thread.currentThread().interrupt();
-   } catch (Exception e) {
-   exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
-   }
+   private void waitInFlightInputsFinished() throws InterruptedException {
+   assert(Thread.holdsLock(checkpointingLock));
 
-   if (exception != null) {
-   throw exception;
+   while (!queue.isEmpty()) {
+   mailboxExecutor.yield();
}
}
 
/**
-* Close the operator's resources. They include the emitter thread and 
the executor to run
-* the queue's complete operation.
+* Outputs one completed element. Watermarks are always completed if 
it's their turn to be processed.
 *
-* @param waitForShutdown is true if the method should wait for the 
resources to be freed;
-*   otherwise false.
-* @throws InterruptedException if current thread has been interrupted
+* This method will be called from {@link 
#processWatermark(Watermark)} and from a mail processing the result
+* of an async function call.
 */
-   private void stopResources(boolean waitForShutdown) throws 
InterruptedException {
-   emitter.stop();
-   emitterThread.interrupt();
-
-   executor.shutdown();
-
-   if (waitForShutdown) {
-   try 

[GitHub] [flink] tillrohrmann commented on a change in pull request #9833: [FLINK-14276][quickstarts] Scala quickstart compiles on JDK 11

2019-10-02 Thread GitBox
tillrohrmann commented on a change in pull request #9833: 
[FLINK-14276][quickstarts] Scala quickstart compiles on JDK 11
URL: https://github.com/apache/flink/pull/9833#discussion_r330541171
 
 

 ##
 File path: 
flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml
 ##
 @@ -171,6 +171,11 @@ under the License.



+   
+   
+   -nobootcp
 
 Review comment:
   I guess it is not so easy to only activate this flag if Java 11 is used, 
right?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-14225) Travis is unable to parse one of the secure environment variables

2019-10-02 Thread Gary Yao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16942797#comment-16942797
 ] 

Gary Yao commented on FLINK-14225:
--

[~StephanEwen] Can you help with this?

> Travis is unable to parse one of the secure environment variables
> -
>
> Key: FLINK-14225
> URL: https://issues.apache.org/jira/browse/FLINK-14225
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Affects Versions: 1.10.0
>Reporter: Gary Yao
>Priority: Blocker
>
> Example: https://travis-ci.org/apache/flink/jobs/589531009
> {noformat}
> We were unable to parse one of your secure environment variables.
> Please make sure to escape special characters such as ' ' (white space) and $ 
> (dollar symbol) with \ (backslash) .
> For example, thi$isanexample would be typed as thi\$isanexample. See 
> https://docs.travis-ci.com/user/encryption-keys.
> {noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] tillrohrmann commented on issue #9825: [FLINK-14299] Introduce ProcessMetricGroup

2019-10-02 Thread GitBox
tillrohrmann commented on issue #9825: [FLINK-14299] Introduce 
ProcessMetricGroup
URL: https://github.com/apache/flink/pull/9825#issuecomment-537485842
 
 
   Thanks for the review @zentol. I've addressed your comments and pushed a 
fixup.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #9825: [FLINK-14299] Introduce ProcessMetricGroup

2019-10-02 Thread GitBox
tillrohrmann commented on a change in pull request #9825: [FLINK-14299] 
Introduce ProcessMetricGroup
URL: https://github.com/apache/flink/pull/9825#discussion_r330539646
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
 ##
 @@ -100,17 +98,22 @@ public static JobManagerMetricGroup 
instantiateJobManagerMetricGroup(
hostName,
resourceID.toString());
 
-   MetricGroup statusGroup = 
taskManagerMetricGroup.addGroup(METRIC_GROUP_STATUS_NAME);
-
-   // Initialize the TM metrics
-   instantiateStatusMetrics(statusGroup);
+   MetricGroup statusGroup = 
createAndInitializeStatusMetricGroup(taskManagerMetricGroup);
 
if (systemResourceProbeInterval.isPresent()) {
instantiateSystemMetrics(taskManagerMetricGroup, 
systemResourceProbeInterval.get());
}
return Tuple2.of(taskManagerMetricGroup, statusGroup);
}
 
+   private static MetricGroup 
createAndInitializeStatusMetricGroup(AbstractMetricGroup parentMetricGroup) {
+   MetricGroup statusGroup = 
parentMetricGroup.addGroup(METRIC_GROUP_STATUS_NAME);
+
+   // Initialize the TM metrics
 
 Review comment:
   true, I will remove it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #9825: [FLINK-14299] Introduce ProcessMetricGroup

2019-10-02 Thread GitBox
tillrohrmann commented on a change in pull request #9825: [FLINK-14299] 
Introduce ProcessMetricGroup
URL: https://github.com/apache/flink/pull/9825#discussion_r330539760
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
 ##
 @@ -62,7 +64,7 @@
  */
 public class MetricUtils {
private static final Logger LOG = 
LoggerFactory.getLogger(MetricUtils.class);
-   private static final String METRIC_GROUP_STATUS_NAME = "Status";
+   public static final String METRIC_GROUP_STATUS_NAME = "Status";
 
 Review comment:
   This is wrong. Will correct it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] GJL commented on issue #9794: [FLINK-14247][runtime] Use NoResourceAvailableException to wrap TimeoutException on slot allocation timeout

2019-10-02 Thread GitBox
GJL commented on issue #9794: [FLINK-14247][runtime] Use 
NoResourceAvailableException to wrap TimeoutException on slot allocation timeout
URL: https://github.com/apache/flink/pull/9794#issuecomment-537485003
 
 
   Merging as soon as build is green.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann edited a comment on issue #9760: [FLINK-13982][runtime] Implement memory calculation logics

2019-10-02 Thread GitBox
tillrohrmann edited a comment on issue #9760: [FLINK-13982][runtime] Implement 
memory calculation logics
URL: https://github.com/apache/flink/pull/9760#issuecomment-537483004
 
 
   One way to solve the problem could be
   
   ```
   final int numVariables = 8;
   final double[] c = new double[numVariables];
   Arrays.fill(c, 1.0);
   
   LinearObjectiveFunction f = new LinearObjectiveFunction(c, 0);
   Collection constraints = new ArrayList<>();
   
   // JVM_overhead, jvm_metaspace, framework, task_heap, task_off_heap, 
shuffle, managed_memory, total process memory
   final double[] overallMemory = {1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, -1.0};
   final double[] jvmOverhead = {1.0, 0, 0, 0, 0, 0, 0, 0};
   final double[] jvmMetaspace = {0, 1.0, 0, 0, 0, 0, 0, 0};
   final double[] framework = {0, 0, 1.0, 0, 0, 0, 0, 0};
   final double[] taskOffHeap = {0, 0, 0, 0, 1.0, 0, 0, 0};
   final double minOverhead = 10;
   final double maxOverhead = 100;
   final double[] shuffle = {0, 0, 0, 0, 0, 1.0, 0, 0};
   final double minShuffle = 10;
   final double maxShuffle = 100;
   final double overheadFraction = 0.1;
   final double[] jvmOverheadRelationship = {-1.0, overheadFraction, 
overheadFraction, overheadFraction, overheadFraction, overheadFraction, 
overheadFraction, 0};
   final double shuffleFraction = 0.2;
   final double[] shuffleRelationship = {0, 0, shuffleFraction, 
shuffleFraction, shuffleFraction, -1, overheadFraction, 0};
   final double jvmMetaspaceValue = 192;
   final double frameworkValue = 192;
   final double taskOffHeapValue = 0;
   constraints.add(new LinearConstraint(overallMemory, Relationship.EQ, 0));
   constraints.add(new LinearConstraint(jvmOverhead, Relationship.GEQ, 
minOverhead));
   constraints.add(new LinearConstraint(jvmOverhead, Relationship.LEQ, 
maxOverhead));
   constraints.add(new LinearConstraint(shuffle, Relationship.GEQ, minShuffle));
   constraints.add(new LinearConstraint(shuffle, Relationship.LEQ, maxShuffle));
   constraints.add(new LinearConstraint(jvmOverheadRelationship, 
Relationship.EQ, 0));
   constraints.add(new LinearConstraint(shuffleRelationship, Relationship.EQ, 
0));
   constraints.add(new LinearConstraint(jvmMetaspace, Relationship.EQ, 
jvmMetaspaceValue));
   constraints.add(new LinearConstraint(framework, Relationship.EQ, 
frameworkValue));
   constraints.add(new LinearConstraint(taskOffHeap, Relationship.EQ, 
taskOffHeapValue));
   
   SimplexSolver solver = new SimplexSolver();
   PointValuePair optSolution = solver.optimize(
new MaxIter(100),
f,
new LinearConstraintSet(constraints),
GoalType.MAXIMIZE,
new NonNegativeConstraint(true));
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on issue #9760: [FLINK-13982][runtime] Implement memory calculation logics

2019-10-02 Thread GitBox
tillrohrmann commented on issue #9760: [FLINK-13982][runtime] Implement memory 
calculation logics
URL: https://github.com/apache/flink/pull/9760#issuecomment-537483004
 
 
   One way to solve the problem could be
   
   ```
   final int numVariables = 8;
final double[] c = new double[numVariables];
Arrays.fill(c, 1.0);
   
LinearObjectiveFunction f = new LinearObjectiveFunction(c, 0);
Collection constraints = new ArrayList<>();
   
// JVM_overhead, jvm_metaspace, framework, task_heap, 
task_off_heap, shuffle, managed_memory, total process memory
final double[] overallMemory = {1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 
1.0, -1.0};
final double[] jvmOverhead = {1.0, 0, 0, 0, 0, 0, 0, 0};
final double[] jvmMetaspace = {0, 1.0, 0, 0, 0, 0, 0, 0};
final double[] framework = {0, 0, 1.0, 0, 0, 0, 0, 0};
final double[] taskOffHeap = {0, 0, 0, 0, 1.0, 0, 0, 0};
final double minOverhead = 10;
final double maxOverhead = 100;
final double[] shuffle = {0, 0, 0, 0, 0, 1.0, 0, 0};
final double minShuffle = 10;
final double maxShuffle = 100;
final double overheadFraction = 0.1;
final double[] jvmOverheadRelationship = {-1.0, 
overheadFraction, overheadFraction, overheadFraction, overheadFraction, 
overheadFraction, overheadFraction, 0};
final double shuffleFraction = 0.2;
final double[] shuffleRelationship = {0, 0, shuffleFraction, 
shuffleFraction, shuffleFraction, -1, overheadFraction, 0};
final double jvmMetaspaceValue = 192;
final double frameworkValue = 192;
final double taskOffHeapValue = 0;
constraints.add(new LinearConstraint(overallMemory, 
Relationship.EQ, 0));
constraints.add(new LinearConstraint(jvmOverhead, 
Relationship.GEQ, minOverhead));
constraints.add(new LinearConstraint(jvmOverhead, 
Relationship.LEQ, maxOverhead));
constraints.add(new LinearConstraint(shuffle, Relationship.GEQ, 
minShuffle));
constraints.add(new LinearConstraint(shuffle, Relationship.LEQ, 
maxShuffle));
constraints.add(new LinearConstraint(jvmOverheadRelationship, 
Relationship.EQ, 0));
constraints.add(new LinearConstraint(shuffleRelationship, 
Relationship.EQ, 0));
constraints.add(new LinearConstraint(jvmMetaspace, 
Relationship.EQ, jvmMetaspaceValue));
constraints.add(new LinearConstraint(framework, 
Relationship.EQ, frameworkValue));
constraints.add(new LinearConstraint(taskOffHeap, 
Relationship.EQ, taskOffHeapValue));
   
SimplexSolver solver = new SimplexSolver();
PointValuePair optSolution = solver.optimize(
new MaxIter(100),
f,
new LinearConstraintSet(constraints),
GoalType.MAXIMIZE,
new NonNegativeConstraint(true));
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #9834: [bp-1.9][FLINK-13827][script] shell variable should be escaped

2019-10-02 Thread GitBox
flinkbot commented on issue #9834: [bp-1.9][FLINK-13827][script] shell variable 
should be escaped
URL: https://github.com/apache/flink/pull/9834#issuecomment-537478603
 
 
   
   ## CI report:
   
   * 65dff9edaba9515d520fdefa5f4b9a7361d3e92f : UNKNOWN
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9791: [FLINK-14248][runtime] Let LazyFromSourcesSchedulingStrategy restart terminated tasks

2019-10-02 Thread GitBox
zhuzhurk commented on a change in pull request #9791: [FLINK-14248][runtime] 
Let LazyFromSourcesSchedulingStrategy restart terminated tasks
URL: https://github.com/apache/flink/pull/9791#discussion_r330530781
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java
 ##
 @@ -78,19 +81,22 @@ public void startScheduling() {
deploymentOptions.put(schedulingVertex.getId(), option);
}
 
-   
allocateSlotsAndDeployExecutionVertexIds(getAllVerticesFromTopology());
+   allocateSlotsAndDeployExecutionVertices(
+   
getSchedulingExecutionVertices(getAllVerticesFromTopology()),
+   IS_IN_CREATED_STATE);
}
 
@Override
-   public void restartTasks(Set verticesToRestart) {
+   public void restartTasks(final Set 
verticesToRestart) {
+   final Set verticesToSchedule = 
getSchedulingExecutionVertices(verticesToRestart);
+
// increase counter of the dataset first
-   verticesToRestart
+   verticesToSchedule
.stream()
-   .map(schedulingTopology::getVertexOrThrow)
.flatMap(vertex -> 
vertex.getProducedResultPartitions().stream())

.forEach(inputConstraintChecker::resetSchedulingResultPartition);
 
-   allocateSlotsAndDeployExecutionVertexIds(verticesToRestart);
+   allocateSlotsAndDeployExecutionVertices(verticesToSchedule, 
IS_IN_TERMINAL_STATE);
 
 Review comment:
   And I also think it's fine to not **restart** `CREATED` vertices. Some 
events later should be responsible to trigger the scheduling for them. 
Otherwise something might be wrong.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9791: [FLINK-14248][runtime] Let LazyFromSourcesSchedulingStrategy restart terminated tasks

2019-10-02 Thread GitBox
zhuzhurk commented on a change in pull request #9791: [FLINK-14248][runtime] 
Let LazyFromSourcesSchedulingStrategy restart terminated tasks
URL: https://github.com/apache/flink/pull/9791#discussion_r330529529
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java
 ##
 @@ -78,19 +81,22 @@ public void startScheduling() {
deploymentOptions.put(schedulingVertex.getId(), option);
}
 
-   
allocateSlotsAndDeployExecutionVertexIds(getAllVerticesFromTopology());
+   allocateSlotsAndDeployExecutionVertices(
+   
getSchedulingExecutionVertices(getAllVerticesFromTopology()),
+   IS_IN_CREATED_STATE);
}
 
@Override
-   public void restartTasks(Set verticesToRestart) {
+   public void restartTasks(final Set 
verticesToRestart) {
+   final Set verticesToSchedule = 
getSchedulingExecutionVertices(verticesToRestart);
+
// increase counter of the dataset first
-   verticesToRestart
+   verticesToSchedule
.stream()
-   .map(schedulingTopology::getVertexOrThrow)
.flatMap(vertex -> 
vertex.getProducedResultPartitions().stream())

.forEach(inputConstraintChecker::resetSchedulingResultPartition);
 
-   allocateSlotsAndDeployExecutionVertexIds(verticesToRestart);
+   allocateSlotsAndDeployExecutionVertices(verticesToSchedule, 
IS_IN_TERMINAL_STATE);
 
 Review comment:
   A vertex is transitioned to `CREATED` only when it is reset. This only 
happens in `allocateSlotsAndDeploy` in `DefaultScheduler`.
   The vertex versions will change in this case. 
   And these vertices will be filtered out to restart.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann edited a comment on issue #9760: [FLINK-13982][runtime] Implement memory calculation logics

2019-10-02 Thread GitBox
tillrohrmann edited a comment on issue #9760: [FLINK-13982][runtime] Implement 
memory calculation logics
URL: https://github.com/apache/flink/pull/9760#issuecomment-537474186
 
 
   Yes, all explicitly configured values should be respected.
   
   I agree, the default values of
   
   * Framework Heap Memory
   * Task Off-Heap Memory
   * Min/Max Shuffle Memory
   * JVM Metaspace
   * Min/Max JVM Overhead
   
   should be respected.
   
   In the end, we are trying to solve a linear programming problem for which 
there are known methods how to solve it. The problem should have the following 
form
   
   ```
   0 = jvm_overhead + jvm_metaspace + framework + task_heap + task_off_heap + 
shuffle + managed_memory - process_memory
   min_overhead <= jvm_overhead
   max_overhead >= jvm_overhead
   min_shuffle <= shuffle
   max_shuffle >= shuffle
   0 = fraction_overhead * (jvm_overhead + jvm_metaspace + framework + 
task_heap + task_off_heap + shuffle + managed_memory) - jvm_overhead
   ```
   
   If `shuffle` has not been set, then we have additionally `0 = 
fraction_shuffle * (framework + task_heap + task_off_heap + shuffle + 
managed_memory) - shuffle` 
   
   There might be some shortcuts given the particulars of this problem. But I'm 
not sure whether it is exactly as you've described. I would suggest to rely on 
known approaches such as the simplex algorithm (but please do not reimplement 
it).
   
   Concerning the backwards compatibility plan, I agree that considering the 
legacy `fraction`, turns the linear problem into a non-linear problem. Hence, I 
would be ok with dropping support for it. I guess we should add a big release 
note and log a warning when using the legacy `fraction` option.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-14271) Deprecate legacy RestartPipelinedRegionStrategy

2019-10-02 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14271?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16942781#comment-16942781
 ] 

Zhu Zhu commented on FLINK-14271:
-

Thanks [~trohrmann]! Let's wait for [~stevenz3wu]'s feedback.

> Deprecate legacy RestartPipelinedRegionStrategy
> ---
>
> Key: FLINK-14271
> URL: https://issues.apache.org/jira/browse/FLINK-14271
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0
>Reporter: Zhu Zhu
>Priority: Minor
> Fix For: 1.10.0
>
>
> The legacy {{RestartPipelinedRegionStrategy}} has been superseded by 
> {{AdaptedRestartPipelinedRegionStrategyNG}} in Flink 1.9.
> It heavily depends on ExecutionGraph components and becomes a blocker for a 
> clean scheduler re-architecture.
> We should deprecate it for further removal.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] tillrohrmann commented on issue #9760: [FLINK-13982][runtime] Implement memory calculation logics

2019-10-02 Thread GitBox
tillrohrmann commented on issue #9760: [FLINK-13982][runtime] Implement memory 
calculation logics
URL: https://github.com/apache/flink/pull/9760#issuecomment-537474186
 
 
   Yes, all explicitly configured values should be respected.
   
   I agree, the default values of
   
   * Framework Heap Memory
   * Task Off-Heap Memory
   * Min/Max Shuffle Memory
   * JVM Metaspace
   * Min/Max JVM Overhead
   
   should be respected.
   
   In the end, we are trying to solve a linear programming problem for which 
there are known methods how to solve it. The problem should have the following 
form
   
   ```
   0 = jvm_overhead + jvm_metaspace + framework + task_heap + task_off_heap + 
shuffle + managed_memory - process_memory
   min_overhead <= jvm_overhead
   max_overhead >= jvm_overhead
   min_shuffle <= shuffle
   max_shuffle >= shuffle
   0 = fraction_overhead * (jvm_overhead + jvm_metaspace + framework + 
task_heap + task_off_heap + shuffle + managed_memory) - jvm_overhead
   ```
   
   If `shuffle` has not been set, then we have additionally `0 = 
fraction_shuffle * (framework + task_heap + task_off_heap + shuffle + 
managed_memory) - shuffle` 
   
   There might be some shortcuts given the particulars of this problem. But I'm 
not sure whether it is exactly as you've described. I would suggest to rely on 
known approaches.
   
   Concerning the backwards compatibility plan, I agree that considering the 
legacy `fraction`, turns the linear problem into a non-linear problem. Hence, I 
would be ok with dropping support for it. I guess we should add a big release 
note and log a warning when using the legacy `fraction` option.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] AHeise commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator

2019-10-02 Thread GitBox
AHeise commented on a change in pull request #9717: [FLINK-14044] [runtime] 
Reducing synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r330522645
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
 ##
 @@ -293,140 +252,138 @@ public void close() throws Exception {
waitInFlightInputsFinished();
}
finally {
-   Exception exception = null;
-
-   try {
-   super.close();
-   } catch (InterruptedException interrupted) {
-   exception = interrupted;
-
-   Thread.currentThread().interrupt();
-   } catch (Exception e) {
-   exception = e;
-   }
-
-   try {
-   // terminate the emitter, the emitter thread 
and the executor
-   stopResources(true);
-   } catch (InterruptedException interrupted) {
-   exception = 
ExceptionUtils.firstOrSuppressed(interrupted, exception);
-
-   Thread.currentThread().interrupt();
-   } catch (Exception e) {
-   exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
-   }
-
-   if (exception != null) {
-   LOG.warn("Errors occurred while closing the 
AsyncWaitOperator.", exception);
-   }
+   super.close();
}
}
 
-   @Override
-   public void dispose() throws Exception {
-   Exception exception = null;
+   /**
+* Add the given stream element to the operator's stream element queue. 
This operation blocks until the element
+* has been added.
+*
+* Between two insertion attempts, this method yields the execution 
to the mailbox, such that events as well
+* as asynchronous results can be processed.
+*
+* @param streamElement to add to the operator's queue
+* @throws InterruptedException if the current thread has been 
interrupted while yielding to mailbox
+* @return a handle that allows to set the result of the async 
computation for the given element.
+*/
+   private ResultFuture addToWorkQueue(StreamElement streamElement) 
throws InterruptedException {
+   assert(Thread.holdsLock(checkpointingLock));
 
-   try {
-   super.dispose();
-   } catch (InterruptedException interrupted) {
-   exception = interrupted;
+   pendingStreamElement = streamElement;
 
-   Thread.currentThread().interrupt();
-   } catch (Exception e) {
-   exception = e;
+   Optional> queueEntry;
+   while (!(queueEntry = queue.tryPut(streamElement)).isPresent()) 
{
+   mailboxExecutor.yield();
}
 
-   try {
-   stopResources(false);
-   } catch (InterruptedException interrupted) {
-   exception = 
ExceptionUtils.firstOrSuppressed(interrupted, exception);
+   pendingStreamElement = null;
 
-   Thread.currentThread().interrupt();
-   } catch (Exception e) {
-   exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
-   }
+   return queueEntry.get();
+   }
 
-   if (exception != null) {
-   throw exception;
+   private void waitInFlightInputsFinished() throws InterruptedException {
+   assert(Thread.holdsLock(checkpointingLock));
+
+   while (!queue.isEmpty()) {
+   mailboxExecutor.yield();
}
}
 
/**
-* Close the operator's resources. They include the emitter thread and 
the executor to run
-* the queue's complete operation.
+* Batch output of all completed elements. Watermarks are always 
completed if it's their turn to be processed.
 *
-* @param waitForShutdown is true if the method should wait for the 
resources to be freed;
-*   otherwise false.
-* @throws InterruptedException if current thread has been interrupted
+* This method will be called from {@link 
#processWatermark(Watermark)} and from a mail processing the result
+* of an async function call.
 */
-   private void stopResources(boolean waitForShutdown) throws 
InterruptedException {
-   emitter.stop();
-   

[GitHub] [flink] GJL commented on a change in pull request #9791: [FLINK-14248][runtime] Let LazyFromSourcesSchedulingStrategy restart terminated tasks

2019-10-02 Thread GitBox
GJL commented on a change in pull request #9791: [FLINK-14248][runtime] Let 
LazyFromSourcesSchedulingStrategy restart terminated tasks
URL: https://github.com/apache/flink/pull/9791#discussion_r330522635
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java
 ##
 @@ -78,19 +81,22 @@ public void startScheduling() {
deploymentOptions.put(schedulingVertex.getId(), option);
}
 
-   
allocateSlotsAndDeployExecutionVertexIds(getAllVerticesFromTopology());
+   allocateSlotsAndDeployExecutionVertices(
+   
getSchedulingExecutionVertices(getAllVerticesFromTopology()),
+   IS_IN_CREATED_STATE);
}
 
@Override
-   public void restartTasks(Set verticesToRestart) {
+   public void restartTasks(final Set 
verticesToRestart) {
+   final Set verticesToSchedule = 
getSchedulingExecutionVertices(verticesToRestart);
+
// increase counter of the dataset first
-   verticesToRestart
+   verticesToSchedule
.stream()
-   .map(schedulingTopology::getVertexOrThrow)
.flatMap(vertex -> 
vertex.getProducedResultPartitions().stream())

.forEach(inputConstraintChecker::resetSchedulingResultPartition);
 
-   allocateSlotsAndDeployExecutionVertexIds(verticesToRestart);
+   allocateSlotsAndDeployExecutionVertices(verticesToSchedule, 
IS_IN_TERMINAL_STATE);
 
 Review comment:
   Would something speak against transitioning the _"vertices to restart"_ to 
`CREATED` already in the scheduler (before invoking 
`SchedulingStrategy#restartTasks()`)? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] AHeise commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator

2019-10-02 Thread GitBox
AHeise commented on a change in pull request #9717: [FLINK-14044] [runtime] 
Reducing synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r330521780
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
 ##
 @@ -293,140 +252,138 @@ public void close() throws Exception {
waitInFlightInputsFinished();
}
finally {
-   Exception exception = null;
-
-   try {
-   super.close();
-   } catch (InterruptedException interrupted) {
-   exception = interrupted;
-
-   Thread.currentThread().interrupt();
-   } catch (Exception e) {
-   exception = e;
-   }
-
-   try {
-   // terminate the emitter, the emitter thread 
and the executor
-   stopResources(true);
-   } catch (InterruptedException interrupted) {
-   exception = 
ExceptionUtils.firstOrSuppressed(interrupted, exception);
-
-   Thread.currentThread().interrupt();
-   } catch (Exception e) {
-   exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
-   }
-
-   if (exception != null) {
-   LOG.warn("Errors occurred while closing the 
AsyncWaitOperator.", exception);
-   }
+   super.close();
}
}
 
-   @Override
-   public void dispose() throws Exception {
-   Exception exception = null;
+   /**
+* Add the given stream element to the operator's stream element queue. 
This operation blocks until the element
+* has been added.
+*
+* Between two insertion attempts, this method yields the execution 
to the mailbox, such that events as well
+* as asynchronous results can be processed.
+*
+* @param streamElement to add to the operator's queue
+* @throws InterruptedException if the current thread has been 
interrupted while yielding to mailbox
+* @return a handle that allows to set the result of the async 
computation for the given element.
+*/
+   private ResultFuture addToWorkQueue(StreamElement streamElement) 
throws InterruptedException {
+   assert(Thread.holdsLock(checkpointingLock));
 
-   try {
-   super.dispose();
-   } catch (InterruptedException interrupted) {
-   exception = interrupted;
+   pendingStreamElement = streamElement;
 
-   Thread.currentThread().interrupt();
-   } catch (Exception e) {
-   exception = e;
+   Optional> queueEntry;
+   while (!(queueEntry = queue.tryPut(streamElement)).isPresent()) 
{
+   mailboxExecutor.yield();
}
 
-   try {
-   stopResources(false);
-   } catch (InterruptedException interrupted) {
-   exception = 
ExceptionUtils.firstOrSuppressed(interrupted, exception);
+   pendingStreamElement = null;
 
-   Thread.currentThread().interrupt();
-   } catch (Exception e) {
-   exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
-   }
+   return queueEntry.get();
+   }
 
-   if (exception != null) {
-   throw exception;
+   private void waitInFlightInputsFinished() throws InterruptedException {
+   assert(Thread.holdsLock(checkpointingLock));
+
+   while (!queue.isEmpty()) {
+   mailboxExecutor.yield();
}
}
 
/**
-* Close the operator's resources. They include the emitter thread and 
the executor to run
-* the queue's complete operation.
+* Batch output of all completed elements. Watermarks are always 
completed if it's their turn to be processed.
 *
-* @param waitForShutdown is true if the method should wait for the 
resources to be freed;
-*   otherwise false.
-* @throws InterruptedException if current thread has been interrupted
+* This method will be called from {@link 
#processWatermark(Watermark)} and from a mail processing the result
+* of an async function call.
 */
-   private void stopResources(boolean waitForShutdown) throws 
InterruptedException {
-   emitter.stop();
-   

[GitHub] [flink] AHeise commented on a change in pull request #9735: [FLINK-14156][runtime] Submit timer trigger letters to task's mailbox with operator's precedence

2019-10-02 Thread GitBox
AHeise commented on a change in pull request #9735: [FLINK-14156][runtime] 
Submit timer trigger letters to task's mailbox with operator's precedence
URL: https://github.com/apache/flink/pull/9735#discussion_r330461900
 
 

 ##
 File path: 
flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/NeverFireProcessingTimeService.java
 ##
 @@ -19,7 +19,7 @@
 
 
 Review comment:
    for pulling out lifecycle stuff outside of user-facing interfaces.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zentol merged pull request #9740: [hotfix][docs] URL should not point to release-1.8 docs

2019-10-02 Thread GitBox
zentol merged pull request #9740: [hotfix][docs] URL should not point to 
release-1.8 docs
URL: https://github.com/apache/flink/pull/9740
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] AHeise commented on a change in pull request #9735: [FLINK-14156][runtime] Submit timer trigger letters to task's mailbox with operator's precedence

2019-10-02 Thread GitBox
AHeise commented on a change in pull request #9735: [FLINK-14156][runtime] 
Submit timer trigger letters to task's mailbox with operator's precedence
URL: https://github.com/apache/flink/pull/9735#discussion_r330513099
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTask.java
 ##
 @@ -75,7 +75,7 @@ public MockStreamTask(
this.closableRegistry = closableRegistry;
this.streamStatusMaintainer = streamStatusMaintainer;
this.checkpointStorage = checkpointStorage;
-   this.processingTimeService = processingTimeService;
+   this.processingTimeService = timerService;
 
 Review comment:
   rename field?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] AHeise commented on a change in pull request #9735: [FLINK-14156][runtime] Submit timer trigger letters to task's mailbox with operator's precedence

2019-10-02 Thread GitBox
AHeise commented on a change in pull request #9735: [FLINK-14156][runtime] 
Submit timer trigger letters to task's mailbox with operator's precedence
URL: https://github.com/apache/flink/pull/9735#discussion_r330464428
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TimerService.java
 ##
 @@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ *
+ * The registration of timers follows a life cycle of three phases:
+ * 
+ * In the initial state, it accepts timer registrations and triggers 
when the time is reached.
+ * After calling {@link #quiesce()}, further calls to
+ * {@link #registerTimer(long, ProcessingTimeCallback)} will not 
register any further timers, and will
+ * return a "dummy" future as a result. This is used for clean 
shutdown, where currently firing
+ * timers are waited for and no future timers can be scheduled, 
without causing hard exceptions.
+ * After a call to {@link #shutdownService()}, all calls to {@link 
#registerTimer(long, ProcessingTimeCallback)}
+ * will result in a hard exception.
+ * 
+ */
+@Internal
+public interface TimerService extends ProcessingTimeService {
 
 Review comment:
   The naming is not really clear:
   * What's the difference between TimerService and TimeService?
   * Usually, names get rather longer than shorter in a hierarchy ;)
   
   I guess TimerService is more plausible but you wanted to keep the 
user-facing interface unchanged.
   
   For me, it feels as if these life-cycle methods keep reappearing at 
different points (Mailbox, Timer, ...). How about we make this interface more 
general purpose? Just call it Lifecycle and don't extend ProcessingTimeService. 
Then we can add this internal interface to different classes later. That's 
quite common in logger and network frameworks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] AHeise commented on a change in pull request #9735: [FLINK-14156][runtime] Submit timer trigger letters to task's mailbox with operator's precedence

2019-10-02 Thread GitBox
AHeise commented on a change in pull request #9735: [FLINK-14156][runtime] 
Submit timer trigger letters to task's mailbox with operator's precedence
URL: https://github.com/apache/flink/pull/9735#discussion_r330516878
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskOperatorTimerTest.java
 ##
 @@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.YieldingOperatorFactory;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
+import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import 
org.apache.flink.streaming.runtime.tasks.mailbox.execution.MailboxExecutor;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+/**
+ * Test to verify that timer triggers are run according to operator precedence 
(combined with yield() at operator level).
+ */
+public class StreamTaskOperatorTimerTest extends TestLogger {
+   private static List events;
+
+   @Test
+   public void testOperatorYieldExecutesSelectedTimers() throws Exception {
+   events = new ArrayList<>();
+   final OneInputStreamTaskTestHarness 
testHarness = new OneInputStreamTaskTestHarness<>(
+   OneInputStreamTask::new,
+   BasicTypeInfo.INT_TYPE_INFO,
+   BasicTypeInfo.INT_TYPE_INFO);
+
+   testHarness.setupOperatorChain(new OperatorID(), new 
TestOperatorFactory<>())
+   .chain(new OperatorID(), new 
TestOperatorFactory<>(), IntSerializer.INSTANCE)
+   .finish();
+
+   testHarness.invoke();
+   testHarness.waitForTaskRunning();
+
+   testHarness.processElement(new StreamRecord<>(42));
+
+   testHarness.endInput();
+   testHarness.waitForTaskCompletion();
+
+   assertThat(events, is(Arrays.asList("Timer:1:1", "Timer:0:1")));
+   }
+
+   private static class TestOperatorFactory implements 
OneInputStreamOperatorFactory, YieldingOperatorFactory {
+   private MailboxExecutor mailboxExecutor;
+
+   @Override
+   public void setMailboxExecutor(MailboxExecutor mailboxExecutor) 
{
+   this.mailboxExecutor = mailboxExecutor;
+   }
+
+   @Override
+   public > Operator 
createStreamOperator(
+   StreamTask containingTask,
+   StreamConfig config,
+   Output> output) {
+   TestOperator operator = new 
TestOperator<>(config.getChainIndex(), mailboxExecutor);
+   operator.setup(containingTask, config, output);
+   return (Operator) operator;
+   }
+
+   @Override
+   public void setChainingStrategy(ChainingStrategy strategy) {
+   }
+
+   @Override
+   public 

[GitHub] [flink] AHeise commented on a change in pull request #9735: [FLINK-14156][runtime] Submit timer trigger letters to task's mailbox with operator's precedence

2019-10-02 Thread GitBox
AHeise commented on a change in pull request #9735: [FLINK-14156][runtime] 
Submit timer trigger letters to task's mailbox with operator's precedence
URL: https://github.com/apache/flink/pull/9735#discussion_r330517177
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskOperatorTimerTest.java
 ##
 @@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.YieldingOperatorFactory;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
+import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import 
org.apache.flink.streaming.runtime.tasks.mailbox.execution.MailboxExecutor;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+/**
+ * Test to verify that timer triggers are run according to operator precedence 
(combined with yield() at operator level).
+ */
+public class StreamTaskOperatorTimerTest extends TestLogger {
+   private static List events;
+
+   @Test
+   public void testOperatorYieldExecutesSelectedTimers() throws Exception {
 
 Review comment:
   I would love to see another test case, where AWOP1.yield will execute 
AWOP2.timer.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] AHeise commented on a change in pull request #9735: [FLINK-14156][runtime] Submit timer trigger letters to task's mailbox with operator's precedence

2019-10-02 Thread GitBox
AHeise commented on a change in pull request #9735: [FLINK-14156][runtime] 
Submit timer trigger letters to task's mailbox with operator's precedence
URL: https://github.com/apache/flink/pull/9735#discussion_r330516661
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskOperatorTimerTest.java
 ##
 @@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.YieldingOperatorFactory;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
+import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import 
org.apache.flink.streaming.runtime.tasks.mailbox.execution.MailboxExecutor;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+/**
+ * Test to verify that timer triggers are run according to operator precedence 
(combined with yield() at operator level).
+ */
+public class StreamTaskOperatorTimerTest extends TestLogger {
+   private static List events;
+
+   @Test
+   public void testOperatorYieldExecutesSelectedTimers() throws Exception {
+   events = new ArrayList<>();
+   final OneInputStreamTaskTestHarness 
testHarness = new OneInputStreamTaskTestHarness<>(
+   OneInputStreamTask::new,
+   BasicTypeInfo.INT_TYPE_INFO,
+   BasicTypeInfo.INT_TYPE_INFO);
+
+   testHarness.setupOperatorChain(new OperatorID(), new 
TestOperatorFactory<>())
+   .chain(new OperatorID(), new 
TestOperatorFactory<>(), IntSerializer.INSTANCE)
+   .finish();
+
+   testHarness.invoke();
+   testHarness.waitForTaskRunning();
+
+   testHarness.processElement(new StreamRecord<>(42));
+
+   testHarness.endInput();
+   testHarness.waitForTaskCompletion();
+
+   assertThat(events, is(Arrays.asList("Timer:1:1", "Timer:0:1")));
+   }
+
+   private static class TestOperatorFactory implements 
OneInputStreamOperatorFactory, YieldingOperatorFactory {
+   private MailboxExecutor mailboxExecutor;
+
+   @Override
+   public void setMailboxExecutor(MailboxExecutor mailboxExecutor) 
{
+   this.mailboxExecutor = mailboxExecutor;
+   }
+
+   @Override
+   public > Operator 
createStreamOperator(
+   StreamTask containingTask,
+   StreamConfig config,
+   Output> output) {
+   TestOperator operator = new 
TestOperator<>(config.getChainIndex(), mailboxExecutor);
+   operator.setup(containingTask, config, output);
+   return (Operator) operator;
+   }
+
+   @Override
+   public void setChainingStrategy(ChainingStrategy strategy) {
+   }
+
+   @Override
+   public 

[GitHub] [flink] AHeise commented on a change in pull request #9735: [FLINK-14156][runtime] Submit timer trigger letters to task's mailbox with operator's precedence

2019-10-02 Thread GitBox
AHeise commented on a change in pull request #9735: [FLINK-14156][runtime] 
Submit timer trigger letters to task's mailbox with operator's precedence
URL: https://github.com/apache/flink/pull/9735#discussion_r330478001
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskOperatorTimerTest.java
 ##
 @@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.YieldingOperatorFactory;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
+import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import 
org.apache.flink.streaming.runtime.tasks.mailbox.execution.MailboxExecutor;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+/**
+ * Test to verify that timer triggers are run according to operator precedence 
(combined with yield() at operator level).
+ */
+public class StreamTaskOperatorTimerTest extends TestLogger {
+   private static List events;
+
+   @Test
+   public void testOperatorYieldExecutesSelectedTimers() throws Exception {
+   events = new ArrayList<>();
+   final OneInputStreamTaskTestHarness 
testHarness = new OneInputStreamTaskTestHarness<>(
+   OneInputStreamTask::new,
+   BasicTypeInfo.INT_TYPE_INFO,
+   BasicTypeInfo.INT_TYPE_INFO);
+
+   testHarness.setupOperatorChain(new OperatorID(), new 
TestOperatorFactory<>())
+   .chain(new OperatorID(), new 
TestOperatorFactory<>(), IntSerializer.INSTANCE)
+   .finish();
+
+   testHarness.invoke();
+   testHarness.waitForTaskRunning();
+
+   testHarness.processElement(new StreamRecord<>(42));
+
+   testHarness.endInput();
+   testHarness.waitForTaskCompletion();
+
+   assertThat(events, is(Arrays.asList("Timer:1:1", "Timer:0:1")));
+   }
+
+   private static class TestOperatorFactory implements 
OneInputStreamOperatorFactory, YieldingOperatorFactory {
+   private MailboxExecutor mailboxExecutor;
+
+   @Override
+   public void setMailboxExecutor(MailboxExecutor mailboxExecutor) 
{
+   this.mailboxExecutor = mailboxExecutor;
+   }
+
+   @Override
+   public > Operator 
createStreamOperator(
+   StreamTask containingTask,
+   StreamConfig config,
+   Output> output) {
+   TestOperator operator = new 
TestOperator<>(config.getChainIndex(), mailboxExecutor);
+   operator.setup(containingTask, config, output);
+   return (Operator) operator;
+   }
+
+   @Override
+   public void setChainingStrategy(ChainingStrategy strategy) {
+   }
+
+   @Override
+   public 

[GitHub] [flink] AHeise commented on a change in pull request #9735: [FLINK-14156][runtime] Submit timer trigger letters to task's mailbox with operator's precedence

2019-10-02 Thread GitBox
AHeise commented on a change in pull request #9735: [FLINK-14156][runtime] 
Submit timer trigger letters to task's mailbox with operator's precedence
URL: https://github.com/apache/flink/pull/9735#discussion_r330515584
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
 ##
 @@ -331,19 +320,10 @@ public CheckpointStateOutputStream 
createCheckpointStateOutputStream(Checkpointe
blockerCheckpointStreamFactory.setWaiterLatch(new 
OneShotLatch());
 
testHarness.invoke(mockEnv);
+   testHarness.waitForTaskRunning();
 
final OneInputStreamTask task = 
testHarness.getTask();
 
-   // wait for the task to be running
 
 Review comment:
   omg. thanks for that hotfix.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] AHeise commented on a change in pull request #9735: [FLINK-14156][runtime] Submit timer trigger letters to task's mailbox with operator's precedence

2019-10-02 Thread GitBox
AHeise commented on a change in pull request #9735: [FLINK-14156][runtime] 
Submit timer trigger letters to task's mailbox with operator's precedence
URL: https://github.com/apache/flink/pull/9735#discussion_r330476481
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskOperatorTimerTest.java
 ##
 @@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.YieldingOperatorFactory;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
+import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import 
org.apache.flink.streaming.runtime.tasks.mailbox.execution.MailboxExecutor;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+/**
+ * Test to verify that timer triggers are run according to operator precedence 
(combined with yield() at operator level).
+ */
+public class StreamTaskOperatorTimerTest extends TestLogger {
+   private static List events;
 
 Review comment:
   Please try to avoid adding even more static fields to the tests. Can you 
please inject a local event list in the same way you inject the mailbox?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] AHeise commented on a change in pull request #9735: [FLINK-14156][runtime] Submit timer trigger letters to task's mailbox with operator's precedence

2019-10-02 Thread GitBox
AHeise commented on a change in pull request #9735: [FLINK-14156][runtime] 
Submit timer trigger letters to task's mailbox with operator's precedence
URL: https://github.com/apache/flink/pull/9735#discussion_r330475591
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
 ##
 @@ -250,37 +250,36 @@ int getNumTasksScheduled() {
// 

 
/**
-* A context to which {@link ProcessingTimeCallback} would be passed to 
be invoked when a timer is up.
+* An exception handler, called when {@link ProcessingTimeCallback} 
throws an exception.
 */
-   interface ScheduledCallbackExecutionContext {
-
-   void invoke(ProcessingTimeCallback callback, long timestamp);
+   interface ExceptionHandler {
 
 Review comment:
   We already have some in 
`org.apache.flink.runtime.operators.sort.ExceptionHandler` . Can we unify them? 
Or can we use `AsyncExceptionHandler` ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] AHeise commented on a change in pull request #9735: [FLINK-14156][runtime] Submit timer trigger letters to task's mailbox with operator's precedence

2019-10-02 Thread GitBox
AHeise commented on a change in pull request #9735: [FLINK-14156][runtime] 
Submit timer trigger letters to task's mailbox with operator's precedence
URL: https://github.com/apache/flink/pull/9735#discussion_r330515293
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskOperatorTimerTest.java
 ##
 @@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.YieldingOperatorFactory;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
+import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import 
org.apache.flink.streaming.runtime.tasks.mailbox.execution.MailboxExecutor;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+/**
+ * Test to verify that timer triggers are run according to operator precedence 
(combined with yield() at operator level).
+ */
+public class StreamTaskOperatorTimerTest extends TestLogger {
+   private static List events;
+
+   @Test
+   public void testOperatorYieldExecutesSelectedTimers() throws Exception {
+   events = new ArrayList<>();
+   final OneInputStreamTaskTestHarness 
testHarness = new OneInputStreamTaskTestHarness<>(
+   OneInputStreamTask::new,
+   BasicTypeInfo.INT_TYPE_INFO,
+   BasicTypeInfo.INT_TYPE_INFO);
+
+   testHarness.setupOperatorChain(new OperatorID(), new 
TestOperatorFactory<>())
+   .chain(new OperatorID(), new 
TestOperatorFactory<>(), IntSerializer.INSTANCE)
+   .finish();
+
+   testHarness.invoke();
+   testHarness.waitForTaskRunning();
+
+   testHarness.processElement(new StreamRecord<>(42));
+
+   testHarness.endInput();
+   testHarness.waitForTaskCompletion();
+
+   assertThat(events, is(Arrays.asList("Timer:1:1", "Timer:0:1")));
+   }
+
+   private static class TestOperatorFactory implements 
OneInputStreamOperatorFactory, YieldingOperatorFactory {
+   private MailboxExecutor mailboxExecutor;
+
+   @Override
+   public void setMailboxExecutor(MailboxExecutor mailboxExecutor) 
{
+   this.mailboxExecutor = mailboxExecutor;
+   }
+
+   @Override
+   public > Operator 
createStreamOperator(
+   StreamTask containingTask,
+   StreamConfig config,
+   Output> output) {
+   TestOperator operator = new 
TestOperator<>(config.getChainIndex(), mailboxExecutor);
+   operator.setup(containingTask, config, output);
+   return (Operator) operator;
+   }
+
+   @Override
+   public void setChainingStrategy(ChainingStrategy strategy) {
+   }
+
+   @Override
+   public 

[GitHub] [flink] AHeise commented on a change in pull request #9735: [FLINK-14156][runtime] Submit timer trigger letters to task's mailbox with operator's precedence

2019-10-02 Thread GitBox
AHeise commented on a change in pull request #9735: [FLINK-14156][runtime] 
Submit timer trigger letters to task's mailbox with operator's precedence
URL: https://github.com/apache/flink/pull/9735#discussion_r330474001
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorLatencyMetricsTest.java
 ##
 @@ -210,7 +210,7 @@ private void testLatencyMarkEmission(int 
numberLatencyMarkers, OperatorSetupOper
StreamSource operator,
ExecutionConfig executionConfig,
Environment env,
-   ProcessingTimeService timeProvider) {
+   TimerService timeProvider) {
 
 Review comment:
   nit: timeProvider is not a really good name.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zentol commented on issue #9798: [FLINK-14176][web]Web Ui add log url for taskmanager of vertex

2019-10-02 Thread GitBox
zentol commented on issue #9798: [FLINK-14176][web]Web Ui add log url for 
taskmanager of vertex
URL: https://github.com/apache/flink/pull/9798#issuecomment-537467031
 
 
   _As usual_, please include a screenshot of the WebUI when modifying it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #9834: [bp-1.9][FLINK-13827][script] shell variable should be escaped

2019-10-02 Thread GitBox
flinkbot commented on issue #9834: [bp-1.9][FLINK-13827][script] shell variable 
should be escaped
URL: https://github.com/apache/flink/pull/9834#issuecomment-537466604
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Automated Checks
   Last check on commit 65dff9edaba9515d520fdefa5f4b9a7361d3e92f (Wed Oct 02 
12:20:06 UTC 2019)
   
   **Warnings:**
* No documentation files were touched! Remember to keep the Flink docs up 
to date!
   
   
   Mention the bot in a comment to re-run the automated checks.
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full 
explanation of the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] TisonKun opened a new pull request #9834: [FLINK-13827][script] shell variable should be escaped

2019-10-02 Thread GitBox
TisonKun opened a new pull request #9834: [FLINK-13827][script] shell variable 
should be escaped
URL: https://github.com/apache/flink/pull/9834
 
 
   See also #9800


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-13827) shell variable should be escaped in start-scala-shell.sh

2019-10-02 Thread Zili Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13827?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16942754#comment-16942754
 ] 

Zili Chen commented on FLINK-13827:
---

master via 865cc4c7a39f7aa610a02cc4a0f41424edcd6279

> shell variable should be escaped in start-scala-shell.sh
> 
>
> Key: FLINK-13827
> URL: https://issues.apache.org/jira/browse/FLINK-13827
> Project: Flink
>  Issue Type: Bug
>  Components: Scala Shell
>Affects Versions: 1.9.0, 1.10.0
>Reporter: Zili Chen
>Assignee: Zili Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.0, 1.9.2
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> {code:java}
> diff --git a/flink-scala-shell/start-script/start-scala-shell.sh 
> b/flink-scala-shell/start-script/start-scala-shell.sh
> index b6da81af72..65b9045584 100644
> --- a/flink-scala-shell/start-script/start-scala-shell.sh
> +++ b/flink-scala-shell/start-script/start-scala-shell.sh
> @@ -97,9 +97,9 @@ log_setting="-Dlog.file="$LOG" 
> -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/$LOG
>  
>  if ${EXTERNAL_LIB_FOUND}
>  then
> -$JAVA_RUN -Dscala.color -cp "$FLINK_CLASSPATH" $log_setting 
> org.apache.flink.api.scala.FlinkShell $@ --addclasspath "$EXT_CLASSPATH"
> +$JAVA_RUN -Dscala.color -cp "$FLINK_CLASSPATH" "$log_setting" 
> org.apache.flink.api.scala.FlinkShell $@ --addclasspath "$EXT_CLASSPATH"
>  else
> -$JAVA_RUN -Dscala.color -cp "$FLINK_CLASSPATH" $log_setting 
> org.apache.flink.api.scala.FlinkShell $@
> +$JAVA_RUN -Dscala.color -cp "$FLINK_CLASSPATH" "$log_setting" 
> org.apache.flink.api.scala.FlinkShell $@
>  fi
>  
>  #restore echo
> {code}
> otherwise it is error prone when {{$log_setting}} contain arbitrary content. 
> For example, if the parent dir contain whitespace, said {{flink-1.9.0 2}}, 
> then {{bin/start-scala-shell.sh local}} will fail with
> {{Error: Could not find or load main class 
> 2.log.flink\-\*\-scala\-shell\-local\-\*.log}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-14300) StreamTask#invoke leaks threads if OperatorChain fails to be constructed

2019-10-02 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14300?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16942753#comment-16942753
 ] 

Piotr Nowojski edited comment on FLINK-14300 at 10/2/19 12:11 PM:
--

Hi [~mklein0]. Thank you for reporting and proposing a fix for this bug.

Could you open a pull request for this change on the 
[github|https://github.com/apache/flink/]? If you do so, please notify me so I 
can quickly review it.


was (Author: pnowojski):
Hi [~mklein0]. Thank you for reporting and proposing a fix for this bug.

Could you open a pull request for this change on the 
[github|https://github.com/apache/flink/]?

> StreamTask#invoke leaks threads if OperatorChain fails to be constructed
> 
>
> Key: FLINK-14300
> URL: https://issues.apache.org/jira/browse/FLINK-14300
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Affects Versions: 1.8.1, 1.8.2, 1.9.0
>Reporter: Marcos Klein
>Priority: Minor
> Fix For: 1.10.0
>
> Attachments: thread-leak-patch.diff
>
>
> In the *StreamTask#invoke* method if an exception occurs during the 
> allocation of the 
> [operatorChain|[https://github.com/apache/flink/blob/release-1.9.0/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L370]]
>  class, the [exception 
> handling|[https://github.com/apache/flink/blob/release-1.9.0/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L485-L491]]
>  fails to cleanup the threads allocated as *StreamTask#recordWriters*. This 
> causes threads to leak as flink attempts to continually restart and fail for 
> the same cause.
>  
> An example cause is a deserialization issue on a custom operator from a 
> checkpoint.
>  
> Attached is a suggested fix for the master branch.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-14300) StreamTask#invoke leaks threads if OperatorChain fails to be constructed

2019-10-02 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14300?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-14300:
---
Fix Version/s: 1.10.0

> StreamTask#invoke leaks threads if OperatorChain fails to be constructed
> 
>
> Key: FLINK-14300
> URL: https://issues.apache.org/jira/browse/FLINK-14300
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Affects Versions: 1.8.1, 1.8.2, 1.9.0
>Reporter: Marcos Klein
>Priority: Minor
> Fix For: 1.10.0
>
> Attachments: thread-leak-patch.diff
>
>
> In the *StreamTask#invoke* method if an exception occurs during the 
> allocation of the 
> [operatorChain|[https://github.com/apache/flink/blob/release-1.9.0/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L370]]
>  class, the [exception 
> handling|[https://github.com/apache/flink/blob/release-1.9.0/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L485-L491]]
>  fails to cleanup the threads allocated as *StreamTask#recordWriters*. This 
> causes threads to leak as flink attempts to continually restart and fail for 
> the same cause.
>  
> An example cause is a deserialization issue on a custom operator from a 
> checkpoint.
>  
> Attached is a suggested fix for the master branch.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] TisonKun closed pull request #9800: [FLINK-13827][script] shell variable should be escaped

2019-10-02 Thread GitBox
TisonKun closed pull request #9800: [FLINK-13827][script] shell variable should 
be escaped
URL: https://github.com/apache/flink/pull/9800
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] TisonKun commented on issue #9800: [FLINK-13827][script] shell variable should be escaped

2019-10-02 Thread GitBox
TisonKun commented on issue #9800: [FLINK-13827][script] shell variable should 
be escaped
URL: https://github.com/apache/flink/pull/9800#issuecomment-537463641
 
 
   master via 865cc4c7a39f7aa610a02cc4a0f41424edcd6279


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-14300) StreamTask#invoke leaks threads if OperatorChain fails to be constructed

2019-10-02 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14300?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16942753#comment-16942753
 ] 

Piotr Nowojski commented on FLINK-14300:


Hi [~mklein0]. Thank you for reporting and proposing a fix for this bug.

Could you open a pull request for this change on the 
[github|https://github.com/apache/flink/]?

> StreamTask#invoke leaks threads if OperatorChain fails to be constructed
> 
>
> Key: FLINK-14300
> URL: https://issues.apache.org/jira/browse/FLINK-14300
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Affects Versions: 1.8.1, 1.8.2, 1.9.0
>Reporter: Marcos Klein
>Priority: Minor
> Attachments: thread-leak-patch.diff
>
>
> In the *StreamTask#invoke* method if an exception occurs during the 
> allocation of the 
> [operatorChain|[https://github.com/apache/flink/blob/release-1.9.0/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L370]]
>  class, the [exception 
> handling|[https://github.com/apache/flink/blob/release-1.9.0/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L485-L491]]
>  fails to cleanup the threads allocated as *StreamTask#recordWriters*. This 
> causes threads to leak as flink attempts to continually restart and fail for 
> the same cause.
>  
> An example cause is a deserialization issue on a custom operator from a 
> checkpoint.
>  
> Attached is a suggested fix for the master branch.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on issue #9815: [FLINK-14117][docs-zh] Translate changes on index page to Chinese

2019-10-02 Thread GitBox
flinkbot edited a comment on issue #9815: [FLINK-14117][docs-zh] Translate 
changes on index page to Chinese
URL: https://github.com/apache/flink/pull/9815#issuecomment-536343016
 
 
   
   ## CI report:
   
   * 615acdb2511760c55f8831934f710678a8962acc : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/129610971)
   * 38894aa30a82d5763ad8137e176ac7d78ee13178 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/129652562)
   * 4b35c7d8ce5d86f030037b924a943f857d7f8a01 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/129684998)
   * 9099b3a2a220c0d289899f05cea6714fc281d800 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/129802867)
   * 90d7fce7f9b27fe52e84c65545c701609b9a3ea9 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/129858575)
   * 7d25238fda6dbf1029d3d315d3b1077227ca05ce : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/130010236)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9791: [FLINK-14248][runtime] Let LazyFromSourcesSchedulingStrategy restart terminated tasks

2019-10-02 Thread GitBox
flinkbot edited a comment on issue #9791: [FLINK-14248][runtime] Let 
LazyFromSourcesSchedulingStrategy restart terminated tasks
URL: https://github.com/apache/flink/pull/9791#issuecomment-536005650
 
 
   
   ## CI report:
   
   * 757ae54b3f31a82334fae36d6e714c6aa9ac6581 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/129477385)
   * fa00ce849c0c110b76207c99ffed9e29b26d597e : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/130013179)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] TisonKun commented on issue #9800: [FLINK-13827][script] shell variable should be escaped

2019-10-02 Thread GitBox
TisonKun commented on issue #9800: [FLINK-13827][script] shell variable should 
be escaped
URL: https://github.com/apache/flink/pull/9800#issuecomment-537457189
 
 
   Thanks for your review @xintongsong & @zentol . Merging to master...
   
   @wuchong given there is a release progress under branch-1.9 it seems the 
back port to 1.9 should be deferred. The JIRA fixed version contains 1.9.2 so 
that I'd like to do the back port once 1.9.1 released and unlock branch-1.9. 
Let me know if I misunderstand anything.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-14214) Performance regression in TwoInputStreamOperator

2019-10-02 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14214?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski closed FLINK-14214.
--
Resolution: Cannot Reproduce

> Performance regression in TwoInputStreamOperator
> 
>
> Key: FLINK-14214
> URL: https://issues.apache.org/jira/browse/FLINK-14214
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Affects Versions: 1.10.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> https://issues.apache.org/jira/browse/FLINK-13051 introduced a performance 
> regression visible for example 
> [here|http://codespeed.dak8s.net:8000/timeline/#/?exe=1,3=twoInputMapSink=2=200=off=off=off]
>  as a drop on August 30th (note that the later performance improvement is 
> caused by serialisation improvement, which is unrelated to the previous slow 
> down). 
> Probable suspect is the following {{.isDone()}} check executed inside 
> {{StreamTwoInputProcessor}} once per record:
> {code:java}
> // to avoid starvation, if the input selection is ALL and availableInputsMask 
> is not ALL,
> // always try to check and set the availability of another input
> // TODO: because this can be a costly operation (checking volatile inside 
> CompletableFuture`
> //  this might be optimized to only check once per processed NetworkBuffer
> if (inputSelectionHandler.shouldSetAvailableForAnotherInput()) {
>   checkAndSetAvailable(1 - readingInputIndex);
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] pnowojski merged pull request #9779: [FLINK-14214][runtime] Shortcut isAvailable().isDone() anti-starvation check in StreamTwoInputProcesso

2019-10-02 Thread GitBox
pnowojski merged pull request #9779: [FLINK-14214][runtime] Shortcut 
isAvailable().isDone() anti-starvation check in StreamTwoInputProcesso
URL: https://github.com/apache/flink/pull/9779
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zentol commented on a change in pull request #9825: [FLINK-14299] Introduce ProcessMetricGroup

2019-10-02 Thread GitBox
zentol commented on a change in pull request #9825: [FLINK-14299] Introduce 
ProcessMetricGroup
URL: https://github.com/apache/flink/pull/9825#discussion_r330501737
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
 ##
 @@ -62,7 +64,7 @@
  */
 public class MetricUtils {
private static final Logger LOG = 
LoggerFactory.getLogger(MetricUtils.class);
-   private static final String METRIC_GROUP_STATUS_NAME = "Status";
+   public static final String METRIC_GROUP_STATUS_NAME = "Status";
 
 Review comment:
   where is this used outside of this class?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zentol commented on a change in pull request #9825: [FLINK-14299] Introduce ProcessMetricGroup

2019-10-02 Thread GitBox
zentol commented on a change in pull request #9825: [FLINK-14299] Introduce 
ProcessMetricGroup
URL: https://github.com/apache/flink/pull/9825#discussion_r330499346
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
 ##
 @@ -100,17 +98,22 @@ public static JobManagerMetricGroup 
instantiateJobManagerMetricGroup(
hostName,
resourceID.toString());
 
-   MetricGroup statusGroup = 
taskManagerMetricGroup.addGroup(METRIC_GROUP_STATUS_NAME);
-
-   // Initialize the TM metrics
-   instantiateStatusMetrics(statusGroup);
+   MetricGroup statusGroup = 
createAndInitializeStatusMetricGroup(taskManagerMetricGroup);
 
if (systemResourceProbeInterval.isPresent()) {
instantiateSystemMetrics(taskManagerMetricGroup, 
systemResourceProbeInterval.get());
}
return Tuple2.of(taskManagerMetricGroup, statusGroup);
}
 
+   private static MetricGroup 
createAndInitializeStatusMetricGroup(AbstractMetricGroup parentMetricGroup) {
+   MetricGroup statusGroup = 
parentMetricGroup.addGroup(METRIC_GROUP_STATUS_NAME);
+
+   // Initialize the TM metrics
 
 Review comment:
   this comment isn't accurate anymore; let's just remove it


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9791: [FLINK-14248][runtime] Let LazyFromSourcesSchedulingStrategy restart terminated tasks

2019-10-02 Thread GitBox
flinkbot edited a comment on issue #9791: [FLINK-14248][runtime] Let 
LazyFromSourcesSchedulingStrategy restart terminated tasks
URL: https://github.com/apache/flink/pull/9791#issuecomment-536005650
 
 
   
   ## CI report:
   
   * 757ae54b3f31a82334fae36d6e714c6aa9ac6581 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/129477385)
   * fa00ce849c0c110b76207c99ffed9e29b26d597e : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/130013179)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zentol merged pull request #9668: [hotfix][docs] Fix the wrong characters in the document(building.md, building.zh.md)

2019-10-02 Thread GitBox
zentol merged pull request #9668: [hotfix][docs] Fix the wrong characters in 
the document(building.md,building.zh.md)
URL: https://github.com/apache/flink/pull/9668
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-13516) YARNSessionFIFOSecuredITCase fails on Java 11

2019-10-02 Thread Chesnay Schepler (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-13516?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-13516.

Resolution: Fixed

master: d35f6990eb402a2137826b62bca11437728da9c1

> YARNSessionFIFOSecuredITCase fails on Java 11
> -
>
> Key: FLINK-13516
> URL: https://issues.apache.org/jira/browse/FLINK-13516
> Project: Flink
>  Issue Type: Sub-task
>  Components: Deployment / YARN, Tests
>Reporter: Chesnay Schepler
>Assignee: Haibo Sun
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> {{YARNSessionFIFOSecuredITCase#testDetachedMode}} times out when run on Java 
> 11. This may be related to security changes in Java 11.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] zentol merged pull request #9622: [FLINK-13516][test] Bump MiniKdc to 3.2.0 to fix the failure of YARNSessionFIFOSecuredITCase on Java 11

2019-10-02 Thread GitBox
zentol merged pull request #9622: [FLINK-13516][test] Bump MiniKdc to 3.2.0 to 
fix the failure of YARNSessionFIFOSecuredITCase on Java 11
URL: https://github.com/apache/flink/pull/9622
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zentol commented on issue #9504: [FLINK-13799][web]: fix error message web submit is disables in the c…

2019-10-02 Thread GitBox
zentol commented on issue #9504: [FLINK-13799][web]: fix error message web 
submit is disables in the c…
URL: https://github.com/apache/flink/pull/9504#issuecomment-537446272
 
 
   @vthinkxie FLINK-13817 exposes whether submissions are enabled in a more 
reliable way. The configuration is is unreliable since it only lists explicitly 
configured keys; if the key is not explicitly configured you'd have to make 
assumptions on the default.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9833: [FLINK-14276][quickstarts] Scala quickstart compiles on JDK 11

2019-10-02 Thread GitBox
flinkbot edited a comment on issue #9833: [FLINK-14276][quickstarts] Scala 
quickstart compiles on JDK 11
URL: https://github.com/apache/flink/pull/9833#issuecomment-537431999
 
 
   
   ## CI report:
   
   * 5077c53b7d163f49e5d4a8414c6c0598d4fdb060 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/130007248)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9815: [FLINK-14117][docs-zh] Translate changes on index page to Chinese

2019-10-02 Thread GitBox
flinkbot edited a comment on issue #9815: [FLINK-14117][docs-zh] Translate 
changes on index page to Chinese
URL: https://github.com/apache/flink/pull/9815#issuecomment-536343016
 
 
   
   ## CI report:
   
   * 615acdb2511760c55f8831934f710678a8962acc : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/129610971)
   * 38894aa30a82d5763ad8137e176ac7d78ee13178 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/129652562)
   * 4b35c7d8ce5d86f030037b924a943f857d7f8a01 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/129684998)
   * 9099b3a2a220c0d289899f05cea6714fc281d800 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/129802867)
   * 90d7fce7f9b27fe52e84c65545c701609b9a3ea9 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/129858575)
   * 7d25238fda6dbf1029d3d315d3b1077227ca05ce : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/130010236)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9791: [FLINK-14248][runtime] Let LazyFromSourcesSchedulingStrategy restart terminated tasks

2019-10-02 Thread GitBox
flinkbot edited a comment on issue #9791: [FLINK-14248][runtime] Let 
LazyFromSourcesSchedulingStrategy restart terminated tasks
URL: https://github.com/apache/flink/pull/9791#issuecomment-536005650
 
 
   
   ## CI report:
   
   * 757ae54b3f31a82334fae36d6e714c6aa9ac6581 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/129477385)
   * fa00ce849c0c110b76207c99ffed9e29b26d597e : UNKNOWN
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9833: [FLINK-14276][quickstarts] Scala quickstart compiles on JDK 11

2019-10-02 Thread GitBox
flinkbot edited a comment on issue #9833: [FLINK-14276][quickstarts] Scala 
quickstart compiles on JDK 11
URL: https://github.com/apache/flink/pull/9833#issuecomment-537431999
 
 
   
   ## CI report:
   
   * 5077c53b7d163f49e5d4a8414c6c0598d4fdb060 : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/130007248)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9815: [FLINK-14117][docs-zh] Translate changes on index page to Chinese

2019-10-02 Thread GitBox
flinkbot edited a comment on issue #9815: [FLINK-14117][docs-zh] Translate 
changes on index page to Chinese
URL: https://github.com/apache/flink/pull/9815#issuecomment-536343016
 
 
   
   ## CI report:
   
   * 615acdb2511760c55f8831934f710678a8962acc : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/129610971)
   * 38894aa30a82d5763ad8137e176ac7d78ee13178 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/129652562)
   * 4b35c7d8ce5d86f030037b924a943f857d7f8a01 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/129684998)
   * 9099b3a2a220c0d289899f05cea6714fc281d800 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/129802867)
   * 90d7fce7f9b27fe52e84c65545c701609b9a3ea9 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/129858575)
   * 7d25238fda6dbf1029d3d315d3b1077227ca05ce : UNKNOWN
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9791: [FLINK-14248][runtime] Let LazyFromSourcesSchedulingStrategy restart terminated tasks

2019-10-02 Thread GitBox
zhuzhurk commented on a change in pull request #9791: [FLINK-14248][runtime] 
Let LazyFromSourcesSchedulingStrategy restart terminated tasks
URL: https://github.com/apache/flink/pull/9791#discussion_r330483271
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java
 ##
 @@ -102,11 +108,12 @@ public void onExecutionStateChange(ExecutionVertexID 
executionVertexId, Executio
final Set verticesToSchedule = 
schedulingTopology.getVertexOrThrow(executionVertexId)
.getProducedResultPartitions()
.stream()
+   .filter(partition -> partition.getPartitionType() == 
ResultPartitionType.BLOCKING)
 
 Review comment:
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9779: [FLINK-14214][runtime] Shortcut isAvailable().isDone() anti-starvation check in StreamTwoInputProcesso

2019-10-02 Thread GitBox
flinkbot edited a comment on issue #9779: [FLINK-14214][runtime] Shortcut 
isAvailable().isDone() anti-starvation check in StreamTwoInputProcesso
URL: https://github.com/apache/flink/pull/9779#issuecomment-535521325
 
 
   
   ## CI report:
   
   * 42064500d00956c47a8b37b678c4c9b43170ac4a : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/129282821)
   * 10728ce9a96eb13e6b2586eedadfe53161ff85b2 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/13514)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9791: [FLINK-14248][runtime] Let LazyFromSourcesSchedulingStrategy restart terminated tasks

2019-10-02 Thread GitBox
zhuzhurk commented on a change in pull request #9791: [FLINK-14248][runtime] 
Let LazyFromSourcesSchedulingStrategy restart terminated tasks
URL: https://github.com/apache/flink/pull/9791#discussion_r330480845
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java
 ##
 @@ -78,19 +81,22 @@ public void startScheduling() {
deploymentOptions.put(schedulingVertex.getId(), option);
}
 
-   
allocateSlotsAndDeployExecutionVertexIds(getAllVerticesFromTopology());
+   allocateSlotsAndDeployExecutionVertices(
+   
getSchedulingExecutionVertices(getAllVerticesFromTopology()),
+   IS_IN_CREATED_STATE);
}
 
@Override
-   public void restartTasks(Set verticesToRestart) {
+   public void restartTasks(final Set 
verticesToRestart) {
+   final Set verticesToSchedule = 
getSchedulingExecutionVertices(verticesToRestart);
+
// increase counter of the dataset first
-   verticesToRestart
+   verticesToSchedule
.stream()
-   .map(schedulingTopology::getVertexOrThrow)
.flatMap(vertex -> 
vertex.getProducedResultPartitions().stream())

.forEach(inputConstraintChecker::resetSchedulingResultPartition);
 
-   allocateSlotsAndDeployExecutionVertexIds(verticesToRestart);
+   allocateSlotsAndDeployExecutionVertices(verticesToSchedule, 
IS_IN_TERMINAL_STATE);
 
 Review comment:
   This should not happen because the `restartTasks` is only invoked when the 
`releaseFuture` of all the`verticesToRestart` are completed(valid states 
include `FINISHED`/`CANCELED`/`FAILED`).


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9791: [FLINK-14248][runtime] Let LazyFromSourcesSchedulingStrategy restart terminated tasks

2019-10-02 Thread GitBox
zhuzhurk commented on a change in pull request #9791: [FLINK-14248][runtime] 
Let LazyFromSourcesSchedulingStrategy restart terminated tasks
URL: https://github.com/apache/flink/pull/9791#discussion_r330480920
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java
 ##
 @@ -102,11 +108,12 @@ public void onExecutionStateChange(ExecutionVertexID 
executionVertexId, Executio
final Set verticesToSchedule = 
schedulingTopology.getVertexOrThrow(executionVertexId)
.getProducedResultPartitions()
.stream()
+   .filter(partition -> partition.getPartitionType() == 
ResultPartitionType.BLOCKING)
 
 Review comment:
   You are right!


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #9833: [FLINK-14276][quickstarts] Scala quickstart compiles on JDK 11

2019-10-02 Thread GitBox
flinkbot commented on issue #9833: [FLINK-14276][quickstarts] Scala quickstart 
compiles on JDK 11
URL: https://github.com/apache/flink/pull/9833#issuecomment-537431999
 
 
   
   ## CI report:
   
   * 5077c53b7d163f49e5d4a8414c6c0598d4fdb060 : UNKNOWN
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-14295) Nightly flink-runtime failed with java 11

2019-10-02 Thread Chesnay Schepler (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16942676#comment-16942676
 ] 

Chesnay Schepler commented on FLINK-14295:
--

Let's split this into 2 tickets since there are 2 test failures. From a quick 
glance I doubt this is exclusive to Java 11.

> Nightly flink-runtime failed with java 11
> -
>
> Key: FLINK-14295
> URL: https://issues.apache.org/jira/browse/FLINK-14295
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Reporter: Yu Li
>Priority: Critical
>  Labels: test-stability
>
> The core-jdk11 part of nightly test failed with below error:
> {noformat}
> 22:09:38.176 [ERROR] Failures: 
> 22:09:38.180 [ERROR]   
> TaskExecutorSubmissionTest.testRequestStackTraceSample:637 
> expected:<[updateTaskExecutionState]> but 
> was:<[lambda$updateTaskExecutionState$0]>
> 22:09:38.185 [ERROR] Errors: 
> 22:09:38.185 [ERROR]   
> RecordWriterTest.testClearBuffersAfterInterruptDuringBlockingBufferRequest:165
>  » NoSuchElement
> 22:09:38.185 [INFO] 
> 22:09:38.186 [ERROR] Tests run: 3936, Failures: 1, Errors: 1, Skipped: 40
> {noformat}
> Link of the build: https://api.travis-ci.org/v3/job/591086968/log.txt



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] tillrohrmann commented on a change in pull request #9760: [FLINK-13982][runtime] Implement memory calculation logics

2019-10-02 Thread GitBox
tillrohrmann commented on a change in pull request #9760: 
[FLINK-13982][runtime] Implement memory calculation logics
URL: https://github.com/apache/flink/pull/9760#discussion_r330469316
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java
 ##
 @@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple6;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.util.ConfigurationException;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Utility class for TaskExecutor memory configurations.
+ */
+public class TaskExecutorResourceUtils {
+
+   private TaskExecutorResourceUtils() {}
+
+   // 

+   //  Memory Configuration Calculations
+   // 

+
+   public static TaskExecutorResourceSpec resourceSpecFromConfig(final 
Configuration config) throws ConfigurationException {
+   if (isTaskHeapMemorySizeExplicitlyConfigured(config) && 
isManagedMemorySizeExplicitlyConfigured(config)) {
+   // both task heap memory and managed memory is 
configured, use these to derive total flink memory
+   return 
deriveResourceSpecWithExplicitTaskAndManagedMemory(config);
+   } else if (isTotalFlinkMemorySizeExplicitlyConfigured(config)) {
+   // total flink memory is configured, but not task heap 
and managed memory, derive from total flink memory
+   return deriveResourceSpecWithTotalFlinkMemory(config);
+   } else if 
(isTotalProcessMemorySizeExplicitlyConfigured(config)) {
+   return deriveResourceSpecWithTotalProcessMemory(config);
+   } else {
+   throw new ConfigurationException("Either Task Heap 
Memory size and Managed Memory size, or Total Flink"
+   + " Memory size, or Total Process Memory size 
need to be configured explicitly.");
+   }
+   }
+
+   private static TaskExecutorResourceSpec 
deriveResourceSpecWithExplicitTaskAndManagedMemory(final Configuration config) {
+   // derive total flink internal memory sizes from explicitly 
configure task heap memory size and managed memory size
+
+   final MemorySize frameworkHeapMemorySize = 
getFrameworkHeapMemorySize(config);
+   final MemorySize taskHeapMemorySize = 
getTaskHeapMemorySize(config);
+   final MemorySize taskOffHeapMemorySize = 
getTaskOffHeapMemorySize(config);
+
+   final MemorySize managedMemorySize = 
getManagedMemorySize(config);
+   final Tuple2 managedMemorySizeTuple2 = 
deriveOnHeapAndOffHeapManagedMemorySizeFromManagedMemorySize(config, 
managedMemorySize);
+   final MemorySize onHeapManagedMemorySize = 
managedMemorySizeTuple2.f0;
+   final MemorySize offHeapManagedMemorySize = 
managedMemorySizeTuple2.f1;
+
+   final MemorySize shuffleMemorySize = 
deriveShuffleMemoryWithInverseFraction(config,
+   
frameworkHeapMemorySize.add(taskHeapMemorySize).add(taskOffHeapMemorySize).add(managedMemorySize));
+
+   // derive total flink external memory sizes from derived total 
flink memory size
+
+   final MemorySize totalFlinkMemorySize = frameworkHeapMemorySize
+   .add(taskHeapMemorySize)
+   .add(taskOffHeapMemorySize)
+   .add(shuffleMemorySize)
+   .add(managedMemorySize);
+
+   final Tuple2 
totalFlinkExternalMemorySizeTuple2 = 
deriveTotalFlinkExternalMemorySizes(config, totalFlinkMemorySize);
+
+   return new TaskExecutorResourceSpec(
+   

<    1   2   3   >