[GitHub] flink issue #5823: [FLINK-9008] [e2e] Implements quickstarts end to end test
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5823 Thanks @zhangminglei. This looks good to merge, will proceed to merge this. ---
[GitHub] flink issue #5374: [FLINK-8101][flink-connectors] Elasticsearch 5.3+ (Transp...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5374 @cjolif since we have now reached a conclusion on where the Elasticsearch connector should be improved in the future, could you maybe close this PR? I assume a new PR will be opened that subsumes this one. ---
[GitHub] flink issue #6040: [FLINK-9349][Kafka Connector] KafkaConnector Exception wh...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/6040 @snuyanzin the failing `YARNSessionCapacitySchedulerITCase` is known to be bit flaky, so you can safely ignore that for now. I'll take another look at your changes soon. Thanks! ---
[GitHub] flink pull request #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/6021#discussion_r189177308 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java --- @@ -326,6 +342,24 @@ private void checkAndPropagateAsyncError() throws Exception { } } + /** +* If the internal queue of the {@link KinesisProducer} gets too long, +* flush some of the records until we are below the limit again. +* We don't want to flush _all_ records at this point since that would +* break record aggregation. +*/ + private void checkQueueLimit() { + while (producer.getOutstandingRecordsCount() >= queueLimit) { + producer.flush(); --- End diff -- When we add a record to the producer queue via `producer.addUserRecord(...)`, we get a callback. We can use that callback to notify the blocking operation in `checkQueueLimit`. ---
[GitHub] flink issue #5941: [FLINK-8971] [e2e] Include broadcast / union state in gen...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5941 I think it is safe to merge this change. Will merge this now .. ---
[GitHub] flink issue #5955: [FLINK-8659] Add migration itcases for broadcast state.
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5955 @kl0u yes, lets do that as a separate commit then. +1, this looks good to me. One final comment for the merge: When merging to `master`, we should have test savepoints for both `1.5` (taken in the release-1.5 branch), and `1.6` (taken in the current `master`) branch. ---
[GitHub] flink pull request #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/6021#discussion_r189163394 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java --- @@ -218,6 +232,8 @@ public void invoke(OUT value, Context context) throws Exception { throw new RuntimeException("Kinesis producer has been closed"); } + checkAndPropagateAsyncError(); + checkQueueLimit(); checkAndPropagateAsyncError(); --- End diff -- This second check is to check any async errors that occurred during the queue flush, correct? If so, we should probably move this second invocation into `checkQueueLimit` to make this more implicit. ---
[GitHub] flink pull request #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/6021#discussion_r189164871 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java --- @@ -326,6 +342,24 @@ private void checkAndPropagateAsyncError() throws Exception { } } + /** +* If the internal queue of the {@link KinesisProducer} gets too long, +* flush some of the records until we are below the limit again. +* We don't want to flush _all_ records at this point since that would +* break record aggregation. +*/ + private void checkQueueLimit() { + while (producer.getOutstandingRecordsCount() >= queueLimit) { + producer.flush(); --- End diff -- Do we have to do a flush here? Shouldn't the KPL child process process the user records in the background without an explicit flush call? If so, perhaps a more graceful solution here is to wait on a local object, and notify it to wake up in the asynchronous producer write call backs. After being notified, we check the `getOutstandingRecordsCount` agains the queueLimit, and either wait more or escape the loop. What do you think? ---
[GitHub] flink issue #6040: [FLINK-9349][Kafka Connector] KafkaConnector Exception wh...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/6040 Thanks for the PR @snuyanzin! I had some comments, please let me know what you think. Also, some general contribution tips: 1. I would suggest the title of the PR to be something along the lines of "[FLINK-9349] [kafka] Fix ConcurrentModificationException when add discovered partitions". That directly makes it clear what exactly is being fixed. 2. The message of the first commit of the PR should also be appropriately set to be similar to the title (most of the time if it is a 1-commit PR, the title of the PR and the commit message can be identical). ---
[GitHub] flink pull request #6040: [FLINK-9349][Kafka Connector] KafkaConnector Excep...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/6040#discussion_r189031751 --- Diff: flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Flink9349Test.java --- @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internal; + +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.core.testutils.MultiShotLatch; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel; +import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper; + +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.assertFalse; +import static org.mockito.Mockito.anyLong; +import static org.powermock.api.mockito.PowerMockito.doAnswer; +import static org.powermock.api.mockito.PowerMockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; +import static org.powermock.api.mockito.PowerMockito.whenNew; + +/** + * Unit tests for the {@link Flink9349Test}. + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest(KafkaConsumerThread.class) +public class Flink9349Test { + @Test + public void testConcurrentPartitionsDiscoveryAndLoopFetching() throws Exception { --- End diff -- Likewise, Kafka 08 / 09 / 010 / 011 should all have this test coverage. ---
[GitHub] flink pull request #6040: [FLINK-9349][Kafka Connector] KafkaConnector Excep...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/6040#discussion_r189029077 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java --- @@ -507,7 +507,7 @@ private void updateMinPunctuatedWatermark(Watermark nextWatermark) { SerializedValue<AssignerWithPunctuatedWatermarks> watermarksPunctuated, ClassLoader userCodeClassLoader) throws IOException, ClassNotFoundException { - List<KafkaTopicPartitionState> partitionStates = new LinkedList<>(); + List<KafkaTopicPartitionState> partitionStates = new CopyOnWriteArrayList<>(); --- End diff -- Would be nice to have a comment on why we need to use a `CopyOnWriteArrayList` ---
[GitHub] flink pull request #6040: [FLINK-9349][Kafka Connector] KafkaConnector Excep...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/6040#discussion_r189031075 --- Diff: flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Flink9349Test.java --- @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internal; + +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.core.testutils.MultiShotLatch; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel; +import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper; + +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.assertFalse; +import static org.mockito.Mockito.anyLong; +import static org.powermock.api.mockito.PowerMockito.doAnswer; +import static org.powermock.api.mockito.PowerMockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; +import static org.powermock.api.mockito.PowerMockito.whenNew; + +/** + * Unit tests for the {@link Flink9349Test}. + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest(KafkaConsumerThread.class) +public class Flink9349Test { + @Test + public void testConcurrentPartitionsDiscoveryAndLoopFetching() throws Exception { --- End diff -- I think we should have a similar test, but move it to `Kafka09FetcherTest`. ---
[GitHub] flink pull request #6040: [FLINK-9349][Kafka Connector] KafkaConnector Excep...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/6040#discussion_r189031464 --- Diff: flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Flink9349Test.java --- @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internal; + +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.core.testutils.MultiShotLatch; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel; +import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper; + +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.assertFalse; +import static org.mockito.Mockito.anyLong; +import static org.powermock.api.mockito.PowerMockito.doAnswer; +import static org.powermock.api.mockito.PowerMockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; +import static org.powermock.api.mockito.PowerMockito.whenNew; + +/** + * Unit tests for the {@link Flink9349Test}. + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest(KafkaConsumerThread.class) +public class Flink9349Test { + @Test + public void testConcurrentPartitionsDiscoveryAndLoopFetching() throws Exception { + + // test data + final KafkaTopicPartition testPartition = new KafkaTopicPartition("test", 42); + final Map<KafkaTopicPartition, Long> testCommitData = new HashMap<>(); + testCommitData.put(testPartition, 11L); + + // to synchronize when the consumer is in its blocking method + final OneShotLatch sync = new OneShotLatch(); + + // - the mock consumer with blocking poll calls + final MultiShotLatch blockerLatch = new MultiShotLatch(); + + KafkaConsumer mockConsumer = mock(KafkaConsumer.class); + when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords>() { + + @Override + public ConsumerRecords answer(InvocationOnMock invocation) throws InterruptedException { + sync.trigger(); + blockerLatch.await(); + return ConsumerRecords.empty(); + } + }); + + doAnswer(new Answer() { + @Override + public Void answer(InvocationOnMock invocation) {
[GitHub] flink pull request #6038: [FLINK-9394] [e2e] Test rescaling when resuming fr...
GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/6038 [FLINK-9394] [e2e] Test rescaling when resuming from externalized checkpoints ## What is the purpose of the change This PR further extends the `test_resume_externalized_checkpoints.sh` e2e test to cover rescaling cases. ## Brief change log - Allow specifying old / new parallelism and state backend configuration for the test script - Modify nightly test script ## Verifying this change This is a simple extension to existing tests. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink FLINK-9394 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6038.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6038 commit eb738232d864f2811987927ee1ea6352a3a041f7 Author: Tzu-Li (Gordon) Tai <tzulitai@...> Date: 2018-05-17T16:33:12Z [FLINK-9394] [e2e] Test rescaling when resuming from externalized checkpoints ---
[GitHub] flink issue #6038: [FLINK-9394] [e2e] Test rescaling when resuming from exte...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/6038 r: @StefanRRichter ---
[GitHub] flink pull request #6008: [FLINK-9354][travis] Print execution times for nig...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/6008#discussion_r188548653 --- Diff: flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh --- @@ -33,8 +33,8 @@ else NUM_SLOTS=$NEW_DOP fi -STATE_BACKEND_TYPE=${STATE_BACKEND_TYPE:-file} -STATE_BACKEND_FILE_ASYNC=${STATE_BACKEND_FILE_ASYNC:-true} +STATE_BACKEND_TYPE=${3:-file} +STATE_BACKEND_FILE_ASYNC=${4:-true} --- End diff -- We should also update the usage message at the beginning of this file to reflect these extra parameters. ---
[GitHub] flink pull request #5955: [FLINK-8659] Add migration itcases for broadcast s...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5955#discussion_r188511081 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java --- @@ -0,0 +1,418 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing.utils; + +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; +import org.apache.flink.runtime.state.StateBackendLoader; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.datastream.BroadcastStream; +import org.apache.flink.streaming.api.datastream.KeyedStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.util.migration.MigrationVersion; +import org.apache.flink.util.Collector; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +/** + * Migration ITCases for a stateful job with broadcast state. The tests are parameterized to (potentially) + * cover migrating for multiple previous Flink versions, as well as for different state backends. + */ +@RunWith(Parameterized.class) +public class StatefulJobWBroadcastStateMigrationITCase extends SavepointMigrationTestBase { + + private static final int NUM_SOURCE_ELEMENTS = 4; + + // TODO change this to PERFORM_SAVEPOINT to regenerate binary savepoints + private final StatefulJobSavepointMigrationITCase.ExecutionMode executionMode = + StatefulJobSavepointMigrationITCase.ExecutionMode.VERIFY_SAVEPOINT; + + @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}") + public static Collection<Tuple2<MigrationVersion, String>> parameters () { + return Arrays.asList( + Tuple2.of(MigrationVersion.v1_5, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), + Tuple2.of(MigrationVersion.v1_5, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME)); + } + + private final MigrationVersion testMigrateVersion; + private final String testStateBackend; + + public StatefulJobWBroadcastStateMigrationITCase(Tuple2<MigrationVersion, String> testMigrateVersionAndBackend) throws Exception { + this.testMigrateVersion = testMigrateVersionAndBackend.f0; + this.testStateBackend = testMigrateVersionAndBackend.f1; + } + + @Test + public void testSavepoint() throws Exception { + + final int parallelism = 4; + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setRestartStrategy(RestartStrategies.noRestart()); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + + switch (testStateBackend) { + ca
[GitHub] flink pull request #5955: [FLINK-8659] Add migration itcases for broadcast s...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5955#discussion_r188512949 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java --- @@ -0,0 +1,418 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing.utils; + +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; +import org.apache.flink.runtime.state.StateBackendLoader; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.datastream.BroadcastStream; +import org.apache.flink.streaming.api.datastream.KeyedStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.util.migration.MigrationVersion; +import org.apache.flink.util.Collector; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +/** + * Migration ITCases for a stateful job with broadcast state. The tests are parameterized to (potentially) + * cover migrating for multiple previous Flink versions, as well as for different state backends. + */ +@RunWith(Parameterized.class) +public class StatefulJobWBroadcastStateMigrationITCase extends SavepointMigrationTestBase { + + private static final int NUM_SOURCE_ELEMENTS = 4; + + // TODO change this to PERFORM_SAVEPOINT to regenerate binary savepoints + private final StatefulJobSavepointMigrationITCase.ExecutionMode executionMode = + StatefulJobSavepointMigrationITCase.ExecutionMode.VERIFY_SAVEPOINT; + + @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}") + public static Collection<Tuple2<MigrationVersion, String>> parameters () { + return Arrays.asList( + Tuple2.of(MigrationVersion.v1_5, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), + Tuple2.of(MigrationVersion.v1_5, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME)); + } + + private final MigrationVersion testMigrateVersion; + private final String testStateBackend; + + public StatefulJobWBroadcastStateMigrationITCase(Tuple2<MigrationVersion, String> testMigrateVersionAndBackend) throws Exception { + this.testMigrateVersion = testMigrateVersionAndBackend.f0; + this.testStateBackend = testMigrateVersionAndBackend.f1; + } + + @Test + public void testSavepoint() throws Exception { + + final int parallelism = 4; + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setRestartStrategy(RestartStrategies.noRestart()); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + + switch (testStateBackend) { + ca
[GitHub] flink pull request #5955: [FLINK-8659] Add migration itcases for broadcast s...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5955#discussion_r188511258 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java --- @@ -0,0 +1,418 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing.utils; + +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; +import org.apache.flink.runtime.state.StateBackendLoader; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.datastream.BroadcastStream; +import org.apache.flink.streaming.api.datastream.KeyedStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.util.migration.MigrationVersion; +import org.apache.flink.util.Collector; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +/** + * Migration ITCases for a stateful job with broadcast state. The tests are parameterized to (potentially) + * cover migrating for multiple previous Flink versions, as well as for different state backends. + */ +@RunWith(Parameterized.class) +public class StatefulJobWBroadcastStateMigrationITCase extends SavepointMigrationTestBase { + + private static final int NUM_SOURCE_ELEMENTS = 4; + + // TODO change this to PERFORM_SAVEPOINT to regenerate binary savepoints + private final StatefulJobSavepointMigrationITCase.ExecutionMode executionMode = + StatefulJobSavepointMigrationITCase.ExecutionMode.VERIFY_SAVEPOINT; + + @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}") + public static Collection<Tuple2<MigrationVersion, String>> parameters () { + return Arrays.asList( + Tuple2.of(MigrationVersion.v1_5, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), + Tuple2.of(MigrationVersion.v1_5, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME)); + } + + private final MigrationVersion testMigrateVersion; + private final String testStateBackend; + + public StatefulJobWBroadcastStateMigrationITCase(Tuple2<MigrationVersion, String> testMigrateVersionAndBackend) throws Exception { + this.testMigrateVersion = testMigrateVersionAndBackend.f0; + this.testStateBackend = testMigrateVersionAndBackend.f1; + } + + @Test + public void testSavepoint() throws Exception { + + final int parallelism = 4; + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setRestartStrategy(RestartStrategies.noRestart()); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + + switch (testStateBackend) { + ca
[GitHub] flink pull request #5955: [FLINK-8659] Add migration itcases for broadcast s...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5955#discussion_r188512862 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java --- @@ -0,0 +1,418 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing.utils; + +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; +import org.apache.flink.runtime.state.StateBackendLoader; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.datastream.BroadcastStream; +import org.apache.flink.streaming.api.datastream.KeyedStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.util.migration.MigrationVersion; +import org.apache.flink.util.Collector; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +/** + * Migration ITCases for a stateful job with broadcast state. The tests are parameterized to (potentially) + * cover migrating for multiple previous Flink versions, as well as for different state backends. + */ +@RunWith(Parameterized.class) +public class StatefulJobWBroadcastStateMigrationITCase extends SavepointMigrationTestBase { + + private static final int NUM_SOURCE_ELEMENTS = 4; + + // TODO change this to PERFORM_SAVEPOINT to regenerate binary savepoints + private final StatefulJobSavepointMigrationITCase.ExecutionMode executionMode = + StatefulJobSavepointMigrationITCase.ExecutionMode.VERIFY_SAVEPOINT; + + @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}") + public static Collection<Tuple2<MigrationVersion, String>> parameters () { + return Arrays.asList( + Tuple2.of(MigrationVersion.v1_5, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), + Tuple2.of(MigrationVersion.v1_5, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME)); + } + + private final MigrationVersion testMigrateVersion; + private final String testStateBackend; + + public StatefulJobWBroadcastStateMigrationITCase(Tuple2<MigrationVersion, String> testMigrateVersionAndBackend) throws Exception { + this.testMigrateVersion = testMigrateVersionAndBackend.f0; + this.testStateBackend = testMigrateVersionAndBackend.f1; + } + + @Test + public void testSavepoint() throws Exception { + + final int parallelism = 4; + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setRestartStrategy(RestartStrategies.noRestart()); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + + switch (testStateBackend) { + ca
[GitHub] flink pull request #5955: [FLINK-8659] Add migration itcases for broadcast s...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5955#discussion_r188512804 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java --- @@ -0,0 +1,418 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing.utils; + +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; +import org.apache.flink.runtime.state.StateBackendLoader; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.datastream.BroadcastStream; +import org.apache.flink.streaming.api.datastream.KeyedStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.util.migration.MigrationVersion; +import org.apache.flink.util.Collector; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +/** + * Migration ITCases for a stateful job with broadcast state. The tests are parameterized to (potentially) + * cover migrating for multiple previous Flink versions, as well as for different state backends. + */ +@RunWith(Parameterized.class) +public class StatefulJobWBroadcastStateMigrationITCase extends SavepointMigrationTestBase { + + private static final int NUM_SOURCE_ELEMENTS = 4; + + // TODO change this to PERFORM_SAVEPOINT to regenerate binary savepoints + private final StatefulJobSavepointMigrationITCase.ExecutionMode executionMode = + StatefulJobSavepointMigrationITCase.ExecutionMode.VERIFY_SAVEPOINT; + + @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}") + public static Collection<Tuple2<MigrationVersion, String>> parameters () { + return Arrays.asList( + Tuple2.of(MigrationVersion.v1_5, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), + Tuple2.of(MigrationVersion.v1_5, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME)); + } + + private final MigrationVersion testMigrateVersion; + private final String testStateBackend; + + public StatefulJobWBroadcastStateMigrationITCase(Tuple2<MigrationVersion, String> testMigrateVersionAndBackend) throws Exception { + this.testMigrateVersion = testMigrateVersionAndBackend.f0; + this.testStateBackend = testMigrateVersionAndBackend.f1; + } + + @Test + public void testSavepoint() throws Exception { + + final int parallelism = 4; + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setRestartStrategy(RestartStrategies.noRestart()); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + + switch (testStateBackend) { + ca
[GitHub] flink pull request #5955: [FLINK-8659] Add migration itcases for broadcast s...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5955#discussion_r188511280 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java --- @@ -0,0 +1,418 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing.utils; + +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; +import org.apache.flink.runtime.state.StateBackendLoader; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.datastream.BroadcastStream; +import org.apache.flink.streaming.api.datastream.KeyedStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.util.migration.MigrationVersion; +import org.apache.flink.util.Collector; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +/** + * Migration ITCases for a stateful job with broadcast state. The tests are parameterized to (potentially) + * cover migrating for multiple previous Flink versions, as well as for different state backends. + */ +@RunWith(Parameterized.class) +public class StatefulJobWBroadcastStateMigrationITCase extends SavepointMigrationTestBase { + + private static final int NUM_SOURCE_ELEMENTS = 4; + + // TODO change this to PERFORM_SAVEPOINT to regenerate binary savepoints + private final StatefulJobSavepointMigrationITCase.ExecutionMode executionMode = + StatefulJobSavepointMigrationITCase.ExecutionMode.VERIFY_SAVEPOINT; + + @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}") + public static Collection<Tuple2<MigrationVersion, String>> parameters () { + return Arrays.asList( + Tuple2.of(MigrationVersion.v1_5, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), + Tuple2.of(MigrationVersion.v1_5, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME)); + } + + private final MigrationVersion testMigrateVersion; + private final String testStateBackend; + + public StatefulJobWBroadcastStateMigrationITCase(Tuple2<MigrationVersion, String> testMigrateVersionAndBackend) throws Exception { + this.testMigrateVersion = testMigrateVersionAndBackend.f0; + this.testStateBackend = testMigrateVersionAndBackend.f1; + } + + @Test + public void testSavepoint() throws Exception { + + final int parallelism = 4; + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setRestartStrategy(RestartStrategies.noRestart()); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + + switch (testStateBackend) { + ca
[GitHub] flink pull request #6008: [FLINK-9354][travis] Print execution times for nig...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/6008#discussion_r188507930 --- Diff: flink-end-to-end-tests/run-nightly-tests.sh --- @@ -43,121 +43,151 @@ EXIT_CODE=0 # printf "\n==\n" # printf "Running my fancy nightly end-to-end test\n" # printf "==\n" +# start_timer # $END_TO_END_DIR/test-scripts/test_something_very_fancy.sh # EXIT_CODE=$? +# end_timer # fi if [ $EXIT_CODE == 0 ]; then printf "\n==\n" printf "Running HA end-to-end test\n" printf "==\n" +start_timer $END_TO_END_DIR/test-scripts/test_ha.sh EXIT_CODE=$? +end_timer fi if [ $EXIT_CODE == 0 ]; then printf "\n==\n" printf "Running Resuming Savepoint (file, async, no parallelism change) end-to-end test\n" printf "==\n" + start_timer STATE_BACKEND_TYPE=file STATE_BACKEND_FILE_ASYNC=true $END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 2 EXIT_CODE=$? + end_timer fi if [ $EXIT_CODE == 0 ]; then printf "\n==\n" printf "Running Resuming Savepoint (file, sync, no parallelism change) end-to-end test\n" printf "==\n" + start_timer STATE_BACKEND_TYPE=file STATE_BACKEND_FILE_ASYNC=false $END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 2 EXIT_CODE=$? + end_timer fi if [ $EXIT_CODE == 0 ]; then printf "\n==\n" printf "Running Resuming Savepoint (file, async, scale up) end-to-end test\n" printf "==\n" + start_timer STATE_BACKEND_TYPE=file STATE_BACKEND_FILE_ASYNC=true $END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 4 EXIT_CODE=$? + end_timer fi --- End diff -- Perhaps we can refactor this whole block into a common base script for the nightly / pre-commit hook scripts. The end_timer / start_timer functions should also be located there. ---
[GitHub] flink issue #6008: [FLINK-9354][travis] Print execution times for nightly E2...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/6008 I see these in the Travis logs: ``` flink-end-to-end-tests/run-pre-commit-tests.sh: line 94: start_timer: command not found ... flink-end-to-end-tests/run-pre-commit-tests.sh: line 97: end_timer: command not found ``` I think you'll have to include `common.sh` in the nightly / pre-commit hook scripts. ---
[GitHub] flink issue #6018: [FLINK-9372] Typo on Elasticsearch website link (elastic....
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/6018 Thanks for catching this @medcv! Apparently elastic.io directs to a completely different website 😅. Merging this .. ---
[GitHub] flink pull request #5977: [FLINK-9295][kafka] Fix transactional.id collision...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5977#discussion_r188205000 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java --- @@ -71,6 +71,17 @@ @PublicEvolving MetricGroup getMetricGroup(); + /** +* Returned value is guaranteed to be unique between operators within the same job and to be +* stable and the same across job submissions. +* +* This operation is currently only supported in Streaming (DataStream) contexts. +* +* @return String representation of the operator's unique id. +*/ + @PublicEvolving + String getOperatorUniqueID(); --- End diff -- I'm slightly leaning towards Stephan's suggestion, which I also agree is the better solution for this case. It might be ok to have this as a "hidden" API for now anyways, since 1) it is marked `@PublicEvolving`, and 2) the API was added in quite a short timeframe. If we want this fix in 1.5, I wouldn't suggest "fully" exposing it. ---
[GitHub] flink issue #6002: [FLINK-9350] Parameter baseInterval has wrong check messa...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/6002 LGTM, merging .. ---
[GitHub] flink issue #5984: [FLINK-9328][state] Fix RocksDBStateBackend restore probl...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5984 Hi @Myasuka, it seems like your local branch is not properly rebased on the latest master. Could you try rebasing again and reopening the PR? ---
[GitHub] flink issue #6008: [FLINK-9354][travis] Print execution times for nightly E2...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/6008 +1 LGTM ---
[GitHub] flink pull request #5823: [FLINK-9008] [e2e] Implements quickstarts end to e...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5823#discussion_r188195971 --- Diff: flink-end-to-end-tests/test-scripts/test_quickstarts.sh --- @@ -0,0 +1,123 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# End to end test for quick starts test. + +CURRENT_DIR=$(cd "$( dirname "$0" )" && pwd ) + +cd $CURRENT_DIR + +mvn archetype:generate \ +-DarchetypeGroupId=org.apache.flink\ +-DarchetypeArtifactId=flink-quickstart-java\ +-DarchetypeVersion=1.5-SNAPSHOT\ +-DgroupId=org.apache.flink.quickstart \ +-DartifactId=flink-java-project\ +-Dversion=0.1 \ +-Dpackage=org.apache.flink.quickstart \ +-DinteractiveMode=false + +cd flink-java-project + +cp $CURRENT_DIR/../flink-quickstart-test/src/main/java/org/apache/flink/quickstart/ElasticsearchStreamingJob.java $CURRENT_DIR/flink-java-project/src/main/java/org/apache/flink/quickstart/ + +position=$(awk '// {print NR}' pom.xml | head -1) + +sed -i -e ''"$(($position + 1))"'i\ +\ +org.apache.flink\ +flink-connector-elasticsearch2_${scala.binary.version}\ +${flink.version}\ +' pom.xml + +sed -i -e "s/org.apache.flink.quickstart.StreamingJob/org.apache.flink.quickstart.ElasticsearchStreamingJob/" pom.xml + +mvn clean package -nsu + +cd target +jar tvf flink-java-project-0.1.jar > contentsInJar.txt + +if [[ `grep -c "org/apache/flink/api/java" contentsInJar.txt` -eq '0' && \ + `grep -c "org/apache/flink/streaming/api" contentsInJar.txt` -eq '0' && \ + `grep -c "org/apache/flink/streaming/experimental" contentsInJar.txt` -eq '0' && \ + `grep -c "org/apache/flink/streaming/runtime" contentsInJar.txt` -eq '0' && \ + `grep -c "org/apache/flink/streaming/util" contentsInJar.txt` -eq '0' ]]; then + +echo "Success: There are no flink core classes are contained in the jar." +else +echo "Failure: There are flink core classes are contained in the jar." +exit 1 +fi + +if [[ `grep -c "org/apache/flink/quickstart/StreamingJob.class" contentsInJar.txt` -eq '0' && \ + `grep -c "org/apache/flink/quickstart/ElasticsearchStreamingJob.class" contentsInJar.txt` -eq '0' && \ + `grep -c "org/apache/flink/streaming/connectors/elasticsearch2" contentsInJar.txt` -eq '0' ]]; then + +echo "Failure: Since ElasticsearchStreamingJob.class and other user classes are not included in the jar. " +exit 1 +else +echo "Success: ElasticsearchStreamingJob.class and other user classes are included in the jar." +fi + +cd $CURRENT_DIR + +source "$(dirname "$0")"/common.sh + +start_cluster + +mkdir -p $TEST_DATA_DIR + +ELASTICSEARCH_URL="https://download.elastic.co/elasticsearch/release/org/elasticsearch/distribution/tar/elasticsearch/2.3.5/elasticsearch-2.3.5.tar.gz; + +curl "$ELASTICSEARCH_URL" > $TEST_DATA_DIR/elasticsearch.tar.gz +tar xzf $TEST_DATA_DIR/elasticsearch.tar.gz -C $TEST_DATA_DIR/ +ELASTICSEARCH_DIR=$TEST_DATA_DIR/elasticsearch-2.3.5 + +# start elasticsearch cluster +nohup $ELASTICSEARCH_DIR/bin/elasticsearch & + +TEST_PROGRAM_JAR=$CURRENT_DIR/flink-java-project/target/flink-java-project-0.1.jar + +# run the Flink job +$FLINK_DIR/bin/flink run -c org.apache.flink.quickstart.ElasticsearchStreamingJob $TEST_P
[GitHub] flink pull request #5823: [FLINK-9008] [e2e] Implements quickstarts end to e...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5823#discussion_r188195534 --- Diff: flink-end-to-end-tests/test-scripts/test_quickstarts.sh --- @@ -0,0 +1,123 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# End to end test for quick starts test. + +CURRENT_DIR=$(cd "$( dirname "$0" )" && pwd ) + +cd $CURRENT_DIR + +mvn archetype:generate \ +-DarchetypeGroupId=org.apache.flink\ +-DarchetypeArtifactId=flink-quickstart-java\ +-DarchetypeVersion=1.5-SNAPSHOT\ +-DgroupId=org.apache.flink.quickstart \ +-DartifactId=flink-java-project\ +-Dversion=0.1 \ +-Dpackage=org.apache.flink.quickstart \ +-DinteractiveMode=false + +cd flink-java-project + +cp $CURRENT_DIR/../flink-quickstart-test/src/main/java/org/apache/flink/quickstart/ElasticsearchStreamingJob.java $CURRENT_DIR/flink-java-project/src/main/java/org/apache/flink/quickstart/ + +position=$(awk '// {print NR}' pom.xml | head -1) + +sed -i -e ''"$(($position + 1))"'i\ +\ +org.apache.flink\ +flink-connector-elasticsearch2_${scala.binary.version}\ +${flink.version}\ +' pom.xml + +sed -i -e "s/org.apache.flink.quickstart.StreamingJob/org.apache.flink.quickstart.ElasticsearchStreamingJob/" pom.xml + +mvn clean package -nsu + +cd target +jar tvf flink-java-project-0.1.jar > contentsInJar.txt + +if [[ `grep -c "org/apache/flink/api/java" contentsInJar.txt` -eq '0' && \ + `grep -c "org/apache/flink/streaming/api" contentsInJar.txt` -eq '0' && \ + `grep -c "org/apache/flink/streaming/experimental" contentsInJar.txt` -eq '0' && \ + `grep -c "org/apache/flink/streaming/runtime" contentsInJar.txt` -eq '0' && \ + `grep -c "org/apache/flink/streaming/util" contentsInJar.txt` -eq '0' ]]; then + +echo "Success: There are no flink core classes are contained in the jar." +else +echo "Failure: There are flink core classes are contained in the jar." +exit 1 --- End diff -- also set `PASS=""` if you want to fail the e2e test ---
[GitHub] flink pull request #5823: [FLINK-9008] [e2e] Implements quickstarts end to e...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5823#discussion_r188195999 --- Diff: flink-end-to-end-tests/test-scripts/test_quickstarts.sh --- @@ -0,0 +1,123 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# End to end test for quick starts test. + +CURRENT_DIR=$(cd "$( dirname "$0" )" && pwd ) + +cd $CURRENT_DIR + +mvn archetype:generate \ +-DarchetypeGroupId=org.apache.flink\ +-DarchetypeArtifactId=flink-quickstart-java\ +-DarchetypeVersion=1.5-SNAPSHOT\ +-DgroupId=org.apache.flink.quickstart \ +-DartifactId=flink-java-project\ +-Dversion=0.1 \ +-Dpackage=org.apache.flink.quickstart \ +-DinteractiveMode=false + +cd flink-java-project + +cp $CURRENT_DIR/../flink-quickstart-test/src/main/java/org/apache/flink/quickstart/ElasticsearchStreamingJob.java $CURRENT_DIR/flink-java-project/src/main/java/org/apache/flink/quickstart/ + +position=$(awk '// {print NR}' pom.xml | head -1) + +sed -i -e ''"$(($position + 1))"'i\ +\ +org.apache.flink\ +flink-connector-elasticsearch2_${scala.binary.version}\ +${flink.version}\ +' pom.xml + +sed -i -e "s/org.apache.flink.quickstart.StreamingJob/org.apache.flink.quickstart.ElasticsearchStreamingJob/" pom.xml + +mvn clean package -nsu + +cd target +jar tvf flink-java-project-0.1.jar > contentsInJar.txt + +if [[ `grep -c "org/apache/flink/api/java" contentsInJar.txt` -eq '0' && \ + `grep -c "org/apache/flink/streaming/api" contentsInJar.txt` -eq '0' && \ + `grep -c "org/apache/flink/streaming/experimental" contentsInJar.txt` -eq '0' && \ + `grep -c "org/apache/flink/streaming/runtime" contentsInJar.txt` -eq '0' && \ + `grep -c "org/apache/flink/streaming/util" contentsInJar.txt` -eq '0' ]]; then + +echo "Success: There are no flink core classes are contained in the jar." +else +echo "Failure: There are flink core classes are contained in the jar." +exit 1 +fi + +if [[ `grep -c "org/apache/flink/quickstart/StreamingJob.class" contentsInJar.txt` -eq '0' && \ + `grep -c "org/apache/flink/quickstart/ElasticsearchStreamingJob.class" contentsInJar.txt` -eq '0' && \ + `grep -c "org/apache/flink/streaming/connectors/elasticsearch2" contentsInJar.txt` -eq '0' ]]; then + +echo "Failure: Since ElasticsearchStreamingJob.class and other user classes are not included in the jar. " +exit 1 +else +echo "Success: ElasticsearchStreamingJob.class and other user classes are included in the jar." +fi + +cd $CURRENT_DIR + +source "$(dirname "$0")"/common.sh + +start_cluster + +mkdir -p $TEST_DATA_DIR + +ELASTICSEARCH_URL="https://download.elastic.co/elasticsearch/release/org/elasticsearch/distribution/tar/elasticsearch/2.3.5/elasticsearch-2.3.5.tar.gz; + +curl "$ELASTICSEARCH_URL" > $TEST_DATA_DIR/elasticsearch.tar.gz +tar xzf $TEST_DATA_DIR/elasticsearch.tar.gz -C $TEST_DATA_DIR/ +ELASTICSEARCH_DIR=$TEST_DATA_DIR/elasticsearch-2.3.5 + +# start elasticsearch cluster +nohup $ELASTICSEARCH_DIR/bin/elasticsearch & + +TEST_PROGRAM_JAR=$CURRENT_DIR/flink-java-project/target/flink-java-project-0.1.jar + +# run the Flink job +$FLINK_DIR/bin/flink run -c org.apache.flink.quickstart.ElasticsearchStreamingJob $TEST_P
[GitHub] flink pull request #5823: [FLINK-9008] [e2e] Implements quickstarts end to e...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5823#discussion_r188196219 --- Diff: flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala --- @@ -34,7 +34,6 @@ import org.apache.flink.runtime.instance.InstanceManager import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkScheduler} import org.apache.flink.runtime.jobmanager.{JobManager, SubmittedJobGraphStore} import org.apache.flink.runtime.leaderelection.LeaderElectionService -import org.apache.flink.runtime.metrics.MetricRegistryImpl --- End diff -- Is this irrelevant? Would prefer this as a separate hotfix. ---
[GitHub] flink pull request #5823: [FLINK-9008] [e2e] Implements quickstarts end to e...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5823#discussion_r188196115 --- Diff: flink-end-to-end-tests/test-scripts/test_quickstarts.sh --- @@ -0,0 +1,123 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# End to end test for quick starts test. + +CURRENT_DIR=$(cd "$( dirname "$0" )" && pwd ) + +cd $CURRENT_DIR + +mvn archetype:generate \ +-DarchetypeGroupId=org.apache.flink\ +-DarchetypeArtifactId=flink-quickstart-java\ +-DarchetypeVersion=1.5-SNAPSHOT\ +-DgroupId=org.apache.flink.quickstart \ +-DartifactId=flink-java-project\ +-Dversion=0.1 \ +-Dpackage=org.apache.flink.quickstart \ +-DinteractiveMode=false + +cd flink-java-project + +cp $CURRENT_DIR/../flink-quickstart-test/src/main/java/org/apache/flink/quickstart/ElasticsearchStreamingJob.java $CURRENT_DIR/flink-java-project/src/main/java/org/apache/flink/quickstart/ + +position=$(awk '// {print NR}' pom.xml | head -1) + +sed -i -e ''"$(($position + 1))"'i\ +\ +org.apache.flink\ +flink-connector-elasticsearch2_${scala.binary.version}\ +${flink.version}\ +' pom.xml + +sed -i -e "s/org.apache.flink.quickstart.StreamingJob/org.apache.flink.quickstart.ElasticsearchStreamingJob/" pom.xml + +mvn clean package -nsu + +cd target +jar tvf flink-java-project-0.1.jar > contentsInJar.txt + +if [[ `grep -c "org/apache/flink/api/java" contentsInJar.txt` -eq '0' && \ + `grep -c "org/apache/flink/streaming/api" contentsInJar.txt` -eq '0' && \ + `grep -c "org/apache/flink/streaming/experimental" contentsInJar.txt` -eq '0' && \ + `grep -c "org/apache/flink/streaming/runtime" contentsInJar.txt` -eq '0' && \ + `grep -c "org/apache/flink/streaming/util" contentsInJar.txt` -eq '0' ]]; then + +echo "Success: There are no flink core classes are contained in the jar." +else +echo "Failure: There are flink core classes are contained in the jar." +exit 1 +fi + +if [[ `grep -c "org/apache/flink/quickstart/StreamingJob.class" contentsInJar.txt` -eq '0' && \ + `grep -c "org/apache/flink/quickstart/ElasticsearchStreamingJob.class" contentsInJar.txt` -eq '0' && \ + `grep -c "org/apache/flink/streaming/connectors/elasticsearch2" contentsInJar.txt` -eq '0' ]]; then + +echo "Failure: Since ElasticsearchStreamingJob.class and other user classes are not included in the jar. " +exit 1 +else +echo "Success: ElasticsearchStreamingJob.class and other user classes are included in the jar." +fi + +cd $CURRENT_DIR + +source "$(dirname "$0")"/common.sh + +start_cluster + +mkdir -p $TEST_DATA_DIR + +ELASTICSEARCH_URL="https://download.elastic.co/elasticsearch/release/org/elasticsearch/distribution/tar/elasticsearch/2.3.5/elasticsearch-2.3.5.tar.gz; + +curl "$ELASTICSEARCH_URL" > $TEST_DATA_DIR/elasticsearch.tar.gz +tar xzf $TEST_DATA_DIR/elasticsearch.tar.gz -C $TEST_DATA_DIR/ +ELASTICSEARCH_DIR=$TEST_DATA_DIR/elasticsearch-2.3.5 + +# start elasticsearch cluster +nohup $ELASTICSEARCH_DIR/bin/elasticsearch & + +TEST_PROGRAM_JAR=$CURRENT_DIR/flink-java-project/target/flink-java-project-0.1.jar + +# run the Flink job --- End diff -- Before running the Flink job, we should verify that the Elasticsearch node really is running. ---
[GitHub] flink pull request #5823: [FLINK-9008] [e2e] Implements quickstarts end to e...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5823#discussion_r188194545 --- Diff: flink-end-to-end-tests/test-scripts/test_quickstarts.sh --- @@ -0,0 +1,123 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# End to end test for quick starts test. + +CURRENT_DIR=$(cd "$( dirname "$0" )" && pwd ) + +cd $CURRENT_DIR + +mvn archetype:generate \ +-DarchetypeGroupId=org.apache.flink\ +-DarchetypeArtifactId=flink-quickstart-java\ +-DarchetypeVersion=1.5-SNAPSHOT\ --- End diff -- This should be `1.6-SNAPSHOT` now? ---
[GitHub] flink pull request #5823: [FLINK-9008] [e2e] Implements quickstarts end to e...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5823#discussion_r188195349 --- Diff: flink-end-to-end-tests/test-scripts/test_quickstarts.sh --- @@ -0,0 +1,123 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# End to end test for quick starts test. + +CURRENT_DIR=$(cd "$( dirname "$0" )" && pwd ) + +cd $CURRENT_DIR + +mvn archetype:generate \ +-DarchetypeGroupId=org.apache.flink\ +-DarchetypeArtifactId=flink-quickstart-java\ +-DarchetypeVersion=1.5-SNAPSHOT\ +-DgroupId=org.apache.flink.quickstart \ +-DartifactId=flink-java-project\ +-Dversion=0.1 \ +-Dpackage=org.apache.flink.quickstart \ +-DinteractiveMode=false + +cd flink-java-project + +cp $CURRENT_DIR/../flink-quickstart-test/src/main/java/org/apache/flink/quickstart/ElasticsearchStreamingJob.java $CURRENT_DIR/flink-java-project/src/main/java/org/apache/flink/quickstart/ --- End diff -- The `$CURRENT_DIR` can be anywhere, so this path is basically invalid. In the e2e test scripts we should be referencing paths to built jar / files relative to `$TEST_INFRA_DIR`. ---
[GitHub] flink pull request #5823: [FLINK-9008] [e2e] Implements quickstarts end to e...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5823#discussion_r188194072 --- Diff: flink-end-to-end-tests/test-scripts/test_quickstarts.sh --- @@ -0,0 +1,123 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# End to end test for quick starts test. + +CURRENT_DIR=$(cd "$( dirname "$0" )" && pwd ) + +cd $CURRENT_DIR --- End diff -- I would suggest to generate the mvn project under `$TEST_DATA_DIR`. Then, it will be properly cleaned up after the test completes. ---
[GitHub] flink issue #5823: [FLINK-9008] [e2e] Implements quickstarts end to end test
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5823 @medcv That sounds like something we should do as part of the release process (if we are going to do it), orthogonal to this PR here. ---
[GitHub] flink pull request #5761: [FLINK-8989] [e2eTests] Elasticsearch1&2&5 end to ...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5761#discussion_r187848661 --- Diff: flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch1.sh --- @@ -0,0 +1,66 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# End to end test for ElasticsearchSink1.x. + +source "$(dirname "$0")"/common.sh + +start_cluster + +mkdir -p $TEST_DATA_DIR + +ELASTICSEARCH_URL="https://download.elastic.co/elasticsearch/elasticsearch/elasticsearch-1.7.1.tar.gz; +echo "Downing Elasticsearch from $ELASTICSEARCH_URL" +curl "$ELASTICSEARCH_URL" > $TEST_DATA_DIR/elasticsearch.tar.gz + +tar xzf $TEST_DATA_DIR/elasticsearch.tar.gz -C $TEST_DATA_DIR/ +ELASTICSEARCH_DIR=$TEST_DATA_DIR/elasticsearch-1.7.1 + +# start elasticsearch cluster +$ELASTICSEARCH_DIR/bin/elasticsearch -daemon + +CURRENT_DIR=$(cd "$( dirname "$0" )" && pwd ) + +TEST_PROGRAM_JAR=$CURRENT_DIR/../flink-elasticsearch1-test/target/Elasticsearch1SinkExample.jar + +# run the Flink job +$FLINK_DIR/bin/flink run -p 1 $TEST_PROGRAM_JAR \ + --index index \ + --type type + +touch $TEST_DATA_DIR/output + +curl 'localhost:9200/index/_search?q=*=21' > $TEST_DATA_DIR/output + +if [ -n "$(grep '\"total\" : 21' $TEST_DATA_DIR/output)" ]; then +echo "Elasticsearch end to end test pass." +fi --- End diff -- we should also have the `else` case, where `PASS` is set to `""`, so that the test properly fails. ---
[GitHub] flink pull request #5761: [FLINK-8989] [e2eTests] Elasticsearch1&2&5 end to ...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5761#discussion_r187847421 --- Diff: flink-end-to-end-tests/flink-elasticsearch1-test/src/main/java/org/apache/flink/elasticsearch1/test/Elasticsearch1SinkExample.java --- @@ -15,10 +15,11 @@ * limitations under the License. */ -package org.apache.flink.streaming.connectors.elasticsearch.examples; +package org.apache.flink.elasticsearch1.test; --- End diff -- I think the package should be: `org.apache.flink.streaming.tests` This would then be consistent with out other end-to-end test jobs ---
[GitHub] flink pull request #5761: [FLINK-8989] [e2eTests] Elasticsearch1&2&5 end to ...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5761#discussion_r187848008 --- Diff: flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch1.sh --- @@ -0,0 +1,66 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# End to end test for ElasticsearchSink1.x. + +source "$(dirname "$0")"/common.sh + +start_cluster + +mkdir -p $TEST_DATA_DIR + +ELASTICSEARCH_URL="https://download.elastic.co/elasticsearch/elasticsearch/elasticsearch-1.7.1.tar.gz; +echo "Downing Elasticsearch from $ELASTICSEARCH_URL" --- End diff -- `Downing` --> `Downloading` ---
[GitHub] flink pull request #5761: [FLINK-8989] [e2eTests] Elasticsearch1&2&5 end to ...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5761#discussion_r187847498 --- Diff: flink-end-to-end-tests/flink-elasticsearch1-test/src/main/java/org/apache/flink/elasticsearch1/test/Elasticsearch1SinkExample.java --- @@ -37,20 +38,33 @@ import java.util.Map; /** - * This example shows how to use the Elasticsearch Sink. Before running it you must ensure that - * you have a cluster named "elasticsearch" running or change the cluster name in the config map. + * End to end test for Elasticsearch1Sink. + * + * This example shows how to use the Elasticsearch1 Sink from an user endpoint. Before running it you + * must ensure that you have a cluster named "elasticsearch" running or change the cluster name in the config map. --- End diff -- This Javadoc doesn't make sense when considering it as a job simply used in end-to-end tests. ---
[GitHub] flink pull request #5761: [FLINK-8989] [e2eTests] Elasticsearch1&2&5 end to ...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5761#discussion_r187848355 --- Diff: flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch1.sh --- @@ -0,0 +1,66 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# End to end test for ElasticsearchSink1.x. + +source "$(dirname "$0")"/common.sh + +start_cluster + +mkdir -p $TEST_DATA_DIR + +ELASTICSEARCH_URL="https://download.elastic.co/elasticsearch/elasticsearch/elasticsearch-1.7.1.tar.gz; +echo "Downing Elasticsearch from $ELASTICSEARCH_URL" +curl "$ELASTICSEARCH_URL" > $TEST_DATA_DIR/elasticsearch.tar.gz + +tar xzf $TEST_DATA_DIR/elasticsearch.tar.gz -C $TEST_DATA_DIR/ +ELASTICSEARCH_DIR=$TEST_DATA_DIR/elasticsearch-1.7.1 + +# start elasticsearch cluster +$ELASTICSEARCH_DIR/bin/elasticsearch -daemon --- End diff -- There is no verification that the elasticsearch node is really up and ready before continuing. This could lead to brittle tests. ---
[GitHub] flink pull request #5761: [FLINK-8989] [e2eTests] Elasticsearch1&2&5 end to ...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5761#discussion_r187847923 --- Diff: flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch1.sh --- @@ -0,0 +1,66 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# End to end test for ElasticsearchSink1.x. --- End diff -- Can we have a single script for all Elasticsearch versions? It doesn't seem right that we have 3 scripts we a lot of duplicate bash code. ---
[GitHub] flink pull request #5761: [FLINK-8989] [e2eTests] Elasticsearch1&2&5 end to ...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5761#discussion_r187847586 --- Diff: flink-end-to-end-tests/flink-elasticsearch2-test/pom.xml --- @@ -0,0 +1,140 @@ + + +http://maven.apache.org/POM/4.0.0; +xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; +xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> + + flink-end-to-end-tests + org.apache.flink + 1.6-SNAPSHOT + .. + + + 4.0.0 + + flink-elasticsearch2-test_${scala.binary.version} + flink-elasticsearch2-test + jar + + + + 2.3.5 --- End diff -- I don't think we need to make this configurable in the tests. ---
[GitHub] flink pull request #5761: [FLINK-8989] [e2eTests] Elasticsearch1&2&5 end to ...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5761#discussion_r187847815 --- Diff: flink-end-to-end-tests/run-pre-commit-tests.sh --- @@ -53,6 +53,30 @@ if [ $EXIT_CODE == 0 ]; then EXIT_CODE=$? fi +if [ $EXIT_CODE == 0 ]; then +printf "\n==\n" +printf "Running Resuming Savepoint (no parallelism change) end-to-end test\n" +printf "==\n" +$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 2 +EXIT_CODE=$? +fi --- End diff -- The changes in this file seems to be unintended, and should be removed. ---
[GitHub] flink pull request #5676: [FLINK-8910][tests] Automated end-to-end test for ...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5676#discussion_r187845478 --- Diff: flink-end-to-end-tests/flink-local-recovery-and-allocation-test/src/main/java/org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.java --- @@ -0,0 +1,480 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; +import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.environment.CheckpointConfig; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.util.Collector; +import org.apache.flink.util.Preconditions; + +import org.apache.commons.lang3.RandomStringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Set; + +/** + * Automatic end-to-end test for local recovery (including sticky allocation). + * + * List of possible input parameters for this job: + * + * checkpointDir: the checkpoint directory, required. + * parallelism: the parallelism of the job, default 1. + * maxParallelism: the maximum parallelism of the job, default 1. + * checkpointInterval: the checkpointing interval in milliseconds, default 1000. + * restartDelay: the delay of the fixed delay restart strategy, default 0. + * externalizedCheckpoints: flag to activate externalized checkpoints, default false. + * stateBackend: choice for state backend between file and rocks, default file. + * killJvmOnFail: flag that determines whether or not an artificial failure induced by the test kills the JVM or not. + * asyncCheckpoints: flag for async checkpoints with file state backend, default true. + * incrementalCheckpoints: flag for incremental checkpoint with rocks state backend, default false. + * delay: sleep delay to throttle down the production of the source, default 0. + * maxAttempts: the maximum number of run attempts, before the job finishes with success, default 3. + * valueSize: size of the artificial value for each key in bytes, default 10. + * + */ +public class StickyAllocationAndLocalRecoveryTestJob { + + private static final Logger LOG = LoggerFactory.getLogger(StickyAllocationAndLocalRecoveryTestJob.class); + + public static void main(String[] args) throws Exception { + + final ParameterTool pt = ParameterTool.fromArgs(args); +
[GitHub] flink pull request #5676: [FLINK-8910][tests] Automated end-to-end test for ...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5676#discussion_r187843886 --- Diff: flink-end-to-end-tests/test-scripts/test_local_recovery_and_scheduling.sh --- @@ -0,0 +1,115 @@ +#!/usr/bin/env bash + + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +source "$(dirname "$0")"/common.sh + +# This function checks the logs for entries that indicate problems with local recovery +function check_logs { + local parallelism=$1 + local attempts=$2 + (( expected_count=parallelism * (attempts + 1) )) + + # Search for the log message that indicates restore problem from existing local state for the keyed backend. + local failed_local_recovery=$(grep '^.*Creating keyed state backend.* from alternative (2/2)\.$' $FLINK_DIR/log/* | wc -l | tr -d ' ') + + # Search for attempts to recover locally. + local attempt_local_recovery=$(grep '^.*Creating keyed state backend.* from alternative (1/2)\.$' $FLINK_DIR/log/* | wc -l | tr -d ' ') + + if [ ${failed_local_recovery} -ne 0 ] + then +PASS="" +echo "FAILURE: Found ${failed_local_recovery} failed attempt(s) for local recovery of correctly scheduled task(s)." + fi + + if [ ${attempt_local_recovery} -eq 0 ] + then +PASS="" +echo "FAILURE: Found no attempt for local recovery. Configuration problem?" + fi +} + +# This function does a cleanup after the test. The configuration is restored, the watchdog is terminated and temporary +# files and folders are deleted. +function cleanup_after_test { + # Reset the configurations + sed -i -e 's/state.backend.local-recovery: .*//' "$FLINK_DIR/conf/flink-conf.yaml" + sed -i -e 's/log4j.rootLogger=.*/log4j.rootLogger=INFO, file/' "$FLINK_DIR/conf/log4j.properties" + # + kill ${watchdog_pid} 2> /dev/null + wait ${watchdog_pid} 2> /dev/null + # + cleanup +} + +# Calls the cleanup step for this tests and exits with an error. +function cleanup_after_test_and_exit_fail { + cleanup_after_test + exit 1 +} + +## This function executes one run for a certain configuration +function run_local_recovery_test { + local parallelism=$1 + local max_attempts=$2 + local backend=$3 + local incremental=$4 + local kill_jvm=$5 + + echo "Running local recovery test on ${backend} backend: incremental checkpoints = ${incremental}, kill JVM = ${kill_jvm}." + TEST_PROGRAM_JAR=$TEST_INFRA_DIR/../../flink-end-to-end-tests/flink-local-recovery-and-allocation-test/target/StickyAllocationAndLocalRecoveryTestJob.jar + + # Enable debug logging + sed -i -e 's/log4j.rootLogger=.*/log4j.rootLogger=DEBUG, file/' "$FLINK_DIR/conf/log4j.properties" + + # Enable local recovery + sed -i -e 's/state.backend.local-recovery: .*//' "$FLINK_DIR/conf/flink-conf.yaml" + echo "state.backend.local-recovery: ENABLE_FILE_BASED" >> "$FLINK_DIR/conf/flink-conf.yaml" --- End diff -- FYI: we now have a `change_conf` method in `common.sh` to modify Flink configuration in e2e tests ---
[GitHub] flink pull request #5676: [FLINK-8910][tests] Automated end-to-end test for ...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5676#discussion_r187842776 --- Diff: flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java --- @@ -140,7 +140,8 @@ public static final ConfigOption SLOT_IDLE_TIMEOUT = key("slot.idle.timeout") - .defaultValue(10L * 1000L) + // default matches heartbeat.timeout so that sticky allocation is not lost on timeouts for local recovery + .defaultValue(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT.defaultValue()) --- End diff -- reminder: if this results in a different default value, I would suggest to make a note of the change in the "Release Notes" field of the JIRA ticket. ---
[GitHub] flink pull request #5676: [FLINK-8910][tests] Automated end-to-end test for ...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5676#discussion_r187844104 --- Diff: flink-end-to-end-tests/test-scripts/test_local_recovery_and_scheduling.sh --- @@ -0,0 +1,115 @@ +#!/usr/bin/env bash + + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +source "$(dirname "$0")"/common.sh + +# This function checks the logs for entries that indicate problems with local recovery +function check_logs { + local parallelism=$1 + local attempts=$2 + (( expected_count=parallelism * (attempts + 1) )) + + # Search for the log message that indicates restore problem from existing local state for the keyed backend. + local failed_local_recovery=$(grep '^.*Creating keyed state backend.* from alternative (2/2)\.$' $FLINK_DIR/log/* | wc -l | tr -d ' ') + + # Search for attempts to recover locally. + local attempt_local_recovery=$(grep '^.*Creating keyed state backend.* from alternative (1/2)\.$' $FLINK_DIR/log/* | wc -l | tr -d ' ') + + if [ ${failed_local_recovery} -ne 0 ] + then +PASS="" +echo "FAILURE: Found ${failed_local_recovery} failed attempt(s) for local recovery of correctly scheduled task(s)." + fi + + if [ ${attempt_local_recovery} -eq 0 ] + then +PASS="" +echo "FAILURE: Found no attempt for local recovery. Configuration problem?" + fi +} + +# This function does a cleanup after the test. The configuration is restored, the watchdog is terminated and temporary +# files and folders are deleted. +function cleanup_after_test { + # Reset the configurations + sed -i -e 's/state.backend.local-recovery: .*//' "$FLINK_DIR/conf/flink-conf.yaml" + sed -i -e 's/log4j.rootLogger=.*/log4j.rootLogger=INFO, file/' "$FLINK_DIR/conf/log4j.properties" + # + kill ${watchdog_pid} 2> /dev/null + wait ${watchdog_pid} 2> /dev/null + # + cleanup +} + +# Calls the cleanup step for this tests and exits with an error. +function cleanup_after_test_and_exit_fail { + cleanup_after_test + exit 1 +} + +## This function executes one run for a certain configuration +function run_local_recovery_test { + local parallelism=$1 + local max_attempts=$2 + local backend=$3 + local incremental=$4 + local kill_jvm=$5 + + echo "Running local recovery test on ${backend} backend: incremental checkpoints = ${incremental}, kill JVM = ${kill_jvm}." + TEST_PROGRAM_JAR=$TEST_INFRA_DIR/../../flink-end-to-end-tests/flink-local-recovery-and-allocation-test/target/StickyAllocationAndLocalRecoveryTestJob.jar + + # Enable debug logging + sed -i -e 's/log4j.rootLogger=.*/log4j.rootLogger=DEBUG, file/' "$FLINK_DIR/conf/log4j.properties" + + # Enable local recovery + sed -i -e 's/state.backend.local-recovery: .*//' "$FLINK_DIR/conf/flink-conf.yaml" + echo "state.backend.local-recovery: ENABLE_FILE_BASED" >> "$FLINK_DIR/conf/flink-conf.yaml" + + rm $FLINK_DIR/log/* 2> /dev/null + + start_cluster + + tm_watchdog ${parallelism} & + watchdog_pid=$! + + echo "Started TM watchdog with PID ${watchdog_pid}." + + $FLINK_DIR/bin/flink run -c org.apache.flink.streaming.tests.StickyAllocationAndLocalRecoveryTestJob \ + -p ${parallelism} $TEST_PROGRAM_JAR \ + -D state.backend.local-recovery=ENABLE_FILE_BASED \ + --checkpointDir file://$TEST_DATA_DIR/local_recovery_test/checkpoints \ + --output $TEST_DATA_DIR/out/local_recovery_test/out --killJvmOnFail ${kill_jvm} --checkpointInterval 1000 \ + --maxAttempts ${max_attempts} --parallelism ${parallelism} --stateBacke
[GitHub] flink issue #5969: [FLINK-9074] [e2e] Add e2e for resuming from externalized...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5969 Thanks a lot for the review @fhueske. Will merge this! ---
[GitHub] flink pull request #6004: [FLINK-8977] [e2e] End-to-end test for manual job ...
GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/6004 [FLINK-8977] [e2e] End-to-end test for manual job resume after terminal failure ## What is the purpose of the change This PR is based on new e2e features introduced by #5941, #5990, and #5969. Only the last two commits are relevant to FLINK-8977. This PR adds e2e test coverage for the case that after a terminal failure caused by the user job code, manually resuming from a retained checkpoint works correctly. This is achieved by extending the `test_resume_externalized_checkpoints.sh` test script to accept a `SIMULATE_FAILURE` flag. ## Brief change log - 9360ea9 Extend the general purpose DataStream job to allow configuring restart strategies. - b5d713c Extend `test_resume_externalized_checkpoints.sh` to allow simulating the job failure + manual resume case. ## Verifying this change Verifiable by running locally the following e2e test script: `SIMULATE_FAILURE=true flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink FLINK-8977 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6004.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6004 commit 8db7f894b67b00f94148e0314a1c10d76266a350 Author: Tzu-Li (Gordon) Tai <tzulitai@...> Date: 2018-04-30T10:04:43Z [hotfix] [e2e-tests] Make SequenceGeneratorSource usable for 0-size key ranges commit c8e14673e58aed0f9625e38875ec85a776282ad4 Author: Tzu-Li (Gordon) Tai <tzulitai@...> Date: 2018-04-30T10:05:46Z [FLINK-8971] [e2e-tests] Include broadcast / union state in general purpose DataStream job commit 78354b295832fa2ec5d829ec4ac21150ecac1231 Author: Tzu-Li (Gordon) Tai <tzulitai@...> Date: 2018-05-08T03:44:13Z PR review - refactor source run function commit f346fd0958e7c3361886680912630fe22761a63d Author: Tzu-Li (Gordon) Tai <tzulitai@...> Date: 2018-05-08T04:39:40Z PR review - simplify broadcast / union state verification commit b01cfda7d77723e8ded2ce99ee12f17352a3ca1f Author: Tzu-Li (Gordon) Tai <tzulitai@...> Date: 2018-05-11T03:51:12Z [FLINK-9322] [e2e] Add failure simulation to the general purpose DataStream job commit 0931f6ed48523ca46e2c99adc24950777d843ac8 Author: Tzu-Li (Gordon) Tai <tzulitai@...> Date: 2018-05-11T07:09:00Z [FLINK-9320] [e2e] Update test_ha e2e to use general purpose DataStream job commit a819e56e0998e09bb6461b6c76be0807d83a1ef5 Author: Tzu-Li (Gordon) Tai <tzulitai@...> Date: 2018-05-09T03:40:24Z [FLINK-9074] [e2e] Allow configuring externalized checkpoints for the general purpose DataStream job commit 3d0c83a991ab78d03c3cc1c9ff2abb61e0329d9d Author: Tzu-Li (Gordon) Tai <tzulitai@...> Date: 2018-05-09T04:17:25Z [FLINK-9074] [e2e] Add e2e test for resuming jobs from retained checkpoints commit 9360ea9ad0db858e7fdeecb54b1918e6b84cae1d Author: Tzu-Li (Gordon) Tai <tzulitai@...> Date: 2018-05-14T03:56:07Z [FLINK-8977] [e2e] Allow configuring restart strategy for general purpose DataStream job commit b5d713cf19290be437286e152d921b23ff532c7d Author: Tzu-Li (Gordon) Tai <tzulitai@...> Date: 2018-05-14T03:56:41Z [FLINK-8977] [e2e] Extend externalized checkpoint e2e to simulate job failures ---
[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5992#discussion_r187788278 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java --- @@ -382,50 +386,62 @@ protected static boolean isRecoverableException(AmazonServiceException ex) { * @param startShardId which shard to start with for this describe operation (earlier shard's infos will not appear in result) * @return the result of the describe stream operation */ - private DescribeStreamResult describeStream(String streamName, @Nullable String startShardId) throws InterruptedException { - final DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest(); - describeStreamRequest.setStreamName(streamName); - describeStreamRequest.setExclusiveStartShardId(startShardId); + private ListShardsResult listShards(String streamName, @Nullable String startShardId, + @Nullable String startNextToken) + throws InterruptedException { + final ListShardsRequest listShardsRequest = new ListShardsRequest(); + if (startNextToken == null) { + listShardsRequest.setExclusiveStartShardId(startShardId); + listShardsRequest.setStreamName(streamName); + } else { + // Note the nextToken returned by AWS expires within 300 sec. + listShardsRequest.setNextToken(startNextToken); + } - DescribeStreamResult describeStreamResult = null; + ListShardsResult listShardsResults = null; - // Call DescribeStream, with full-jitter backoff (if we get LimitExceededException). + // Call ListShards, with full-jitter backoff (if we get LimitExceededException). int attemptCount = 0; - while (describeStreamResult == null) { // retry until we get a result + // List Shards returns just the first 1000 shard entries. Make sure that all entries + // are taken up. + while (listShardsResults == null) { // retry until we get a result try { - describeStreamResult = kinesisClient.describeStream(describeStreamRequest); + listShardsResults = kinesisClient.listShards(listShardsRequest); } catch (LimitExceededException le) { long backoffMillis = fullJitterBackoff( - describeStreamBaseBackoffMillis, describeStreamMaxBackoffMillis, describeStreamExpConstant, attemptCount++); - LOG.warn("Got LimitExceededException when describing stream " + streamName + ". Backing off for " - + backoffMillis + " millis."); + listShardsBaseBackoffMillis, listShardsMaxBackoffMillis, listShardsExpConstant, attemptCount++); + LOG.warn("Got LimitExceededException when listing shards from stream " + streamName + + ". Backing off for " + backoffMillis + " millis."); Thread.sleep(backoffMillis); - } catch (ResourceNotFoundException re) { - throw new RuntimeException("Error while getting stream details", re); - } - } - - String streamStatus = describeStreamResult.getStreamDescription().getStreamStatus(); - if (!(streamStatus.equals(StreamStatus.ACTIVE.toString()) || streamStatus.equals(StreamStatus.UPDATING.toString( { - if (LOG.isWarnEnabled()) { - LOG.warn("The status of stream " + streamName + " is " + streamStatus + "; result of the current " + - "describeStream operation will not contain any shard information."); + } catch (ResourceInUseException reInUse) { + if (LOG.isWarnEnabled()) { + // List Shards will throw an exception if stream in not in active state. Will return + LOG.warn("The stream is currently not in active state. Reusing the older state " + + "for the time being"); +
[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5992#discussion_r187788338 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java --- @@ -353,19 +355,21 @@ protected static boolean isRecoverableException(AmazonServiceException ex) { private List getShardsOfStream(String streamName, @Nullable String lastSeenShardId) throws InterruptedException { List shardsOfStream = new ArrayList<>(); - DescribeStreamResult describeStreamResult; + // List Shards returns just the first 1000 shard entries. In order to read the entire stream, + // we need to use the returned nextToken to get additional shards. + ListShardsResult listShardsResult; + String startShardToken = null; do { - describeStreamResult = describeStream(streamName, lastSeenShardId); - - List shards = describeStreamResult.getStreamDescription().getShards(); + listShardsResult = listShards(streamName, lastSeenShardId, startShardToken); --- End diff -- if within `listShards(...)` we caught the `ExpiredNextTokenException`, then `null` will be returned as the result, correct? If so, then the current built up `shardsIfStream` will be returned immediately, regardless of whether or not there are more shards following. Although it might not be too common that we have expired tokens here, I wonder if we can handle this case more gracefully (e.g., re-fetching a token to make sure that there really is no more shards). ---
[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5992#discussion_r187788213 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java --- @@ -65,14 +65,14 @@ public SentinelSequenceNumber toSentinelSequenceNumber() { /** The date format of initial timestamp to start reading Kinesis stream from (when AT_TIMESTAMP is set for STREAM_INITIAL_POSITION). */ public static final String STREAM_TIMESTAMP_DATE_FORMAT = "flink.stream.initpos.timestamp.format"; - /** The base backoff time between each describeStream attempt. */ - public static final String STREAM_DESCRIBE_BACKOFF_BASE = "flink.stream.describe.backoff.base"; --- End diff -- Changing the name of this variable, strictly speaking, breaks backwards compatibility, as users might be using them. ---
[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5992#discussion_r187788363 --- Diff: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java --- @@ -70,20 +113,107 @@ public void testIsRecoverableExceptionWithNullErrorType() { } @Test - public void testCustomConfigurationOverride() { - Properties configProps = new Properties(); - configProps.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1"); - KinesisProxy proxy = new KinesisProxy(configProps) { - @Override - protected AmazonKinesis createKinesisClient(Properties configProps) { - ClientConfiguration clientConfig = new ClientConfigurationFactory().getConfig(); - clientConfig.setSocketTimeout(1); - return AWSUtil.createKinesisClient(configProps, clientConfig); + public void testGetShardList() throws Exception { + List shardIds = + Arrays.asList( + "shardId-", + "shardId-0001", + "shardId-0002", + "shardId-0003"); + shardIdSet = new HashSet<>(shardIds); + shards = + shardIds + .stream() + .map(shardId -> new Shard().withShardId(shardId)) + .collect(Collectors.toList()); + Properties kinesisConsumerConfig = new Properties(); + kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1"); + kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "fake_accesskey"); + kinesisConsumerConfig.setProperty( + ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "fake_secretkey"); + KinesisProxy kinesisProxy = new KinesisProxy(kinesisConsumerConfig); + AmazonKinesis mockClient = mock(AmazonKinesis.class); + Whitebox.setInternalState(kinesisProxy, "kinesisClient", mockClient); + + ListShardsResult responseWithMoreData = + new ListShardsResult().withShards(shards.subList(0, 2)).withNextToken(NEXT_TOKEN); + ListShardsResult responseFinal = + new ListShardsResult().withShards(shards.subList(2, shards.size())).withNextToken(null); + doReturn(responseWithMoreData) + .when(mockClient) + .listShards(argThat(initialListShardsRequestMatcher())); + doReturn(responseFinal).when(mockClient).listShards(argThat(listShardsNextToken(NEXT_TOKEN))); + HashMap<String, String> streamHashMap = + createInitialSubscribedStreamsToLastDiscoveredShardsState(Arrays.asList(fakeStreamName)); + GetShardListResult shardListResult = kinesisProxy.getShardList(streamHashMap); + + Assert.assertEquals(shardListResult.hasRetrievedShards(), true); + + Set expectedStreams = new HashSet<>(); + expectedStreams.add(fakeStreamName); + Assert.assertEquals(shardListResult.getStreamsWithRetrievedShards(), expectedStreams); + List actualShardList = + shardListResult.getRetrievedShardListOfStream(fakeStreamName); + List expectedStreamShard = new ArrayList<>(); + System.out.println(actualShardList.toString()); + assertThat(actualShardList, hasSize(4)); + for (int i = 0; i < 4; i++) { + StreamShardHandle shardHandle = + new StreamShardHandle( + fakeStreamName, + new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(i))); + expectedStreamShard.add(shardHandle); + } + + Assert.assertThat( + actualShardList, + containsInAnyOrder( + expectedStreamShard.toArray(new StreamShardHandle[actualShardList.size()]))); + } + + private static class ListShardsRequestMatcher
[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5992#discussion_r187788223 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java --- @@ -65,14 +65,14 @@ public SentinelSequenceNumber toSentinelSequenceNumber() { /** The date format of initial timestamp to start reading Kinesis stream from (when AT_TIMESTAMP is set for STREAM_INITIAL_POSITION). */ public static final String STREAM_TIMESTAMP_DATE_FORMAT = "flink.stream.initpos.timestamp.format"; - /** The base backoff time between each describeStream attempt. */ - public static final String STREAM_DESCRIBE_BACKOFF_BASE = "flink.stream.describe.backoff.base"; --- End diff -- This goes the same for all other key variable rename changes in this class. I would suggest to deprecate existing ones if we want to change the name internally. ---
[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5992#discussion_r187788284 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java --- @@ -382,50 +386,62 @@ protected static boolean isRecoverableException(AmazonServiceException ex) { * @param startShardId which shard to start with for this describe operation (earlier shard's infos will not appear in result) * @return the result of the describe stream operation */ - private DescribeStreamResult describeStream(String streamName, @Nullable String startShardId) throws InterruptedException { - final DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest(); - describeStreamRequest.setStreamName(streamName); - describeStreamRequest.setExclusiveStartShardId(startShardId); + private ListShardsResult listShards(String streamName, @Nullable String startShardId, + @Nullable String startNextToken) + throws InterruptedException { + final ListShardsRequest listShardsRequest = new ListShardsRequest(); + if (startNextToken == null) { + listShardsRequest.setExclusiveStartShardId(startShardId); + listShardsRequest.setStreamName(streamName); + } else { + // Note the nextToken returned by AWS expires within 300 sec. + listShardsRequest.setNextToken(startNextToken); + } - DescribeStreamResult describeStreamResult = null; + ListShardsResult listShardsResults = null; - // Call DescribeStream, with full-jitter backoff (if we get LimitExceededException). + // Call ListShards, with full-jitter backoff (if we get LimitExceededException). int attemptCount = 0; - while (describeStreamResult == null) { // retry until we get a result + // List Shards returns just the first 1000 shard entries. Make sure that all entries + // are taken up. + while (listShardsResults == null) { // retry until we get a result try { - describeStreamResult = kinesisClient.describeStream(describeStreamRequest); + listShardsResults = kinesisClient.listShards(listShardsRequest); } catch (LimitExceededException le) { long backoffMillis = fullJitterBackoff( - describeStreamBaseBackoffMillis, describeStreamMaxBackoffMillis, describeStreamExpConstant, attemptCount++); - LOG.warn("Got LimitExceededException when describing stream " + streamName + ". Backing off for " - + backoffMillis + " millis."); + listShardsBaseBackoffMillis, listShardsMaxBackoffMillis, listShardsExpConstant, attemptCount++); + LOG.warn("Got LimitExceededException when listing shards from stream " + streamName + + ". Backing off for " + backoffMillis + " millis."); Thread.sleep(backoffMillis); - } catch (ResourceNotFoundException re) { - throw new RuntimeException("Error while getting stream details", re); - } - } - - String streamStatus = describeStreamResult.getStreamDescription().getStreamStatus(); - if (!(streamStatus.equals(StreamStatus.ACTIVE.toString()) || streamStatus.equals(StreamStatus.UPDATING.toString( { - if (LOG.isWarnEnabled()) { - LOG.warn("The status of stream " + streamName + " is " + streamStatus + "; result of the current " + - "describeStream operation will not contain any shard information."); + } catch (ResourceInUseException reInUse) { + if (LOG.isWarnEnabled()) { + // List Shards will throw an exception if stream in not in active state. Will return + LOG.warn("The stream is currently not in active state. Reusing the older state " + + "for the time being"); +
[GitHub] flink issue #5929: [FLINK-8497] [connectors] KafkaConsumer throws NPE if top...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5929 @alexpf thanks for the updates. Please bear with me just a little, I'll try to get back to reviewing this PR ASAP after I finish with some ongoing tasks. ---
[GitHub] flink pull request #5990: [FLINK-9322][FLINK-9320] [e2e] Improvements to e2e...
GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/5990 [FLINK-9322][FLINK-9320] [e2e] Improvements to e2e standalone chaos monkey test ## What is the purpose of the change This PR is based on #5941. Only the last 2 commits are relevant. This PR improves our standalone e2e chaos monkey test by: - Using the general purpose DataStream job, instead of the state machine example, to have a wider coverage of commonly used DataStream program building blocks. - Lets the running job simulate failures by throwing exceptions. This enhances the intensiveness of the chaos monkey test. ## Brief change log - b01cfda Allows the general purpose job to configure whether or not to simulate failures. This resolves FLINK-9322. - 4009406 in `test_ha.sh`, use the general purpose job instead. This change additionally lets the e2e test now have failures caused by the user application, and not just TM / JM shutdowns. It also changes the parameterization of the test script to be consistent with our other e2e test scripts. ## Verifying this change This is purely a change to improve current e2e tests. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink chaos-monkey-e2e Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5990.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5990 commit 8db7f894b67b00f94148e0314a1c10d76266a350 Author: Tzu-Li (Gordon) Tai <tzulitai@...> Date: 2018-04-30T10:04:43Z [hotfix] [e2e-tests] Make SequenceGeneratorSource usable for 0-size key ranges commit c8e14673e58aed0f9625e38875ec85a776282ad4 Author: Tzu-Li (Gordon) Tai <tzulitai@...> Date: 2018-04-30T10:05:46Z [FLINK-8971] [e2e-tests] Include broadcast / union state in general purpose DataStream job commit 78354b295832fa2ec5d829ec4ac21150ecac1231 Author: Tzu-Li (Gordon) Tai <tzulitai@...> Date: 2018-05-08T03:44:13Z PR review - refactor source run function commit f346fd0958e7c3361886680912630fe22761a63d Author: Tzu-Li (Gordon) Tai <tzulitai@...> Date: 2018-05-08T04:39:40Z PR review - simplify broadcast / union state verification commit b01cfda7d77723e8ded2ce99ee12f17352a3ca1f Author: Tzu-Li (Gordon) Tai <tzulitai@...> Date: 2018-05-11T03:51:12Z [FLINK-9322] [e2e] Add failure simulation to the general purpose DataStream job commit 4009406d4729486d57cc4a71bcb72d269583a762 Author: Tzu-Li (Gordon) Tai <tzulitai@...> Date: 2018-05-11T07:09:00Z [FLINK-9320] [e2e] Update test_ha e2e to use general purpose DataStream job ---
[GitHub] flink pull request #5969: [FLINK-9074] [e2e] Add e2e for resuming from exter...
GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/5969 [FLINK-9074] [e2e] Add e2e for resuming from externalized checkpoints ## What is the purpose of the change This PR adds an end-to-end test for resuming from externalized, retained checkpoints. The test does the following: - Runs the general purpose DataStream job, with externalized checkpoints enabled - Waits until the job has at least 1 completed checkpoints, AND has processed at least 200 records - Cancel the job - Make sure that there is exactly 1 externalized checkpoint available - Restore from that, wait for another 200 records to be processed to verify that exactly-once isn't violated ## Brief change log - Allow general purpose job to be configured for externalized checkpoints - Add new e2e test script `test_resume_externalized_checkpoint.sh` ## Verifying this change This PR adds a new test, `test_resume_externalized_checkpoint.sh`. ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink FLINK-9074 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5969.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5969 commit bb8dce7622d83d4c2214013de24193ffa47a1e75 Author: Tzu-Li (Gordon) Tai <tzulitai@...> Date: 2018-05-09T03:40:24Z [FLINK-9074] [e2e] Allow configuring externalized checkpoints for the general purpose DataStream job commit 91e7f911739a094d58051a7b620a4564fd7f6067 Author: Tzu-Li (Gordon) Tai <tzulitai@...> Date: 2018-05-09T04:17:25Z [FLINK-9074] [e2e] Add e2e test for resuming jobs from retained checkpoints ---
[GitHub] flink issue #5941: [FLINK-8971] [e2e] Include broadcast / union state in gen...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5941 @StefanRRichter the PR is ready for another review, thanks! ---
[GitHub] flink issue #5926: [FLINK-9073] [e2e-tests] Extend savepoint e2e tests for d...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5926 Thanks for the review @StefanRRichter! Will address your comment and merge this. ---
[GitHub] flink pull request #5926: [FLINK-9073] [e2e-tests] Extend savepoint e2e test...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5926#discussion_r186608526 --- Diff: flink-end-to-end-tests/run-nightly-tests.sh --- @@ -58,25 +58,97 @@ fi if [ $EXIT_CODE == 0 ]; then printf "\n==\n" - printf "Running Resuming Savepoint (no parallelism change) end-to-end test\n" + printf "Running Resuming Savepoint (file, async, no parallelism change) end-to-end test\n" printf "==\n" - $END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 2 + STATE_BACKEND_TYPE=file STATE_BACKEND_FILE_ASYNC=true $END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 2 EXIT_CODE=$? fi if [ $EXIT_CODE == 0 ]; then printf "\n==\n" - printf "Running Resuming Savepoint (scale up) end-to-end test\n" + printf "Running Resuming Savepoint (file, sync, no parallelism change) end-to-end test\n" printf "==\n" - $END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 4 + STATE_BACKEND_TYPE=file STATE_BACKEND_FILE_ASYNC=false $END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 2 EXIT_CODE=$? fi if [ $EXIT_CODE == 0 ]; then printf "\n==\n" - printf "Running Resuming Savepoint (scale down) end-to-end test\n" + printf "Running Resuming Savepoint (file, async, scale up) end-to-end test\n" printf "==\n" - $END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 4 2 + STATE_BACKEND_TYPE=file STATE_BACKEND_FILE_ASYNC=true $END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 4 + EXIT_CODE=$? +fi + +if [ $EXIT_CODE == 0 ]; then + printf "\n==\n" + printf "Running Resuming Savepoint (file, sync, scale up) end-to-end test\n" + printf "==\n" + STATE_BACKEND_TYPE=file STATE_BACKEND_FILE_ASYNC=false $END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 4 + EXIT_CODE=$? +fi + +if [ $EXIT_CODE == 0 ]; then + printf "\n==\n" + printf "Running Resuming Savepoint (file, async, scale down) end-to-end test\n" + printf "==\n" + STATE_BACKEND_TYPE=file STATE_BACKEND_FILE_ASYNC=true $END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 4 2 + EXIT_CODE=$? +fi + +if [ $EXIT_CODE == 0 ]; then + printf "\n==\n" + printf "Running Resuming Savepoint (file, sync, scale down) end-to-end test\n" + printf "==\n" + STATE_BACKEND_TYPE=file STATE_BACKEND_FILE_ASYNC=false $END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 4 2 + EXIT_CODE=$? +fi + +if [ $EXIT_CODE == 0 ]; then + printf "\n==\n" + printf "Running Resuming Savepoint (rocks, non-incremental, no parallelism change) end-to-end test\n" + printf "==\n" + STATE_BACKEND_TYPE=rocks STATE_BACKEND_ROCKS_INCREMENTAL=false $END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 2 + EXIT_CODE=$? +fi + +if [ $EXIT_CODE == 0 ]; then + printf "\n==\n" + printf "Running Resuming Savepoint (rocks, incremental, no parallelism change) end-to-end test\n" + printf "==\n" + STATE_BACKEND_TYPE=rocks STATE_BACKEND_ROCKS_INCREMENTAL=true $END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 2 --- End diff -- This makes sense, will address this. ---
[GitHub] flink issue #5958: [FLINK-8500] Get the timestamp of the Kafka message from ...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5958 Thanks for the update @FredTing. I'll try to take another look at the PR within the next days. ---
[GitHub] flink pull request #5958: [FLINK-8500] Get the timestamp of the Kafka messag...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5958#discussion_r186606104 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java --- @@ -42,14 +42,22 @@ @Public public interface DeserializationSchema extends Serializable, ResultTypeQueryable { + /** +* @deprecated Use {@link #deserialize(ConsumerRecordMetaInfo)} . +*/ + @Deprecated + T deserialize(byte[] message) throws IOException; + /** * Deserializes the byte message. * -* @param message The message, as a byte array. +* @param consumerRecordMetaInfossage The message, as a {@link ConsumerRecordMetaInfo}. * * @return The deserialized message as an object (null if the message cannot be deserialized). */ - T deserialize(byte[] message) throws IOException; + default T deserialize(ConsumerRecordMetaInfo consumerRecordMetaInfossage) throws IOException { --- End diff -- Makes sense. Alright, lets leave this as is then. ---
[GitHub] flink pull request #5922: [FLINK-8780] [docs] Add Broadcast State documentat...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5922#discussion_r186341972 --- Diff: docs/dev/stream/state/broadcast_state.md --- @@ -0,0 +1,279 @@ +--- +title: "The Broadcast State Pattern" +nav-parent_id: streaming_state +nav-pos: 2 +--- + + +* ToC +{:toc} + +[Working with State](state.html) describes operator state which upon restore is either evenly distributed among the +parallel tasks of an operator, or unioned, with the whole state being used to initialize the restored parallel tasks. + +A third type of supported *operator state* is the *Broadcast State*. Broadcast state was introduced to support use cases +where some data coming from one stream is required to be broadcasted to all downstream tasks, where it is stored locally +and is used to process all incoming elements on the other stream. As an example where broadcast state can emerge as a +natural fit, one can imagine a low-throughput stream containing a set of rules which we want to evaluate against all +elements coming from another stream. Having the above type of use cases in mind, broadcast state differs from the rest +of operator states in that: + 1. it has a map format, + 2. it is only available to specific operators that have as inputs a *broadcasted* stream and a *non-broadcasted* one, and + 3. such an operator can have *multiple broadcast states* with different names. + +## Provided APIs + +To show the provided APIs, we will start with an example before presenting their full functionality. As our running +example, we will use the case where we have a stream of objects of different colors and shapes and we want to find pairs +of objects of the same color that follow a certain pattern, *e.g.* a rectangle followed by a triangle. We assume that +the set of interesting patterns evolves over time. + +In this example, the first stream will contain elements of type `Item` with a `Color` and a `Shape` property. The other +stream will contain the `Rules`. + +Starting from the stream of `Items`, we just need to *key it* by `Color`, as we want pairs of the same color. This will +make sure that elements of the same color end up on the same physical machine. + +{% highlight java %} +// key the shapes by color +KeyedStream<Item, Color> colorPartitionedStream = shapeStream +.keyBy(new KeySelector<Shape, Color>(){...}); +{% endhighlight %} + +Moving on to the `Rules`, the stream containing them should be broadcasted to all downstream tasks, and these tasks +should store them locally so that they can evaluate them against all incoming `Items`. The snippet below will i) broadcast +the stream of rules and ii) using the provided `MapStateDescriptor`, it will create the broadcast state where the rules +will be stored. + +{% highlight java %} + +// a map descriptor to store the name of the rule (string) and the rule itself. +MapStateDescriptor<String, Rule> ruleStateDescriptor = new MapStateDescriptor<>( + "RulesBroadcastState", + BasicTypeInfo.STRING_TYPE_INFO, + TypeInformation.of(new TypeHint() {}) + ); + +// broadcast the rules and create the broadcast state +BroadcastStream ruleBroadcastStream = ruleStream +.broadcast(ruleStateDescriptor); +{% endhighlight %} + +Finally, in order to evaluate the `Rules` against the incoming elements from the `Item` stream, we need to: +1) connect the two streams and +2) specify our match detecting logic. + +Connecting a stream (keyed or non-keyed) with a `BroadcastStream` can be done by calling `connect()` on the +non-broadcasted stream, with the `BroadcastStream` as an argument. This will return a `BroadcastConnectedStream`, on +which we can call `process()` with a special type of `CoProcessFunction`. The function will contain our matching logic. +The exact type of the function depends on the type of the non-broadcasted stream: + - if that is **keyed**, then the function is a `KeyedBroadcastProcessFunction`. + - if it is **non-keyed**, the function is a `BroadcastProcessFunction`. + + Given that our non-broadcasted stream is keyed, the following snippet includes the above calls: + + + Attention: The connect should be called on the non-broadcasted stream, with the `BroadcastStream` + as an argument. + + +{% highlight java %} +DataStream output = colorPartitionedStream + .connect(ruleBroadcastStream) + .process( + +
[GitHub] flink pull request #5922: [FLINK-8780] [docs] Add Broadcast State documentat...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5922#discussion_r186341885 --- Diff: docs/dev/stream/state/broadcast_state.md --- @@ -0,0 +1,279 @@ +--- +title: "The Broadcast State Pattern" +nav-parent_id: streaming_state +nav-pos: 2 +--- + + +* ToC +{:toc} + +[Working with State](state.html) describes operator state which upon restore is either evenly distributed among the +parallel tasks of an operator, or unioned, with the whole state being used to initialize the restored parallel tasks. + +A third type of supported *operator state* is the *Broadcast State*. Broadcast state was introduced to support use cases +where some data coming from one stream is required to be broadcasted to all downstream tasks, where it is stored locally +and is used to process all incoming elements on the other stream. As an example where broadcast state can emerge as a +natural fit, one can imagine a low-throughput stream containing a set of rules which we want to evaluate against all +elements coming from another stream. Having the above type of use cases in mind, broadcast state differs from the rest +of operator states in that: + 1. it has a map format, + 2. it is only available to specific operators that have as inputs a *broadcasted* stream and a *non-broadcasted* one, and + 3. such an operator can have *multiple broadcast states* with different names. + +## Provided APIs + +To show the provided APIs, we will start with an example before presenting their full functionality. As our running +example, we will use the case where we have a stream of objects of different colors and shapes and we want to find pairs +of objects of the same color that follow a certain pattern, *e.g.* a rectangle followed by a triangle. We assume that +the set of interesting patterns evolves over time. + +In this example, the first stream will contain elements of type `Item` with a `Color` and a `Shape` property. The other +stream will contain the `Rules`. + +Starting from the stream of `Items`, we just need to *key it* by `Color`, as we want pairs of the same color. This will +make sure that elements of the same color end up on the same physical machine. + +{% highlight java %} +// key the shapes by color +KeyedStream<Item, Color> colorPartitionedStream = shapeStream +.keyBy(new KeySelector<Shape, Color>(){...}); +{% endhighlight %} + +Moving on to the `Rules`, the stream containing them should be broadcasted to all downstream tasks, and these tasks +should store them locally so that they can evaluate them against all incoming `Items`. The snippet below will i) broadcast +the stream of rules and ii) using the provided `MapStateDescriptor`, it will create the broadcast state where the rules +will be stored. + +{% highlight java %} + +// a map descriptor to store the name of the rule (string) and the rule itself. +MapStateDescriptor<String, Rule> ruleStateDescriptor = new MapStateDescriptor<>( + "RulesBroadcastState", + BasicTypeInfo.STRING_TYPE_INFO, + TypeInformation.of(new TypeHint() {}) + ); + +// broadcast the rules and create the broadcast state +BroadcastStream ruleBroadcastStream = ruleStream +.broadcast(ruleStateDescriptor); +{% endhighlight %} + +Finally, in order to evaluate the `Rules` against the incoming elements from the `Item` stream, we need to: +1) connect the two streams and +2) specify our match detecting logic. + +Connecting a stream (keyed or non-keyed) with a `BroadcastStream` can be done by calling `connect()` on the +non-broadcasted stream, with the `BroadcastStream` as an argument. This will return a `BroadcastConnectedStream`, on +which we can call `process()` with a special type of `CoProcessFunction`. The function will contain our matching logic. +The exact type of the function depends on the type of the non-broadcasted stream: + - if that is **keyed**, then the function is a `KeyedBroadcastProcessFunction`. + - if it is **non-keyed**, the function is a `BroadcastProcessFunction`. + + Given that our non-broadcasted stream is keyed, the following snippet includes the above calls: + + + Attention: The connect should be called on the non-broadcasted stream, with the `BroadcastStream` + as an argument. + + +{% highlight java %} +DataStream output = colorPartitionedStream + .connect(ruleBroadcastStream) + .process( + +
[GitHub] flink pull request #5922: [FLINK-8780] [docs] Add Broadcast State documentat...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5922#discussion_r186341059 --- Diff: docs/dev/stream/state/broadcast_state.md --- @@ -0,0 +1,279 @@ +--- +title: "The Broadcast State Pattern" +nav-parent_id: streaming_state +nav-pos: 2 +--- + + +* ToC +{:toc} + +[Working with State](state.html) describes operator state which upon restore is either evenly distributed among the +parallel tasks of an operator, or unioned, with the whole state being used to initialize the restored parallel tasks. + +A third type of supported *operator state* is the *Broadcast State*. Broadcast state was introduced to support use cases +where some data coming from one stream is required to be broadcasted to all downstream tasks, where it is stored locally +and is used to process all incoming elements on the other stream. As an example where broadcast state can emerge as a +natural fit, one can imagine a low-throughput stream containing a set of rules which we want to evaluate against all +elements coming from another stream. Having the above type of use cases in mind, broadcast state differs from the rest +of operator states in that: + 1. it has a map format, + 2. it is only available to specific operators that have as inputs a *broadcasted* stream and a *non-broadcasted* one, and + 3. such an operator can have *multiple broadcast states* with different names. + +## Provided APIs + +To show the provided APIs, we will start with an example before presenting their full functionality. As our running +example, we will use the case where we have a stream of objects of different colors and shapes and we want to find pairs +of objects of the same color that follow a certain pattern, *e.g.* a rectangle followed by a triangle. We assume that +the set of interesting patterns evolves over time. + +In this example, the first stream will contain elements of type `Item` with a `Color` and a `Shape` property. The other +stream will contain the `Rules`. + +Starting from the stream of `Items`, we just need to *key it* by `Color`, as we want pairs of the same color. This will +make sure that elements of the same color end up on the same physical machine. + +{% highlight java %} +// key the shapes by color +KeyedStream<Item, Color> colorPartitionedStream = shapeStream +.keyBy(new KeySelector<Shape, Color>(){...}); +{% endhighlight %} + +Moving on to the `Rules`, the stream containing them should be broadcasted to all downstream tasks, and these tasks +should store them locally so that they can evaluate them against all incoming `Items`. The snippet below will i) broadcast +the stream of rules and ii) using the provided `MapStateDescriptor`, it will create the broadcast state where the rules +will be stored. + +{% highlight java %} + +// a map descriptor to store the name of the rule (string) and the rule itself. +MapStateDescriptor<String, Rule> ruleStateDescriptor = new MapStateDescriptor<>( + "RulesBroadcastState", + BasicTypeInfo.STRING_TYPE_INFO, + TypeInformation.of(new TypeHint() {}) + ); + +// broadcast the rules and create the broadcast state +BroadcastStream ruleBroadcastStream = ruleStream +.broadcast(ruleStateDescriptor); +{% endhighlight %} + +Finally, in order to evaluate the `Rules` against the incoming elements from the `Item` stream, we need to: +1) connect the two streams and +2) specify our match detecting logic. + +Connecting a stream (keyed or non-keyed) with a `BroadcastStream` can be done by calling `connect()` on the +non-broadcasted stream, with the `BroadcastStream` as an argument. This will return a `BroadcastConnectedStream`, on +which we can call `process()` with a special type of `CoProcessFunction`. The function will contain our matching logic. +The exact type of the function depends on the type of the non-broadcasted stream: + - if that is **keyed**, then the function is a `KeyedBroadcastProcessFunction`. + - if it is **non-keyed**, the function is a `BroadcastProcessFunction`. + + Given that our non-broadcasted stream is keyed, the following snippet includes the above calls: + + + Attention: The connect should be called on the non-broadcasted stream, with the `BroadcastStream` --- End diff -- The BroadcastStream actually isn't formatted as a code font after rendering. ---
[GitHub] flink pull request #5922: [FLINK-8780] [docs] Add Broadcast State documentat...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5922#discussion_r186340944 --- Diff: docs/dev/stream/state/broadcast_state.md --- @@ -0,0 +1,279 @@ +--- +title: "The Broadcast State Pattern" +nav-parent_id: streaming_state +nav-pos: 2 +--- + + +* ToC +{:toc} + +[Working with State](state.html) describes operator state which upon restore is either evenly distributed among the +parallel tasks of an operator, or unioned, with the whole state being used to initialize the restored parallel tasks. + +A third type of supported *operator state* is the *Broadcast State*. Broadcast state was introduced to support use cases +where some data coming from one stream is required to be broadcasted to all downstream tasks, where it is stored locally +and is used to process all incoming elements on the other stream. As an example where broadcast state can emerge as a +natural fit, one can imagine a low-throughput stream containing a set of rules which we want to evaluate against all +elements coming from another stream. Having the above type of use cases in mind, broadcast state differs from the rest +of operator states in that: + 1. it has a map format, + 2. it is only available to specific operators that have as inputs a *broadcasted* stream and a *non-broadcasted* one, and + 3. such an operator can have *multiple broadcast states* with different names. + +## Provided APIs + +To show the provided APIs, we will start with an example before presenting their full functionality. As our running +example, we will use the case where we have a stream of objects of different colors and shapes and we want to find pairs +of objects of the same color that follow a certain pattern, *e.g.* a rectangle followed by a triangle. We assume that +the set of interesting patterns evolves over time. + +In this example, the first stream will contain elements of type `Item` with a `Color` and a `Shape` property. The other +stream will contain the `Rules`. + +Starting from the stream of `Items`, we just need to *key it* by `Color`, as we want pairs of the same color. This will +make sure that elements of the same color end up on the same physical machine. + +{% highlight java %} +// key the shapes by color +KeyedStream<Item, Color> colorPartitionedStream = shapeStream +.keyBy(new KeySelector<Shape, Color>(){...}); +{% endhighlight %} + +Moving on to the `Rules`, the stream containing them should be broadcasted to all downstream tasks, and these tasks +should store them locally so that they can evaluate them against all incoming `Items`. The snippet below will i) broadcast +the stream of rules and ii) using the provided `MapStateDescriptor`, it will create the broadcast state where the rules +will be stored. + +{% highlight java %} + +// a map descriptor to store the name of the rule (string) and the rule itself. +MapStateDescriptor<String, Rule> ruleStateDescriptor = new MapStateDescriptor<>( + "RulesBroadcastState", + BasicTypeInfo.STRING_TYPE_INFO, + TypeInformation.of(new TypeHint() {}) + ); --- End diff -- After rendering, this seems a bit off. Maybe we don't need it on a separate new line? ---
[GitHub] flink pull request #5922: [FLINK-8780] [docs] Add Broadcast State documentat...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5922#discussion_r186342816 --- Diff: docs/dev/stream/state/broadcast_state.md --- @@ -0,0 +1,279 @@ +--- +title: "The Broadcast State Pattern" +nav-parent_id: streaming_state +nav-pos: 2 +--- + + +* ToC +{:toc} + +[Working with State](state.html) describes operator state which upon restore is either evenly distributed among the +parallel tasks of an operator, or unioned, with the whole state being used to initialize the restored parallel tasks. + +A third type of supported *operator state* is the *Broadcast State*. Broadcast state was introduced to support use cases +where some data coming from one stream is required to be broadcasted to all downstream tasks, where it is stored locally +and is used to process all incoming elements on the other stream. As an example where broadcast state can emerge as a +natural fit, one can imagine a low-throughput stream containing a set of rules which we want to evaluate against all +elements coming from another stream. Having the above type of use cases in mind, broadcast state differs from the rest +of operator states in that: + 1. it has a map format, + 2. it is only available to specific operators that have as inputs a *broadcasted* stream and a *non-broadcasted* one, and + 3. such an operator can have *multiple broadcast states* with different names. + +## Provided APIs + +To show the provided APIs, we will start with an example before presenting their full functionality. As our running +example, we will use the case where we have a stream of objects of different colors and shapes and we want to find pairs +of objects of the same color that follow a certain pattern, *e.g.* a rectangle followed by a triangle. We assume that +the set of interesting patterns evolves over time. + +In this example, the first stream will contain elements of type `Item` with a `Color` and a `Shape` property. The other +stream will contain the `Rules`. + +Starting from the stream of `Items`, we just need to *key it* by `Color`, as we want pairs of the same color. This will +make sure that elements of the same color end up on the same physical machine. + +{% highlight java %} +// key the shapes by color +KeyedStream<Item, Color> colorPartitionedStream = shapeStream +.keyBy(new KeySelector<Shape, Color>(){...}); +{% endhighlight %} + +Moving on to the `Rules`, the stream containing them should be broadcasted to all downstream tasks, and these tasks +should store them locally so that they can evaluate them against all incoming `Items`. The snippet below will i) broadcast +the stream of rules and ii) using the provided `MapStateDescriptor`, it will create the broadcast state where the rules +will be stored. + +{% highlight java %} + +// a map descriptor to store the name of the rule (string) and the rule itself. +MapStateDescriptor<String, Rule> ruleStateDescriptor = new MapStateDescriptor<>( + "RulesBroadcastState", + BasicTypeInfo.STRING_TYPE_INFO, + TypeInformation.of(new TypeHint() {}) + ); + +// broadcast the rules and create the broadcast state +BroadcastStream ruleBroadcastStream = ruleStream +.broadcast(ruleStateDescriptor); +{% endhighlight %} + +Finally, in order to evaluate the `Rules` against the incoming elements from the `Item` stream, we need to: +1) connect the two streams and +2) specify our match detecting logic. + +Connecting a stream (keyed or non-keyed) with a `BroadcastStream` can be done by calling `connect()` on the +non-broadcasted stream, with the `BroadcastStream` as an argument. This will return a `BroadcastConnectedStream`, on +which we can call `process()` with a special type of `CoProcessFunction`. The function will contain our matching logic. +The exact type of the function depends on the type of the non-broadcasted stream: + - if that is **keyed**, then the function is a `KeyedBroadcastProcessFunction`. + - if it is **non-keyed**, the function is a `BroadcastProcessFunction`. + + Given that our non-broadcasted stream is keyed, the following snippet includes the above calls: + + + Attention: The connect should be called on the non-broadcasted stream, with the `BroadcastStream` + as an argument. + + +{% highlight java %} +DataStream output = colorPartitionedStream + .connect(ruleBroadcastStream) + .process( + +
[GitHub] flink pull request #5922: [FLINK-8780] [docs] Add Broadcast State documentat...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5922#discussion_r186340110 --- Diff: docs/dev/stream/state/index.md --- @@ -49,6 +49,7 @@ Where to go next? - * [Working with State](state.html): Shows how to use state in a Flink application and explains the different kinds of state. +* [The Broadcast State Pattern](broadcast_state.html): Explains how to connect a broadcast with a non-broadcast stream and use state to exchange information between them. --- End diff -- how to connect a broadcast "stream" with a ... missing "stream" word ---
[GitHub] flink pull request #5922: [FLINK-8780] [docs] Add Broadcast State documentat...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5922#discussion_r186339587 --- Diff: docs/dev/stream/state/broadcast_state.md --- @@ -25,27 +25,25 @@ under the License. * ToC {:toc} -[Working with State](state.html) described operator state which is either **evenly** distributed among the parallel -tasks of an operator, or state which **upon restore**, its partial (task) states are **unioned** and the whole state is -used to initialize the restored parallel tasks. +[Working with State](state.html) describes operator state which upon restore is either evenly distributed among the +parallel tasks of an operator, or unioned, with the whole state being used to initialize the restored parallel tasks. -A third type of supported *operator state* is the *Broadcast State*. Broadcast state was introduced to support use-cases +A third type of supported *operator state* is the *Broadcast State*. Broadcast state was introduced to support use cases where some data coming from one stream is required to be broadcasted to all downstream tasks, where it is stored locally and is used to process all incoming elements on the other stream. As an example where broadcast state can emerge as a natural fit, one can imagine a low-throughput stream containing a set of rules which we want to evaluate against all -elements coming from another stream. Having the above type of use-cases in mind, broadcast state differs from the rest +elements coming from another stream. Having the above type of use cases in mind, broadcast state differs from the rest of operator states in that: 1. it has a map format, - 2. it is only available to streams whose elements are *broadcasted*, - 3. the only operation available to a stream with broadcast state is to be *connected* to another keyed or non-keyed stream, - 4. such a broadcast stream can have *multiple broadcast states* with different names. + 2. it is only available to specific operators that have as inputs a *broadcasted* stream and a *non-broadcasted* one, and --- End diff -- This looks good now (from what I read of it :) )! ---
[GitHub] flink pull request #5922: [FLINK-8780] [docs] Add Broadcast State documentat...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5922#discussion_r186342376 --- Diff: docs/dev/stream/state/broadcast_state.md --- @@ -0,0 +1,279 @@ +--- +title: "The Broadcast State Pattern" +nav-parent_id: streaming_state +nav-pos: 2 +--- + + +* ToC +{:toc} + +[Working with State](state.html) describes operator state which upon restore is either evenly distributed among the +parallel tasks of an operator, or unioned, with the whole state being used to initialize the restored parallel tasks. + +A third type of supported *operator state* is the *Broadcast State*. Broadcast state was introduced to support use cases +where some data coming from one stream is required to be broadcasted to all downstream tasks, where it is stored locally +and is used to process all incoming elements on the other stream. As an example where broadcast state can emerge as a +natural fit, one can imagine a low-throughput stream containing a set of rules which we want to evaluate against all +elements coming from another stream. Having the above type of use cases in mind, broadcast state differs from the rest +of operator states in that: + 1. it has a map format, + 2. it is only available to specific operators that have as inputs a *broadcasted* stream and a *non-broadcasted* one, and + 3. such an operator can have *multiple broadcast states* with different names. + +## Provided APIs + +To show the provided APIs, we will start with an example before presenting their full functionality. As our running +example, we will use the case where we have a stream of objects of different colors and shapes and we want to find pairs +of objects of the same color that follow a certain pattern, *e.g.* a rectangle followed by a triangle. We assume that +the set of interesting patterns evolves over time. + +In this example, the first stream will contain elements of type `Item` with a `Color` and a `Shape` property. The other +stream will contain the `Rules`. + +Starting from the stream of `Items`, we just need to *key it* by `Color`, as we want pairs of the same color. This will +make sure that elements of the same color end up on the same physical machine. + +{% highlight java %} +// key the shapes by color +KeyedStream<Item, Color> colorPartitionedStream = shapeStream +.keyBy(new KeySelector<Shape, Color>(){...}); +{% endhighlight %} + +Moving on to the `Rules`, the stream containing them should be broadcasted to all downstream tasks, and these tasks +should store them locally so that they can evaluate them against all incoming `Items`. The snippet below will i) broadcast +the stream of rules and ii) using the provided `MapStateDescriptor`, it will create the broadcast state where the rules +will be stored. + +{% highlight java %} + +// a map descriptor to store the name of the rule (string) and the rule itself. +MapStateDescriptor<String, Rule> ruleStateDescriptor = new MapStateDescriptor<>( + "RulesBroadcastState", + BasicTypeInfo.STRING_TYPE_INFO, + TypeInformation.of(new TypeHint() {}) + ); + +// broadcast the rules and create the broadcast state +BroadcastStream ruleBroadcastStream = ruleStream +.broadcast(ruleStateDescriptor); +{% endhighlight %} + +Finally, in order to evaluate the `Rules` against the incoming elements from the `Item` stream, we need to: +1) connect the two streams and +2) specify our match detecting logic. + +Connecting a stream (keyed or non-keyed) with a `BroadcastStream` can be done by calling `connect()` on the +non-broadcasted stream, with the `BroadcastStream` as an argument. This will return a `BroadcastConnectedStream`, on +which we can call `process()` with a special type of `CoProcessFunction`. The function will contain our matching logic. +The exact type of the function depends on the type of the non-broadcasted stream: + - if that is **keyed**, then the function is a `KeyedBroadcastProcessFunction`. + - if it is **non-keyed**, the function is a `BroadcastProcessFunction`. + + Given that our non-broadcasted stream is keyed, the following snippet includes the above calls: + + + Attention: The connect should be called on the non-broadcasted stream, with the `BroadcastStream` + as an argument. + + +{% highlight java %} +DataStream output = colorPartitionedStream + .connect(ruleBroadcastStream) + .process( + +
[GitHub] flink pull request #5922: [FLINK-8780] [docs] Add Broadcast State documentat...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5922#discussion_r186341293 --- Diff: docs/dev/stream/state/broadcast_state.md --- @@ -0,0 +1,279 @@ +--- +title: "The Broadcast State Pattern" +nav-parent_id: streaming_state +nav-pos: 2 +--- + + +* ToC +{:toc} + +[Working with State](state.html) describes operator state which upon restore is either evenly distributed among the +parallel tasks of an operator, or unioned, with the whole state being used to initialize the restored parallel tasks. + +A third type of supported *operator state* is the *Broadcast State*. Broadcast state was introduced to support use cases +where some data coming from one stream is required to be broadcasted to all downstream tasks, where it is stored locally +and is used to process all incoming elements on the other stream. As an example where broadcast state can emerge as a +natural fit, one can imagine a low-throughput stream containing a set of rules which we want to evaluate against all +elements coming from another stream. Having the above type of use cases in mind, broadcast state differs from the rest +of operator states in that: + 1. it has a map format, + 2. it is only available to specific operators that have as inputs a *broadcasted* stream and a *non-broadcasted* one, and + 3. such an operator can have *multiple broadcast states* with different names. + +## Provided APIs + +To show the provided APIs, we will start with an example before presenting their full functionality. As our running +example, we will use the case where we have a stream of objects of different colors and shapes and we want to find pairs +of objects of the same color that follow a certain pattern, *e.g.* a rectangle followed by a triangle. We assume that +the set of interesting patterns evolves over time. + +In this example, the first stream will contain elements of type `Item` with a `Color` and a `Shape` property. The other +stream will contain the `Rules`. + +Starting from the stream of `Items`, we just need to *key it* by `Color`, as we want pairs of the same color. This will +make sure that elements of the same color end up on the same physical machine. + +{% highlight java %} +// key the shapes by color +KeyedStream<Item, Color> colorPartitionedStream = shapeStream +.keyBy(new KeySelector<Shape, Color>(){...}); +{% endhighlight %} + +Moving on to the `Rules`, the stream containing them should be broadcasted to all downstream tasks, and these tasks +should store them locally so that they can evaluate them against all incoming `Items`. The snippet below will i) broadcast +the stream of rules and ii) using the provided `MapStateDescriptor`, it will create the broadcast state where the rules +will be stored. + +{% highlight java %} + +// a map descriptor to store the name of the rule (string) and the rule itself. +MapStateDescriptor<String, Rule> ruleStateDescriptor = new MapStateDescriptor<>( + "RulesBroadcastState", + BasicTypeInfo.STRING_TYPE_INFO, + TypeInformation.of(new TypeHint() {}) + ); + +// broadcast the rules and create the broadcast state +BroadcastStream ruleBroadcastStream = ruleStream +.broadcast(ruleStateDescriptor); +{% endhighlight %} + +Finally, in order to evaluate the `Rules` against the incoming elements from the `Item` stream, we need to: +1) connect the two streams and +2) specify our match detecting logic. + +Connecting a stream (keyed or non-keyed) with a `BroadcastStream` can be done by calling `connect()` on the +non-broadcasted stream, with the `BroadcastStream` as an argument. This will return a `BroadcastConnectedStream`, on +which we can call `process()` with a special type of `CoProcessFunction`. The function will contain our matching logic. +The exact type of the function depends on the type of the non-broadcasted stream: + - if that is **keyed**, then the function is a `KeyedBroadcastProcessFunction`. + - if it is **non-keyed**, the function is a `BroadcastProcessFunction`. + + Given that our non-broadcasted stream is keyed, the following snippet includes the above calls: + + + Attention: The connect should be called on the non-broadcasted stream, with the `BroadcastStream` --- End diff -- What happens if the user calls connect on the wrong (broadcasted) stream? Is there an exception? If so it might make sense to clarify that here. ---
[GitHub] flink pull request #5958: [FLINK-8500] Get the timestamp of the Kafka messag...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5958#discussion_r186338002 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java --- @@ -42,14 +42,22 @@ @Public public interface DeserializationSchema extends Serializable, ResultTypeQueryable { + /** +* @deprecated Use {@link #deserialize(ConsumerRecordMetaInfo)} . --- End diff -- Unnecessary space before period at the end. ---
[GitHub] flink pull request #5958: [FLINK-8500] Get the timestamp of the Kafka messag...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5958#discussion_r186337736 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/serialization/ConsumerRecordMetaInfo.java --- @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.serialization; + +import org.apache.flink.annotation.Public; + +/** + * The consumer record meta info contains, besides the actual message, some meta information, such as + * key, topic, partition, offset and timestamp for Apache kafka + * + * Note:The timestamp is only valid for Kafka clients 0.10+, for older versions the value has the value `Long.MinValue` and + * the timestampType has the value `NO_TIMESTAMP_TYPE`. + */ +@Public +public interface ConsumerRecordMetaInfo { + /** +* The TimestampType is introduced in the kafka clients 0.10+. This interface is also used for the Kafka connector 0.9 +* so a local enumeration is needed. +*/ + enum TimestampType { + NO_TIMESTAMP_TYPE, CREATE_TIME, INGEST_TIME --- End diff -- I wonder if `CREATE_TIME` should be renamed as `EVENT_TIME`, to be more coherent with Flink's terminologies. ---
[GitHub] flink pull request #5958: [FLINK-8500] Get the timestamp of the Kafka messag...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5958#discussion_r186337890 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/serialization/ConsumerRecordMetaInfo.java --- @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.serialization; + +import org.apache.flink.annotation.Public; + +/** + * The consumer record meta info contains, besides the actual message, some meta information, such as + * key, topic, partition, offset and timestamp for Apache kafka + * + * Note:The timestamp is only valid for Kafka clients 0.10+, for older versions the value has the value `Long.MinValue` and + * the timestampType has the value `NO_TIMESTAMP_TYPE`. + */ +@Public +public interface ConsumerRecordMetaInfo { + /** +* The TimestampType is introduced in the kafka clients 0.10+. This interface is also used for the Kafka connector 0.9 +* so a local enumeration is needed. +*/ + enum TimestampType { + NO_TIMESTAMP_TYPE, CREATE_TIME, INGEST_TIME --- End diff -- `NO_TIMESTAMP_TYPE` --> maybe `NO_TIMESTAMP` will do, since from the enum name we already know it is a type. ---
[GitHub] flink pull request #5958: [FLINK-8500] Get the timestamp of the Kafka messag...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5958#discussion_r186338049 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java --- @@ -42,14 +42,22 @@ @Public public interface DeserializationSchema extends Serializable, ResultTypeQueryable { + /** +* @deprecated Use {@link #deserialize(ConsumerRecordMetaInfo)} . --- End diff -- For the deprecation, I would recommend explaining why the new deserialize method is more superior. ---
[GitHub] flink pull request #5958: [FLINK-8500] Get the timestamp of the Kafka messag...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5958#discussion_r186337834 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/serialization/ConsumerRecordMetaInfo.java --- @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.serialization; + +import org.apache.flink.annotation.Public; + +/** + * The consumer record meta info contains, besides the actual message, some meta information, such as + * key, topic, partition, offset and timestamp for Apache kafka + * + * Note:The timestamp is only valid for Kafka clients 0.10+, for older versions the value has the value `Long.MinValue` and + * the timestampType has the value `NO_TIMESTAMP_TYPE`. + */ +@Public +public interface ConsumerRecordMetaInfo { + /** +* The TimestampType is introduced in the kafka clients 0.10+. This interface is also used for the Kafka connector 0.9 +* so a local enumeration is needed. +*/ + enum TimestampType { + NO_TIMESTAMP_TYPE, CREATE_TIME, INGEST_TIME + } + + /** +* @return the key as a byte array (null if no key has been set). +*/ + byte[] getKey(); + + /** +* @return The message, as a byte array (null if the message was empty or deleted). +*/ + byte[] getMessage(); + + /** +* @return The topic the message has originated from (for example the Kafka topic). +*/ + String getTopic(); + + /** +* @return The partition the message has originated from (for example the Kafka partition). +*/ + int getPartition(); + + /** +* @return the offset of the message in the original source (for example the Kafka offset). +*/ + long getOffset(); + + /** +* @return the timestamp of the consumer record --- End diff -- Javadoc should educate the "dummy" timestamp value if timestamp type is `NO_TIMESTAMP`. ---
[GitHub] flink pull request #5958: [FLINK-8500] Get the timestamp of the Kafka messag...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5958#discussion_r186338721 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java --- @@ -42,14 +42,22 @@ @Public public interface DeserializationSchema extends Serializable, ResultTypeQueryable { + /** +* @deprecated Use {@link #deserialize(ConsumerRecordMetaInfo)} . +*/ + @Deprecated + T deserialize(byte[] message) throws IOException; + /** * Deserializes the byte message. * -* @param message The message, as a byte array. +* @param consumerRecordMetaInfossage The message, as a {@link ConsumerRecordMetaInfo}. * * @return The deserialized message as an object (null if the message cannot be deserialized). */ - T deserialize(byte[] message) throws IOException; + default T deserialize(ConsumerRecordMetaInfo consumerRecordMetaInfossage) throws IOException { --- End diff -- I'm actually not sure that we should continue using this class, for the following reasons: 1. The class is actually placed under a non-ideal package: `o.a.f.api.common.serialization`, whereas is should be placed under some `o.a.f.connectors.kafka`. The reason it is currently placed under this package was because the `DeserializationSchema` was initially intended to be commonly used by all connectors. However, over time, things have proven that each connector will benefit from their own version of a schema class. So, it actually might make sense to deprecate the whole `DeserializationSchema` class now, and have a new class (maybe called `KafkaDeserializationSchema` / `KafkaSerializationSchema`) under a correct Kafka package. What do you think? ---
[GitHub] flink pull request #5958: [FLINK-8500] Get the timestamp of the Kafka messag...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5958#discussion_r186337301 --- Diff: flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java --- @@ -78,6 +79,69 @@ public Kafka010Fetcher( useMetrics); } + private class KafkaConsumerRecordWrapper10 implements ConsumerRecordMetaInfo { + private static final long serialVersionUID = 2651665280744549935L; + + private final ConsumerRecord<byte[], byte[]> consumerRecord; + + public KafkaConsumerRecordWrapper10(ConsumerRecord<byte[], byte[]> consumerRecord) { + this.consumerRecord = consumerRecord; + } + + @Override + public byte[] getKey() { + return consumerRecord.key(); + } + + @Override + public byte[] getMessage() { + return consumerRecord.value(); + } + + @Override + public String getTopic() { + return consumerRecord.topic(); + } + + @Override + public int getPartition() { + return consumerRecord.partition(); + } + + @Override + public long getOffset() { + return consumerRecord.offset(); + } + + @Override + public long getTimestamp() { + return Long.MIN_VALUE; --- End diff -- Doesn't Kafka 0.10 support record timestamps? ---
[GitHub] flink issue #5006: [hotfix][docs][QS] MInor cleanup of QS documentation
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5006 +1, LGTM. Will address Greg's last comment and merge this .. ---
[GitHub] flink issue #5956: [hotfix][javadocs] Correct the javadoc of StandaloneCheck...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5956 LGTM, thanks @klion26. Merging this .. ---
[GitHub] flink issue #5890: [FLINK-8999] [e2e] Ensure the job has an operator with op...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5890 Hi @zhangminglei, I think the current general purpose DataStream job already subsumes this PR; for example, the sequence source generator in the job already uses operator state. Moreover, `test_resume_savepoint.sh` also tests the behaviours of that via a rescale operation. What do you think? If you agree, we maybe can close this PR. ---
[GitHub] flink issue #5918: [hotfix][ListCheckpointed]modify parallelism in the annot...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5918 Thanks @maqingxiang! This LGTM, will merge this, thanks. ---
[GitHub] flink issue #5952: [FLINK-9287][kafka] Properly clean up resources in non EX...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5952 As discussed offline, both approaches I mentioned wouldn't seem to work. I'll merge as is, let's keep an eye on it to see if the test is flaky. ---
[GitHub] flink issue #5950: [FLINK-9169] [state-backend] Allow absence of old seriali...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5950 Thanks for the review Stefan! Will merge this now .. ---
[GitHub] flink issue #5952: [FLINK-9287][kafka] Properly clean up resources in non EX...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5952 Would filtering the thread count by thread name be helpful here? Another possible approach to test this: Maybe we can simply verify that the current transaction producer is closed after the cleanup? This would require making the `getCurrentTransaction()` method visible for tests, though. We'll also need to have a `isClosed()` method on the `FlinkKafkaProducer`. ---
[GitHub] flink issue #5950: [FLINK-9169] [state-backend] Allow absence of old seriali...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5950 @StefanRRichter I have updated the PR. Also had to do a rebase due to conflicts. Regarding the thoughts you brought up: - Bubble up `UnloadableTypeSerializerException` approach: I introduced the exception, but only use it minimally. I think overall it is definitely an improvement, since we don't have to carry the dummy flag all the way down to the low-level serializer serialization proxy. However, I don't think there really is a need to handle it any level higher than the `TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience` method, since the original intent of that method was to always be fault tolerant when reading a bunch of serializers alongside other things. More details on that in my above comments. - Whether or not we really need to hand down the flag to `KeyedBackendSerializationProxy`: My gut feeling is that, even if in the future we bubble up the `UnloadableTypeSerializerException` to higher level components, the serialization proxy is still where we need to decide whether or not we handle it with a dummy serializer. The reason is that the serialization proxies handles deserialization of _all_ keyed state meta infos (and therefore their serializers); simply bubbling the exception further up without checking does not make sense. ---
[GitHub] flink pull request #5950: [FLINK-9169] [state-backend] Allow absence of old ...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5950#discussion_r185994375 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java --- @@ -200,7 +197,7 @@ public static void writeSerializersAndConfigsWithResilience( for (int i = 0; i < numSerializersAndConfigSnapshots; i++) { bufferWithPos.setPosition(offsets[i * 2]); - serializer = tryReadSerializer(bufferWrapper, userCodeClassLoader); + serializer = tryReadSerializer(bufferWrapper, userCodeClassLoader, true); --- End diff -- The reason why this method is so complex, is because it handles indexing of the serializers' and serializer config snapshots' offsets within the byte stream. It does so to be able to read all serializers and their serializer config snapshots fault tolerantly, and to not leave the stream corrupt when some exception occurs. I'm not sure we can break this method up - doing so would just be moving a lot of duplicate code to the callers (due to the fact that we previously have the offset index reading / writing, if we remove that we still need to maintain backwards compatibility). ---
[GitHub] flink pull request #5950: [FLINK-9169] [state-backend] Allow absence of old ...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5950#discussion_r185995278 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java --- @@ -200,7 +197,7 @@ public static void writeSerializersAndConfigsWithResilience( for (int i = 0; i < numSerializersAndConfigSnapshots; i++) { bufferWithPos.setPosition(offsets[i * 2]); - serializer = tryReadSerializer(bufferWrapper, userCodeClassLoader); + serializer = tryReadSerializer(bufferWrapper, userCodeClassLoader, true); --- End diff -- One other thing to keep in mind: The original intent of having the `readSerializersAndConfigSnapshotsWithResilience` method and why we added the complex indexing of offsets, is that so we can _always_ be fault tolerant when trying to read a bunch of serializers. So, essentially, there is no need to push out the exception further - the result of `readSerializersAndConfigSnapshotsWithResilience` should always be that there is some serializer, even if it is a dummy (hence the naming). ---
[GitHub] flink pull request #5950: [FLINK-9169] [state-backend] Allow absence of old ...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5950#discussion_r185994454 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java --- @@ -200,7 +197,7 @@ public static void writeSerializersAndConfigsWithResilience( for (int i = 0; i < numSerializersAndConfigSnapshots; i++) { bufferWithPos.setPosition(offsets[i * 2]); - serializer = tryReadSerializer(bufferWrapper, userCodeClassLoader); + serializer = tryReadSerializer(bufferWrapper, userCodeClassLoader, true); --- End diff -- Since we are already thinking about not writing serializers anymore in savepoints for 1.6, I'm leaning towards not touching this method now. ---
[GitHub] flink issue #5950: [FLINK-9169] [state-backend] Allow absence of old seriali...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5950 I think even with the `UnloadableTypeSerializerException` exception bubbling approach, we actually still need a flag in the serialization proxy to decide how to handle the exception. The serialization proxy handles deserialization of all meta data of all registered key states, so that would be the highest level where we need to decide whether or not to use the dummy serializer. If we want to hand out this control to an even higher level (i.e. the backend), we would then need to break up the deserialization logic from the serialization proxy, which IMO isn't appropriate. ---
[GitHub] flink issue #5950: [FLINK-9169] [state-backend] Allow absence of old seriali...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5950 @StefanRRichter yes, now that you mentioned it, the `isSerializerPresenceRequiredFlag` does seem a bit awkward to be in the serialization proxy. Essentially, what it is only doing is serving as a switch to decide whether or not to fail - something that could be done by the caller. I'll quickly try your suggested approach and see how that turns out. ---
[GitHub] flink pull request #5929: [FLINK-8497] [connectors] KafkaConsumer throws NPE...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5929#discussion_r185759080 --- Diff: flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09PartitionDiscoverer.java --- @@ -74,7 +74,12 @@ protected void initializeConnections() { try { for (String topic : topics) { - for (PartitionInfo partitionInfo : kafkaConsumer.partitionsFor(topic)) { + List topicPartitions = kafkaConsumer.partitionsFor(topic); + if (topicPartitions == null) { + throw new IllegalStateException("The topic " + topic + " does not exist"); --- End diff -- I think the `RuntimeException` in `AbstractPartitionDiscoverer#discoverPartitions` maybe needs to be revisited, also. As far as I understand it, we should only fail the job if for the first discovery (for seed initial partitions that the connector consumes) is empty across all partitions. Otherwise, it should be ok that while the job runs, the partitions discover fails fetch partition meta info for some discovery attempt. ---
[GitHub] flink issue #5939: [FLINK-8500] [Kafka Connector] Get the timestamp of the K...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5939 @StephanEwen we have some follow-up discussion on how to approach this on the JIRA ticket. Do you want to take a look at that? https://issues.apache.org/jira/browse/FLINK-8500 The general idea is that we should perhaps come up with a more generic schema interface that allows providing all information that Kafka gives us and provide them to the user. ---
[GitHub] flink pull request #5929: [FLINK-8497] [connectors] KafkaConsumer throws NPE...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5929#discussion_r185732772 --- Diff: flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09PartitionDiscoverer.java --- @@ -74,7 +74,12 @@ protected void initializeConnections() { try { for (String topic : topics) { - for (PartitionInfo partitionInfo : kafkaConsumer.partitionsFor(topic)) { + List topicPartitions = kafkaConsumer.partitionsFor(topic); + if (topicPartitions == null) { + throw new IllegalStateException("The topic " + topic + " does not exist"); --- End diff -- I fear that this might be too aggressive. IMO, it is fine that the user has, say 3 topics, but only one of them actually doesn't exist. What we should handle is the case where there is completely no partitions at all across all provided topics. Perhaps for this, we only write a log that some topic has no partitions? ---
[GitHub] flink pull request #5945: [FLINK-9169] [runtime] Allow KeyedBackendSerializa...
Github user tzulitai closed the pull request at: https://github.com/apache/flink/pull/5945 ---
[GitHub] flink issue #5945: [FLINK-9169] [runtime] Allow KeyedBackendSerializationPro...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5945 I've opened a new PR #5950 which has a cleaner approach to this. That new PR subsumes this one. ---