[GitHub] [flink] flinkbot edited a comment on issue #9825: [FLINK-14299] Introduce ProcessMetricGroup
flinkbot edited a comment on issue #9825: [FLINK-14299] Introduce ProcessMetricGroup URL: https://github.com/apache/flink/pull/9825#issuecomment-536938289 ## CI report: * 53c41b006ae2155206a7843b67721b598b3591d1 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/129823434) * 6039a68165fc94b780c8129bd01844ae7028e97a : PENDING [Build](https://travis-ci.com/flink-ci/flink/builds/130034403) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on issue #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
tillrohrmann commented on issue #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#issuecomment-537509801 > About the performance impact, lease refer to [the analysis of JDK CSLM implementation](https://docs.google.com/document/d/16VIY7o-18sM-pIlIYkbTuhKPmwfnqabCt_nlOARAzdg/edit#) and a compacted data structure we introduced for HBase to reduce GC pressure. Search for `Key space schema` and `Value space schema` in `SkipListUtils` and we could find a similar design here. Where exactly do I see the performance comparison? > About reusable object, it will add a lot of efforts/complexity making sure to prevent concurrent manipulation on it. Why would this be the case? The only accessing threads should be the Task's main thread and the asynchronous checkpointing, right? Couldn't we say that the asynchronous checkpointing creates one single instance and reuses this instance for the whole checkpointing procedure? One could make it even a thread local variable if one wants to have an easy solution. So I'm not sure where the argument comes from that using a thin wrapping object around a pointer will necessarily decrease performance. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r330567573 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMapTest.java ## @@ -0,0 +1,1448 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one + * * or more contributor license agreements. See the NOTICE file + * * distributed with this work for additional information + * * regarding copyright ownership. The ASF licenses this file + * * to you under the Apache License, Version 2.0 (the + * * "License"); you may not use this file except in compliance + * * with the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.core.memory.ByteArrayInputStreamWithPos; +import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.state.StateEntry; +import org.apache.flink.runtime.state.StateSnapshotTransformer; +import org.apache.flink.runtime.state.StateTransformationFunction; +import org.apache.flink.runtime.state.heap.space.Allocator; +import org.apache.flink.runtime.state.internal.InternalKvState; +import org.apache.flink.util.IOUtils; +import org.apache.flink.util.TestLogger; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; + +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +/** + * Tests for {@link CopyOnWriteSkipListStateMap}. + */ +public class CopyOnWriteSkipListStateMapTest extends TestLogger { + + private TestAllocator spaceAllocator; + + @Before + public void setUp() { + int maxAllocateSize = 256; + spaceAllocator = new TestAllocator(maxAllocateSize); + } + + @After + public void tearDown() { + IOUtils.closeQuietly(spaceAllocator); + } + + /** +* Test initialization of state map. +*/ + @Test + public void testInitStateMap() { + TypeSerializer keySerializer = IntSerializer.INSTANCE; + TypeSerializer namespaceSerializer = LongSerializer.INSTANCE; + TypeSerializer stateSerializer = StringSerializer.INSTANCE; + CopyOnWriteSkipListStateMap stateMap = new CopyOnWriteSkipListStateMap<>( + keySerializer, namespaceSerializer, stateSerializer, spaceAllocator); + + assertTrue(stateMap.isEmpty()); + assertEquals(0, stateMap.size()); + assertEquals(0, stateMap.totalSize()); + assertEquals(0, stateMap.getRequestCount()); + assertTrue(stateMap.getLogicallyRemovedNodes().isEmpty()); + assertEquals(0, stateMap.getHighestRequiredSnapshotVersion()); + assertEquals(0, stateMap.getHighestFinishedSnapshotVersion()); + assertTrue(stateMap.getSnapshotVersions().isEmpty()); + assertTrue(stateMap.getPruningValueNodes().isEmpty()); + assertEquals(0, stateMap.getResourceGuard().getLeaseCount()); + assertFalse(stateMap.getResourceGuard().isClosed()); + assertFalse(stateMap.isClosed()); + + assertNull(stateMap.get(0, 0L)); +
[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r330566832 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMapTest.java ## @@ -0,0 +1,1448 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one + * * or more contributor license agreements. See the NOTICE file + * * distributed with this work for additional information + * * regarding copyright ownership. The ASF licenses this file + * * to you under the Apache License, Version 2.0 (the + * * "License"); you may not use this file except in compliance + * * with the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.core.memory.ByteArrayInputStreamWithPos; +import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.state.StateEntry; +import org.apache.flink.runtime.state.StateSnapshotTransformer; +import org.apache.flink.runtime.state.StateTransformationFunction; +import org.apache.flink.runtime.state.heap.space.Allocator; +import org.apache.flink.runtime.state.internal.InternalKvState; +import org.apache.flink.util.IOUtils; +import org.apache.flink.util.TestLogger; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; + +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +/** + * Tests for {@link CopyOnWriteSkipListStateMap}. + */ +public class CopyOnWriteSkipListStateMapTest extends TestLogger { + + private TestAllocator spaceAllocator; + + @Before + public void setUp() { + int maxAllocateSize = 256; + spaceAllocator = new TestAllocator(maxAllocateSize); + } + + @After + public void tearDown() { + IOUtils.closeQuietly(spaceAllocator); + } + + /** +* Test initialization of state map. +*/ + @Test + public void testInitStateMap() { + TypeSerializer keySerializer = IntSerializer.INSTANCE; + TypeSerializer namespaceSerializer = LongSerializer.INSTANCE; + TypeSerializer stateSerializer = StringSerializer.INSTANCE; + CopyOnWriteSkipListStateMap stateMap = new CopyOnWriteSkipListStateMap<>( + keySerializer, namespaceSerializer, stateSerializer, spaceAllocator); + + assertTrue(stateMap.isEmpty()); + assertEquals(0, stateMap.size()); + assertEquals(0, stateMap.totalSize()); + assertEquals(0, stateMap.getRequestCount()); + assertTrue(stateMap.getLogicallyRemovedNodes().isEmpty()); + assertEquals(0, stateMap.getHighestRequiredSnapshotVersion()); + assertEquals(0, stateMap.getHighestFinishedSnapshotVersion()); + assertTrue(stateMap.getSnapshotVersions().isEmpty()); + assertTrue(stateMap.getPruningValueNodes().isEmpty()); + assertEquals(0, stateMap.getResourceGuard().getLeaseCount()); + assertFalse(stateMap.getResourceGuard().isClosed()); + assertFalse(stateMap.isClosed()); + + assertNull(stateMap.get(0, 0L)); +
[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r330565842 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMapTest.java ## @@ -0,0 +1,1448 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one + * * or more contributor license agreements. See the NOTICE file + * * distributed with this work for additional information + * * regarding copyright ownership. The ASF licenses this file + * * to you under the Apache License, Version 2.0 (the + * * "License"); you may not use this file except in compliance + * * with the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.core.memory.ByteArrayInputStreamWithPos; +import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.state.StateEntry; +import org.apache.flink.runtime.state.StateSnapshotTransformer; +import org.apache.flink.runtime.state.StateTransformationFunction; +import org.apache.flink.runtime.state.heap.space.Allocator; +import org.apache.flink.runtime.state.internal.InternalKvState; +import org.apache.flink.util.IOUtils; +import org.apache.flink.util.TestLogger; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; + +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +/** + * Tests for {@link CopyOnWriteSkipListStateMap}. + */ +public class CopyOnWriteSkipListStateMapTest extends TestLogger { + + private TestAllocator spaceAllocator; + + @Before + public void setUp() { + int maxAllocateSize = 256; + spaceAllocator = new TestAllocator(maxAllocateSize); + } + + @After + public void tearDown() { + IOUtils.closeQuietly(spaceAllocator); + } + + /** +* Test initialization of state map. +*/ + @Test + public void testInitStateMap() { + TypeSerializer keySerializer = IntSerializer.INSTANCE; + TypeSerializer namespaceSerializer = LongSerializer.INSTANCE; + TypeSerializer stateSerializer = StringSerializer.INSTANCE; + CopyOnWriteSkipListStateMap stateMap = new CopyOnWriteSkipListStateMap<>( + keySerializer, namespaceSerializer, stateSerializer, spaceAllocator); + + assertTrue(stateMap.isEmpty()); + assertEquals(0, stateMap.size()); + assertEquals(0, stateMap.totalSize()); + assertEquals(0, stateMap.getRequestCount()); + assertTrue(stateMap.getLogicallyRemovedNodes().isEmpty()); + assertEquals(0, stateMap.getHighestRequiredSnapshotVersion()); + assertEquals(0, stateMap.getHighestFinishedSnapshotVersion()); + assertTrue(stateMap.getSnapshotVersions().isEmpty()); + assertTrue(stateMap.getPruningValueNodes().isEmpty()); + assertEquals(0, stateMap.getResourceGuard().getLeaseCount()); + assertFalse(stateMap.getResourceGuard().isClosed()); + assertFalse(stateMap.isClosed()); + + assertNull(stateMap.get(0, 0L)); +
[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r330565842 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMapTest.java ## @@ -0,0 +1,1448 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one + * * or more contributor license agreements. See the NOTICE file + * * distributed with this work for additional information + * * regarding copyright ownership. The ASF licenses this file + * * to you under the Apache License, Version 2.0 (the + * * "License"); you may not use this file except in compliance + * * with the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.core.memory.ByteArrayInputStreamWithPos; +import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.state.StateEntry; +import org.apache.flink.runtime.state.StateSnapshotTransformer; +import org.apache.flink.runtime.state.StateTransformationFunction; +import org.apache.flink.runtime.state.heap.space.Allocator; +import org.apache.flink.runtime.state.internal.InternalKvState; +import org.apache.flink.util.IOUtils; +import org.apache.flink.util.TestLogger; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; + +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +/** + * Tests for {@link CopyOnWriteSkipListStateMap}. + */ +public class CopyOnWriteSkipListStateMapTest extends TestLogger { + + private TestAllocator spaceAllocator; + + @Before + public void setUp() { + int maxAllocateSize = 256; + spaceAllocator = new TestAllocator(maxAllocateSize); + } + + @After + public void tearDown() { + IOUtils.closeQuietly(spaceAllocator); + } + + /** +* Test initialization of state map. +*/ + @Test + public void testInitStateMap() { + TypeSerializer keySerializer = IntSerializer.INSTANCE; + TypeSerializer namespaceSerializer = LongSerializer.INSTANCE; + TypeSerializer stateSerializer = StringSerializer.INSTANCE; + CopyOnWriteSkipListStateMap stateMap = new CopyOnWriteSkipListStateMap<>( + keySerializer, namespaceSerializer, stateSerializer, spaceAllocator); + + assertTrue(stateMap.isEmpty()); + assertEquals(0, stateMap.size()); + assertEquals(0, stateMap.totalSize()); + assertEquals(0, stateMap.getRequestCount()); + assertTrue(stateMap.getLogicallyRemovedNodes().isEmpty()); + assertEquals(0, stateMap.getHighestRequiredSnapshotVersion()); + assertEquals(0, stateMap.getHighestFinishedSnapshotVersion()); + assertTrue(stateMap.getSnapshotVersions().isEmpty()); + assertTrue(stateMap.getPruningValueNodes().isEmpty()); + assertEquals(0, stateMap.getResourceGuard().getLeaseCount()); + assertFalse(stateMap.getResourceGuard().isClosed()); + assertFalse(stateMap.isClosed()); + + assertNull(stateMap.get(0, 0L)); +
[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r330565096 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/space/SpaceUtils.java ## @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap.space; + +import static org.apache.flink.runtime.state.heap.space.Constants.FOUR_BYTES_BITS; +import static org.apache.flink.runtime.state.heap.space.Constants.FOUR_BYTES_MARK; + +/** + * Utils. + */ +public class SpaceUtils { + + public static int getChunkIdByAddress(long offset) { + return (int) ((offset >>> FOUR_BYTES_BITS) & FOUR_BYTES_MARK); + } + + public static int getChunkOffsetByAddress(long offset) { + return (int) (offset & FOUR_BYTES_MARK); + } +} Review comment: I would really be interested in learning what performance gain we actually achieve with it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r330564637 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/space/Allocator.java ## @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap.space; + +import java.io.Closeable; + +/** + * Implementations are responsible for allocate space. + */ +public interface Allocator extends Closeable { + + /** +* Allocate space with the given size. +* +* @param size size of space to allocate. +* @return address of the allocated space, or -1 when allocation is failed. +*/ + long allocate(int size); Review comment: I don't understand it. If the space allocation fails, then we should also fail the operation right? How exactly will this be caught? It looks as if we pass the return value of `allocate` unconditionally to other methods. Are you saying that these methods will simply swallow this value or fail at a different place? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r330561678 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMapSnapshot.java ## @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.runtime.state.StateSnapshotTransformer; +import org.apache.flink.util.ResourceGuard; + +import javax.annotation.Nonnegative; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.Iterator; +import java.util.NoSuchElementException; + +import static org.apache.flink.runtime.state.heap.SkipListUtils.HEAD_NODE; +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE; +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_VALUE_POINTER; + +/** + * This class represents the snapshot of a {@link CopyOnWriteSkipListStateMap}. + * + * @param type of key + * @param type of namespace + * @param type of state + */ +public class CopyOnWriteSkipListStateMapSnapshot + extends StateMapSnapshot> { + + /** +* Version of the {@link CopyOnWriteSkipListStateMap} when this snapshot was created. This can be used to release the snapshot. +*/ + private final int snapshotVersion; + + /** The number of (non-null) entries in snapshotData. */ + @Nonnegative + private final int numberOfEntriesInSnapshotData; + + /** +* This lease protects the state map resources. +*/ + private final ResourceGuard.Lease lease; + + /** +* Creates a new {@link CopyOnWriteSkipListStateMap}. +* +* @param owningStateMap the {@link CopyOnWriteSkipListStateMap} for which this object represents a snapshot. +* @param lease the lease protects the state map resources. +*/ + CopyOnWriteSkipListStateMapSnapshot( + CopyOnWriteSkipListStateMap owningStateMap, + ResourceGuard.Lease lease) { + super(owningStateMap); + + this.snapshotVersion = owningStateMap.getStateMapVersion(); + this.numberOfEntriesInSnapshotData = owningStateMap.size(); + this.lease = lease; + } + + /** +* Returns the internal version of the when this snapshot was created. +*/ + int getSnapshotVersion() { + return snapshotVersion; + } + + @Override + public void release() { + owningStateMap.releaseSnapshot(this); + lease.close(); + } + + @Override + public void writeState( + TypeSerializer keySerializer, + TypeSerializer namespaceSerializer, + TypeSerializer stateSerializer, + @Nonnull DataOutputView dov, + @Nullable StateSnapshotTransformer stateSnapshotTransformer) throws IOException { + if (stateSnapshotTransformer == null) { + writeStateWithNoTransform(dov); + } else { + writeStateWithTransform(stateSerializer, dov, stateSnapshotTransformer); + } + } + + private void writeStateWithNoTransform(@Nonnull DataOutputView dov) throws IOException { + dov.writeInt(numberOfEntriesInSnapshotData); + SnapshotNodeIterator nodeIterator = new SnapshotNodeIterator(true); + while (nodeIterator.hasNext()) { + Tuple2 tuple = nodeIterator.next(); + writeKeyAndNamespace(tuple.f0, dov); + writeValue(tuple.f1, dov); + } + } + + private void writeStateWithTransform( + TypeSerializer stateSerializer, + @Nonnull
[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r330561193 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java ## @@ -0,0 +1,1527 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.ByteBufferUtils; +import org.apache.flink.runtime.state.StateEntry; +import org.apache.flink.runtime.state.StateTransformationFunction; +import org.apache.flink.runtime.state.heap.space.Allocator; +import org.apache.flink.runtime.state.heap.space.Chunk; +import org.apache.flink.runtime.state.heap.space.SpaceUtils; +import org.apache.flink.runtime.state.internal.InternalKvState; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.ResourceGuard; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.Spliterators; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import static org.apache.flink.runtime.state.heap.SkipListUtils.HEAD_NODE; +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE; +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_VALUE_POINTER; + +/** + * Implementation of state map which is based on skip list with copy-on-write support. states will + * be serialized to bytes and stored in the space allocated with the given allocator. + * + * @param type of key + * @param type of namespace + * @param type of state + */ +public class CopyOnWriteSkipListStateMap extends StateMap implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(CopyOnWriteSkipListStateMap.class); + + /** +* Default max number of logically-removed keys to delete one time. +*/ + private static final int DEFAULT_MAX_KEYS_TO_DELETE_ONE_TIME = 3; + + /** +* Default ratio of the logically-removed keys to trigger deletion when snapshot. +*/ + private static final float DEFAULT_LOGICAL_REMOVED_KEYS_RATIO = 0.2f; + + /** +* The serializer used to serialize the key and namespace to bytes stored in skip list. +*/ + private final SkipListKeySerializer skipListKeySerializer; + + /** +* The serializer used to serialize the state to bytes stored in skip list. +*/ + private final SkipListValueSerializer skipListValueSerializer; + + /** +* Space allocator. +*/ + private final Allocator spaceAllocator; + + /** +* The level index header. +*/ + private final LevelIndexHeader levelIndexHeader; + + /** +* Seed to generate random index level. +*/ + private int randomSeed; + + /** +* The current version of this map. Used for copy-on-write mechanics. +*/ + private int stateMapVersion; + + /** +* The highest version of this map that is still required by any unreleased snapshot. +*/ + private int highestRequiredSnapshotVersion; + + /** +* Snapshots no more than this version must have been finished, but there may be some +* snapshots more than this version are still running. +*/ + private volatile int highestFinishedSnapshotVersion; + + /** +* Maintains an ordered set of version ids that are
[GitHub] [flink] flinkbot edited a comment on issue #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator
flinkbot edited a comment on issue #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator URL: https://github.com/apache/flink/pull/9717#issuecomment-533068819 ## CI report: * 9cfda801891969ac460f45ea639d636b519f22db : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/128317194) * af7f6be848f0bd90a049d5ee9f7a38b1c3e2b972 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/128336908) * 43157eb165d9409fbdf4a2f773ef7d52dd74e759 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/128365390) * ff400d149ed36c63a90d2f3fcd517bde738e5af1 : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/128402339) * 02dbf61e50e6c913610df7f586d7eb0f20529c13 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/128406923) * 68cf941c7e165a5c6b2a27f7ac716b29f76d1918 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/128472867) * 54aa0043ec73b03580eda2e0310b44bdee352a55 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/128495989) * e081ea5c536d6ebb828eb8b68c404379d23f5aef : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/128502020) * 911bdd80f4b1cbb6cee2ffb517cafc5e3c83ee0b : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/128622324) * 40fd94b291968062eec85a9436a3a755914d282a : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/128904653) * e60ea55f913143611d9162fd79a75cfa575aa9f0 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/128907420) * c2c7b2cc91961bc2c8707f9077c4d66ea6535b8e : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/129102072) * 1048cb10486f94983a52582953379afa02a3b350 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/129122517) * 773eb07af5ac848ec70cd32663ebf657b0419f3e : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/129823477) * 7c4191061dbacaf142f2b3e6fa8dec30d7e16595 : PENDING [Build](https://travis-ci.com/flink-ci/flink/builds/130030060) * d8cee5bd77379a05ba6a501444f7285e73a4be43 : UNKNOWN This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r330560285 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java ## @@ -0,0 +1,1527 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.ByteBufferUtils; +import org.apache.flink.runtime.state.StateEntry; +import org.apache.flink.runtime.state.StateTransformationFunction; +import org.apache.flink.runtime.state.heap.space.Allocator; +import org.apache.flink.runtime.state.heap.space.Chunk; +import org.apache.flink.runtime.state.heap.space.SpaceUtils; +import org.apache.flink.runtime.state.internal.InternalKvState; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.ResourceGuard; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.Spliterators; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import static org.apache.flink.runtime.state.heap.SkipListUtils.HEAD_NODE; +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE; +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_VALUE_POINTER; + +/** + * Implementation of state map which is based on skip list with copy-on-write support. states will + * be serialized to bytes and stored in the space allocated with the given allocator. + * + * @param type of key + * @param type of namespace + * @param type of state + */ +public class CopyOnWriteSkipListStateMap extends StateMap implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(CopyOnWriteSkipListStateMap.class); + + /** +* Default max number of logically-removed keys to delete one time. +*/ + private static final int DEFAULT_MAX_KEYS_TO_DELETE_ONE_TIME = 3; + + /** +* Default ratio of the logically-removed keys to trigger deletion when snapshot. +*/ + private static final float DEFAULT_LOGICAL_REMOVED_KEYS_RATIO = 0.2f; + + /** +* The serializer used to serialize the key and namespace to bytes stored in skip list. +*/ + private final SkipListKeySerializer skipListKeySerializer; + + /** +* The serializer used to serialize the state to bytes stored in skip list. +*/ + private final SkipListValueSerializer skipListValueSerializer; + + /** +* Space allocator. +*/ + private final Allocator spaceAllocator; + + /** +* The level index header. +*/ + private final LevelIndexHeader levelIndexHeader; + + /** +* Seed to generate random index level. +*/ + private int randomSeed; + + /** +* The current version of this map. Used for copy-on-write mechanics. +*/ + private int stateMapVersion; + + /** +* The highest version of this map that is still required by any unreleased snapshot. +*/ + private int highestRequiredSnapshotVersion; + + /** +* Snapshots no more than this version must have been finished, but there may be some +* snapshots more than this version are still running. +*/ + private volatile int highestFinishedSnapshotVersion; + + /** +* Maintains an ordered set of version ids that are
[GitHub] [flink] flinkbot edited a comment on issue #9834: [bp-1.9][FLINK-13827][script] shell variable should be escaped
flinkbot edited a comment on issue #9834: [bp-1.9][FLINK-13827][script] shell variable should be escaped URL: https://github.com/apache/flink/pull/9834#issuecomment-537478603 ## CI report: * 65dff9edaba9515d520fdefa5f4b9a7361d3e92f : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/130026021) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #9827: [FLINK-14303][metrics] Replace JobManagerMetricGroup with ResourceManagerMetricGroup in ResourceManager
flinkbot edited a comment on issue #9827: [FLINK-14303][metrics] Replace JobManagerMetricGroup with ResourceManagerMetricGroup in ResourceManager URL: https://github.com/apache/flink/pull/9827#issuecomment-536948437 ## CI report: * fa367aaeb2089366de1d9497f1bbec9b11d75e72 : UNKNOWN * f865d19c9a7c1530d06d194652582ebe7257176a : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/129826911) * 53201b462b67f199c628367918e19c475204765d : UNKNOWN This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #9828: [FLINK-14305] Transfer ownership of JobManagerMetricGroup to Dispatcher
flinkbot edited a comment on issue #9828: [FLINK-14305] Transfer ownership of JobManagerMetricGroup to Dispatcher URL: https://github.com/apache/flink/pull/9828#issuecomment-536958155 ## CI report: * 582fcdf7a275b9416d2e5f947352bf1310ff5d88 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/129830601) * 12bfe6e05cc2cf1e92d4f99ac9d41964026388ee : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/129854671) * 7529618b93035b1238efb7795813a2f550cda04a : UNKNOWN This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r330557424 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java ## @@ -0,0 +1,1527 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.ByteBufferUtils; +import org.apache.flink.runtime.state.StateEntry; +import org.apache.flink.runtime.state.StateTransformationFunction; +import org.apache.flink.runtime.state.heap.space.Allocator; +import org.apache.flink.runtime.state.heap.space.Chunk; +import org.apache.flink.runtime.state.heap.space.SpaceUtils; +import org.apache.flink.runtime.state.internal.InternalKvState; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.ResourceGuard; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.Spliterators; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import static org.apache.flink.runtime.state.heap.SkipListUtils.HEAD_NODE; +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE; +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_VALUE_POINTER; + +/** + * Implementation of state map which is based on skip list with copy-on-write support. states will + * be serialized to bytes and stored in the space allocated with the given allocator. + * + * @param type of key + * @param type of namespace + * @param type of state + */ +public class CopyOnWriteSkipListStateMap extends StateMap implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(CopyOnWriteSkipListStateMap.class); + + /** +* Default max number of logically-removed keys to delete one time. +*/ + private static final int DEFAULT_MAX_KEYS_TO_DELETE_ONE_TIME = 3; + + /** +* Default ratio of the logically-removed keys to trigger deletion when snapshot. +*/ + private static final float DEFAULT_LOGICAL_REMOVED_KEYS_RATIO = 0.2f; + + /** +* The serializer used to serialize the key and namespace to bytes stored in skip list. +*/ + private final SkipListKeySerializer skipListKeySerializer; + + /** +* The serializer used to serialize the state to bytes stored in skip list. +*/ + private final SkipListValueSerializer skipListValueSerializer; + + /** +* Space allocator. +*/ + private final Allocator spaceAllocator; + + /** +* The level index header. +*/ + private final LevelIndexHeader levelIndexHeader; + + /** +* Seed to generate random index level. +*/ + private int randomSeed; + + /** +* The current version of this map. Used for copy-on-write mechanics. +*/ + private int stateMapVersion; + + /** +* The highest version of this map that is still required by any unreleased snapshot. +*/ + private int highestRequiredSnapshotVersion; + + /** +* Snapshots no more than this version must have been finished, but there may be some +* snapshots more than this version are still running. +*/ + private volatile int highestFinishedSnapshotVersion; + + /** +* Maintains an ordered set of version ids that are
[GitHub] [flink] flinkbot edited a comment on issue #9825: [FLINK-14299] Introduce ProcessMetricGroup
flinkbot edited a comment on issue #9825: [FLINK-14299] Introduce ProcessMetricGroup URL: https://github.com/apache/flink/pull/9825#issuecomment-536938289 ## CI report: * 53c41b006ae2155206a7843b67721b598b3591d1 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/129823434) * 6039a68165fc94b780c8129bd01844ae7028e97a : UNKNOWN This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r33054 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java ## @@ -0,0 +1,1527 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.ByteBufferUtils; +import org.apache.flink.runtime.state.StateEntry; +import org.apache.flink.runtime.state.StateTransformationFunction; +import org.apache.flink.runtime.state.heap.space.Allocator; +import org.apache.flink.runtime.state.heap.space.Chunk; +import org.apache.flink.runtime.state.heap.space.SpaceUtils; +import org.apache.flink.runtime.state.internal.InternalKvState; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.ResourceGuard; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.Spliterators; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import static org.apache.flink.runtime.state.heap.SkipListUtils.HEAD_NODE; +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE; +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_VALUE_POINTER; + +/** + * Implementation of state map which is based on skip list with copy-on-write support. states will + * be serialized to bytes and stored in the space allocated with the given allocator. + * + * @param type of key + * @param type of namespace + * @param type of state + */ +public class CopyOnWriteSkipListStateMap extends StateMap implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(CopyOnWriteSkipListStateMap.class); + + /** +* Default max number of logically-removed keys to delete one time. +*/ + private static final int DEFAULT_MAX_KEYS_TO_DELETE_ONE_TIME = 3; + + /** +* Default ratio of the logically-removed keys to trigger deletion when snapshot. +*/ + private static final float DEFAULT_LOGICAL_REMOVED_KEYS_RATIO = 0.2f; + + /** +* The serializer used to serialize the key and namespace to bytes stored in skip list. +*/ + private final SkipListKeySerializer skipListKeySerializer; + + /** +* The serializer used to serialize the state to bytes stored in skip list. +*/ + private final SkipListValueSerializer skipListValueSerializer; + + /** +* Space allocator. +*/ + private final Allocator spaceAllocator; + + /** +* The level index header. +*/ + private final LevelIndexHeader levelIndexHeader; + + /** +* Seed to generate random index level. +*/ + private int randomSeed; + + /** +* The current version of this map. Used for copy-on-write mechanics. +*/ + private int stateMapVersion; + + /** +* The highest version of this map that is still required by any unreleased snapshot. +*/ + private int highestRequiredSnapshotVersion; + + /** +* Snapshots no more than this version must have been finished, but there may be some +* snapshots more than this version are still running. +*/ + private volatile int highestFinishedSnapshotVersion; + + /** +* Maintains an ordered set of version ids that are
[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r330554852 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java ## @@ -0,0 +1,1527 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.ByteBufferUtils; +import org.apache.flink.runtime.state.StateEntry; +import org.apache.flink.runtime.state.StateTransformationFunction; +import org.apache.flink.runtime.state.heap.space.Allocator; +import org.apache.flink.runtime.state.heap.space.Chunk; +import org.apache.flink.runtime.state.heap.space.SpaceUtils; +import org.apache.flink.runtime.state.internal.InternalKvState; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.ResourceGuard; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.Spliterators; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import static org.apache.flink.runtime.state.heap.SkipListUtils.HEAD_NODE; +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE; +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_VALUE_POINTER; + +/** + * Implementation of state map which is based on skip list with copy-on-write support. states will + * be serialized to bytes and stored in the space allocated with the given allocator. + * + * @param type of key + * @param type of namespace + * @param type of state + */ +public class CopyOnWriteSkipListStateMap extends StateMap implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(CopyOnWriteSkipListStateMap.class); + + /** +* Default max number of logically-removed keys to delete one time. +*/ + private static final int DEFAULT_MAX_KEYS_TO_DELETE_ONE_TIME = 3; + + /** +* Default ratio of the logically-removed keys to trigger deletion when snapshot. +*/ + private static final float DEFAULT_LOGICAL_REMOVED_KEYS_RATIO = 0.2f; + + /** +* The serializer used to serialize the key and namespace to bytes stored in skip list. +*/ + private final SkipListKeySerializer skipListKeySerializer; + + /** +* The serializer used to serialize the state to bytes stored in skip list. +*/ + private final SkipListValueSerializer skipListValueSerializer; + + /** +* Space allocator. +*/ + private final Allocator spaceAllocator; + + /** +* The level index header. +*/ + private final LevelIndexHeader levelIndexHeader; + + /** +* Seed to generate random index level. +*/ + private int randomSeed; + + /** +* The current version of this map. Used for copy-on-write mechanics. +*/ + private int stateMapVersion; + + /** +* The highest version of this map that is still required by any unreleased snapshot. +*/ + private int highestRequiredSnapshotVersion; + + /** +* Snapshots no more than this version must have been finished, but there may be some +* snapshots more than this version are still running. +*/ + private volatile int highestFinishedSnapshotVersion; + + /** +* Maintains an ordered set of version ids that are
[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r330554289 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java ## @@ -0,0 +1,1527 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.ByteBufferUtils; +import org.apache.flink.runtime.state.StateEntry; +import org.apache.flink.runtime.state.StateTransformationFunction; +import org.apache.flink.runtime.state.heap.space.Allocator; +import org.apache.flink.runtime.state.heap.space.Chunk; +import org.apache.flink.runtime.state.heap.space.SpaceUtils; +import org.apache.flink.runtime.state.internal.InternalKvState; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.ResourceGuard; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.Spliterators; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import static org.apache.flink.runtime.state.heap.SkipListUtils.HEAD_NODE; +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE; +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_VALUE_POINTER; + +/** + * Implementation of state map which is based on skip list with copy-on-write support. states will + * be serialized to bytes and stored in the space allocated with the given allocator. + * + * @param type of key + * @param type of namespace + * @param type of state + */ +public class CopyOnWriteSkipListStateMap extends StateMap implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(CopyOnWriteSkipListStateMap.class); + + /** +* Default max number of logically-removed keys to delete one time. +*/ + private static final int DEFAULT_MAX_KEYS_TO_DELETE_ONE_TIME = 3; + + /** +* Default ratio of the logically-removed keys to trigger deletion when snapshot. +*/ + private static final float DEFAULT_LOGICAL_REMOVED_KEYS_RATIO = 0.2f; + + /** +* The serializer used to serialize the key and namespace to bytes stored in skip list. +*/ + private final SkipListKeySerializer skipListKeySerializer; + + /** +* The serializer used to serialize the state to bytes stored in skip list. +*/ + private final SkipListValueSerializer skipListValueSerializer; + + /** +* Space allocator. +*/ + private final Allocator spaceAllocator; + + /** +* The level index header. +*/ + private final LevelIndexHeader levelIndexHeader; + + /** +* Seed to generate random index level. +*/ + private int randomSeed; + + /** +* The current version of this map. Used for copy-on-write mechanics. +*/ + private int stateMapVersion; + + /** +* The highest version of this map that is still required by any unreleased snapshot. +*/ + private int highestRequiredSnapshotVersion; + + /** +* Snapshots no more than this version must have been finished, but there may be some +* snapshots more than this version are still running. +*/ + private volatile int highestFinishedSnapshotVersion; + + /** +* Maintains an ordered set of version ids that are
[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r330552051 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java ## @@ -0,0 +1,1527 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.ByteBufferUtils; +import org.apache.flink.runtime.state.StateEntry; +import org.apache.flink.runtime.state.StateTransformationFunction; +import org.apache.flink.runtime.state.heap.space.Allocator; +import org.apache.flink.runtime.state.heap.space.Chunk; +import org.apache.flink.runtime.state.heap.space.SpaceUtils; +import org.apache.flink.runtime.state.internal.InternalKvState; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.ResourceGuard; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.Spliterators; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import static org.apache.flink.runtime.state.heap.SkipListUtils.HEAD_NODE; +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE; +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_VALUE_POINTER; + +/** + * Implementation of state map which is based on skip list with copy-on-write support. states will + * be serialized to bytes and stored in the space allocated with the given allocator. + * + * @param type of key + * @param type of namespace + * @param type of state + */ +public class CopyOnWriteSkipListStateMap extends StateMap implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(CopyOnWriteSkipListStateMap.class); + + /** +* Default max number of logically-removed keys to delete one time. +*/ + private static final int DEFAULT_MAX_KEYS_TO_DELETE_ONE_TIME = 3; + + /** +* Default ratio of the logically-removed keys to trigger deletion when snapshot. +*/ + private static final float DEFAULT_LOGICAL_REMOVED_KEYS_RATIO = 0.2f; + + /** +* The serializer used to serialize the key and namespace to bytes stored in skip list. +*/ + private final SkipListKeySerializer skipListKeySerializer; + + /** +* The serializer used to serialize the state to bytes stored in skip list. +*/ + private final SkipListValueSerializer skipListValueSerializer; + + /** +* Space allocator. +*/ + private final Allocator spaceAllocator; + + /** +* The level index header. +*/ + private final LevelIndexHeader levelIndexHeader; + + /** +* Seed to generate random index level. +*/ + private int randomSeed; + + /** +* The current version of this map. Used for copy-on-write mechanics. +*/ + private int stateMapVersion; + + /** +* The highest version of this map that is still required by any unreleased snapshot. +*/ + private int highestRequiredSnapshotVersion; + + /** +* Snapshots no more than this version must have been finished, but there may be some +* snapshots more than this version are still running. +*/ + private volatile int highestFinishedSnapshotVersion; + + /** +* Maintains an ordered set of version ids that are
[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r330552051 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java ## @@ -0,0 +1,1527 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.ByteBufferUtils; +import org.apache.flink.runtime.state.StateEntry; +import org.apache.flink.runtime.state.StateTransformationFunction; +import org.apache.flink.runtime.state.heap.space.Allocator; +import org.apache.flink.runtime.state.heap.space.Chunk; +import org.apache.flink.runtime.state.heap.space.SpaceUtils; +import org.apache.flink.runtime.state.internal.InternalKvState; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.ResourceGuard; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.Spliterators; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import static org.apache.flink.runtime.state.heap.SkipListUtils.HEAD_NODE; +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE; +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_VALUE_POINTER; + +/** + * Implementation of state map which is based on skip list with copy-on-write support. states will + * be serialized to bytes and stored in the space allocated with the given allocator. + * + * @param type of key + * @param type of namespace + * @param type of state + */ +public class CopyOnWriteSkipListStateMap extends StateMap implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(CopyOnWriteSkipListStateMap.class); + + /** +* Default max number of logically-removed keys to delete one time. +*/ + private static final int DEFAULT_MAX_KEYS_TO_DELETE_ONE_TIME = 3; + + /** +* Default ratio of the logically-removed keys to trigger deletion when snapshot. +*/ + private static final float DEFAULT_LOGICAL_REMOVED_KEYS_RATIO = 0.2f; + + /** +* The serializer used to serialize the key and namespace to bytes stored in skip list. +*/ + private final SkipListKeySerializer skipListKeySerializer; + + /** +* The serializer used to serialize the state to bytes stored in skip list. +*/ + private final SkipListValueSerializer skipListValueSerializer; + + /** +* Space allocator. +*/ + private final Allocator spaceAllocator; + + /** +* The level index header. +*/ + private final LevelIndexHeader levelIndexHeader; + + /** +* Seed to generate random index level. +*/ + private int randomSeed; + + /** +* The current version of this map. Used for copy-on-write mechanics. +*/ + private int stateMapVersion; + + /** +* The highest version of this map that is still required by any unreleased snapshot. +*/ + private int highestRequiredSnapshotVersion; + + /** +* Snapshots no more than this version must have been finished, but there may be some +* snapshots more than this version are still running. +*/ + private volatile int highestFinishedSnapshotVersion; + + /** +* Maintains an ordered set of version ids that are
[GitHub] [flink] 1u0 commented on a change in pull request #9735: [FLINK-14156][runtime] Submit timer trigger letters to task's mailbox with operator's precedence
1u0 commented on a change in pull request #9735: [FLINK-14156][runtime] Submit timer trigger letters to task's mailbox with operator's precedence URL: https://github.com/apache/flink/pull/9735#discussion_r330551710 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskOperatorTimerTest.java ## @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators; + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.api.operators.YieldingOperatorFactory; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask; +import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.streaming.runtime.tasks.mailbox.execution.MailboxExecutor; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; + +/** + * Test to verify that timer triggers are run according to operator precedence (combined with yield() at operator level). + */ +public class StreamTaskOperatorTimerTest extends TestLogger { + private static List events; Review comment: I don't like usage of static (mutable) variables myself (including in tests), but I don't see other way how to get "feedback" from the test operators here. I'm open if you have a suggestion how to avoid it here. The mailbox is in a different situation, as it's provided by normal (production) code by the runtime. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r330551180 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java ## @@ -0,0 +1,1527 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.ByteBufferUtils; +import org.apache.flink.runtime.state.StateEntry; +import org.apache.flink.runtime.state.StateTransformationFunction; +import org.apache.flink.runtime.state.heap.space.Allocator; +import org.apache.flink.runtime.state.heap.space.Chunk; +import org.apache.flink.runtime.state.heap.space.SpaceUtils; +import org.apache.flink.runtime.state.internal.InternalKvState; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.ResourceGuard; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.Spliterators; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import static org.apache.flink.runtime.state.heap.SkipListUtils.HEAD_NODE; +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE; +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_VALUE_POINTER; + +/** + * Implementation of state map which is based on skip list with copy-on-write support. states will + * be serialized to bytes and stored in the space allocated with the given allocator. + * + * @param type of key + * @param type of namespace + * @param type of state + */ +public class CopyOnWriteSkipListStateMap extends StateMap implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(CopyOnWriteSkipListStateMap.class); + + /** +* Default max number of logically-removed keys to delete one time. +*/ + private static final int DEFAULT_MAX_KEYS_TO_DELETE_ONE_TIME = 3; + + /** +* Default ratio of the logically-removed keys to trigger deletion when snapshot. +*/ + private static final float DEFAULT_LOGICAL_REMOVED_KEYS_RATIO = 0.2f; + + /** +* The serializer used to serialize the key and namespace to bytes stored in skip list. +*/ + private final SkipListKeySerializer skipListKeySerializer; + + /** +* The serializer used to serialize the state to bytes stored in skip list. +*/ + private final SkipListValueSerializer skipListValueSerializer; + + /** +* Space allocator. +*/ + private final Allocator spaceAllocator; + + /** +* The level index header. +*/ + private final LevelIndexHeader levelIndexHeader; + + /** +* Seed to generate random index level. +*/ + private int randomSeed; + + /** +* The current version of this map. Used for copy-on-write mechanics. +*/ + private int stateMapVersion; + + /** +* The highest version of this map that is still required by any unreleased snapshot. +*/ + private int highestRequiredSnapshotVersion; + + /** +* Snapshots no more than this version must have been finished, but there may be some +* snapshots more than this version are still running. +*/ + private volatile int highestFinishedSnapshotVersion; + + /** +* Maintains an ordered set of version ids that are
[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r330550525 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java ## @@ -0,0 +1,1527 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.ByteBufferUtils; +import org.apache.flink.runtime.state.StateEntry; +import org.apache.flink.runtime.state.StateTransformationFunction; +import org.apache.flink.runtime.state.heap.space.Allocator; +import org.apache.flink.runtime.state.heap.space.Chunk; +import org.apache.flink.runtime.state.heap.space.SpaceUtils; +import org.apache.flink.runtime.state.internal.InternalKvState; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.ResourceGuard; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.Spliterators; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import static org.apache.flink.runtime.state.heap.SkipListUtils.HEAD_NODE; +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE; +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_VALUE_POINTER; + +/** + * Implementation of state map which is based on skip list with copy-on-write support. states will + * be serialized to bytes and stored in the space allocated with the given allocator. + * + * @param type of key + * @param type of namespace + * @param type of state + */ +public class CopyOnWriteSkipListStateMap extends StateMap implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(CopyOnWriteSkipListStateMap.class); + + /** +* Default max number of logically-removed keys to delete one time. +*/ + private static final int DEFAULT_MAX_KEYS_TO_DELETE_ONE_TIME = 3; + + /** +* Default ratio of the logically-removed keys to trigger deletion when snapshot. +*/ + private static final float DEFAULT_LOGICAL_REMOVED_KEYS_RATIO = 0.2f; + + /** +* The serializer used to serialize the key and namespace to bytes stored in skip list. +*/ + private final SkipListKeySerializer skipListKeySerializer; + + /** +* The serializer used to serialize the state to bytes stored in skip list. +*/ + private final SkipListValueSerializer skipListValueSerializer; + + /** +* Space allocator. +*/ + private final Allocator spaceAllocator; + + /** +* The level index header. +*/ + private final LevelIndexHeader levelIndexHeader; + + /** +* Seed to generate random index level. +*/ + private int randomSeed; + + /** +* The current version of this map. Used for copy-on-write mechanics. +*/ + private int stateMapVersion; + + /** +* The highest version of this map that is still required by any unreleased snapshot. +*/ + private int highestRequiredSnapshotVersion; + + /** +* Snapshots no more than this version must have been finished, but there may be some +* snapshots more than this version are still running. +*/ + private volatile int highestFinishedSnapshotVersion; + + /** +* Maintains an ordered set of version ids that are
[GitHub] [flink] 1u0 commented on a change in pull request #9735: [FLINK-14156][runtime] Submit timer trigger letters to task's mailbox with operator's precedence
1u0 commented on a change in pull request #9735: [FLINK-14156][runtime] Submit timer trigger letters to task's mailbox with operator's precedence URL: https://github.com/apache/flink/pull/9735#discussion_r330549295 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java ## @@ -250,37 +250,36 @@ int getNumTasksScheduled() { // /** -* A context to which {@link ProcessingTimeCallback} would be passed to be invoked when a timer is up. +* An exception handler, called when {@link ProcessingTimeCallback} throws an exception. */ - interface ScheduledCallbackExecutionContext { - - void invoke(ProcessingTimeCallback callback, long timestamp); + interface ExceptionHandler { Review comment: I would not use `org.apache.flink.runtime.operators.sort.ExceptionHandler` or `AsyncExceptionHandler` here, as they are both used in their own use case (and were probably introduced on their own to have separation). Reusing them here would be a false code reuse, imo. I see a value to have a generic interface defined somewhere in `org.apache.flink.util` (in `flink-core`?). But even for the generic interface, the question is what should be the api (currently, all three mentioned interfaces have some small different details). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r330548923 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java ## @@ -0,0 +1,1527 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.ByteBufferUtils; +import org.apache.flink.runtime.state.StateEntry; +import org.apache.flink.runtime.state.StateTransformationFunction; +import org.apache.flink.runtime.state.heap.space.Allocator; +import org.apache.flink.runtime.state.heap.space.Chunk; +import org.apache.flink.runtime.state.heap.space.SpaceUtils; +import org.apache.flink.runtime.state.internal.InternalKvState; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.ResourceGuard; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.Spliterators; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import static org.apache.flink.runtime.state.heap.SkipListUtils.HEAD_NODE; +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE; +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_VALUE_POINTER; + +/** + * Implementation of state map which is based on skip list with copy-on-write support. states will + * be serialized to bytes and stored in the space allocated with the given allocator. + * + * @param type of key + * @param type of namespace + * @param type of state + */ +public class CopyOnWriteSkipListStateMap extends StateMap implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(CopyOnWriteSkipListStateMap.class); + + /** +* Default max number of logically-removed keys to delete one time. +*/ + private static final int DEFAULT_MAX_KEYS_TO_DELETE_ONE_TIME = 3; + + /** +* Default ratio of the logically-removed keys to trigger deletion when snapshot. +*/ + private static final float DEFAULT_LOGICAL_REMOVED_KEYS_RATIO = 0.2f; + + /** +* The serializer used to serialize the key and namespace to bytes stored in skip list. +*/ + private final SkipListKeySerializer skipListKeySerializer; + + /** +* The serializer used to serialize the state to bytes stored in skip list. +*/ + private final SkipListValueSerializer skipListValueSerializer; + + /** +* Space allocator. +*/ + private final Allocator spaceAllocator; + + /** +* The level index header. +*/ + private final LevelIndexHeader levelIndexHeader; + + /** +* Seed to generate random index level. +*/ + private int randomSeed; + + /** +* The current version of this map. Used for copy-on-write mechanics. +*/ + private int stateMapVersion; + + /** +* The highest version of this map that is still required by any unreleased snapshot. +*/ + private int highestRequiredSnapshotVersion; + + /** +* Snapshots no more than this version must have been finished, but there may be some +* snapshots more than this version are still running. +*/ + private volatile int highestFinishedSnapshotVersion; + + /** +* Maintains an ordered set of version ids that are
[GitHub] [flink] AHeise commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator
AHeise commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator URL: https://github.com/apache/flink/pull/9717#discussion_r330547315 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java ## @@ -293,140 +252,138 @@ public void close() throws Exception { waitInFlightInputsFinished(); } finally { - Exception exception = null; - - try { - super.close(); - } catch (InterruptedException interrupted) { - exception = interrupted; - - Thread.currentThread().interrupt(); - } catch (Exception e) { - exception = e; - } - - try { - // terminate the emitter, the emitter thread and the executor - stopResources(true); - } catch (InterruptedException interrupted) { - exception = ExceptionUtils.firstOrSuppressed(interrupted, exception); - - Thread.currentThread().interrupt(); - } catch (Exception e) { - exception = ExceptionUtils.firstOrSuppressed(e, exception); - } - - if (exception != null) { - LOG.warn("Errors occurred while closing the AsyncWaitOperator.", exception); - } + super.close(); } } - @Override - public void dispose() throws Exception { - Exception exception = null; + /** +* Add the given stream element to the operator's stream element queue. This operation blocks until the element +* has been added. +* +* Between two insertion attempts, this method yields the execution to the mailbox, such that events as well +* as asynchronous results can be processed. +* +* @param streamElement to add to the operator's queue +* @throws InterruptedException if the current thread has been interrupted while yielding to mailbox +* @return a handle that allows to set the result of the async computation for the given element. +*/ + private ResultFuture addToWorkQueue(StreamElement streamElement) throws InterruptedException { + assert(Thread.holdsLock(checkpointingLock)); - try { - super.dispose(); - } catch (InterruptedException interrupted) { - exception = interrupted; + pendingStreamElement = streamElement; Review comment: Fixed in hotfix. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r330546245 ## File path: flink-core/src/test/java/org/apache/flink/core/memory/ByteBufferUtilsTest.java ## @@ -0,0 +1,195 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one + * * or more contributor license agreements. See the NOTICE file + * * distributed with this work for additional information + * * regarding copyright ownership. The ASF licenses this file + * * to you under the Apache License, Version 2.0 (the + * * "License"); you may not use this file except in compliance + * * with the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.flink.core.memory; + +import org.apache.flink.util.TestLogger; + +import org.junit.Assert; +import org.junit.Test; + +import java.nio.ByteBuffer; + +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.lessThan; + +/** + * Tests for {@link ByteBufferUtils}. + */ +public class ByteBufferUtilsTest extends TestLogger { + + @Test + public void testDirectBBWriteAndRead() { + testWithDifferentOffset(true); + } + + @Test + public void testHeapBBWriteAndRead() { + testWithDifferentOffset(false); + } + + @Test + public void testCompareDirectBBToArray() { + testCompareTo(true, false, false); + } + + @Test + public void testCompareDirectBBToDirectBB() { + testCompareTo(true, true, true); + } + + @Test + public void testCompareDirectBBToHeapBB() { + testCompareTo(true, true, false); + } + + @Test + public void testCompareHeapBBToArray() { + testCompareTo(false, false, false); + } + + @Test + public void testCompareHeapBBToDirectBB() { + testCompareTo(false, true, true); + } + + @Test + public void testCompareHeapBBToHeapBB() { + testCompareTo(false, true, false); + } + + private void testCompareTo(boolean isLeftBBDirect, boolean isRightBuffer, boolean isRightDirect) { + testEquals(isLeftBBDirect, isRightBuffer, isRightDirect); + testLessThan(isLeftBBDirect, isRightBuffer, isRightDirect); + testGreaterThan(isLeftBBDirect, isRightBuffer, isRightDirect); + } + + private void testEquals(boolean isLeftBBDirect, boolean isRightBuffer, boolean isRightDirect) { + byte[] leftBufferBytes = new byte[]{'a', 'b', 'c', 'd', 'e'}; + byte[] rightBufferBytes = new byte[]{'b', 'c', 'd', 'e', 'f'}; + ByteBuffer left = isLeftBBDirect + ? ByteBuffer.allocateDirect(leftBufferBytes.length).put(leftBufferBytes) + : ByteBuffer.wrap(leftBufferBytes); + ByteBuffer right = null; + if (isRightBuffer) { + right = isRightDirect + ? ByteBuffer.allocateDirect(rightBufferBytes.length).put(rightBufferBytes) + : ByteBuffer.wrap(rightBufferBytes); + } + if (right != null) { + Assert.assertThat( + ByteBufferUtils.compareTo(left, 1, 4, right, 0, 4), Review comment: I think we should simply have a dedicated test for this instead of complicating another test. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator
flinkbot edited a comment on issue #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator URL: https://github.com/apache/flink/pull/9717#issuecomment-533068819 ## CI report: * 9cfda801891969ac460f45ea639d636b519f22db : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/128317194) * af7f6be848f0bd90a049d5ee9f7a38b1c3e2b972 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/128336908) * 43157eb165d9409fbdf4a2f773ef7d52dd74e759 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/128365390) * ff400d149ed36c63a90d2f3fcd517bde738e5af1 : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/128402339) * 02dbf61e50e6c913610df7f586d7eb0f20529c13 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/128406923) * 68cf941c7e165a5c6b2a27f7ac716b29f76d1918 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/128472867) * 54aa0043ec73b03580eda2e0310b44bdee352a55 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/128495989) * e081ea5c536d6ebb828eb8b68c404379d23f5aef : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/128502020) * 911bdd80f4b1cbb6cee2ffb517cafc5e3c83ee0b : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/128622324) * 40fd94b291968062eec85a9436a3a755914d282a : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/128904653) * e60ea55f913143611d9162fd79a75cfa575aa9f0 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/128907420) * c2c7b2cc91961bc2c8707f9077c4d66ea6535b8e : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/129102072) * 1048cb10486f94983a52582953379afa02a3b350 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/129122517) * 773eb07af5ac848ec70cd32663ebf657b0419f3e : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/129823477) * 7c4191061dbacaf142f2b3e6fa8dec30d7e16595 : UNKNOWN This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r330546245 ## File path: flink-core/src/test/java/org/apache/flink/core/memory/ByteBufferUtilsTest.java ## @@ -0,0 +1,195 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one + * * or more contributor license agreements. See the NOTICE file + * * distributed with this work for additional information + * * regarding copyright ownership. The ASF licenses this file + * * to you under the Apache License, Version 2.0 (the + * * "License"); you may not use this file except in compliance + * * with the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.flink.core.memory; + +import org.apache.flink.util.TestLogger; + +import org.junit.Assert; +import org.junit.Test; + +import java.nio.ByteBuffer; + +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.lessThan; + +/** + * Tests for {@link ByteBufferUtils}. + */ +public class ByteBufferUtilsTest extends TestLogger { + + @Test + public void testDirectBBWriteAndRead() { + testWithDifferentOffset(true); + } + + @Test + public void testHeapBBWriteAndRead() { + testWithDifferentOffset(false); + } + + @Test + public void testCompareDirectBBToArray() { + testCompareTo(true, false, false); + } + + @Test + public void testCompareDirectBBToDirectBB() { + testCompareTo(true, true, true); + } + + @Test + public void testCompareDirectBBToHeapBB() { + testCompareTo(true, true, false); + } + + @Test + public void testCompareHeapBBToArray() { + testCompareTo(false, false, false); + } + + @Test + public void testCompareHeapBBToDirectBB() { + testCompareTo(false, true, true); + } + + @Test + public void testCompareHeapBBToHeapBB() { + testCompareTo(false, true, false); + } + + private void testCompareTo(boolean isLeftBBDirect, boolean isRightBuffer, boolean isRightDirect) { + testEquals(isLeftBBDirect, isRightBuffer, isRightDirect); + testLessThan(isLeftBBDirect, isRightBuffer, isRightDirect); + testGreaterThan(isLeftBBDirect, isRightBuffer, isRightDirect); + } + + private void testEquals(boolean isLeftBBDirect, boolean isRightBuffer, boolean isRightDirect) { + byte[] leftBufferBytes = new byte[]{'a', 'b', 'c', 'd', 'e'}; + byte[] rightBufferBytes = new byte[]{'b', 'c', 'd', 'e', 'f'}; + ByteBuffer left = isLeftBBDirect + ? ByteBuffer.allocateDirect(leftBufferBytes.length).put(leftBufferBytes) + : ByteBuffer.wrap(leftBufferBytes); + ByteBuffer right = null; + if (isRightBuffer) { + right = isRightDirect + ? ByteBuffer.allocateDirect(rightBufferBytes.length).put(rightBufferBytes) + : ByteBuffer.wrap(rightBufferBytes); + } + if (right != null) { + Assert.assertThat( + ByteBufferUtils.compareTo(left, 1, 4, right, 0, 4), Review comment: I think should simply have a dedicated test for this. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] TisonKun commented on issue #9682: [FLINK-14041][tests] Refactor LeaderRetrievalServiceHostnameResolutionTest and remove StandaloneUtils
TisonKun commented on issue #9682: [FLINK-14041][tests] Refactor LeaderRetrievalServiceHostnameResolutionTest and remove StandaloneUtils URL: https://github.com/apache/flink/pull/9682#issuecomment-537489792 Thanks for your review @zentol ! I can see your concern here. Actually the original tests test `AkkaRpcServicesUtils#getRpcUrl` and since that `StandaloneUtils` is never used outside these tests it seems we tests nothing about abstract leader retrieval service. In fact, the retrieval of WebMonitor is a bit different from `AkkaRpcServicesUtils#getRpcUrl`(said it is `HighAvailabilityServicesUtils#getWebMonitorAddress`). I think we can even guard by `AkkaUtilsTest` and maybe add a dedicate test for `HighAvailabilityServicesUtils#getWebMonitorAddress` and delete `LeaderRetrievalServiceHostnameResolutionTest` because it doesn't test something at that abstraction level. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] GJL commented on issue #8760: [FLINK-12869] Add yarn acls capability to flink containers
GJL commented on issue #8760: [FLINK-12869] Add yarn acls capability to flink containers URL: https://github.com/apache/flink/pull/8760#issuecomment-537489889 Thanks for updating the PR. I'll have another look This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] 1u0 commented on a change in pull request #9735: [FLINK-14156][runtime] Submit timer trigger letters to task's mailbox with operator's precedence
1u0 commented on a change in pull request #9735: [FLINK-14156][runtime] Submit timer trigger letters to task's mailbox with operator's precedence URL: https://github.com/apache/flink/pull/9735#discussion_r330543800 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TimerService.java ## @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks; + +import org.apache.flink.annotation.Internal; + +/** + * + * The registration of timers follows a life cycle of three phases: + * + * In the initial state, it accepts timer registrations and triggers when the time is reached. + * After calling {@link #quiesce()}, further calls to + * {@link #registerTimer(long, ProcessingTimeCallback)} will not register any further timers, and will + * return a "dummy" future as a result. This is used for clean shutdown, where currently firing + * timers are waited for and no future timers can be scheduled, without causing hard exceptions. + * After a call to {@link #shutdownService()}, all calls to {@link #registerTimer(long, ProcessingTimeCallback)} + * will result in a hard exception. + * + */ +@Internal +public interface TimerService extends ProcessingTimeService { Review comment: Although, the commit message says that the lifecycle methods were extracted into a separate interface, my main motivation was to separate "application level api" (`ProcessingTimeService` that exposed to operators) from the system timer provider (implementation of `TimerService`). It just happens that currently in this PR `TimerService` piggybacks on `ProcessingTimeService` interface (and `ProcessingTimeCallback`) to define the system timer api. I can introduce a new, separate timer api for `TimerService` to explicitly separate them. It should not make big changes in the runtime code, only some tests should be affected to be more explicit what they are testing against. I'd like to hear from others if you are ok with this direction. Regarding the naming, `ProcessingTimeService` is simply the preserved name that operators see and `TimerService` is a common name in Flink code base for classes with similar functionality. Introducing a common "life-cycle" interface may be too early generalization on my side. Also, unifying/consolidating the `TimerService` classes may be a separate effort on it's own. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #9834: [bp-1.9][FLINK-13827][script] shell variable should be escaped
flinkbot edited a comment on issue #9834: [bp-1.9][FLINK-13827][script] shell variable should be escaped URL: https://github.com/apache/flink/pull/9834#issuecomment-537478603 ## CI report: * 65dff9edaba9515d520fdefa5f4b9a7361d3e92f : PENDING [Build](https://travis-ci.com/flink-ci/flink/builds/130026021) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on issue #9827: [FLINK-14303][metrics] Replace JobManagerMetricGroup with ResourceManagerMetricGroup in ResourceManager
tillrohrmann commented on issue #9827: [FLINK-14303][metrics] Replace JobManagerMetricGroup with ResourceManagerMetricGroup in ResourceManager URL: https://github.com/apache/flink/pull/9827#issuecomment-537488117 Thanks for the review @zentol. I've rebased and force pushed to let CI run another time. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] 1u0 commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator
1u0 commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator URL: https://github.com/apache/flink/pull/9717#discussion_r330540787 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java ## @@ -293,140 +245,136 @@ public void close() throws Exception { waitInFlightInputsFinished(); } finally { - Exception exception = null; - - try { - super.close(); - } catch (InterruptedException interrupted) { - exception = interrupted; - - Thread.currentThread().interrupt(); - } catch (Exception e) { - exception = e; - } - - try { - // terminate the emitter, the emitter thread and the executor - stopResources(true); - } catch (InterruptedException interrupted) { - exception = ExceptionUtils.firstOrSuppressed(interrupted, exception); - - Thread.currentThread().interrupt(); - } catch (Exception e) { - exception = ExceptionUtils.firstOrSuppressed(e, exception); - } - - if (exception != null) { - LOG.warn("Errors occurred while closing the AsyncWaitOperator.", exception); - } + super.close(); } } - @Override - public void dispose() throws Exception { - Exception exception = null; - - try { - super.dispose(); - } catch (InterruptedException interrupted) { - exception = interrupted; + /** +* Add the given stream element to the operator's stream element queue. This operation blocks until the element +* has been added. +* +* Between two insertion attempts, this method yields the execution to the mailbox, such that events as well +* as asynchronous results can be processed. +* +* @param streamElement to add to the operator's queue +* @throws InterruptedException if the current thread has been interrupted while yielding to mailbox +* @return a handle that allows to set the result of the async computation for the given element. +*/ + private ResultFuture addToWorkQueue(StreamElement streamElement) throws InterruptedException { + assert(Thread.holdsLock(checkpointingLock)); - Thread.currentThread().interrupt(); - } catch (Exception e) { - exception = e; + Optional> queueEntry; + while (!(queueEntry = queue.tryPut(streamElement)).isPresent()) { + mailboxExecutor.yield(); } - try { - stopResources(false); - } catch (InterruptedException interrupted) { - exception = ExceptionUtils.firstOrSuppressed(interrupted, exception); + return queueEntry.get(); + } - Thread.currentThread().interrupt(); - } catch (Exception e) { - exception = ExceptionUtils.firstOrSuppressed(e, exception); - } + private void waitInFlightInputsFinished() throws InterruptedException { + assert(Thread.holdsLock(checkpointingLock)); - if (exception != null) { - throw exception; + while (!queue.isEmpty()) { + mailboxExecutor.yield(); } } /** -* Close the operator's resources. They include the emitter thread and the executor to run -* the queue's complete operation. +* Outputs one completed element. Watermarks are always completed if it's their turn to be processed. * -* @param waitForShutdown is true if the method should wait for the resources to be freed; -* otherwise false. -* @throws InterruptedException if current thread has been interrupted +* This method will be called from {@link #processWatermark(Watermark)} and from a mail processing the result +* of an async function call. */ - private void stopResources(boolean waitForShutdown) throws InterruptedException { - emitter.stop(); - emitterThread.interrupt(); - - executor.shutdown(); - - if (waitForShutdown) { - try
[GitHub] [flink] tillrohrmann commented on a change in pull request #9833: [FLINK-14276][quickstarts] Scala quickstart compiles on JDK 11
tillrohrmann commented on a change in pull request #9833: [FLINK-14276][quickstarts] Scala quickstart compiles on JDK 11 URL: https://github.com/apache/flink/pull/9833#discussion_r330541171 ## File path: flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml ## @@ -171,6 +171,11 @@ under the License. + + + -nobootcp Review comment: I guess it is not so easy to only activate this flag if Java 11 is used, right? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-14225) Travis is unable to parse one of the secure environment variables
[ https://issues.apache.org/jira/browse/FLINK-14225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16942797#comment-16942797 ] Gary Yao commented on FLINK-14225: -- [~StephanEwen] Can you help with this? > Travis is unable to parse one of the secure environment variables > - > > Key: FLINK-14225 > URL: https://issues.apache.org/jira/browse/FLINK-14225 > Project: Flink > Issue Type: Bug > Components: Build System >Affects Versions: 1.10.0 >Reporter: Gary Yao >Priority: Blocker > > Example: https://travis-ci.org/apache/flink/jobs/589531009 > {noformat} > We were unable to parse one of your secure environment variables. > Please make sure to escape special characters such as ' ' (white space) and $ > (dollar symbol) with \ (backslash) . > For example, thi$isanexample would be typed as thi\$isanexample. See > https://docs.travis-ci.com/user/encryption-keys. > {noformat} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] tillrohrmann commented on issue #9825: [FLINK-14299] Introduce ProcessMetricGroup
tillrohrmann commented on issue #9825: [FLINK-14299] Introduce ProcessMetricGroup URL: https://github.com/apache/flink/pull/9825#issuecomment-537485842 Thanks for the review @zentol. I've addressed your comments and pushed a fixup. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #9825: [FLINK-14299] Introduce ProcessMetricGroup
tillrohrmann commented on a change in pull request #9825: [FLINK-14299] Introduce ProcessMetricGroup URL: https://github.com/apache/flink/pull/9825#discussion_r330539646 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java ## @@ -100,17 +98,22 @@ public static JobManagerMetricGroup instantiateJobManagerMetricGroup( hostName, resourceID.toString()); - MetricGroup statusGroup = taskManagerMetricGroup.addGroup(METRIC_GROUP_STATUS_NAME); - - // Initialize the TM metrics - instantiateStatusMetrics(statusGroup); + MetricGroup statusGroup = createAndInitializeStatusMetricGroup(taskManagerMetricGroup); if (systemResourceProbeInterval.isPresent()) { instantiateSystemMetrics(taskManagerMetricGroup, systemResourceProbeInterval.get()); } return Tuple2.of(taskManagerMetricGroup, statusGroup); } + private static MetricGroup createAndInitializeStatusMetricGroup(AbstractMetricGroup parentMetricGroup) { + MetricGroup statusGroup = parentMetricGroup.addGroup(METRIC_GROUP_STATUS_NAME); + + // Initialize the TM metrics Review comment: true, I will remove it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #9825: [FLINK-14299] Introduce ProcessMetricGroup
tillrohrmann commented on a change in pull request #9825: [FLINK-14299] Introduce ProcessMetricGroup URL: https://github.com/apache/flink/pull/9825#discussion_r330539760 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java ## @@ -62,7 +64,7 @@ */ public class MetricUtils { private static final Logger LOG = LoggerFactory.getLogger(MetricUtils.class); - private static final String METRIC_GROUP_STATUS_NAME = "Status"; + public static final String METRIC_GROUP_STATUS_NAME = "Status"; Review comment: This is wrong. Will correct it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] GJL commented on issue #9794: [FLINK-14247][runtime] Use NoResourceAvailableException to wrap TimeoutException on slot allocation timeout
GJL commented on issue #9794: [FLINK-14247][runtime] Use NoResourceAvailableException to wrap TimeoutException on slot allocation timeout URL: https://github.com/apache/flink/pull/9794#issuecomment-537485003 Merging as soon as build is green. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann edited a comment on issue #9760: [FLINK-13982][runtime] Implement memory calculation logics
tillrohrmann edited a comment on issue #9760: [FLINK-13982][runtime] Implement memory calculation logics URL: https://github.com/apache/flink/pull/9760#issuecomment-537483004 One way to solve the problem could be ``` final int numVariables = 8; final double[] c = new double[numVariables]; Arrays.fill(c, 1.0); LinearObjectiveFunction f = new LinearObjectiveFunction(c, 0); Collection constraints = new ArrayList<>(); // JVM_overhead, jvm_metaspace, framework, task_heap, task_off_heap, shuffle, managed_memory, total process memory final double[] overallMemory = {1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, -1.0}; final double[] jvmOverhead = {1.0, 0, 0, 0, 0, 0, 0, 0}; final double[] jvmMetaspace = {0, 1.0, 0, 0, 0, 0, 0, 0}; final double[] framework = {0, 0, 1.0, 0, 0, 0, 0, 0}; final double[] taskOffHeap = {0, 0, 0, 0, 1.0, 0, 0, 0}; final double minOverhead = 10; final double maxOverhead = 100; final double[] shuffle = {0, 0, 0, 0, 0, 1.0, 0, 0}; final double minShuffle = 10; final double maxShuffle = 100; final double overheadFraction = 0.1; final double[] jvmOverheadRelationship = {-1.0, overheadFraction, overheadFraction, overheadFraction, overheadFraction, overheadFraction, overheadFraction, 0}; final double shuffleFraction = 0.2; final double[] shuffleRelationship = {0, 0, shuffleFraction, shuffleFraction, shuffleFraction, -1, overheadFraction, 0}; final double jvmMetaspaceValue = 192; final double frameworkValue = 192; final double taskOffHeapValue = 0; constraints.add(new LinearConstraint(overallMemory, Relationship.EQ, 0)); constraints.add(new LinearConstraint(jvmOverhead, Relationship.GEQ, minOverhead)); constraints.add(new LinearConstraint(jvmOverhead, Relationship.LEQ, maxOverhead)); constraints.add(new LinearConstraint(shuffle, Relationship.GEQ, minShuffle)); constraints.add(new LinearConstraint(shuffle, Relationship.LEQ, maxShuffle)); constraints.add(new LinearConstraint(jvmOverheadRelationship, Relationship.EQ, 0)); constraints.add(new LinearConstraint(shuffleRelationship, Relationship.EQ, 0)); constraints.add(new LinearConstraint(jvmMetaspace, Relationship.EQ, jvmMetaspaceValue)); constraints.add(new LinearConstraint(framework, Relationship.EQ, frameworkValue)); constraints.add(new LinearConstraint(taskOffHeap, Relationship.EQ, taskOffHeapValue)); SimplexSolver solver = new SimplexSolver(); PointValuePair optSolution = solver.optimize( new MaxIter(100), f, new LinearConstraintSet(constraints), GoalType.MAXIMIZE, new NonNegativeConstraint(true)); ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on issue #9760: [FLINK-13982][runtime] Implement memory calculation logics
tillrohrmann commented on issue #9760: [FLINK-13982][runtime] Implement memory calculation logics URL: https://github.com/apache/flink/pull/9760#issuecomment-537483004 One way to solve the problem could be ``` final int numVariables = 8; final double[] c = new double[numVariables]; Arrays.fill(c, 1.0); LinearObjectiveFunction f = new LinearObjectiveFunction(c, 0); Collection constraints = new ArrayList<>(); // JVM_overhead, jvm_metaspace, framework, task_heap, task_off_heap, shuffle, managed_memory, total process memory final double[] overallMemory = {1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, -1.0}; final double[] jvmOverhead = {1.0, 0, 0, 0, 0, 0, 0, 0}; final double[] jvmMetaspace = {0, 1.0, 0, 0, 0, 0, 0, 0}; final double[] framework = {0, 0, 1.0, 0, 0, 0, 0, 0}; final double[] taskOffHeap = {0, 0, 0, 0, 1.0, 0, 0, 0}; final double minOverhead = 10; final double maxOverhead = 100; final double[] shuffle = {0, 0, 0, 0, 0, 1.0, 0, 0}; final double minShuffle = 10; final double maxShuffle = 100; final double overheadFraction = 0.1; final double[] jvmOverheadRelationship = {-1.0, overheadFraction, overheadFraction, overheadFraction, overheadFraction, overheadFraction, overheadFraction, 0}; final double shuffleFraction = 0.2; final double[] shuffleRelationship = {0, 0, shuffleFraction, shuffleFraction, shuffleFraction, -1, overheadFraction, 0}; final double jvmMetaspaceValue = 192; final double frameworkValue = 192; final double taskOffHeapValue = 0; constraints.add(new LinearConstraint(overallMemory, Relationship.EQ, 0)); constraints.add(new LinearConstraint(jvmOverhead, Relationship.GEQ, minOverhead)); constraints.add(new LinearConstraint(jvmOverhead, Relationship.LEQ, maxOverhead)); constraints.add(new LinearConstraint(shuffle, Relationship.GEQ, minShuffle)); constraints.add(new LinearConstraint(shuffle, Relationship.LEQ, maxShuffle)); constraints.add(new LinearConstraint(jvmOverheadRelationship, Relationship.EQ, 0)); constraints.add(new LinearConstraint(shuffleRelationship, Relationship.EQ, 0)); constraints.add(new LinearConstraint(jvmMetaspace, Relationship.EQ, jvmMetaspaceValue)); constraints.add(new LinearConstraint(framework, Relationship.EQ, frameworkValue)); constraints.add(new LinearConstraint(taskOffHeap, Relationship.EQ, taskOffHeapValue)); SimplexSolver solver = new SimplexSolver(); PointValuePair optSolution = solver.optimize( new MaxIter(100), f, new LinearConstraintSet(constraints), GoalType.MAXIMIZE, new NonNegativeConstraint(true)); ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #9834: [bp-1.9][FLINK-13827][script] shell variable should be escaped
flinkbot commented on issue #9834: [bp-1.9][FLINK-13827][script] shell variable should be escaped URL: https://github.com/apache/flink/pull/9834#issuecomment-537478603 ## CI report: * 65dff9edaba9515d520fdefa5f4b9a7361d3e92f : UNKNOWN This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #9791: [FLINK-14248][runtime] Let LazyFromSourcesSchedulingStrategy restart terminated tasks
zhuzhurk commented on a change in pull request #9791: [FLINK-14248][runtime] Let LazyFromSourcesSchedulingStrategy restart terminated tasks URL: https://github.com/apache/flink/pull/9791#discussion_r330530781 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java ## @@ -78,19 +81,22 @@ public void startScheduling() { deploymentOptions.put(schedulingVertex.getId(), option); } - allocateSlotsAndDeployExecutionVertexIds(getAllVerticesFromTopology()); + allocateSlotsAndDeployExecutionVertices( + getSchedulingExecutionVertices(getAllVerticesFromTopology()), + IS_IN_CREATED_STATE); } @Override - public void restartTasks(Set verticesToRestart) { + public void restartTasks(final Set verticesToRestart) { + final Set verticesToSchedule = getSchedulingExecutionVertices(verticesToRestart); + // increase counter of the dataset first - verticesToRestart + verticesToSchedule .stream() - .map(schedulingTopology::getVertexOrThrow) .flatMap(vertex -> vertex.getProducedResultPartitions().stream()) .forEach(inputConstraintChecker::resetSchedulingResultPartition); - allocateSlotsAndDeployExecutionVertexIds(verticesToRestart); + allocateSlotsAndDeployExecutionVertices(verticesToSchedule, IS_IN_TERMINAL_STATE); Review comment: And I also think it's fine to not **restart** `CREATED` vertices. Some events later should be responsible to trigger the scheduling for them. Otherwise something might be wrong. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #9791: [FLINK-14248][runtime] Let LazyFromSourcesSchedulingStrategy restart terminated tasks
zhuzhurk commented on a change in pull request #9791: [FLINK-14248][runtime] Let LazyFromSourcesSchedulingStrategy restart terminated tasks URL: https://github.com/apache/flink/pull/9791#discussion_r330529529 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java ## @@ -78,19 +81,22 @@ public void startScheduling() { deploymentOptions.put(schedulingVertex.getId(), option); } - allocateSlotsAndDeployExecutionVertexIds(getAllVerticesFromTopology()); + allocateSlotsAndDeployExecutionVertices( + getSchedulingExecutionVertices(getAllVerticesFromTopology()), + IS_IN_CREATED_STATE); } @Override - public void restartTasks(Set verticesToRestart) { + public void restartTasks(final Set verticesToRestart) { + final Set verticesToSchedule = getSchedulingExecutionVertices(verticesToRestart); + // increase counter of the dataset first - verticesToRestart + verticesToSchedule .stream() - .map(schedulingTopology::getVertexOrThrow) .flatMap(vertex -> vertex.getProducedResultPartitions().stream()) .forEach(inputConstraintChecker::resetSchedulingResultPartition); - allocateSlotsAndDeployExecutionVertexIds(verticesToRestart); + allocateSlotsAndDeployExecutionVertices(verticesToSchedule, IS_IN_TERMINAL_STATE); Review comment: A vertex is transitioned to `CREATED` only when it is reset. This only happens in `allocateSlotsAndDeploy` in `DefaultScheduler`. The vertex versions will change in this case. And these vertices will be filtered out to restart. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann edited a comment on issue #9760: [FLINK-13982][runtime] Implement memory calculation logics
tillrohrmann edited a comment on issue #9760: [FLINK-13982][runtime] Implement memory calculation logics URL: https://github.com/apache/flink/pull/9760#issuecomment-537474186 Yes, all explicitly configured values should be respected. I agree, the default values of * Framework Heap Memory * Task Off-Heap Memory * Min/Max Shuffle Memory * JVM Metaspace * Min/Max JVM Overhead should be respected. In the end, we are trying to solve a linear programming problem for which there are known methods how to solve it. The problem should have the following form ``` 0 = jvm_overhead + jvm_metaspace + framework + task_heap + task_off_heap + shuffle + managed_memory - process_memory min_overhead <= jvm_overhead max_overhead >= jvm_overhead min_shuffle <= shuffle max_shuffle >= shuffle 0 = fraction_overhead * (jvm_overhead + jvm_metaspace + framework + task_heap + task_off_heap + shuffle + managed_memory) - jvm_overhead ``` If `shuffle` has not been set, then we have additionally `0 = fraction_shuffle * (framework + task_heap + task_off_heap + shuffle + managed_memory) - shuffle` There might be some shortcuts given the particulars of this problem. But I'm not sure whether it is exactly as you've described. I would suggest to rely on known approaches such as the simplex algorithm (but please do not reimplement it). Concerning the backwards compatibility plan, I agree that considering the legacy `fraction`, turns the linear problem into a non-linear problem. Hence, I would be ok with dropping support for it. I guess we should add a big release note and log a warning when using the legacy `fraction` option. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-14271) Deprecate legacy RestartPipelinedRegionStrategy
[ https://issues.apache.org/jira/browse/FLINK-14271?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16942781#comment-16942781 ] Zhu Zhu commented on FLINK-14271: - Thanks [~trohrmann]! Let's wait for [~stevenz3wu]'s feedback. > Deprecate legacy RestartPipelinedRegionStrategy > --- > > Key: FLINK-14271 > URL: https://issues.apache.org/jira/browse/FLINK-14271 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Affects Versions: 1.10.0 >Reporter: Zhu Zhu >Priority: Minor > Fix For: 1.10.0 > > > The legacy {{RestartPipelinedRegionStrategy}} has been superseded by > {{AdaptedRestartPipelinedRegionStrategyNG}} in Flink 1.9. > It heavily depends on ExecutionGraph components and becomes a blocker for a > clean scheduler re-architecture. > We should deprecate it for further removal. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] tillrohrmann commented on issue #9760: [FLINK-13982][runtime] Implement memory calculation logics
tillrohrmann commented on issue #9760: [FLINK-13982][runtime] Implement memory calculation logics URL: https://github.com/apache/flink/pull/9760#issuecomment-537474186 Yes, all explicitly configured values should be respected. I agree, the default values of * Framework Heap Memory * Task Off-Heap Memory * Min/Max Shuffle Memory * JVM Metaspace * Min/Max JVM Overhead should be respected. In the end, we are trying to solve a linear programming problem for which there are known methods how to solve it. The problem should have the following form ``` 0 = jvm_overhead + jvm_metaspace + framework + task_heap + task_off_heap + shuffle + managed_memory - process_memory min_overhead <= jvm_overhead max_overhead >= jvm_overhead min_shuffle <= shuffle max_shuffle >= shuffle 0 = fraction_overhead * (jvm_overhead + jvm_metaspace + framework + task_heap + task_off_heap + shuffle + managed_memory) - jvm_overhead ``` If `shuffle` has not been set, then we have additionally `0 = fraction_shuffle * (framework + task_heap + task_off_heap + shuffle + managed_memory) - shuffle` There might be some shortcuts given the particulars of this problem. But I'm not sure whether it is exactly as you've described. I would suggest to rely on known approaches. Concerning the backwards compatibility plan, I agree that considering the legacy `fraction`, turns the linear problem into a non-linear problem. Hence, I would be ok with dropping support for it. I guess we should add a big release note and log a warning when using the legacy `fraction` option. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] AHeise commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator
AHeise commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator URL: https://github.com/apache/flink/pull/9717#discussion_r330522645 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java ## @@ -293,140 +252,138 @@ public void close() throws Exception { waitInFlightInputsFinished(); } finally { - Exception exception = null; - - try { - super.close(); - } catch (InterruptedException interrupted) { - exception = interrupted; - - Thread.currentThread().interrupt(); - } catch (Exception e) { - exception = e; - } - - try { - // terminate the emitter, the emitter thread and the executor - stopResources(true); - } catch (InterruptedException interrupted) { - exception = ExceptionUtils.firstOrSuppressed(interrupted, exception); - - Thread.currentThread().interrupt(); - } catch (Exception e) { - exception = ExceptionUtils.firstOrSuppressed(e, exception); - } - - if (exception != null) { - LOG.warn("Errors occurred while closing the AsyncWaitOperator.", exception); - } + super.close(); } } - @Override - public void dispose() throws Exception { - Exception exception = null; + /** +* Add the given stream element to the operator's stream element queue. This operation blocks until the element +* has been added. +* +* Between two insertion attempts, this method yields the execution to the mailbox, such that events as well +* as asynchronous results can be processed. +* +* @param streamElement to add to the operator's queue +* @throws InterruptedException if the current thread has been interrupted while yielding to mailbox +* @return a handle that allows to set the result of the async computation for the given element. +*/ + private ResultFuture addToWorkQueue(StreamElement streamElement) throws InterruptedException { + assert(Thread.holdsLock(checkpointingLock)); - try { - super.dispose(); - } catch (InterruptedException interrupted) { - exception = interrupted; + pendingStreamElement = streamElement; - Thread.currentThread().interrupt(); - } catch (Exception e) { - exception = e; + Optional> queueEntry; + while (!(queueEntry = queue.tryPut(streamElement)).isPresent()) { + mailboxExecutor.yield(); } - try { - stopResources(false); - } catch (InterruptedException interrupted) { - exception = ExceptionUtils.firstOrSuppressed(interrupted, exception); + pendingStreamElement = null; - Thread.currentThread().interrupt(); - } catch (Exception e) { - exception = ExceptionUtils.firstOrSuppressed(e, exception); - } + return queueEntry.get(); + } - if (exception != null) { - throw exception; + private void waitInFlightInputsFinished() throws InterruptedException { + assert(Thread.holdsLock(checkpointingLock)); + + while (!queue.isEmpty()) { + mailboxExecutor.yield(); } } /** -* Close the operator's resources. They include the emitter thread and the executor to run -* the queue's complete operation. +* Batch output of all completed elements. Watermarks are always completed if it's their turn to be processed. * -* @param waitForShutdown is true if the method should wait for the resources to be freed; -* otherwise false. -* @throws InterruptedException if current thread has been interrupted +* This method will be called from {@link #processWatermark(Watermark)} and from a mail processing the result +* of an async function call. */ - private void stopResources(boolean waitForShutdown) throws InterruptedException { - emitter.stop(); -
[GitHub] [flink] GJL commented on a change in pull request #9791: [FLINK-14248][runtime] Let LazyFromSourcesSchedulingStrategy restart terminated tasks
GJL commented on a change in pull request #9791: [FLINK-14248][runtime] Let LazyFromSourcesSchedulingStrategy restart terminated tasks URL: https://github.com/apache/flink/pull/9791#discussion_r330522635 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java ## @@ -78,19 +81,22 @@ public void startScheduling() { deploymentOptions.put(schedulingVertex.getId(), option); } - allocateSlotsAndDeployExecutionVertexIds(getAllVerticesFromTopology()); + allocateSlotsAndDeployExecutionVertices( + getSchedulingExecutionVertices(getAllVerticesFromTopology()), + IS_IN_CREATED_STATE); } @Override - public void restartTasks(Set verticesToRestart) { + public void restartTasks(final Set verticesToRestart) { + final Set verticesToSchedule = getSchedulingExecutionVertices(verticesToRestart); + // increase counter of the dataset first - verticesToRestart + verticesToSchedule .stream() - .map(schedulingTopology::getVertexOrThrow) .flatMap(vertex -> vertex.getProducedResultPartitions().stream()) .forEach(inputConstraintChecker::resetSchedulingResultPartition); - allocateSlotsAndDeployExecutionVertexIds(verticesToRestart); + allocateSlotsAndDeployExecutionVertices(verticesToSchedule, IS_IN_TERMINAL_STATE); Review comment: Would something speak against transitioning the _"vertices to restart"_ to `CREATED` already in the scheduler (before invoking `SchedulingStrategy#restartTasks()`)? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] AHeise commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator
AHeise commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator URL: https://github.com/apache/flink/pull/9717#discussion_r330521780 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java ## @@ -293,140 +252,138 @@ public void close() throws Exception { waitInFlightInputsFinished(); } finally { - Exception exception = null; - - try { - super.close(); - } catch (InterruptedException interrupted) { - exception = interrupted; - - Thread.currentThread().interrupt(); - } catch (Exception e) { - exception = e; - } - - try { - // terminate the emitter, the emitter thread and the executor - stopResources(true); - } catch (InterruptedException interrupted) { - exception = ExceptionUtils.firstOrSuppressed(interrupted, exception); - - Thread.currentThread().interrupt(); - } catch (Exception e) { - exception = ExceptionUtils.firstOrSuppressed(e, exception); - } - - if (exception != null) { - LOG.warn("Errors occurred while closing the AsyncWaitOperator.", exception); - } + super.close(); } } - @Override - public void dispose() throws Exception { - Exception exception = null; + /** +* Add the given stream element to the operator's stream element queue. This operation blocks until the element +* has been added. +* +* Between two insertion attempts, this method yields the execution to the mailbox, such that events as well +* as asynchronous results can be processed. +* +* @param streamElement to add to the operator's queue +* @throws InterruptedException if the current thread has been interrupted while yielding to mailbox +* @return a handle that allows to set the result of the async computation for the given element. +*/ + private ResultFuture addToWorkQueue(StreamElement streamElement) throws InterruptedException { + assert(Thread.holdsLock(checkpointingLock)); - try { - super.dispose(); - } catch (InterruptedException interrupted) { - exception = interrupted; + pendingStreamElement = streamElement; - Thread.currentThread().interrupt(); - } catch (Exception e) { - exception = e; + Optional> queueEntry; + while (!(queueEntry = queue.tryPut(streamElement)).isPresent()) { + mailboxExecutor.yield(); } - try { - stopResources(false); - } catch (InterruptedException interrupted) { - exception = ExceptionUtils.firstOrSuppressed(interrupted, exception); + pendingStreamElement = null; - Thread.currentThread().interrupt(); - } catch (Exception e) { - exception = ExceptionUtils.firstOrSuppressed(e, exception); - } + return queueEntry.get(); + } - if (exception != null) { - throw exception; + private void waitInFlightInputsFinished() throws InterruptedException { + assert(Thread.holdsLock(checkpointingLock)); + + while (!queue.isEmpty()) { + mailboxExecutor.yield(); } } /** -* Close the operator's resources. They include the emitter thread and the executor to run -* the queue's complete operation. +* Batch output of all completed elements. Watermarks are always completed if it's their turn to be processed. * -* @param waitForShutdown is true if the method should wait for the resources to be freed; -* otherwise false. -* @throws InterruptedException if current thread has been interrupted +* This method will be called from {@link #processWatermark(Watermark)} and from a mail processing the result +* of an async function call. */ - private void stopResources(boolean waitForShutdown) throws InterruptedException { - emitter.stop(); -
[GitHub] [flink] AHeise commented on a change in pull request #9735: [FLINK-14156][runtime] Submit timer trigger letters to task's mailbox with operator's precedence
AHeise commented on a change in pull request #9735: [FLINK-14156][runtime] Submit timer trigger letters to task's mailbox with operator's precedence URL: https://github.com/apache/flink/pull/9735#discussion_r330461900 ## File path: flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/NeverFireProcessingTimeService.java ## @@ -19,7 +19,7 @@ Review comment: for pulling out lifecycle stuff outside of user-facing interfaces. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zentol merged pull request #9740: [hotfix][docs] URL should not point to release-1.8 docs
zentol merged pull request #9740: [hotfix][docs] URL should not point to release-1.8 docs URL: https://github.com/apache/flink/pull/9740 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] AHeise commented on a change in pull request #9735: [FLINK-14156][runtime] Submit timer trigger letters to task's mailbox with operator's precedence
AHeise commented on a change in pull request #9735: [FLINK-14156][runtime] Submit timer trigger letters to task's mailbox with operator's precedence URL: https://github.com/apache/flink/pull/9735#discussion_r330513099 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTask.java ## @@ -75,7 +75,7 @@ public MockStreamTask( this.closableRegistry = closableRegistry; this.streamStatusMaintainer = streamStatusMaintainer; this.checkpointStorage = checkpointStorage; - this.processingTimeService = processingTimeService; + this.processingTimeService = timerService; Review comment: rename field? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] AHeise commented on a change in pull request #9735: [FLINK-14156][runtime] Submit timer trigger letters to task's mailbox with operator's precedence
AHeise commented on a change in pull request #9735: [FLINK-14156][runtime] Submit timer trigger letters to task's mailbox with operator's precedence URL: https://github.com/apache/flink/pull/9735#discussion_r330464428 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TimerService.java ## @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks; + +import org.apache.flink.annotation.Internal; + +/** + * + * The registration of timers follows a life cycle of three phases: + * + * In the initial state, it accepts timer registrations and triggers when the time is reached. + * After calling {@link #quiesce()}, further calls to + * {@link #registerTimer(long, ProcessingTimeCallback)} will not register any further timers, and will + * return a "dummy" future as a result. This is used for clean shutdown, where currently firing + * timers are waited for and no future timers can be scheduled, without causing hard exceptions. + * After a call to {@link #shutdownService()}, all calls to {@link #registerTimer(long, ProcessingTimeCallback)} + * will result in a hard exception. + * + */ +@Internal +public interface TimerService extends ProcessingTimeService { Review comment: The naming is not really clear: * What's the difference between TimerService and TimeService? * Usually, names get rather longer than shorter in a hierarchy ;) I guess TimerService is more plausible but you wanted to keep the user-facing interface unchanged. For me, it feels as if these life-cycle methods keep reappearing at different points (Mailbox, Timer, ...). How about we make this interface more general purpose? Just call it Lifecycle and don't extend ProcessingTimeService. Then we can add this internal interface to different classes later. That's quite common in logger and network frameworks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] AHeise commented on a change in pull request #9735: [FLINK-14156][runtime] Submit timer trigger letters to task's mailbox with operator's precedence
AHeise commented on a change in pull request #9735: [FLINK-14156][runtime] Submit timer trigger letters to task's mailbox with operator's precedence URL: https://github.com/apache/flink/pull/9735#discussion_r330516878 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskOperatorTimerTest.java ## @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators; + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.api.operators.YieldingOperatorFactory; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask; +import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.streaming.runtime.tasks.mailbox.execution.MailboxExecutor; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; + +/** + * Test to verify that timer triggers are run according to operator precedence (combined with yield() at operator level). + */ +public class StreamTaskOperatorTimerTest extends TestLogger { + private static List events; + + @Test + public void testOperatorYieldExecutesSelectedTimers() throws Exception { + events = new ArrayList<>(); + final OneInputStreamTaskTestHarness testHarness = new OneInputStreamTaskTestHarness<>( + OneInputStreamTask::new, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO); + + testHarness.setupOperatorChain(new OperatorID(), new TestOperatorFactory<>()) + .chain(new OperatorID(), new TestOperatorFactory<>(), IntSerializer.INSTANCE) + .finish(); + + testHarness.invoke(); + testHarness.waitForTaskRunning(); + + testHarness.processElement(new StreamRecord<>(42)); + + testHarness.endInput(); + testHarness.waitForTaskCompletion(); + + assertThat(events, is(Arrays.asList("Timer:1:1", "Timer:0:1"))); + } + + private static class TestOperatorFactory implements OneInputStreamOperatorFactory, YieldingOperatorFactory { + private MailboxExecutor mailboxExecutor; + + @Override + public void setMailboxExecutor(MailboxExecutor mailboxExecutor) { + this.mailboxExecutor = mailboxExecutor; + } + + @Override + public > Operator createStreamOperator( + StreamTask containingTask, + StreamConfig config, + Output> output) { + TestOperator operator = new TestOperator<>(config.getChainIndex(), mailboxExecutor); + operator.setup(containingTask, config, output); + return (Operator) operator; + } + + @Override + public void setChainingStrategy(ChainingStrategy strategy) { + } + + @Override + public
[GitHub] [flink] AHeise commented on a change in pull request #9735: [FLINK-14156][runtime] Submit timer trigger letters to task's mailbox with operator's precedence
AHeise commented on a change in pull request #9735: [FLINK-14156][runtime] Submit timer trigger letters to task's mailbox with operator's precedence URL: https://github.com/apache/flink/pull/9735#discussion_r330517177 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskOperatorTimerTest.java ## @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators; + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.api.operators.YieldingOperatorFactory; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask; +import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.streaming.runtime.tasks.mailbox.execution.MailboxExecutor; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; + +/** + * Test to verify that timer triggers are run according to operator precedence (combined with yield() at operator level). + */ +public class StreamTaskOperatorTimerTest extends TestLogger { + private static List events; + + @Test + public void testOperatorYieldExecutesSelectedTimers() throws Exception { Review comment: I would love to see another test case, where AWOP1.yield will execute AWOP2.timer. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] AHeise commented on a change in pull request #9735: [FLINK-14156][runtime] Submit timer trigger letters to task's mailbox with operator's precedence
AHeise commented on a change in pull request #9735: [FLINK-14156][runtime] Submit timer trigger letters to task's mailbox with operator's precedence URL: https://github.com/apache/flink/pull/9735#discussion_r330516661 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskOperatorTimerTest.java ## @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators; + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.api.operators.YieldingOperatorFactory; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask; +import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.streaming.runtime.tasks.mailbox.execution.MailboxExecutor; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; + +/** + * Test to verify that timer triggers are run according to operator precedence (combined with yield() at operator level). + */ +public class StreamTaskOperatorTimerTest extends TestLogger { + private static List events; + + @Test + public void testOperatorYieldExecutesSelectedTimers() throws Exception { + events = new ArrayList<>(); + final OneInputStreamTaskTestHarness testHarness = new OneInputStreamTaskTestHarness<>( + OneInputStreamTask::new, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO); + + testHarness.setupOperatorChain(new OperatorID(), new TestOperatorFactory<>()) + .chain(new OperatorID(), new TestOperatorFactory<>(), IntSerializer.INSTANCE) + .finish(); + + testHarness.invoke(); + testHarness.waitForTaskRunning(); + + testHarness.processElement(new StreamRecord<>(42)); + + testHarness.endInput(); + testHarness.waitForTaskCompletion(); + + assertThat(events, is(Arrays.asList("Timer:1:1", "Timer:0:1"))); + } + + private static class TestOperatorFactory implements OneInputStreamOperatorFactory, YieldingOperatorFactory { + private MailboxExecutor mailboxExecutor; + + @Override + public void setMailboxExecutor(MailboxExecutor mailboxExecutor) { + this.mailboxExecutor = mailboxExecutor; + } + + @Override + public > Operator createStreamOperator( + StreamTask containingTask, + StreamConfig config, + Output> output) { + TestOperator operator = new TestOperator<>(config.getChainIndex(), mailboxExecutor); + operator.setup(containingTask, config, output); + return (Operator) operator; + } + + @Override + public void setChainingStrategy(ChainingStrategy strategy) { + } + + @Override + public
[GitHub] [flink] AHeise commented on a change in pull request #9735: [FLINK-14156][runtime] Submit timer trigger letters to task's mailbox with operator's precedence
AHeise commented on a change in pull request #9735: [FLINK-14156][runtime] Submit timer trigger letters to task's mailbox with operator's precedence URL: https://github.com/apache/flink/pull/9735#discussion_r330478001 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskOperatorTimerTest.java ## @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators; + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.api.operators.YieldingOperatorFactory; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask; +import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.streaming.runtime.tasks.mailbox.execution.MailboxExecutor; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; + +/** + * Test to verify that timer triggers are run according to operator precedence (combined with yield() at operator level). + */ +public class StreamTaskOperatorTimerTest extends TestLogger { + private static List events; + + @Test + public void testOperatorYieldExecutesSelectedTimers() throws Exception { + events = new ArrayList<>(); + final OneInputStreamTaskTestHarness testHarness = new OneInputStreamTaskTestHarness<>( + OneInputStreamTask::new, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO); + + testHarness.setupOperatorChain(new OperatorID(), new TestOperatorFactory<>()) + .chain(new OperatorID(), new TestOperatorFactory<>(), IntSerializer.INSTANCE) + .finish(); + + testHarness.invoke(); + testHarness.waitForTaskRunning(); + + testHarness.processElement(new StreamRecord<>(42)); + + testHarness.endInput(); + testHarness.waitForTaskCompletion(); + + assertThat(events, is(Arrays.asList("Timer:1:1", "Timer:0:1"))); + } + + private static class TestOperatorFactory implements OneInputStreamOperatorFactory, YieldingOperatorFactory { + private MailboxExecutor mailboxExecutor; + + @Override + public void setMailboxExecutor(MailboxExecutor mailboxExecutor) { + this.mailboxExecutor = mailboxExecutor; + } + + @Override + public > Operator createStreamOperator( + StreamTask containingTask, + StreamConfig config, + Output> output) { + TestOperator operator = new TestOperator<>(config.getChainIndex(), mailboxExecutor); + operator.setup(containingTask, config, output); + return (Operator) operator; + } + + @Override + public void setChainingStrategy(ChainingStrategy strategy) { + } + + @Override + public
[GitHub] [flink] AHeise commented on a change in pull request #9735: [FLINK-14156][runtime] Submit timer trigger letters to task's mailbox with operator's precedence
AHeise commented on a change in pull request #9735: [FLINK-14156][runtime] Submit timer trigger letters to task's mailbox with operator's precedence URL: https://github.com/apache/flink/pull/9735#discussion_r330515584 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java ## @@ -331,19 +320,10 @@ public CheckpointStateOutputStream createCheckpointStateOutputStream(Checkpointe blockerCheckpointStreamFactory.setWaiterLatch(new OneShotLatch()); testHarness.invoke(mockEnv); + testHarness.waitForTaskRunning(); final OneInputStreamTask task = testHarness.getTask(); - // wait for the task to be running Review comment: omg. thanks for that hotfix. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] AHeise commented on a change in pull request #9735: [FLINK-14156][runtime] Submit timer trigger letters to task's mailbox with operator's precedence
AHeise commented on a change in pull request #9735: [FLINK-14156][runtime] Submit timer trigger letters to task's mailbox with operator's precedence URL: https://github.com/apache/flink/pull/9735#discussion_r330476481 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskOperatorTimerTest.java ## @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators; + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.api.operators.YieldingOperatorFactory; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask; +import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.streaming.runtime.tasks.mailbox.execution.MailboxExecutor; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; + +/** + * Test to verify that timer triggers are run according to operator precedence (combined with yield() at operator level). + */ +public class StreamTaskOperatorTimerTest extends TestLogger { + private static List events; Review comment: Please try to avoid adding even more static fields to the tests. Can you please inject a local event list in the same way you inject the mailbox? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] AHeise commented on a change in pull request #9735: [FLINK-14156][runtime] Submit timer trigger letters to task's mailbox with operator's precedence
AHeise commented on a change in pull request #9735: [FLINK-14156][runtime] Submit timer trigger letters to task's mailbox with operator's precedence URL: https://github.com/apache/flink/pull/9735#discussion_r330475591 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java ## @@ -250,37 +250,36 @@ int getNumTasksScheduled() { // /** -* A context to which {@link ProcessingTimeCallback} would be passed to be invoked when a timer is up. +* An exception handler, called when {@link ProcessingTimeCallback} throws an exception. */ - interface ScheduledCallbackExecutionContext { - - void invoke(ProcessingTimeCallback callback, long timestamp); + interface ExceptionHandler { Review comment: We already have some in `org.apache.flink.runtime.operators.sort.ExceptionHandler` . Can we unify them? Or can we use `AsyncExceptionHandler` ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] AHeise commented on a change in pull request #9735: [FLINK-14156][runtime] Submit timer trigger letters to task's mailbox with operator's precedence
AHeise commented on a change in pull request #9735: [FLINK-14156][runtime] Submit timer trigger letters to task's mailbox with operator's precedence URL: https://github.com/apache/flink/pull/9735#discussion_r330515293 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskOperatorTimerTest.java ## @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators; + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.api.operators.YieldingOperatorFactory; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask; +import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.streaming.runtime.tasks.mailbox.execution.MailboxExecutor; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; + +/** + * Test to verify that timer triggers are run according to operator precedence (combined with yield() at operator level). + */ +public class StreamTaskOperatorTimerTest extends TestLogger { + private static List events; + + @Test + public void testOperatorYieldExecutesSelectedTimers() throws Exception { + events = new ArrayList<>(); + final OneInputStreamTaskTestHarness testHarness = new OneInputStreamTaskTestHarness<>( + OneInputStreamTask::new, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO); + + testHarness.setupOperatorChain(new OperatorID(), new TestOperatorFactory<>()) + .chain(new OperatorID(), new TestOperatorFactory<>(), IntSerializer.INSTANCE) + .finish(); + + testHarness.invoke(); + testHarness.waitForTaskRunning(); + + testHarness.processElement(new StreamRecord<>(42)); + + testHarness.endInput(); + testHarness.waitForTaskCompletion(); + + assertThat(events, is(Arrays.asList("Timer:1:1", "Timer:0:1"))); + } + + private static class TestOperatorFactory implements OneInputStreamOperatorFactory, YieldingOperatorFactory { + private MailboxExecutor mailboxExecutor; + + @Override + public void setMailboxExecutor(MailboxExecutor mailboxExecutor) { + this.mailboxExecutor = mailboxExecutor; + } + + @Override + public > Operator createStreamOperator( + StreamTask containingTask, + StreamConfig config, + Output> output) { + TestOperator operator = new TestOperator<>(config.getChainIndex(), mailboxExecutor); + operator.setup(containingTask, config, output); + return (Operator) operator; + } + + @Override + public void setChainingStrategy(ChainingStrategy strategy) { + } + + @Override + public
[GitHub] [flink] AHeise commented on a change in pull request #9735: [FLINK-14156][runtime] Submit timer trigger letters to task's mailbox with operator's precedence
AHeise commented on a change in pull request #9735: [FLINK-14156][runtime] Submit timer trigger letters to task's mailbox with operator's precedence URL: https://github.com/apache/flink/pull/9735#discussion_r330474001 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorLatencyMetricsTest.java ## @@ -210,7 +210,7 @@ private void testLatencyMarkEmission(int numberLatencyMarkers, OperatorSetupOper StreamSource operator, ExecutionConfig executionConfig, Environment env, - ProcessingTimeService timeProvider) { + TimerService timeProvider) { Review comment: nit: timeProvider is not a really good name. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zentol commented on issue #9798: [FLINK-14176][web]Web Ui add log url for taskmanager of vertex
zentol commented on issue #9798: [FLINK-14176][web]Web Ui add log url for taskmanager of vertex URL: https://github.com/apache/flink/pull/9798#issuecomment-537467031 _As usual_, please include a screenshot of the WebUI when modifying it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #9834: [bp-1.9][FLINK-13827][script] shell variable should be escaped
flinkbot commented on issue #9834: [bp-1.9][FLINK-13827][script] shell variable should be escaped URL: https://github.com/apache/flink/pull/9834#issuecomment-537466604 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit 65dff9edaba9515d520fdefa5f4b9a7361d3e92f (Wed Oct 02 12:20:06 UTC 2019) **Warnings:** * No documentation files were touched! Remember to keep the Flink docs up to date! Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] TisonKun opened a new pull request #9834: [FLINK-13827][script] shell variable should be escaped
TisonKun opened a new pull request #9834: [FLINK-13827][script] shell variable should be escaped URL: https://github.com/apache/flink/pull/9834 See also #9800 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-13827) shell variable should be escaped in start-scala-shell.sh
[ https://issues.apache.org/jira/browse/FLINK-13827?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16942754#comment-16942754 ] Zili Chen commented on FLINK-13827: --- master via 865cc4c7a39f7aa610a02cc4a0f41424edcd6279 > shell variable should be escaped in start-scala-shell.sh > > > Key: FLINK-13827 > URL: https://issues.apache.org/jira/browse/FLINK-13827 > Project: Flink > Issue Type: Bug > Components: Scala Shell >Affects Versions: 1.9.0, 1.10.0 >Reporter: Zili Chen >Assignee: Zili Chen >Priority: Major > Labels: pull-request-available > Fix For: 1.10.0, 1.9.2 > > Time Spent: 20m > Remaining Estimate: 0h > > {code:java} > diff --git a/flink-scala-shell/start-script/start-scala-shell.sh > b/flink-scala-shell/start-script/start-scala-shell.sh > index b6da81af72..65b9045584 100644 > --- a/flink-scala-shell/start-script/start-scala-shell.sh > +++ b/flink-scala-shell/start-script/start-scala-shell.sh > @@ -97,9 +97,9 @@ log_setting="-Dlog.file="$LOG" > -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/$LOG > > if ${EXTERNAL_LIB_FOUND} > then > -$JAVA_RUN -Dscala.color -cp "$FLINK_CLASSPATH" $log_setting > org.apache.flink.api.scala.FlinkShell $@ --addclasspath "$EXT_CLASSPATH" > +$JAVA_RUN -Dscala.color -cp "$FLINK_CLASSPATH" "$log_setting" > org.apache.flink.api.scala.FlinkShell $@ --addclasspath "$EXT_CLASSPATH" > else > -$JAVA_RUN -Dscala.color -cp "$FLINK_CLASSPATH" $log_setting > org.apache.flink.api.scala.FlinkShell $@ > +$JAVA_RUN -Dscala.color -cp "$FLINK_CLASSPATH" "$log_setting" > org.apache.flink.api.scala.FlinkShell $@ > fi > > #restore echo > {code} > otherwise it is error prone when {{$log_setting}} contain arbitrary content. > For example, if the parent dir contain whitespace, said {{flink-1.9.0 2}}, > then {{bin/start-scala-shell.sh local}} will fail with > {{Error: Could not find or load main class > 2.log.flink\-\*\-scala\-shell\-local\-\*.log}} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-14300) StreamTask#invoke leaks threads if OperatorChain fails to be constructed
[ https://issues.apache.org/jira/browse/FLINK-14300?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16942753#comment-16942753 ] Piotr Nowojski edited comment on FLINK-14300 at 10/2/19 12:11 PM: -- Hi [~mklein0]. Thank you for reporting and proposing a fix for this bug. Could you open a pull request for this change on the [github|https://github.com/apache/flink/]? If you do so, please notify me so I can quickly review it. was (Author: pnowojski): Hi [~mklein0]. Thank you for reporting and proposing a fix for this bug. Could you open a pull request for this change on the [github|https://github.com/apache/flink/]? > StreamTask#invoke leaks threads if OperatorChain fails to be constructed > > > Key: FLINK-14300 > URL: https://issues.apache.org/jira/browse/FLINK-14300 > Project: Flink > Issue Type: Bug > Components: Runtime / Task >Affects Versions: 1.8.1, 1.8.2, 1.9.0 >Reporter: Marcos Klein >Priority: Minor > Fix For: 1.10.0 > > Attachments: thread-leak-patch.diff > > > In the *StreamTask#invoke* method if an exception occurs during the > allocation of the > [operatorChain|[https://github.com/apache/flink/blob/release-1.9.0/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L370]] > class, the [exception > handling|[https://github.com/apache/flink/blob/release-1.9.0/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L485-L491]] > fails to cleanup the threads allocated as *StreamTask#recordWriters*. This > causes threads to leak as flink attempts to continually restart and fail for > the same cause. > > An example cause is a deserialization issue on a custom operator from a > checkpoint. > > Attached is a suggested fix for the master branch. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-14300) StreamTask#invoke leaks threads if OperatorChain fails to be constructed
[ https://issues.apache.org/jira/browse/FLINK-14300?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski updated FLINK-14300: --- Fix Version/s: 1.10.0 > StreamTask#invoke leaks threads if OperatorChain fails to be constructed > > > Key: FLINK-14300 > URL: https://issues.apache.org/jira/browse/FLINK-14300 > Project: Flink > Issue Type: Bug > Components: Runtime / Task >Affects Versions: 1.8.1, 1.8.2, 1.9.0 >Reporter: Marcos Klein >Priority: Minor > Fix For: 1.10.0 > > Attachments: thread-leak-patch.diff > > > In the *StreamTask#invoke* method if an exception occurs during the > allocation of the > [operatorChain|[https://github.com/apache/flink/blob/release-1.9.0/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L370]] > class, the [exception > handling|[https://github.com/apache/flink/blob/release-1.9.0/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L485-L491]] > fails to cleanup the threads allocated as *StreamTask#recordWriters*. This > causes threads to leak as flink attempts to continually restart and fail for > the same cause. > > An example cause is a deserialization issue on a custom operator from a > checkpoint. > > Attached is a suggested fix for the master branch. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] TisonKun closed pull request #9800: [FLINK-13827][script] shell variable should be escaped
TisonKun closed pull request #9800: [FLINK-13827][script] shell variable should be escaped URL: https://github.com/apache/flink/pull/9800 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] TisonKun commented on issue #9800: [FLINK-13827][script] shell variable should be escaped
TisonKun commented on issue #9800: [FLINK-13827][script] shell variable should be escaped URL: https://github.com/apache/flink/pull/9800#issuecomment-537463641 master via 865cc4c7a39f7aa610a02cc4a0f41424edcd6279 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-14300) StreamTask#invoke leaks threads if OperatorChain fails to be constructed
[ https://issues.apache.org/jira/browse/FLINK-14300?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16942753#comment-16942753 ] Piotr Nowojski commented on FLINK-14300: Hi [~mklein0]. Thank you for reporting and proposing a fix for this bug. Could you open a pull request for this change on the [github|https://github.com/apache/flink/]? > StreamTask#invoke leaks threads if OperatorChain fails to be constructed > > > Key: FLINK-14300 > URL: https://issues.apache.org/jira/browse/FLINK-14300 > Project: Flink > Issue Type: Bug > Components: Runtime / Task >Affects Versions: 1.8.1, 1.8.2, 1.9.0 >Reporter: Marcos Klein >Priority: Minor > Attachments: thread-leak-patch.diff > > > In the *StreamTask#invoke* method if an exception occurs during the > allocation of the > [operatorChain|[https://github.com/apache/flink/blob/release-1.9.0/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L370]] > class, the [exception > handling|[https://github.com/apache/flink/blob/release-1.9.0/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L485-L491]] > fails to cleanup the threads allocated as *StreamTask#recordWriters*. This > causes threads to leak as flink attempts to continually restart and fail for > the same cause. > > An example cause is a deserialization issue on a custom operator from a > checkpoint. > > Attached is a suggested fix for the master branch. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on issue #9815: [FLINK-14117][docs-zh] Translate changes on index page to Chinese
flinkbot edited a comment on issue #9815: [FLINK-14117][docs-zh] Translate changes on index page to Chinese URL: https://github.com/apache/flink/pull/9815#issuecomment-536343016 ## CI report: * 615acdb2511760c55f8831934f710678a8962acc : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/129610971) * 38894aa30a82d5763ad8137e176ac7d78ee13178 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/129652562) * 4b35c7d8ce5d86f030037b924a943f857d7f8a01 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/129684998) * 9099b3a2a220c0d289899f05cea6714fc281d800 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/129802867) * 90d7fce7f9b27fe52e84c65545c701609b9a3ea9 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/129858575) * 7d25238fda6dbf1029d3d315d3b1077227ca05ce : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/130010236) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #9791: [FLINK-14248][runtime] Let LazyFromSourcesSchedulingStrategy restart terminated tasks
flinkbot edited a comment on issue #9791: [FLINK-14248][runtime] Let LazyFromSourcesSchedulingStrategy restart terminated tasks URL: https://github.com/apache/flink/pull/9791#issuecomment-536005650 ## CI report: * 757ae54b3f31a82334fae36d6e714c6aa9ac6581 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/129477385) * fa00ce849c0c110b76207c99ffed9e29b26d597e : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/130013179) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] TisonKun commented on issue #9800: [FLINK-13827][script] shell variable should be escaped
TisonKun commented on issue #9800: [FLINK-13827][script] shell variable should be escaped URL: https://github.com/apache/flink/pull/9800#issuecomment-537457189 Thanks for your review @xintongsong & @zentol . Merging to master... @wuchong given there is a release progress under branch-1.9 it seems the back port to 1.9 should be deferred. The JIRA fixed version contains 1.9.2 so that I'd like to do the back port once 1.9.1 released and unlock branch-1.9. Let me know if I misunderstand anything. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-14214) Performance regression in TwoInputStreamOperator
[ https://issues.apache.org/jira/browse/FLINK-14214?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski closed FLINK-14214. -- Resolution: Cannot Reproduce > Performance regression in TwoInputStreamOperator > > > Key: FLINK-14214 > URL: https://issues.apache.org/jira/browse/FLINK-14214 > Project: Flink > Issue Type: Bug > Components: Runtime / Task >Affects Versions: 1.10.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > Time Spent: 20m > Remaining Estimate: 0h > > https://issues.apache.org/jira/browse/FLINK-13051 introduced a performance > regression visible for example > [here|http://codespeed.dak8s.net:8000/timeline/#/?exe=1,3=twoInputMapSink=2=200=off=off=off] > as a drop on August 30th (note that the later performance improvement is > caused by serialisation improvement, which is unrelated to the previous slow > down). > Probable suspect is the following {{.isDone()}} check executed inside > {{StreamTwoInputProcessor}} once per record: > {code:java} > // to avoid starvation, if the input selection is ALL and availableInputsMask > is not ALL, > // always try to check and set the availability of another input > // TODO: because this can be a costly operation (checking volatile inside > CompletableFuture` > // this might be optimized to only check once per processed NetworkBuffer > if (inputSelectionHandler.shouldSetAvailableForAnotherInput()) { > checkAndSetAvailable(1 - readingInputIndex); > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] pnowojski merged pull request #9779: [FLINK-14214][runtime] Shortcut isAvailable().isDone() anti-starvation check in StreamTwoInputProcesso
pnowojski merged pull request #9779: [FLINK-14214][runtime] Shortcut isAvailable().isDone() anti-starvation check in StreamTwoInputProcesso URL: https://github.com/apache/flink/pull/9779 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zentol commented on a change in pull request #9825: [FLINK-14299] Introduce ProcessMetricGroup
zentol commented on a change in pull request #9825: [FLINK-14299] Introduce ProcessMetricGroup URL: https://github.com/apache/flink/pull/9825#discussion_r330501737 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java ## @@ -62,7 +64,7 @@ */ public class MetricUtils { private static final Logger LOG = LoggerFactory.getLogger(MetricUtils.class); - private static final String METRIC_GROUP_STATUS_NAME = "Status"; + public static final String METRIC_GROUP_STATUS_NAME = "Status"; Review comment: where is this used outside of this class? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zentol commented on a change in pull request #9825: [FLINK-14299] Introduce ProcessMetricGroup
zentol commented on a change in pull request #9825: [FLINK-14299] Introduce ProcessMetricGroup URL: https://github.com/apache/flink/pull/9825#discussion_r330499346 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java ## @@ -100,17 +98,22 @@ public static JobManagerMetricGroup instantiateJobManagerMetricGroup( hostName, resourceID.toString()); - MetricGroup statusGroup = taskManagerMetricGroup.addGroup(METRIC_GROUP_STATUS_NAME); - - // Initialize the TM metrics - instantiateStatusMetrics(statusGroup); + MetricGroup statusGroup = createAndInitializeStatusMetricGroup(taskManagerMetricGroup); if (systemResourceProbeInterval.isPresent()) { instantiateSystemMetrics(taskManagerMetricGroup, systemResourceProbeInterval.get()); } return Tuple2.of(taskManagerMetricGroup, statusGroup); } + private static MetricGroup createAndInitializeStatusMetricGroup(AbstractMetricGroup parentMetricGroup) { + MetricGroup statusGroup = parentMetricGroup.addGroup(METRIC_GROUP_STATUS_NAME); + + // Initialize the TM metrics Review comment: this comment isn't accurate anymore; let's just remove it This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #9791: [FLINK-14248][runtime] Let LazyFromSourcesSchedulingStrategy restart terminated tasks
flinkbot edited a comment on issue #9791: [FLINK-14248][runtime] Let LazyFromSourcesSchedulingStrategy restart terminated tasks URL: https://github.com/apache/flink/pull/9791#issuecomment-536005650 ## CI report: * 757ae54b3f31a82334fae36d6e714c6aa9ac6581 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/129477385) * fa00ce849c0c110b76207c99ffed9e29b26d597e : PENDING [Build](https://travis-ci.com/flink-ci/flink/builds/130013179) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zentol merged pull request #9668: [hotfix][docs] Fix the wrong characters in the document(building.md, building.zh.md)
zentol merged pull request #9668: [hotfix][docs] Fix the wrong characters in the document(building.md,building.zh.md) URL: https://github.com/apache/flink/pull/9668 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-13516) YARNSessionFIFOSecuredITCase fails on Java 11
[ https://issues.apache.org/jira/browse/FLINK-13516?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-13516. Resolution: Fixed master: d35f6990eb402a2137826b62bca11437728da9c1 > YARNSessionFIFOSecuredITCase fails on Java 11 > - > > Key: FLINK-13516 > URL: https://issues.apache.org/jira/browse/FLINK-13516 > Project: Flink > Issue Type: Sub-task > Components: Deployment / YARN, Tests >Reporter: Chesnay Schepler >Assignee: Haibo Sun >Priority: Major > Labels: pull-request-available > Fix For: 1.10.0 > > Time Spent: 20m > Remaining Estimate: 0h > > {{YARNSessionFIFOSecuredITCase#testDetachedMode}} times out when run on Java > 11. This may be related to security changes in Java 11. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] zentol merged pull request #9622: [FLINK-13516][test] Bump MiniKdc to 3.2.0 to fix the failure of YARNSessionFIFOSecuredITCase on Java 11
zentol merged pull request #9622: [FLINK-13516][test] Bump MiniKdc to 3.2.0 to fix the failure of YARNSessionFIFOSecuredITCase on Java 11 URL: https://github.com/apache/flink/pull/9622 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zentol commented on issue #9504: [FLINK-13799][web]: fix error message web submit is disables in the c…
zentol commented on issue #9504: [FLINK-13799][web]: fix error message web submit is disables in the c… URL: https://github.com/apache/flink/pull/9504#issuecomment-537446272 @vthinkxie FLINK-13817 exposes whether submissions are enabled in a more reliable way. The configuration is is unreliable since it only lists explicitly configured keys; if the key is not explicitly configured you'd have to make assumptions on the default. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #9833: [FLINK-14276][quickstarts] Scala quickstart compiles on JDK 11
flinkbot edited a comment on issue #9833: [FLINK-14276][quickstarts] Scala quickstart compiles on JDK 11 URL: https://github.com/apache/flink/pull/9833#issuecomment-537431999 ## CI report: * 5077c53b7d163f49e5d4a8414c6c0598d4fdb060 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/130007248) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #9815: [FLINK-14117][docs-zh] Translate changes on index page to Chinese
flinkbot edited a comment on issue #9815: [FLINK-14117][docs-zh] Translate changes on index page to Chinese URL: https://github.com/apache/flink/pull/9815#issuecomment-536343016 ## CI report: * 615acdb2511760c55f8831934f710678a8962acc : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/129610971) * 38894aa30a82d5763ad8137e176ac7d78ee13178 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/129652562) * 4b35c7d8ce5d86f030037b924a943f857d7f8a01 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/129684998) * 9099b3a2a220c0d289899f05cea6714fc281d800 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/129802867) * 90d7fce7f9b27fe52e84c65545c701609b9a3ea9 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/129858575) * 7d25238fda6dbf1029d3d315d3b1077227ca05ce : PENDING [Build](https://travis-ci.com/flink-ci/flink/builds/130010236) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #9791: [FLINK-14248][runtime] Let LazyFromSourcesSchedulingStrategy restart terminated tasks
flinkbot edited a comment on issue #9791: [FLINK-14248][runtime] Let LazyFromSourcesSchedulingStrategy restart terminated tasks URL: https://github.com/apache/flink/pull/9791#issuecomment-536005650 ## CI report: * 757ae54b3f31a82334fae36d6e714c6aa9ac6581 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/129477385) * fa00ce849c0c110b76207c99ffed9e29b26d597e : UNKNOWN This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #9833: [FLINK-14276][quickstarts] Scala quickstart compiles on JDK 11
flinkbot edited a comment on issue #9833: [FLINK-14276][quickstarts] Scala quickstart compiles on JDK 11 URL: https://github.com/apache/flink/pull/9833#issuecomment-537431999 ## CI report: * 5077c53b7d163f49e5d4a8414c6c0598d4fdb060 : PENDING [Build](https://travis-ci.com/flink-ci/flink/builds/130007248) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #9815: [FLINK-14117][docs-zh] Translate changes on index page to Chinese
flinkbot edited a comment on issue #9815: [FLINK-14117][docs-zh] Translate changes on index page to Chinese URL: https://github.com/apache/flink/pull/9815#issuecomment-536343016 ## CI report: * 615acdb2511760c55f8831934f710678a8962acc : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/129610971) * 38894aa30a82d5763ad8137e176ac7d78ee13178 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/129652562) * 4b35c7d8ce5d86f030037b924a943f857d7f8a01 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/129684998) * 9099b3a2a220c0d289899f05cea6714fc281d800 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/129802867) * 90d7fce7f9b27fe52e84c65545c701609b9a3ea9 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/129858575) * 7d25238fda6dbf1029d3d315d3b1077227ca05ce : UNKNOWN This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #9791: [FLINK-14248][runtime] Let LazyFromSourcesSchedulingStrategy restart terminated tasks
zhuzhurk commented on a change in pull request #9791: [FLINK-14248][runtime] Let LazyFromSourcesSchedulingStrategy restart terminated tasks URL: https://github.com/apache/flink/pull/9791#discussion_r330483271 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java ## @@ -102,11 +108,12 @@ public void onExecutionStateChange(ExecutionVertexID executionVertexId, Executio final Set verticesToSchedule = schedulingTopology.getVertexOrThrow(executionVertexId) .getProducedResultPartitions() .stream() + .filter(partition -> partition.getPartitionType() == ResultPartitionType.BLOCKING) Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #9779: [FLINK-14214][runtime] Shortcut isAvailable().isDone() anti-starvation check in StreamTwoInputProcesso
flinkbot edited a comment on issue #9779: [FLINK-14214][runtime] Shortcut isAvailable().isDone() anti-starvation check in StreamTwoInputProcesso URL: https://github.com/apache/flink/pull/9779#issuecomment-535521325 ## CI report: * 42064500d00956c47a8b37b678c4c9b43170ac4a : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/129282821) * 10728ce9a96eb13e6b2586eedadfe53161ff85b2 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/13514) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #9791: [FLINK-14248][runtime] Let LazyFromSourcesSchedulingStrategy restart terminated tasks
zhuzhurk commented on a change in pull request #9791: [FLINK-14248][runtime] Let LazyFromSourcesSchedulingStrategy restart terminated tasks URL: https://github.com/apache/flink/pull/9791#discussion_r330480845 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java ## @@ -78,19 +81,22 @@ public void startScheduling() { deploymentOptions.put(schedulingVertex.getId(), option); } - allocateSlotsAndDeployExecutionVertexIds(getAllVerticesFromTopology()); + allocateSlotsAndDeployExecutionVertices( + getSchedulingExecutionVertices(getAllVerticesFromTopology()), + IS_IN_CREATED_STATE); } @Override - public void restartTasks(Set verticesToRestart) { + public void restartTasks(final Set verticesToRestart) { + final Set verticesToSchedule = getSchedulingExecutionVertices(verticesToRestart); + // increase counter of the dataset first - verticesToRestart + verticesToSchedule .stream() - .map(schedulingTopology::getVertexOrThrow) .flatMap(vertex -> vertex.getProducedResultPartitions().stream()) .forEach(inputConstraintChecker::resetSchedulingResultPartition); - allocateSlotsAndDeployExecutionVertexIds(verticesToRestart); + allocateSlotsAndDeployExecutionVertices(verticesToSchedule, IS_IN_TERMINAL_STATE); Review comment: This should not happen because the `restartTasks` is only invoked when the `releaseFuture` of all the`verticesToRestart` are completed(valid states include `FINISHED`/`CANCELED`/`FAILED`). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #9791: [FLINK-14248][runtime] Let LazyFromSourcesSchedulingStrategy restart terminated tasks
zhuzhurk commented on a change in pull request #9791: [FLINK-14248][runtime] Let LazyFromSourcesSchedulingStrategy restart terminated tasks URL: https://github.com/apache/flink/pull/9791#discussion_r330480920 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java ## @@ -102,11 +108,12 @@ public void onExecutionStateChange(ExecutionVertexID executionVertexId, Executio final Set verticesToSchedule = schedulingTopology.getVertexOrThrow(executionVertexId) .getProducedResultPartitions() .stream() + .filter(partition -> partition.getPartitionType() == ResultPartitionType.BLOCKING) Review comment: You are right! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #9833: [FLINK-14276][quickstarts] Scala quickstart compiles on JDK 11
flinkbot commented on issue #9833: [FLINK-14276][quickstarts] Scala quickstart compiles on JDK 11 URL: https://github.com/apache/flink/pull/9833#issuecomment-537431999 ## CI report: * 5077c53b7d163f49e5d4a8414c6c0598d4fdb060 : UNKNOWN This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-14295) Nightly flink-runtime failed with java 11
[ https://issues.apache.org/jira/browse/FLINK-14295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16942676#comment-16942676 ] Chesnay Schepler commented on FLINK-14295: -- Let's split this into 2 tickets since there are 2 test failures. From a quick glance I doubt this is exclusive to Java 11. > Nightly flink-runtime failed with java 11 > - > > Key: FLINK-14295 > URL: https://issues.apache.org/jira/browse/FLINK-14295 > Project: Flink > Issue Type: Bug > Components: Tests >Reporter: Yu Li >Priority: Critical > Labels: test-stability > > The core-jdk11 part of nightly test failed with below error: > {noformat} > 22:09:38.176 [ERROR] Failures: > 22:09:38.180 [ERROR] > TaskExecutorSubmissionTest.testRequestStackTraceSample:637 > expected:<[updateTaskExecutionState]> but > was:<[lambda$updateTaskExecutionState$0]> > 22:09:38.185 [ERROR] Errors: > 22:09:38.185 [ERROR] > RecordWriterTest.testClearBuffersAfterInterruptDuringBlockingBufferRequest:165 > » NoSuchElement > 22:09:38.185 [INFO] > 22:09:38.186 [ERROR] Tests run: 3936, Failures: 1, Errors: 1, Skipped: 40 > {noformat} > Link of the build: https://api.travis-ci.org/v3/job/591086968/log.txt -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] tillrohrmann commented on a change in pull request #9760: [FLINK-13982][runtime] Implement memory calculation logics
tillrohrmann commented on a change in pull request #9760: [FLINK-13982][runtime] Implement memory calculation logics URL: https://github.com/apache/flink/pull/9760#discussion_r330469316 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java ## @@ -0,0 +1,333 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple6; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.MemorySize; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.util.ConfigurationException; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * Utility class for TaskExecutor memory configurations. + */ +public class TaskExecutorResourceUtils { + + private TaskExecutorResourceUtils() {} + + // + // Memory Configuration Calculations + // + + public static TaskExecutorResourceSpec resourceSpecFromConfig(final Configuration config) throws ConfigurationException { + if (isTaskHeapMemorySizeExplicitlyConfigured(config) && isManagedMemorySizeExplicitlyConfigured(config)) { + // both task heap memory and managed memory is configured, use these to derive total flink memory + return deriveResourceSpecWithExplicitTaskAndManagedMemory(config); + } else if (isTotalFlinkMemorySizeExplicitlyConfigured(config)) { + // total flink memory is configured, but not task heap and managed memory, derive from total flink memory + return deriveResourceSpecWithTotalFlinkMemory(config); + } else if (isTotalProcessMemorySizeExplicitlyConfigured(config)) { + return deriveResourceSpecWithTotalProcessMemory(config); + } else { + throw new ConfigurationException("Either Task Heap Memory size and Managed Memory size, or Total Flink" + + " Memory size, or Total Process Memory size need to be configured explicitly."); + } + } + + private static TaskExecutorResourceSpec deriveResourceSpecWithExplicitTaskAndManagedMemory(final Configuration config) { + // derive total flink internal memory sizes from explicitly configure task heap memory size and managed memory size + + final MemorySize frameworkHeapMemorySize = getFrameworkHeapMemorySize(config); + final MemorySize taskHeapMemorySize = getTaskHeapMemorySize(config); + final MemorySize taskOffHeapMemorySize = getTaskOffHeapMemorySize(config); + + final MemorySize managedMemorySize = getManagedMemorySize(config); + final Tuple2 managedMemorySizeTuple2 = deriveOnHeapAndOffHeapManagedMemorySizeFromManagedMemorySize(config, managedMemorySize); + final MemorySize onHeapManagedMemorySize = managedMemorySizeTuple2.f0; + final MemorySize offHeapManagedMemorySize = managedMemorySizeTuple2.f1; + + final MemorySize shuffleMemorySize = deriveShuffleMemoryWithInverseFraction(config, + frameworkHeapMemorySize.add(taskHeapMemorySize).add(taskOffHeapMemorySize).add(managedMemorySize)); + + // derive total flink external memory sizes from derived total flink memory size + + final MemorySize totalFlinkMemorySize = frameworkHeapMemorySize + .add(taskHeapMemorySize) + .add(taskOffHeapMemorySize) + .add(shuffleMemorySize) + .add(managedMemorySize); + + final Tuple2 totalFlinkExternalMemorySizeTuple2 = deriveTotalFlinkExternalMemorySizes(config, totalFlinkMemorySize); + + return new TaskExecutorResourceSpec( +