[GitHub] flink issue #6391: [FLINK-9885] [FLINK-8101] Finalize Elasticsearch 6.x
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/6391 @twalthr I've addressed most of your comments. Thanks a lot for the detailed review. However, now it seems like the Elasticsearch 6.x IT cases are failing, due to ceased support of Elasticsearch embedded nodes for Java unit tests. I'll have to look a bit more to see how that can be fixed. Will ping you when that is addressed. ---
[GitHub] flink pull request #6391: [FLINK-9885] [FLINK-8101] Finalize Elasticsearch 6...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/6391#discussion_r205037835 --- Diff: flink-connectors/flink-connector-elasticsearch6/src/test/resources/log4j-test.properties --- @@ -0,0 +1,27 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +log4j.rootLogger=INFO, testlogger + +log4j.appender.testlogger=org.apache.log4j.ConsoleAppender +log4j.appender.testlogger.target=System.err +log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout +log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n + +# suppress the irrelevant (wrong) warnings from the netty channel handler +log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger --- End diff -- Removed. ---
[GitHub] flink pull request #6391: [FLINK-9885] [FLINK-8101] Finalize Elasticsearch 6...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/6391#discussion_r205037795 --- Diff: flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch6/examples/ElasticsearchSinkExample.java --- @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch6.examples; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; +import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer; +import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink; + +import org.apache.http.HttpHost; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.client.Requests; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * This example shows how to use the Elasticsearch Sink. Before running it you must ensure that + * you have a cluster named "elasticsearch" running or change the name of cluster in the config map. + */ +public class ElasticsearchSinkExample { --- End diff -- Actually, we decided to move examples out of the test code. Removing this. ---
[GitHub] flink pull request #6391: [FLINK-9885] [FLINK-8101] Finalize Elasticsearch 6...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/6391#discussion_r205031956 --- Diff: flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java --- @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch; + +import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSinkITCase; + +import org.elasticsearch.client.Client; +import org.elasticsearch.common.network.NetworkModule; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.node.InternalSettingsPreparer; +import org.elasticsearch.node.Node; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.transport.Netty4Plugin; + +import java.io.File; +import java.util.Collections; + +/** + * Implementation of {@link EmbeddedElasticsearchNodeEnvironment} for Elasticsearch 6. + * Will be dynamically loaded in {@link ElasticsearchSinkITCase} for integration tests. + */ +public class EmbeddedElasticsearchNodeEnvironmentImpl implements EmbeddedElasticsearchNodeEnvironment { + + private Node node; + + @Override + public void start(File tmpDataFolder, String clusterName) throws Exception { + if (node == null) { + Settings settings = Settings.builder() + .put("cluster.name", clusterName) + .put("http.enabled", false) + .put("path.home", tmpDataFolder.getParent()) + .put("path.data", tmpDataFolder.getAbsolutePath()) + .put(NetworkModule.HTTP_TYPE_KEY, Netty4Plugin.NETTY_HTTP_TRANSPORT_NAME) --- End diff -- You're right, removed. ---
[GitHub] flink pull request #6391: [FLINK-9885] [FLINK-8101] Finalize Elasticsearch 6...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/6391#discussion_r205029480 --- Diff: flink-end-to-end-tests/run-nightly-tests.sh --- @@ -96,6 +96,7 @@ run_test "Local recovery and sticky scheduling end-to-end test" "$END_TO_END_DIR run_test "Elasticsearch (v1.7.1) sink end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_elasticsearch.sh 1 https://download.elastic.co/elasticsearch/elasticsearch/elasticsearch-1.7.1.tar.gz; run_test "Elasticsearch (v2.3.5) sink end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_elasticsearch.sh 2 https://download.elastic.co/elasticsearch/release/org/elasticsearch/distribution/tar/elasticsearch/2.3.5/elasticsearch-2.3.5.tar.gz; run_test "Elasticsearch (v5.1.2) sink end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_elasticsearch.sh 5 https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.1.2.tar.gz; +run_test "Elasticsearch (v6.3.1) sink end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_elasticsearch.sh 6 https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.3.1.tar.gz; --- End diff -- Added a loop to wait until the Elasticsearch node is really running. ---
[GitHub] flink pull request #6413: [FLINK-8993] [tests] Let general purpose DataStrea...
GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/6413 [FLINK-8993] [tests] Let general purpose DataStream job uses KryoSerializer via type extraction ## What is the purpose of the change The general purpose DataStream job previously only uses the `KryoSerializer` via a custom state serializer. This PR allows the job to also use the `KryoSerializer` via Flink's type extraction. ## Brief change log - Adapt the state builders to be able to supply a state class, instead of a state type serializer. - Let `DataStreamAllroundTestJob` specify state serializers via state classes instead of a direct custom serializer. ## Verifying this change This is a extension to existing end-to-end tests (`test-streaming-savepoint.sh`). ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink FLINK-8993 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6413.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6413 commit 428d5427227343479b6d63daf7fced8f1bf9a69c Author: Tzu-Li (Gordon) Tai Date: 2018-07-25T06:58:46Z [FLINK-8993] [tests] Let general purpose DataStream job uses KryoSerializer via type extraction ---
[GitHub] flink pull request #6408: [FLINK-9897][Kinesis Connector] Make adaptive read...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/6408#discussion_r204999405 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java --- @@ -233,26 +225,69 @@ public void run() { subscribedShard.getShard().getHashKeyRange().getEndingHashKey()); long recordBatchSizeBytes = 0L; - long averageRecordSizeBytes = 0L; - for (UserRecord record : fetchedRecords) { recordBatchSizeBytes += record.getData().remaining(); deserializeRecordForCollectionAndUpdateState(record); } - if (useAdaptiveReads && !fetchedRecords.isEmpty()) { - averageRecordSizeBytes = recordBatchSizeBytes / fetchedRecords.size(); - maxNumberOfRecordsPerFetch = getAdaptiveMaxRecordsPerFetch(averageRecordSizeBytes); - } - nextShardItr = getRecordsResult.getNextShardIterator(); + + long processingEndTimeNanos = System.nanoTime(); --- End diff -- having a local variable here seems a bit redundant, since we always adjust it right afterwards. ---
[GitHub] flink pull request #6408: [FLINK-9897][Kinesis Connector] Make adaptive read...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/6408#discussion_r204996330 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java --- @@ -233,26 +225,69 @@ public void run() { subscribedShard.getShard().getHashKeyRange().getEndingHashKey()); long recordBatchSizeBytes = 0L; - long averageRecordSizeBytes = 0L; - for (UserRecord record : fetchedRecords) { recordBatchSizeBytes += record.getData().remaining(); deserializeRecordForCollectionAndUpdateState(record); } - if (useAdaptiveReads && !fetchedRecords.isEmpty()) { - averageRecordSizeBytes = recordBatchSizeBytes / fetchedRecords.size(); - maxNumberOfRecordsPerFetch = getAdaptiveMaxRecordsPerFetch(averageRecordSizeBytes); - } - nextShardItr = getRecordsResult.getNextShardIterator(); + + long processingEndTimeNanos = System.nanoTime(); + + long adjustmentEndTimeNanos = adjustRunLoopFrequency(processingStartTimeNanos, processingEndTimeNanos); --- End diff -- just a minor nit pick here: `adjustmentEndTimeNanos` would be better named as `adjustedEndTimeNanos` ---
[GitHub] flink pull request #6408: [FLINK-9897][Kinesis Connector] Make adaptive read...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/6408#discussion_r204998677 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java --- @@ -233,26 +225,69 @@ public void run() { subscribedShard.getShard().getHashKeyRange().getEndingHashKey()); long recordBatchSizeBytes = 0L; - long averageRecordSizeBytes = 0L; - for (UserRecord record : fetchedRecords) { recordBatchSizeBytes += record.getData().remaining(); deserializeRecordForCollectionAndUpdateState(record); } - if (useAdaptiveReads && !fetchedRecords.isEmpty()) { - averageRecordSizeBytes = recordBatchSizeBytes / fetchedRecords.size(); - maxNumberOfRecordsPerFetch = getAdaptiveMaxRecordsPerFetch(averageRecordSizeBytes); - } - nextShardItr = getRecordsResult.getNextShardIterator(); + + long processingEndTimeNanos = System.nanoTime(); + + long adjustmentEndTimeNanos = adjustRunLoopFrequency(processingStartTimeNanos, processingEndTimeNanos); + long runLoopTimeNanos = adjustmentEndTimeNanos - processingStartTimeNanos; + maxNumberOfRecordsPerFetch = adaptRecordsToRead(runLoopTimeNanos, fetchedRecords.size(), recordBatchSizeBytes); + processingStartTimeNanos = adjustmentEndTimeNanos; // for next time through the loop } } } catch (Throwable t) { fetcherRef.stopWithError(t); } } + /** +* Adjusts loop timing to match target frequency if specified. +* @param processingStartTimeNanos The start time of the run loop "work" +* @param processingEndTimeNanos The end time of the run loop "work" +* @return The System.nanoTime() after the sleep (if any) +* @throws InterruptedException +*/ + protected long adjustRunLoopFrequency(long processingStartTimeNanos, long processingEndTimeNanos) + throws InterruptedException { + long endTimeNanos = processingEndTimeNanos; + if (fetchIntervalMillis != 0) { + long processingTimeNanos = processingEndTimeNanos - processingStartTimeNanos; + long sleepTimeMillis = fetchIntervalMillis - (processingTimeNanos / 1_000_000); + if (sleepTimeMillis > 0) { + Thread.sleep(sleepTimeMillis); + endTimeNanos = System.nanoTime(); + } + } + return endTimeNanos; + } + + /** +* Calculates how many records to read each time through the loop based on a target throughput +* and the measured frequenecy of the loop. +* @param runLoopTimeNanos The total time of one pass through the loop +* @param numRecords The number of records of the last read operation +* @param recordBatchSizeBytes The total batch size of the last read operation +*/ + protected int adaptRecordsToRead(long runLoopTimeNanos, int numRecords, long recordBatchSizeBytes) { --- End diff -- If this is a method without side effects on the fields of the `ShardConsumer`, it might be better off to make this method static, and pass in the current `maxNumberOfRecordsPerFetch` as an argument. This makes it more clear that it is only an utility method to calculate the number of records to read. ---
[GitHub] flink issue #6392: [FLINK-9694][table] Fix NPE in CRowSerializerConfigSnapsh...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/6392 LGTM, +1 ---
[GitHub] flink pull request #6392: [FLINK-9694][table] Fix NPE in CRowSerializerConfi...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/6392#discussion_r204684670 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerConfigSnapshot.java --- @@ -46,8 +46,6 @@ public CompositeTypeSerializerConfigSnapshot() {} public CompositeTypeSerializerConfigSnapshot(TypeSerializer... nestedSerializers) { - Preconditions.checkNotNull(nestedSerializers); --- End diff -- Not sure about this. It should be perfectly fine that we have a null check here. ---
[GitHub] flink pull request #6391: [FLINK-9885] [FLINK-8101] Finalize Elasticsearch 6...
GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/6391 [FLINK-9885] [FLINK-8101] Finalize Elasticsearch 6.x PR ## What is the purpose of the change This PR adds the Elasticsearch 6.x PR, as well as an end-to-end test for it. ## Brief change log - Cherry-picked @cjolif 's Elasticsearch 5.3+ compatibility and 6.x implementation changes - Add end to end test for ES 6.x ## Verifying this change Run the new end-to-end test for ES 6.x. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink FLINK-9885 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6391.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6391 ---
[GitHub] flink pull request #6351: [FLINK-9862] [test] Extend general puropose DataSt...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/6351#discussion_r204342877 --- Diff: flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SequenceGeneratorSource.java --- @@ -197,6 +210,11 @@ public void initializeState(FunctionInitializationContext context) throws Except for (KeyRangeStates keyRange : snapshotKeyRanges.get()) { keyRanges.add(keyRange); } + + // let event time start from the max of all event time progress across subtasks in the last execution + for (Long lastEventTime : lastEventTimes.get()) { + monotonousEventTime = Math.max(monotonousEventTime, lastEventTime); --- End diff -- actually, watermarks are not the direct concern here. What this piece of change is doing is just to ensure that all subtasks start from an event time that is guaranteed to have not "jump back" in time. Watermark extraction is not done within the source. ---
[GitHub] flink issue #6043: [FLINK-7386] evolve RequestIndexer API to make it working...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/6043 @cjolif no problem, thanks for the notice. I'll try to incorporate the changes you mentioned above to the previous work you've already done. Thanks a lot! ---
[GitHub] flink issue #6043: [FLINK-7386] evolve RequestIndexer API to make it working...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/6043 @cjolif Ideally we have ES 6.x connector merged by the beginning of next week. Let me know if this is possible for you. I'll proceed to merge this PR first. ---
[GitHub] flink issue #6043: [FLINK-7386] evolve RequestIndexer API to make it working...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/6043 @cjolif do you think you would be able to quickly open a PR for the REST 6.x connector that includes the new changes you mentioned, based on this one? ---
[GitHub] flink issue #6043: [FLINK-7386] evolve RequestIndexer API to make it working...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/6043 I took another look at the PR, and also talked with @tillrohrmann about merging this for 1.6. I think this LGTM, and with these changes we will at least have an ES 5.x connector that is 5.3+ compatible. Merging .. After merging this, I'll also try cherry-picking your 6.x REST-based ES connector on top. If that works well, will also merge that. ---
[GitHub] flink pull request #6351: [FLINK-9862] [test] Extend general puropose DataSt...
GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/6351 [FLINK-9862] [test] Extend general puropose DataStream test to have a tumbling window ## What is the purpose of the change This allows our end-to-end tests to have coverage for snapshotting / restoring timers, when configured to use different state backends. ## Brief change log - Add a tumbling window to the `DataStreamAllAroundTestProgram` - Change default "max out of orderness" setting of the source generator to 0 ## Verifying this change This is a change that affects existing tests ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink FLINK-9862 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6351.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6351 commit a3bc102303481fa784b9c94d7838b1c2b5f65123 Author: Tzu-Li (Gordon) Tai Date: 2018-07-17T10:00:40Z [FLINK-9862] [test] Extend general puropose DataStream test to have a tumbling window This allows the end-to-end tests to have coverage for testing checkpointing timers. ---
[GitHub] flink pull request #6273: [FLINK-9377] [core] Implement restore serializer f...
GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/6273 [FLINK-9377] [core] Implement restore serializer factory method for simple composite serializers ## What is the purpose of the change This PR is built on top of #6235. It is a WIP PR. This PR implements the restore serializer factory method for all simple composite serializers (i.e., Flink serializers with nested serializers). More complex serializers such as the Scala serializers, POJO serializers, KryoSerializer, AvroSerializer, etc. will come as a follow-up PR. ## Brief change log - Introduce the `CompositeTypeSerializer` base class, which wraps the configuration snapshotting logic and compatibility checks. - Let all simple composite type serializers extend the `CompositeTypeSerializer`. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (**yes** / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (**yes** / no / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink FLINK-9377-composite Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6273.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6273 commit 5fc4a36a144c3f8f22be7e21a4e542d3042d10b1 Author: Tzu-Li (Gordon) Tai Date: 2018-06-13T11:43:53Z [FLINK-9377] [core] (part 1) Extend TypeSerializerConfigSnapshot as a factory for restoring serializers This commit is the first step towards removing serializers from checkpointed state meta info and making Flink checkpoints Java serialization free. Instead of writing serializers in checkpoints, and trying to read that to obtain a restore serializer at restore time, we aim to only write the config snapshot as the single source of truth and use it as a factory to create a restore serializer. This commit adds the method and signatures to the TypeSerializerConfigSnapshot interface. Use of the method, as well as properly implementing the method for all serializers, will be implemented in follow-up commits. commit 661eb6d34da450ed096a77f166a4cc62ce3efdba Author: Tzu-Li (Gordon) Tai Date: 2018-06-14T09:52:06Z [FLINK-9377] [core] (part 2) Remove fallback deserializer option from CompatibilityResult Now that the config snapshot is used as a factory for the restore serializer, it should be guaranteed that a restore serializer is always available. This removes the need for the user to provide a "fallback" convert serializer in the case where a migration is required. commit c91d045c5eb6e355981e4edaa6d1a0d48e5d4a5e Author: Tzu-Li (Gordon) Tai Date: 2018-06-14T14:41:45Z [FLINK-9377] [core] (part 3) Deprecate TypeSerializerSerializationUtil This commit deprecates all utility methods and classes related to serializing serializers. All methods that will still be in use, i.e. writing config snapshots, are now moved to a separate new TypeSerializerConfigSnapshotSerializationUtil class. commit e09f91469fb6c86f5d2f05b78a9db3d9af8cce87 Author: Tzu-Li (Gordon) Tai Date: 2018-06-18T14:24:08Z [FLINK-9377] [core] (part 4) Introduce BackwardsCompatibleConfigSnapshot The BackwardsCompatibleConfigSnapshot is a wrapper, dummy config snapshot which wraps an actual config snapshot, as well as a pre-existing serializer instance. In previous versions, since the config snapshot wasn't a serializer factory but simply a container for serializer parameters, previous serializers didn't necessarily have config snapshots that are capable of correctly creating a correct corresponding restore serializer. In this case, since previous serializers still have serializers written in the checkpoint, the backwards compatible solution would be to wrap the written serializer and the config snapshot within the BackwardsCompatibleConfigSnapshot dummy. When attempting to restore the serializer, the wrapped serializer instance is returned instead of actually calling the restore
[GitHub] flink issue #6206: [FLINK-9654] [core] Changed the check for anonymous class...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/6206 Thanks for looking into this @zsolt-donca. The changes looks good to me, but as @yanghua mentioned, a test to cover the previous bug would be nice here and allow the reviewer to understand directly that it is a required fix. ---
[GitHub] flink pull request #6221: [FLINK-9686] [kinesis] Enable Kinesis authenticati...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/6221#discussion_r199420811 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/AWSConfigConstants.java --- @@ -45,29 +45,63 @@ /** Simply create AWS credentials by supplying the AWS access key ID and AWS secret key in the configuration properties. */ BASIC, + /** Create AWS credentials by assuming a role. The credentials for assuming the role must be supplied. **/ + ASSUME_ROLE, + /** A credentials provider chain will be used that searches for credentials in this order: ENV_VARS, SYS_PROPS, PROFILE in the AWS instance metadata. **/ AUTO, } /** The AWS region of the Kinesis streams to be pulled ("us-east-1" is used if not set). */ public static final String AWS_REGION = "aws.region"; + /** The credential provider type to use when AWS credentials are required (BASIC is used if not set). */ + public static final String AWS_CREDENTIALS_PROVIDER = "aws.credentials.provider"; + /** The AWS access key ID to use when setting credentials provider type to BASIC. */ - public static final String AWS_ACCESS_KEY_ID = "aws.credentials.provider.basic.accesskeyid"; + public static final String AWS_ACCESS_KEY_ID = accessKeyId(AWS_CREDENTIALS_PROVIDER); /** The AWS secret key to use when setting credentials provider type to BASIC. */ - public static final String AWS_SECRET_ACCESS_KEY = "aws.credentials.provider.basic.secretkey"; - - /** The credential provider type to use when AWS credentials are required (BASIC is used if not set). */ - public static final String AWS_CREDENTIALS_PROVIDER = "aws.credentials.provider"; + public static final String AWS_SECRET_ACCESS_KEY = secretKey(AWS_CREDENTIALS_PROVIDER); /** Optional configuration for profile path if credential provider type is set to be PROFILE. */ - public static final String AWS_PROFILE_PATH = "aws.credentials.provider.profile.path"; + public static final String AWS_PROFILE_PATH = profilePath(AWS_CREDENTIALS_PROVIDER); /** Optional configuration for profile name if credential provider type is set to be PROFILE. */ - public static final String AWS_PROFILE_NAME = "aws.credentials.provider.profile.name"; + public static final String AWS_PROFILE_NAME = profileName(AWS_CREDENTIALS_PROVIDER); /** The AWS endpoint for Kinesis (derived from the AWS region setting if not set). */ public static final String AWS_ENDPOINT = "aws.endpoint"; + public static String accessKeyId(String prefix) { + return prefix + ".basic.accesskeyid"; + } + + public static String secretKey(String prefix) { + return prefix + ".basic.secretkey"; + } + + public static String profilePath(String prefix) { + return prefix + ".profile.path"; + } + + public static String profileName(String prefix) { + return prefix + ".profile.name"; + } + + public static String roleArn(String prefix) { --- End diff -- Is there a reason to change the way key constants are defined in this class? i.e., if the previous pattern was followed, users could just use `AwsConfigConstants.AWS_ROLE_ARN` to set a value for the role ARN, and likewise for the other new configurations. ---
[GitHub] flink issue #6209: [FLINK-9655][tests] Add missing parallelism parameters
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/6209 Thanks @zentol for fixing this, merging .. ---
[GitHub] flink pull request #6235: [FLINK-9377] [core] Remove serializers from checkp...
GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/6235 [FLINK-9377] [core] Remove serializers from checkpointed state meta infos ## What is the purpose of the change This PR is the first step towards a smoother state evolution experience. It removes the behavior of writing serializers in checkpointed state meta infos (using Java serialization) and relying on them to be deserializable at restore time. Instead, the configuration snapshots of serializers now double as a factory for creating the restore serializer, solidifying it as the single source of truth of information about the previous serializer of state. With this change: - Checkpoints / savepoints move towards being Java serialization-free - The availability of the restore serializer, is basically determined at compile time - Potentially resolves caveats with macro-generated Scala serializers which typically have anonymous classnames which are easily susceptible to changes, which blocks successful savepoint restores due to how Java serialization works. - In conclusion: the written configuration snapshot is now the single point of entry for obtaining a serializer for previous state. The user is only required to guarantee that the configuration snapshot's classname remains constant for the restore to proceed. This PR is only a WIP which only includes extending the `TypeSerializerConfigSnapshot` interface to include a `restoreSerializer` method, as well as the methods interplay in the state backends after removing serializers from checkpointed state meta infos. This PR does **NOT** include: - Proper implementation of the new `restoreSerializer` method on all serializers. - Tests for snapshotting, restoring, and migrating serializers and their interplay in the state backends. Because of this, it is expected that existing tests will fail. Follow-up PRs will be opened for the above mentioned missing parts. ## Brief change log - 5fc4a36 Add a `restoreSerializer` method to the `TypeSerializerConfigSnapshot` interface The method still has a dummy base implementation, because this PR doesn't yet properly implement the method for all serializers. Once that is accomplished, the base implementation should be removed. - 661eb6d Remove the "fallback" serializer option from `CompatibilityResult` That option was available in the past to allow users to have a safety path for state conversion, in case their previous serializer cannot be deserialized due to any reason blocked by Java serialization. Since now we use the config snapshot as the restore serializer factory, it is guaranteed that the restore serializer is always available in case conversion is required, and therefore voids the need for the "fallback" serializer option. - c91d045 Deprecates any utility methods that still have the behaviour of writing serializers in checkpoints - e09f914 Introduces the `BackwardsCompatibleConfigSnapshot` class The BackwardsCompatibleConfigSnapshot is a wrapper, dummy config snapshot which wraps an actual config snapshot, as well as a pre-existing serializer instance. In previous versions, since the config snapshot wasn't a serializer factory but simply a container for serializer parameters, previous serializers didn't necessarily have config snapshots that are capable of correctly creating a correct corresponding restore serializer. In this case, since previous serializers still have serializers written in the checkpoint, the backwards compatible solution would be to wrap the written serializer and the config snapshot within the BackwardsCompatibleConfigSnapshot dummy. When attempting to restore the serializer, the wrapped serializer instance is returned instead of actually calling the restoreSerializer method of the wrapped config snapshot. - da84665 the actual removal of serializers from checkpointed state meta info ## Verifying this change This PR is a WIP preview, and tests is expected to fail due to reasons mentioned in the description. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (**yes** / no) - The serializers: (**yes** / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (**yes** / no / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce
[GitHub] flink issue #6177: Backport of Kinesis connector changes from 1.5 to release...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/6177 @tweise I've manually merged this, but forgot to add the "This closes #" message. Could you close this PR manually? Thanks! ---
[GitHub] flink issue #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer Backpr...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/6021 @fmthoma I've merged this manually. Thanks for the contribution. Could you close this PR? ---
[GitHub] flink issue #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer Backpr...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/6021 Thanks @fmthoma, will proceed to merge this .. ---
[GitHub] flink issue #6197: [FLINK-9638][E2E Tests] Add helper script to run single t...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/6197 Tried this locally, and it works. +1, will merge this .. ---
[GitHub] flink issue #6172: [FLINK-9594][E2E tests] Improve docs for new test runner
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/6172 Thanks @florianschmidt1994, merging this .. ---
[GitHub] flink issue #6193: [hotfix] [docs] Fix typo in index.md
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/6193 LGTM, thanks for catching this @MichealShin, will merge this. ---
[GitHub] flink issue #6182: [FLINK-8795] Fixed local scala shell for Flip6
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/6182 Merging this .. ---
[GitHub] flink issue #6177: Backport of Kinesis connector changes from 1.5 to release...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/6177 @tweise ok, I don't really have a strong opinion on not including the other changes. Changes LGTM then, will merge this. ---
[GitHub] flink pull request #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/6021#discussion_r197071117 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java --- @@ -326,6 +366,29 @@ private void checkAndPropagateAsyncError() throws Exception { } } + /** +* If the internal queue of the {@link KinesisProducer} gets too long, +* flush some of the records until we are below the limit again. +* We don't want to flush _all_ records at this point since that would +* break record aggregation. +*/ + private void enforceQueueLimit() { --- End diff -- Would like to request one more slight change here: Let this method return a boolean that indicates whether or not flushing occurred. The caller of this method can then use the flag to decide whether or not `checkAndPropagateAsyncError` is required. ---
[GitHub] flink pull request #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/6021#discussion_r197067733 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java --- @@ -326,6 +366,29 @@ private void checkAndPropagateAsyncError() throws Exception { } } + /** +* If the internal queue of the {@link KinesisProducer} gets too long, +* flush some of the records until we are below the limit again. +* We don't want to flush _all_ records at this point since that would +* break record aggregation. +*/ + private void enforceQueueLimit() { + int attempt = 0; + while (producer.getOutstandingRecordsCount() >= queueLimit) { + backpressureCycles.inc(); + if (attempt >= 10) { + LOG.warn("Waiting for the queue length to drop below the limit takes unusually long, still not done after {} attempts.", attempt); + } + attempt++; + try { + backpressureLatch.await(100); --- End diff -- We might want to make the wait time configurable? (as a separate PR) My reasoning is that it directly affects how long until the "flush taking unusually long" message starts popping up. ---
[GitHub] flink pull request #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/6021#discussion_r197065346 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java --- @@ -55,6 +58,13 @@ @PublicEvolving public class FlinkKinesisProducer extends RichSinkFunction implements CheckpointedFunction { + public static final String KINESIS_PRODUCER_METRIC_GROUP = "kinesisProducer"; + + public static final String METRIC_BACKPRESSURE_CYCLES = "backpressureCycles"; + + public static final String METRIC_OUTSTANDING_RECORDS_COUNT = "outstandingRecordsCount"; + + --- End diff -- nit: unnecessary line ---
[GitHub] flink pull request #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/6021#discussion_r197067136 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java --- @@ -326,6 +366,29 @@ private void checkAndPropagateAsyncError() throws Exception { } } + /** +* If the internal queue of the {@link KinesisProducer} gets too long, +* flush some of the records until we are below the limit again. +* We don't want to flush _all_ records at this point since that would +* break record aggregation. +*/ + private void enforceQueueLimit() { + int attempt = 0; + while (producer.getOutstandingRecordsCount() >= queueLimit) { + backpressureCycles.inc(); + if (attempt >= 10) { + LOG.warn("Waiting for the queue length to drop below the limit takes unusually long, still not done after {} attempts.", attempt); + } + attempt++; + try { + backpressureLatch.await(100); --- End diff -- I like this implementation a lot better now ð ---
[GitHub] flink pull request #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/6021#discussion_r197065961 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java --- @@ -144,6 +163,17 @@ public void setFailOnError(boolean failOnError) { this.failOnError = failOnError; } + /** +* The {@link KinesisProducer} holds an unbounded queue internally. To avoid memory +* problems under high loads, a limit can be employed above which the internal queue +* will be flushed, thereby applying backpressure. +* +* @param queueLimit The maximum length of the internal queue before backpressuring +*/ + public void setQueueLimit(int queueLimit) { + this.queueLimit = queueLimit; --- End diff -- Will need argument checks on the given `queueLimit`. ---
[GitHub] flink pull request #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/6021#discussion_r197070282 --- Diff: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java --- @@ -267,6 +268,79 @@ public void go() throws Exception { testHarness.close(); } + /** +* Test ensuring that the producer blocks if the queue limit is exceeded, +* until the queue length drops below the limit; +* we set a timeout because the test will not finish if the logic is broken. +*/ + @Test(timeout = 1) + public void testBackpressure() throws Throwable { + final DummyFlinkKinesisProducer producer = new DummyFlinkKinesisProducer<>(new SimpleStringSchema()); + producer.setQueueLimit(1); + + OneInputStreamOperatorTestHarness testHarness = + new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producer)); + + testHarness.open(); + + UserRecordResult result = mock(UserRecordResult.class); + when(result.isSuccessful()).thenReturn(true); + + CheckedThread msg1 = new CheckedThread() { + @Override + public void go() throws Exception { + testHarness.processElement(new StreamRecord<>("msg-1")); + } + }; + msg1.start(); + msg1.trySync(100); + assertFalse("Flush triggered before reaching queue limit", msg1.isAlive()); --- End diff -- I wonder if this would introduce flakiness in the test. @fmthoma could you elaborate a bit here? ---
[GitHub] flink pull request #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/6021#discussion_r197068370 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/TimeoutLatch.java --- @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.util; + +public class TimeoutLatch { --- End diff -- This needs to be annotated as `@Internal` ---
[GitHub] flink pull request #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/6021#discussion_r197069244 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/TimeoutLatch.java --- @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.util; + +public class TimeoutLatch { + + private final Object lock = new Object(); + private volatile boolean waiting; + + public void await(long timeout) throws InterruptedException { + synchronized (lock) { + waiting = true; + lock.wait(timeout); + } + } + + public void trigger() { + if (waiting) { + synchronized (lock) { + waiting = false; --- End diff -- I agree with @fmthoma here. ---
[GitHub] flink pull request #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/6021#discussion_r197064931 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java --- @@ -326,6 +366,29 @@ private void checkAndPropagateAsyncError() throws Exception { } } + /** +* If the internal queue of the {@link KinesisProducer} gets too long, +* flush some of the records until we are below the limit again. +* We don't want to flush _all_ records at this point since that would +* break record aggregation. +*/ + private void enforceQueueLimit() { --- End diff -- The moving average calculation, that you described, could maybe just be a implementation of the limit supplier function. ---
[GitHub] flink pull request #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/6021#discussion_r197063764 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java --- @@ -326,6 +366,29 @@ private void checkAndPropagateAsyncError() throws Exception { } } + /** +* If the internal queue of the {@link KinesisProducer} gets too long, +* flush some of the records until we are below the limit again. +* We don't want to flush _all_ records at this point since that would +* break record aggregation. +*/ + private void enforceQueueLimit() { --- End diff -- A user-provided queue limit supplier function sounds like a good idea. As you mentioned, this can come as a follow-up PR. ---
[GitHub] flink issue #6182: [FLINK-8795] Fixed local scala shell for Flip6
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/6182 Thanks for the fix @dawidwys! The changes LGTM on my side, +1. ---
[GitHub] flink pull request #6172: [FLINK-9594][E2E tests] Improve docs for new test ...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/6172#discussion_r195992732 --- Diff: flink-end-to-end-tests/README.md --- @@ -33,6 +33,53 @@ $ FLINK_DIR= flink-end-to-end-tests/test-scripts/test_batch_wordcount ## Writing Tests -Have a look at test_batch_wordcount.sh for a very basic test and -test_streaming_kafka010.sh for a more involved example. Whenever possible, try -to put new functionality in common.sh so that it can be reused by other tests. +### Examples +Have a look at `test_batch_wordcount.sh` for a very basic test and +`test_streaming_kafka010.sh` for a more involved example. Whenever possible, try +to put new functionality in `common.sh` so that it can be reused by other tests. + +### Adding a test case +In order to add a new test case you need a new line to either `test-scripts/run-nightly-tests.sh` and / or `test-scripts/run-pre-commit-tests.sh` + +Adding a new test case generally follows the following pattern + +```sh +run_test "simple end-to-end test" "$END_TO_END_DIR/test-scripts/simple.sh arg1 arg2" +``` + +_Note: If you want to parameterize your tests please do so by adding multiple test cases with parameters as arguments to the nightly / pre-commit test suites. This allows the test runner to do a cleanup in between each individual test and also to fail those tests individually._ + +### Passing your test +A test is considered to have passed if it: +- has exit code 0 +- there are no non-empty .out files (nothing was written to stdout / stderr by your flink program) +- there are no exceptions in the log files +- there are no errors in the log files + +### Failing your test +A test is considered to have failed if it: +- exited with non-zero exit code +- has non-empty *.out files (something was written to stdout / stderr by your flink program) +- there are errors in the log files +- there are exceptions in the log files + +_There is a whitelist for exceptions and errors that do not lead to failure, they can be found in the `check_logs_for_errors` and `check_logs_for_exceptions` in `test-scripts/common.sh`_ + +Please note that a previously supported pattern where you could assign a value the global variable `PASS` to have your tests fail **is not supported anymore** --- End diff -- Missing period at the end. ---
[GitHub] flink pull request #6172: [FLINK-9594][E2E tests] Improve docs for new test ...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/6172#discussion_r195992679 --- Diff: flink-end-to-end-tests/README.md --- @@ -33,6 +33,53 @@ $ FLINK_DIR= flink-end-to-end-tests/test-scripts/test_batch_wordcount ## Writing Tests -Have a look at test_batch_wordcount.sh for a very basic test and -test_streaming_kafka010.sh for a more involved example. Whenever possible, try -to put new functionality in common.sh so that it can be reused by other tests. +### Examples +Have a look at `test_batch_wordcount.sh` for a very basic test and +`test_streaming_kafka010.sh` for a more involved example. Whenever possible, try +to put new functionality in `common.sh` so that it can be reused by other tests. + +### Adding a test case +In order to add a new test case you need a new line to either `test-scripts/run-nightly-tests.sh` and / or `test-scripts/run-pre-commit-tests.sh` + +Adding a new test case generally follows the following pattern + +```sh +run_test "simple end-to-end test" "$END_TO_END_DIR/test-scripts/simple.sh arg1 arg2" +``` + +_Note: If you want to parameterize your tests please do so by adding multiple test cases with parameters as arguments to the nightly / pre-commit test suites. This allows the test runner to do a cleanup in between each individual test and also to fail those tests individually._ + +### Passing your test +A test is considered to have passed if it: +- has exit code 0 +- there are no non-empty .out files (nothing was written to stdout / stderr by your flink program) +- there are no exceptions in the log files +- there are no errors in the log files + +### Failing your test +A test is considered to have failed if it: +- exited with non-zero exit code +- has non-empty *.out files (something was written to stdout / stderr by your flink program) +- there are errors in the log files +- there are exceptions in the log files + +_There is a whitelist for exceptions and errors that do not lead to failure, they can be found in the `check_logs_for_errors` and `check_logs_for_exceptions` in `test-scripts/common.sh`_ --- End diff -- `... not lead to failure, they can be found ...` --> `.. not lead to failure, "which" can be found` ---
[GitHub] flink pull request #6172: [FLINK-9594][E2E tests] Improve docs for new test ...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/6172#discussion_r195992920 --- Diff: flink-end-to-end-tests/README.md --- @@ -33,6 +33,53 @@ $ FLINK_DIR= flink-end-to-end-tests/test-scripts/test_batch_wordcount ## Writing Tests -Have a look at test_batch_wordcount.sh for a very basic test and -test_streaming_kafka010.sh for a more involved example. Whenever possible, try -to put new functionality in common.sh so that it can be reused by other tests. +### Examples +Have a look at `test_batch_wordcount.sh` for a very basic test and +`test_streaming_kafka010.sh` for a more involved example. Whenever possible, try +to put new functionality in `common.sh` so that it can be reused by other tests. + +### Adding a test case +In order to add a new test case you need a new line to either `test-scripts/run-nightly-tests.sh` and / or `test-scripts/run-pre-commit-tests.sh` + +Adding a new test case generally follows the following pattern + +```sh +run_test "simple end-to-end test" "$END_TO_END_DIR/test-scripts/simple.sh arg1 arg2" +``` + +_Note: If you want to parameterize your tests please do so by adding multiple test cases with parameters as arguments to the nightly / pre-commit test suites. This allows the test runner to do a cleanup in between each individual test and also to fail those tests individually._ + +### Passing your test +A test is considered to have passed if it: +- has exit code 0 +- there are no non-empty .out files (nothing was written to stdout / stderr by your flink program) +- there are no exceptions in the log files +- there are no errors in the log files + +### Failing your test +A test is considered to have failed if it: +- exited with non-zero exit code +- has non-empty *.out files (something was written to stdout / stderr by your flink program) +- there are errors in the log files +- there are exceptions in the log files + +_There is a whitelist for exceptions and errors that do not lead to failure, they can be found in the `check_logs_for_errors` and `check_logs_for_exceptions` in `test-scripts/common.sh`_ + +Please note that a previously supported pattern where you could assign a value the global variable `PASS` to have your tests fail **is not supported anymore** + +### Cleanup +The test runner performs a cleanup after each test case, which includes: +- Stopping the cluster +- Killing all task and job managers +- Reverting config to default (if changed before) +- Cleaning up log and temp directories + +If your test case should do some *additional* cleanup, a common pattern is to trap a `test_cleanup` function to `EXIT` in your test case like so: --- End diff -- I would explicitly state that this typically consists of shutting down external systems used by the e2e test, e.g. Kafka / Elasticsearch. ---
[GitHub] flink pull request #6172: [FLINK-9594][E2E tests] Improve docs for new test ...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/6172#discussion_r195991753 --- Diff: flink-end-to-end-tests/README.md --- @@ -33,6 +33,53 @@ $ FLINK_DIR= flink-end-to-end-tests/test-scripts/test_batch_wordcount ## Writing Tests -Have a look at test_batch_wordcount.sh for a very basic test and -test_streaming_kafka010.sh for a more involved example. Whenever possible, try -to put new functionality in common.sh so that it can be reused by other tests. +### Examples +Have a look at `test_batch_wordcount.sh` for a very basic test and +`test_streaming_kafka010.sh` for a more involved example. Whenever possible, try +to put new functionality in `common.sh` so that it can be reused by other tests. + +### Adding a test case +In order to add a new test case you need a new line to either `test-scripts/run-nightly-tests.sh` and / or `test-scripts/run-pre-commit-tests.sh` + +Adding a new test case generally follows the following pattern + +```sh +run_test "simple end-to-end test" "$END_TO_END_DIR/test-scripts/simple.sh arg1 arg2" +``` + +_Note: If you want to parameterize your tests please do so by adding multiple test cases with parameters as arguments to the nightly / pre-commit test suites. This allows the test runner to do a cleanup in between each individual test and also to fail those tests individually._ + +### Passing your test +A test is considered to have passed if it: +- has exit code 0 +- there are no non-empty .out files (nothing was written to stdout / stderr by your flink program) --- End diff -- Capital `f` for `Flink` ---
[GitHub] flink issue #6177: Backport of Kinesis connector changes from 1.5 to release...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/6177 I'm not sure that we want to backport the shard-mapping hash function. IMO, that is a new feature and shouldn't go into a bugfix release. What do you think @tweise? ---
[GitHub] flink issue #6053: [FLINK-9257][E2E Tests] Fix wrong "All tests pass" messag...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/6053 ``` == Running 'Streaming Python Wordcount end-to-end test' == Flink dist directory: /home/travis/build/apache/flink/build-target TEST_DATA_DIR: /home/travis/build/apache/flink/flink-end-to-end-tests/test-scripts/temp-test-directory-10412258255 Starting cluster. Starting standalonesession daemon on host travis-job-363a754a-fe5f-4873-bbe8-fe4064b95bc8. Starting taskexecutor daemon on host travis-job-363a754a-fe5f-4873-bbe8-fe4064b95bc8. Waiting for dispatcher REST endpoint to come up... Waiting for dispatcher REST endpoint to come up... Waiting for dispatcher REST endpoint to come up... Waiting for dispatcher REST endpoint to come up... Waiting for dispatcher REST endpoint to come up... Waiting for dispatcher REST endpoint to come up... Dispatcher REST endpoint is up. Starting execution of program Program execution finished Job with JobID 06184a085272dd12b3573b1bcb96badc has finished. Job Runtime: 6103 ms pass StreamingPythonWordCount Stopping taskexecutor daemon (pid: 31303) on host travis-job-363a754a-fe5f-4873-bbe8-fe4064b95bc8. Stopping standalonesession daemon (pid: 30988) on host travis-job-363a754a-fe5f-4873-bbe8-fe4064b95bc8. [PASS] 'Streaming Python Wordcount end-to-end test' passed after 0 minutes and 24 seconds! Test exited with exit code 0. == Running 'Wordcount end-to-end test' == Flink dist directory: /home/travis/build/apache/flink/build-target TEST_DATA_DIR: /home/travis/build/apache/flink/flink-end-to-end-tests/test-scripts/temp-test-directory-36174383269 Starting cluster. Starting standalonesession daemon on host travis-job-363a754a-fe5f-4873-bbe8-fe4064b95bc8. Starting taskexecutor daemon on host travis-job-363a754a-fe5f-4873-bbe8-fe4064b95bc8. Waiting for dispatcher REST endpoint to come up... Waiting for dispatcher REST endpoint to come up... Waiting for dispatcher REST endpoint to come up... Waiting for dispatcher REST endpoint to come up... Waiting for dispatcher REST endpoint to come up... Waiting for dispatcher REST endpoint to come up... Dispatcher REST endpoint is up. Starting execution of program Program execution finished Job with JobID 30256ad7ff23ea8543ddca76bacaaee5 has finished. Job Runtime: 1352 ms pass WordCount Stopping taskexecutor daemon (pid: 835) on host travis-job-363a754a-fe5f-4873-bbe8-fe4064b95bc8. Stopping standalonesession daemon (pid: 517) on host travis-job-363a754a-fe5f-4873-bbe8-fe4064b95bc8. [PASS] 'Wordcount end-to-end test' passed after 0 minutes and 11 seconds! Test exited with exit code 0. ``` Travis logs excerpt looks good. Follow up commits looks good. +1, LGTM on my side. ---
[GitHub] flink pull request #6151: [FLINK-9569] [avro] Fix confusing construction of ...
GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/6151 [FLINK-9569] [avro] Fix confusing construction of GenericRecord AvroSerializers ## What is the purpose of the change The `AvroSerializer` previously had a `AvroSerializer(Class type, Schema schema)` public constructor when used for generic records. This is a bit confusing, because when using the `AvroSerializer`, the type to be serialized should always be a `GenericData.Record` type. This PR fixes this by letting the `AvroSerializer` having similar construction patterns to the `AvroDeserializationSchema`, where we have static factory methods for generic and non-generic Avro types. ## Brief change log - Remove all public constructors from the `AvroSerializer` - Add `TypeSerializer AvroSerializer.forGeneric(Schema)` method to create an `AvroSerializer` for `GenericRecord`s - Add `TypeSerializer AvroSerializer.forNonGeneric(Class type)` method to create an `AvroSerializer` for specific or POJO types. - A previously deprecated constructor in the `AvroSerializer` is completely removed, since it should be assumed that the `AvroSerializer` is an internal API and would not require API deprecation. ## Verifying this change This is just a code refactor. Required test coverage should not have been effected. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (**yes** / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink FLINK-9569 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6151.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6151 commit 53da570a962226ff81172a209dc1362a0637e110 Author: Tzu-Li (Gordon) Tai Date: 2018-06-11T16:04:51Z wip commit 1abbcd13da9f2ac69afc318e121c04d59ee903ec Author: Tzu-Li (Gordon) Tai Date: 2018-06-12T08:43:19Z [FLINK-9569] [avro] Fix confusing construction for GenericRecord AvroSerializers ---
[GitHub] flink issue #5845: [FLINK-9168][flink-connectors]Pulsar Sink connector
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5845 Sorry for delays on my reply here. I'll take a look at this week, over the next days. If there is going to be a new PR, please also let me know. Thanks! ---
[GitHub] flink issue #6043: [FLINK-7386] evolve RequestIndexer API to make it working...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/6043 @cjolif do you think it would be possible that with a clean cut using a REST implementation, we no longer need to have separate modules anymore for ES 6.x, 7.x, 8.x or so on? i.e., it would only be a matter for the user of recompiling that REST-based implementation with a different ES client version. If no, then I would still prefer that we continue with the current approach this PR is proposing, since we need this fix in to have Elasticsearch 5.3+ working anyways. ---
[GitHub] flink issue #6038: [FLINK-9394] [e2e] Test rescaling when resuming from exte...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/6038 @StefanRRichter thanks for the review, will merge this. ---
[GitHub] flink issue #6072: [hotfix][doc] Clarify kinesis docs and fix debugging clas...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/6072 Will only merge the Kinesis doc fix, as the classloading docs already has a substantial fix meanwhile in master. ---
[GitHub] flink pull request #6074: [FLINK-9429] [test] Fix does not properly terminat...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/6074#discussion_r190786548 --- Diff: flink-end-to-end-tests/test-scripts/elasticsearch-common.sh --- @@ -61,8 +61,9 @@ function verify_result { rm $TEST_DATA_DIR/output fi -while : ; do - curl 'localhost:9200/index/_search?q=*=21' > $TEST_DATA_DIR/output +# make sure can terminate properly with control-C. +while [ $? -ne 130 ]; do --- End diff -- I would prefer that we just wrap the Elasticsearch querying to another function, that improves readability. Something along the lines of: ``` function fetch_elasticsearch { curl 'localhost:9200/index3/_count?q=*' > $TEST_DATA_DIR/output echo $(grep '\"count\"' $TEST_DATA_DIR/output | awk '{print $3}' | sed 's/\(.*\),/\1 /') } function verify_result { local numRecords=$1 if [ -f "$TEST_DATA_DIR/output" ]; then rm $TEST_DATA_DIR/output fi while (( $(fetch_elasticsearch) < $numRecords )) ; do echo "Waiting for Elasticsearch records ..." sleep 1 done } ``` ---
[GitHub] flink issue #6043: [FLINK-7386] evolve RequestIndexer API to make it working...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/6043 One more thing to clarify: When planning to switch to REST, are we speaking of an implementation that works directly against Elasticsearch's REST API? Or are we thinking of using Elasticsearch's [RestHighLevelClient](https://snapshots.elastic.co/javadoc/org/elasticsearch/client/elasticsearch-rest-high-level-client/7.0.0-alpha1-SNAPSHOT/org/elasticsearch/client/RestHighLevelClient.html)? I would assume the latter, but IMO we would not be able to avoid yet again having a common base module across future versions (e.g. across ES 6.x, 7.x, and so on), even if we make a clean cut now. So, I have the feeling that the main problem here isn't that we are sharing code between versions, but the fact that our base shared code isn't future-proof enough for potential 3rd party API breaks. That's the main reason why I'm proposing not to expose Elasticsearch classes anymore through base class APIs in the shared module. ---
[GitHub] flink issue #6043: [FLINK-7386] evolve RequestIndexer API to make it working...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/6043 The main reason why the discussion leaned towards the current proposed change by this PR, was that only Elasticsearch 5.6+ supports REST. Only working towards a clean-cut module that uses REST, would mean that we still wouldn't be able to support Elasticsearch 5.2+ up to Elasticsearch 5.5. ---
[GitHub] flink issue #6072: [hotfix][doc] Clarify kinesis docs and fix debugging clas...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/6072 +1, thanks for the fixes @pnowojski. Will merge this .. ---
[GitHub] flink issue #6061: [FLINK-9425] Make release scripts compliant with ASF rele...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/6061 +1, thanks for the compliance updates. LGTM. ---
[GitHub] flink issue #6058: [FLINK-9415] Remove reference to StreamingMultipleProgram...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/6058 +1, LGTM Merging this .. ---
[GitHub] flink issue #6040: [FLINK-9349][Kafka Connector] KafkaConnector Exception wh...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/6040 Using `CheckedThread` is more preferable, as it simplifies some of the test code. But yes, the utility was introduced at a later point in time in Flink, so some parts of the test code might still be using `Thread`s and `AtomicReference`s. ---
[GitHub] flink issue #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer Backpr...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/6021 @fmthoma yes, that would be great. ---
[GitHub] flink pull request #5991: [FLINK-9303] [kafka] Adding support for unassign d...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5991#discussion_r190153528 --- Diff: flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java --- @@ -80,6 +83,9 @@ /** The queue of unassigned partitions that we need to assign to the Kafka consumer. */ private final ClosableBlockingQueue<KafkaTopicPartitionState> unassignedPartitionsQueue; + /** The list of partitions to be removed from kafka consumer. */ + private final Set partitionsToBeRemoved; --- End diff -- Would it actually make more sense that we have a queue for this? Like how we are handling unassigned new partitions via the `unassignedPartitionsQueue`. The fact that this is a set means that we will need to eventually remove entries from it anyways. ---
[GitHub] flink pull request #5991: [FLINK-9303] [kafka] Adding support for unassign d...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5991#discussion_r190150419 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java --- @@ -235,7 +243,8 @@ public FlinkKafkaConsumerBase( Pattern topicPattern, KeyedDeserializationSchema deserializer, long discoveryIntervalMillis, - boolean useMetrics) { + boolean useMetrics, + boolean checkUnavailablePartitions) { --- End diff -- Why do we want this to be configurable? Is there any case that we would prefer to leave them untouched? ---
[GitHub] flink pull request #5991: [FLINK-9303] [kafka] Adding support for unassign d...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5991#discussion_r190152570 --- Diff: flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java --- @@ -374,8 +385,8 @@ void setOffsetsToCommit( * This method is exposed for testing purposes. */ @VisibleForTesting - void reassignPartitions(List<KafkaTopicPartitionState> newPartitions) throws Exception { - if (newPartitions.size() == 0) { + void reassignPartitions(List<KafkaTopicPartitionState> newPartitions, Set partitionsToBeRemoved) throws Exception { --- End diff -- I have the feeling that this method is way too complex now, to a point that it might make more sense to break this up into 2 different methods - `addPartitionsToAssignment` and `removePartitionsFromAssignment`. ---
[GitHub] flink pull request #5863: [FLINK-8985][e2etest] initial support for End-to-e...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5863#discussion_r190146869 --- Diff: flink-end-to-end-tests/test-scripts/test_cli_api.sh --- @@ -0,0 +1,196 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +source "$(dirname "$0")"/common.sh + +start_cluster + +# Test for CLI commands. +# verify only the return code the content correctness of the API results. +PERIODIC_JOB_JAR=$TEST_INFRA_DIR/../../flink-end-to-end-tests/flink-api-test/target/PeriodicStreamingJob.jar +JOB_ID_REGEX_EXTRACTOR=".*JobID ([0-9,a-f]*)" +SAVE_POINT_REGEX_EXTRACTOR=".*Savepoint stored in (.*)\\." +JOB_INFO_PACT_DATA_SOURCE_REGEX_EXTRACTOR="\"pact\": \"(Data Source)\"" +JOB_INFO_PACT_DATA_SINK_REGEX_EXTRACTOR="\"pact\": \"(Data Sink)\"" +JOB_LIST_REGEX_EXTRACTOR_BY_STATUS="([0-9,a-f]*) :" + +EXIT_CODE=0 + +function extract_job_id_from_job_submission_return() { +if [[ $1 =~ $JOB_ID_REGEX_EXTRACTOR ]]; +then +JOB_ID="${BASH_REMATCH[1]}"; +else +JOB_ID="" +fi +echo "$JOB_ID" +} + +function extract_savepoint_path_from_savepoint_return() { +if [[ $1 =~ $SAVE_POINT_REGEX_EXTRACTOR ]]; +then +SAVEPOINT_PATH="${BASH_REMATCH[1]}"; +else +SAVEPOINT_PATH="" +fi +echo "$SAVEPOINT_PATH" +} + +function extract_valid_pact_from_job_info_return() { +PACT_MATCH=0 +if [[ $1 =~ $JOB_INFO_PACT_DATA_SOURCE_REGEX_EXTRACTOR ]]; +then +PACT_MATCH=$PACT_MATCH +else +PACT_MATCH=-1 +fi +if [[ $1 =~ $JOB_INFO_PACT_DATA_SINK_REGEX_EXTRACTOR ]]; +then +PACT_MATCH=$PACT_MATCH +else +PACT_MATCH=-1 +fi +echo ${PACT_MATCH} +} + +function extract_valid_job_list_by_type_from_job_list_return() { +JOB_LIST_MATCH=0 +JOB_LIST_REGEX_EXTRACTOR="$JOB_LIST_REGEX_EXTRACTOR_BY_STATUS $2 $3" +if [[ $1 =~ $JOB_LIST_REGEX_EXTRACTOR ]]; +then +JOB_LIST_MATCH=$JOB_LIST_MATCH +else +JOB_LIST_MATCH=-1 +fi +echo ${JOB_LIST_MATCH} +} + +function cleanup_cli_test() { + stop_cluster + $FLINK_DIR/bin/taskmanager.sh stop-all --- End diff -- I don't think we need to explicitly shutdown the cluster and TMs here; that is already part of the `cleanup` call ---
[GitHub] flink pull request #5863: [FLINK-8985][e2etest] initial support for End-to-e...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5863#discussion_r190145819 --- Diff: flink-end-to-end-tests/test-scripts/test_cli_api.sh --- @@ -0,0 +1,155 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +source "$(dirname "$0")"/common.sh + +start_cluster +$FLINK_DIR/bin/taskmanager.sh start +$FLINK_DIR/bin/taskmanager.sh start +$FLINK_DIR/bin/taskmanager.sh start + +# Test for CLI commands. +# verify only the return code the content correctness of the API results. +PERIODIC_JOB_JAR=$TEST_INFRA_DIR/../../flink-end-to-end-tests/flink-api-test/target/PeriodicStreamingJob.jar +JOB_ID_REGEX_EXTRACTOR=".*JobID ([0-9,a-f]*)" +SAVE_POINT_REGEX_EXTRACTOR=".*Savepoint stored in (.*)\\." + +EXIT_CODE=0 + +function extract_job_id_from_job_submission_return() { +if [[ $1 =~ $JOB_ID_REGEX_EXTRACTOR ]]; +then +JOB_ID="${BASH_REMATCH[1]}"; +else +JOB_ID="" +fi +echo "$JOB_ID" +} + +function extract_savepoint_path_from_savepoint_return() { +if [[ $1 =~ $SAVE_POINT_REGEX_EXTRACTOR ]]; +then +SAVEPOINT_PATH="${BASH_REMATCH[1]}"; +else +SAVEPOINT_PATH="" +fi +echo "$SAVEPOINT_PATH" +} + +function cleanup_cli_test() { + stop_cluster + $FLINK_DIR/bin/taskmanager.sh stop-all + + cleanup +} + +printf "\n==\n" +printf "Test default job launch with non-detach mode\n" +printf "==\n" +if [ $EXIT_CODE == 0 ]; then +eval "$FLINK_DIR/bin/flink run $FLINK_DIR/examples/batch/WordCount.jar" +EXIT_CODE=$? +fi + +printf "\n==\n" +printf "Test run with complex parameter set\n" +printf "==\n" +if [ $EXIT_CODE == 0 ]; then +eval "$FLINK_DIR/bin/flink run -m localhost:8081 -p 4 -q -d \ --- End diff -- Well, we can have a completely normal exit code from the `run` execution, but the `-p` option completely ignored if we change the CLI to simply not recognize the option. This is an extreme case, though. ---
[GitHub] flink pull request #6023: [FLINK-9383][runtime] Test directories in Distribu...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/6023#discussion_r190144815 --- Diff: flink-end-to-end-tests/run-pre-commit-tests.sh --- @@ -93,6 +93,14 @@ if [ $EXIT_CODE == 0 ]; then EXIT_CODE=$? fi +if [ $EXIT_CODE == 0 ]; then +printf "\n==\n" +printf "Running Distributed cache end-to-end test\n" +printf "==\n" + $END_TO_END_DIR/test-scripts/test_streaming_distributed_cache_via_blob.sh --- End diff -- Should update this to use the new `run_test` utility we have. ---
[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5992#discussion_r190140597 --- Diff: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java --- @@ -26,20 +29,60 @@ import com.amazonaws.ClientConfigurationFactory; import com.amazonaws.services.kinesis.AmazonKinesis; import com.amazonaws.services.kinesis.model.ExpiredIteratorException; +import com.amazonaws.services.kinesis.model.ListShardsRequest; +import com.amazonaws.services.kinesis.model.ListShardsResult; import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException; +import com.amazonaws.services.kinesis.model.Shard; +import org.hamcrest.Description; +import org.hamcrest.TypeSafeDiagnosingMatcher; +import org.junit.Assert; import org.junit.Test; import org.powermock.reflect.Whitebox; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; import java.util.Properties; +import java.util.Set; +import java.util.stream.Collectors; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.collection.IsCollectionWithSize.hasSize; +import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.argThat; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; /** * Test for methods in the {@link KinesisProxy} class. */ public class KinesisProxyTest { + private static final String NEXT_TOKEN = "NextToken"; + private static final String fakeStreamName = "fake-stream"; + private Set shardIdSet; + private List shards; --- End diff -- Should we move these to be scoped only to the `testGetShardList ` test method? ---
[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5992#discussion_r190140724 --- Diff: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java --- @@ -26,20 +29,60 @@ import com.amazonaws.ClientConfigurationFactory; import com.amazonaws.services.kinesis.AmazonKinesis; import com.amazonaws.services.kinesis.model.ExpiredIteratorException; +import com.amazonaws.services.kinesis.model.ListShardsRequest; +import com.amazonaws.services.kinesis.model.ListShardsResult; import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException; +import com.amazonaws.services.kinesis.model.Shard; +import org.hamcrest.Description; +import org.hamcrest.TypeSafeDiagnosingMatcher; +import org.junit.Assert; import org.junit.Test; import org.powermock.reflect.Whitebox; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; import java.util.Properties; +import java.util.Set; +import java.util.stream.Collectors; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.collection.IsCollectionWithSize.hasSize; +import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.argThat; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; /** * Test for methods in the {@link KinesisProxy} class. */ public class KinesisProxyTest { + private static final String NEXT_TOKEN = "NextToken"; + private static final String fakeStreamName = "fake-stream"; + private Set shardIdSet; + private List shards; + + protected static HashMap<String, String> + createInitialSubscribedStreamsToLastDiscoveredShardsState(List streams) { + HashMap<String, String> initial = new HashMap<>(); + for (String stream : streams) { + initial.put(stream, null); + } + return initial; + } + + private static ListShardsRequestMatcher initialListShardsRequestMatcher() { + return new ListShardsRequestMatcher(null, null); + } + + private static ListShardsRequestMatcher listShardsNextToken(final String nextToken) { --- End diff -- nit: IMO, it would help with readability if we move these private utility methods after the main test ones. ---
[GitHub] flink issue #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer Backpr...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/6021 @fmthoma I think this might benefit from an actual documentation, not only Javadocs. ---
[GitHub] flink pull request #6043: [FLINK-7386] evolve RequestIndexer API to make it ...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/6043#discussion_r190127059 --- Diff: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/RequestIndexer.java --- @@ -21,18 +21,56 @@ import org.apache.flink.annotation.PublicEvolving; import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.update.UpdateRequest; /** - * Users add multiple {@link ActionRequest ActionRequests} to a {@link RequestIndexer} to prepare + * Users add multiple delete, index or update requests to a {@link RequestIndexer} to prepare * them for sending to an Elasticsearch cluster. */ @PublicEvolving -public interface RequestIndexer { +public abstract class RequestIndexer { --- End diff -- I think we can leave `RequestIndexer` as a interface, and make the `add(ActionRequest...)` a [default method](https://docs.oracle.com/javase/tutorial/java/IandI/defaultmethods.html). This would lessen the friction of this breaking change. ---
[GitHub] flink pull request #6043: [FLINK-7386] evolve RequestIndexer API to make it ...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/6043#discussion_r190127406 --- Diff: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/RequestIndexer.java --- @@ -21,18 +21,56 @@ import org.apache.flink.annotation.PublicEvolving; import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.update.UpdateRequest; /** - * Users add multiple {@link ActionRequest ActionRequests} to a {@link RequestIndexer} to prepare + * Users add multiple delete, index or update requests to a {@link RequestIndexer} to prepare * them for sending to an Elasticsearch cluster. */ @PublicEvolving -public interface RequestIndexer { +public abstract class RequestIndexer { /** * Add multiple {@link ActionRequest} to the indexer to prepare for sending requests to Elasticsearch. * * @param actionRequests The multiple {@link ActionRequest} to add. +* @deprecated use the {@link DeleteRequest}, {@link IndexRequest} or {@Up} */ - void add(ActionRequest... actionRequests); + @Deprecated + public void add(ActionRequest... actionRequests) { + for (ActionRequest actionRequest : actionRequests) { + if (actionRequest instanceof IndexRequest) { + add((IndexRequest) actionRequest); + } else if (actionRequest instanceof DeleteRequest) { + add((DeleteRequest) actionRequest); + } else if (actionRequest instanceof UpdateRequest) { + add((UpdateRequest) actionRequest); + } else { + throw new IllegalArgumentException("RequestIndexer only supports Index, Delete and Update requests"); + } + } + } + + /** +* Add multiple {@link DeleteRequest} to the indexer to prepare for sending requests to Elasticsearch. +* +* @param deleteRequests The multiple {@link DeleteRequest} to add. +*/ + public abstract void add(DeleteRequest... deleteRequests); --- End diff -- What would be your feeling on not exposing `DeleteRequest`, `IndexRequest`, `UpdateRequest` directly through user API? We could maintain our own way to specify requests, and only create the actual ES request instances internally. It would be more maintenance work for us, but might be safer from a future-proof API perspective. ---
[GitHub] flink pull request #6043: [FLINK-7386] evolve RequestIndexer API to make it ...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/6043#discussion_r190126862 --- Diff: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/BulkProcessorIndexer.java --- @@ -45,12 +48,34 @@ } @Override - public void add(ActionRequest... actionRequests) { - for (ActionRequest actionRequest : actionRequests) { + public void add(DeleteRequest... deleteRequests) { + for (DeleteRequest deleteRequest : deleteRequests) { if (flushOnCheckpoint) { numPendingRequestsRef.getAndIncrement(); } - this.bulkProcessor.add(actionRequest); + this.bulkProcessor.add(deleteRequest); + } + } + + @Override + public void add(IndexRequest... indexRequests) { + for (IndexRequest indexRequest : indexRequests) { + System.out.println("ir: " + indexRequest); --- End diff -- Leftover print. ---
[GitHub] flink pull request #6043: [FLINK-7386] evolve RequestIndexer API to make it ...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/6043#discussion_r190126707 --- Diff: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/RequestIndexer.java --- @@ -21,18 +21,56 @@ import org.apache.flink.annotation.PublicEvolving; import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.update.UpdateRequest; /** - * Users add multiple {@link ActionRequest ActionRequests} to a {@link RequestIndexer} to prepare + * Users add multiple delete, index or update requests to a {@link RequestIndexer} to prepare * them for sending to an Elasticsearch cluster. */ @PublicEvolving -public interface RequestIndexer { +public abstract class RequestIndexer { /** * Add multiple {@link ActionRequest} to the indexer to prepare for sending requests to Elasticsearch. * * @param actionRequests The multiple {@link ActionRequest} to add. +* @deprecated use the {@link DeleteRequest}, {@link IndexRequest} or {@Up} --- End diff -- typo at the end. ---
[GitHub] flink pull request #6053: [FLINK-9257][E2E Tests] Fix wrong "All tests pass"...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/6053#discussion_r190126277 --- Diff: flink-end-to-end-tests/run-nightly-tests.sh --- @@ -30,168 +30,50 @@ if [ -z "$FLINK_DIR" ] ; then exit 1 fi -source "$(dirname "$0")"/test-scripts/common.sh +source "$(dirname "$0")"/test-scripts/test-runner-common.sh FLINK_DIR="`( cd \"$FLINK_DIR\" && pwd )`" # absolutized and normalized echo "flink-end-to-end-test directory: $END_TO_END_DIR" echo "Flink distribution directory: $FLINK_DIR" -EXIT_CODE=0 - # Template for adding a test: -# if [ $EXIT_CODE == 0 ]; then -#run_test "" "$END_TO_END_DIR/test-scripts/" -#EXIT_CODE=$? -# fi - - -if [ $EXIT_CODE == 0 ]; then - run_test "Running HA (file, async) end-to-end test" "$END_TO_END_DIR/test-scripts/test_ha.sh file true false" - EXIT_CODE=$? -fi - -if [ $EXIT_CODE == 0 ]; then - run_test "Running HA (file, sync) end-to-end test" "$END_TO_END_DIR/test-scripts/test_ha.sh file false false" - EXIT_CODE=$? -fi - -if [ $EXIT_CODE == 0 ]; then - run_test "Running HA (rocks, non-incremental) end-to-end test" "$END_TO_END_DIR/test-scripts/test_ha.sh rocks true false" - EXIT_CODE=$? -fi - -if [ $EXIT_CODE == 0 ]; then - run_test "Running HA (rocks, incremental) end-to-end test" "$END_TO_END_DIR/test-scripts/test_ha.sh rocks true true" - EXIT_CODE=$? -fi - -if [ $EXIT_CODE == 0 ]; then - run_test "Resuming Savepoint (file, async, no parallelism change) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 2 file true" - EXIT_CODE=$? -fi - -if [ $EXIT_CODE == 0 ]; then - run_test "Resuming Savepoint (file, sync, no parallelism change) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 2 file false" - EXIT_CODE=$? -fi - -if [ $EXIT_CODE == 0 ]; then - run_test "Resuming Savepoint (file, async, scale up) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 4 file true" - EXIT_CODE=$? -fi - -if [ $EXIT_CODE == 0 ]; then - run_test "Resuming Savepoint (file, sync, scale up) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 4 file false" - EXIT_CODE=$? -fi - -if [ $EXIT_CODE == 0 ]; then - run_test "Resuming Savepoint (file, async, scale down) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 4 2 file true" - EXIT_CODE=$? -fi - -if [ $EXIT_CODE == 0 ]; then - run_test "Resuming Savepoint (file, sync, scale down) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 4 2 file false" - EXIT_CODE=$? -fi - -if [ $EXIT_CODE == 0 ]; then - run_test "Resuming Savepoint (rocks, no parallelism change) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 2 rocks" - EXIT_CODE=$? -fi - -if [ $EXIT_CODE == 0 ]; then - run_test "Resuming Savepoint (rocks, scale up) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 4 rocks" - EXIT_CODE=$? -fi - -if [ $EXIT_CODE == 0 ]; then - run_test "Resuming Savepoint (rocks, scale down) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 4 2 rocks" - EXIT_CODE=$? -fi - -if [ $EXIT_CODE == 0 ]; then - run_test "Resuming Externalized Checkpoint (file, async) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh file true false" - EXIT_CODE=$? -fi - -if [ $EXIT_CODE == 0 ]; then - run_test "Resuming Externalized Checkpoint (file, sync) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh file false false" - EXIT_CODE=$? -fi - -if [ $EXIT_CODE == 0 ]; then - run_test "Resuming Externalized Checkpoint (rocks) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh rocks false" - EXIT_CODE=$? -fi - -if [ $EXIT_CODE == 0 ]; then - run_test "Resuming Externalized Checkpoint after terminal failure (file, async) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh file true true" - EXIT_CODE=$? -fi - -if [ $EXIT_CODE == 0 ]; then
[GitHub] flink pull request #6053: [FLINK-9257][E2E Tests] Fix wrong "All tests pass"...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/6053#discussion_r190126071 --- Diff: flink-end-to-end-tests/test-scripts/test_local_recovery_and_scheduling.sh --- @@ -53,8 +53,6 @@ function cleanup_after_test { # kill ${watchdog_pid} 2> /dev/null wait ${watchdog_pid} 2> /dev/null -# -cleanup --- End diff -- The `test_local_recovery_and_scheduling` test currently bundles several executions of the test (e.g. with different state backend configurations) in a single run of the test script. That's why it required this cleanup within the test itself. How would the change of this PR affect this? In general, should we also restructure e2e tests so that each execution configuration variant should be executed with the `test-runner-cleanup#run_test` method (instead of cleaning up itself in-between executions)? AFAIK, only the `test_local_recovery_and_scheduling` does this at the moment. ---
[GitHub] flink pull request #6053: [FLINK-9257][E2E Tests] Fix wrong "All tests pass"...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/6053#discussion_r190124962 --- Diff: flink-end-to-end-tests/run-pre-commit-tests.sh --- @@ -31,56 +31,23 @@ if [ -z "$FLINK_DIR" ] ; then fi source "$(dirname "$0")"/test-scripts/common.sh --- End diff -- I think this can now be removed? ---
[GitHub] flink pull request #6053: [FLINK-9257][E2E Tests] Fix wrong "All tests pass"...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/6053#discussion_r190125404 --- Diff: flink-end-to-end-tests/test-scripts/test-runner-common.sh --- @@ -0,0 +1,76 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +source "$(pwd)"/test-scripts/common.sh + +### +# Prints the given description, runs the given test and prints how long the execution took. +# Arguments: +# $1: description of the test +# $2: command to execute +### +function run_test { +description="$1" +command="$2" + +printf "\n==\n" +printf "Running '${description}'\n" +printf "==\n" +start_timer +${command} +exit_code="$?" +time_elapsed=$(end_timer) + +check_logs_for_errors +check_logs_for_exceptions +check_logs_for_non_empty_out_files --- End diff -- Maybe we should move all these methods: ``` start_timer end_timer check_logs_for_errors check_logs_for_exceptions check_logs_for_non_empty_out_files ``` to `test-runner-common.sh` since that's the only place they are used anyways ---
[GitHub] flink pull request #6058: [FLINK-9415] Remove reference to StreamingMultiple...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/6058#discussion_r190123863 --- Diff: docs/dev/stream/testing.md --- @@ -181,7 +181,7 @@ public class ExampleIntegrationTest extends StreamingMultipleProgramsTestBase { {% highlight scala %} -class ExampleIntegrationTest extends StreamingMultipleProgramsTestBase { +class ExampleIntegrationTest extends ScalaStreamingMultipleProgramsTestBase { --- End diff -- I see that in the code we still have only 1 usage of `ScalaStreamingMultipleProgramsTestBase`, but I don't see why it can't be replaced with `AbstractTestBase` also (therefore removing `ScalaStreamingMultipleProgramsTestBase` and not referencing it anymore in docs). @zentol could you comment here? ---
[GitHub] flink pull request #6058: [FLINK-9415] Remove reference to StreamingMultiple...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/6058#discussion_r190123818 --- Diff: docs/dev/stream/testing.md --- @@ -181,7 +181,7 @@ public class ExampleIntegrationTest extends StreamingMultipleProgramsTestBase { {% highlight scala %} -class ExampleIntegrationTest extends StreamingMultipleProgramsTestBase { +class ExampleIntegrationTest extends ScalaStreamingMultipleProgramsTestBase { --- End diff -- I see that in the code we still have only 1 usage of `ScalaStreamingMultipleProgramsTestBase`, but I don't see why it can't be replaced with `AbstractTestBase` also (therefore removing `ScalaStreamingMultipleProgramsTestBase` and not referencing it anymore in docs). @zentol could you comment here? ---
[GitHub] flink pull request #6040: [FLINK-9349][Kafka Connector] KafkaConnector Excep...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/6040#discussion_r190120134 --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java --- @@ -390,6 +398,102 @@ public void testPeriodicWatermarksWithNoSubscribedPartitionsShouldYieldNoWaterma assertEquals(100, sourceContext.getLatestWatermark().getTimestamp()); } + @Test + public void testConcurrentPartitionsDiscoveryAndLoopFetching() throws Exception { + // test data + final KafkaTopicPartition testPartition = new KafkaTopicPartition("test", 42); + + final Map<KafkaTopicPartition, Long> testCommitData = new HashMap<>(); + testCommitData.put(testPartition, 11L); + + // - create the test fetcher - + + @SuppressWarnings("unchecked") + SourceContext sourceContext = PowerMockito.mock(SourceContext.class); + Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets = + Collections.singletonMap(testPartition, KafkaTopicPartitionStateSentinel.GROUP_OFFSET); + + final TestFetcher fetcher = new TestFetcher<>( + sourceContext, + partitionsWithInitialOffsets, + null, /* periodic assigner */ + null, /* punctuated assigner */ + new TestProcessingTimeService(), + 10); + + // - run the fetcher - + + final AtomicReference error = new AtomicReference<>(); + int fetchTasks = 5; + final CountDownLatch latch = new CountDownLatch(fetchTasks); + ExecutorService service = Executors.newFixedThreadPool(fetchTasks + 1); + + service.submit(new Thread("fetcher runner") { + @Override + public void run() { + try { + latch.await(); + fetcher.runFetchLoop(); --- End diff -- So, IMO, the test should look something like this: ``` final OneShotLatch fetchLoopWaitLatch = new OneShotLatch(); final OneShotLatch stateIterationBlockLatch = new OneShotLatch(); final TestFetcher fetcher = new TestFetcher<>( sourceContext, partitionsWithInitialOffsets, null, /* periodic assigner */ null, /* punctuated assigner */ new TestProcessingTimeService(), 10, fetchLoopWaitLatch, stateIterationBlockLatch); // - run the fetcher - final CheckedThread checkedThread = new CheckedThread() { @Override public void go() throws Exception { fetcher.runFetchLoop(); } }; checkedThread.start(); // wait until state iteration begins before adding discovered partitions fetchLoopWaitLatch.await(); fetcher.addDiscoveredPartitions(Collections.singletonList(testPartition)); stateIterationBlockLatch.trigger(); checkedThread.sync(); ``` ---
[GitHub] flink pull request #6040: [FLINK-9349][Kafka Connector] KafkaConnector Excep...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/6040#discussion_r190111529 --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java --- @@ -416,9 +520,16 @@ protected TestFetcher( false); } + /** +* Emulation of partition's iteration which is required for +* {@link AbstractFetcherTest#testConcurrentPartitionsDiscoveryAndLoopFetching}. +* @throws Exception +*/ @Override public void runFetchLoop() throws Exception { - throw new UnsupportedOperationException(); + for (KafkaTopicPartitionState ignored: subscribedPartitionStates()) { + Thread.sleep(10L); --- End diff -- This would only let the test fail "occasionally", right? I would like this to be changed, so that we always have the test failing without the copy on write fix. We could do this by having a dummy source context that blocks on record emit. ---
[GitHub] flink pull request #6040: [FLINK-9349][Kafka Connector] KafkaConnector Excep...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/6040#discussion_r190110928 --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java --- @@ -390,6 +398,102 @@ public void testPeriodicWatermarksWithNoSubscribedPartitionsShouldYieldNoWaterma assertEquals(100, sourceContext.getLatestWatermark().getTimestamp()); } + @Test + public void testConcurrentPartitionsDiscoveryAndLoopFetching() throws Exception { + // test data + final KafkaTopicPartition testPartition = new KafkaTopicPartition("test", 42); + + final Map<KafkaTopicPartition, Long> testCommitData = new HashMap<>(); + testCommitData.put(testPartition, 11L); + + // - create the test fetcher - + + @SuppressWarnings("unchecked") + SourceContext sourceContext = PowerMockito.mock(SourceContext.class); --- End diff -- It is unnecessary to use a power mock here. A dummy implementation of a `SourceContext` will be better. ---
[GitHub] flink pull request #6040: [FLINK-9349][Kafka Connector] KafkaConnector Excep...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/6040#discussion_r190112933 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java --- @@ -507,7 +507,11 @@ private void updateMinPunctuatedWatermark(Watermark nextWatermark) { SerializedValue<AssignerWithPunctuatedWatermarks> watermarksPunctuated, ClassLoader userCodeClassLoader) throws IOException, ClassNotFoundException { - List<KafkaTopicPartitionState> partitionStates = new LinkedList<>(); + /** +* CopyOnWrite as adding discovered partitions could happen in parallel +* with different threads iterating by {@link AbstractFetcher#subscribedPartitionStates} results +*/ --- End diff -- I think we usually don't have Javadoc blocks within methods. A regular comment with `//` would do. ---
[GitHub] flink pull request #6040: [FLINK-9349][Kafka Connector] KafkaConnector Excep...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/6040#discussion_r190111239 --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java --- @@ -390,6 +398,102 @@ public void testPeriodicWatermarksWithNoSubscribedPartitionsShouldYieldNoWaterma assertEquals(100, sourceContext.getLatestWatermark().getTimestamp()); } + @Test + public void testConcurrentPartitionsDiscoveryAndLoopFetching() throws Exception { + // test data + final KafkaTopicPartition testPartition = new KafkaTopicPartition("test", 42); + + final Map<KafkaTopicPartition, Long> testCommitData = new HashMap<>(); + testCommitData.put(testPartition, 11L); + + // - create the test fetcher - + + @SuppressWarnings("unchecked") + SourceContext sourceContext = PowerMockito.mock(SourceContext.class); + Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets = + Collections.singletonMap(testPartition, KafkaTopicPartitionStateSentinel.GROUP_OFFSET); + + final TestFetcher fetcher = new TestFetcher<>( + sourceContext, + partitionsWithInitialOffsets, + null, /* periodic assigner */ + null, /* punctuated assigner */ + new TestProcessingTimeService(), + 10); + + // - run the fetcher - + + final AtomicReference error = new AtomicReference<>(); + int fetchTasks = 5; + final CountDownLatch latch = new CountDownLatch(fetchTasks); + ExecutorService service = Executors.newFixedThreadPool(fetchTasks + 1); + + service.submit(new Thread("fetcher runner") { + @Override + public void run() { + try { + latch.await(); + fetcher.runFetchLoop(); + } catch (Throwable t) { + error.set(t); + } + } + }); + + for (int i = 0; i < fetchTasks; i++) { + service.submit(new Thread("add partitions " + i) { + @Override + public void run() { + try { + List newPartitions = new ArrayList<>(); + for (int i = 0; i < 1000; i++) { + newPartitions.add(testPartition); + } + fetcher.addDiscoveredPartitions(newPartitions); + latch.countDown(); + for (int i = 0; i < 100; i++) { + fetcher.addDiscoveredPartitions(newPartitions); + Thread.sleep(1L); + } + } catch (Throwable t) { + error.set(t); + } + } + }); + } + + service.awaitTermination(1L, TimeUnit.SECONDS); + + // - trigger the offset commit - --- End diff -- We should be able to ignore offset commit triggering in this test ---
[GitHub] flink pull request #6040: [FLINK-9349][Kafka Connector] KafkaConnector Excep...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/6040#discussion_r190120209 --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java --- @@ -390,6 +398,102 @@ public void testPeriodicWatermarksWithNoSubscribedPartitionsShouldYieldNoWaterma assertEquals(100, sourceContext.getLatestWatermark().getTimestamp()); } + @Test + public void testConcurrentPartitionsDiscoveryAndLoopFetching() throws Exception { + // test data + final KafkaTopicPartition testPartition = new KafkaTopicPartition("test", 42); + + final Map<KafkaTopicPartition, Long> testCommitData = new HashMap<>(); + testCommitData.put(testPartition, 11L); + + // - create the test fetcher - + + @SuppressWarnings("unchecked") + SourceContext sourceContext = PowerMockito.mock(SourceContext.class); + Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets = + Collections.singletonMap(testPartition, KafkaTopicPartitionStateSentinel.GROUP_OFFSET); + + final TestFetcher fetcher = new TestFetcher<>( + sourceContext, + partitionsWithInitialOffsets, + null, /* periodic assigner */ + null, /* punctuated assigner */ + new TestProcessingTimeService(), + 10); + + // - run the fetcher - + + final AtomicReference error = new AtomicReference<>(); + int fetchTasks = 5; + final CountDownLatch latch = new CountDownLatch(fetchTasks); + ExecutorService service = Executors.newFixedThreadPool(fetchTasks + 1); + + service.submit(new Thread("fetcher runner") { + @Override + public void run() { + try { + latch.await(); + fetcher.runFetchLoop(); --- End diff -- The final `checkedThread.sync()` would always fail with the `ConcurrentModificationException` if the test is designed like this. ---
[GitHub] flink pull request #6040: [FLINK-9349][Kafka Connector] KafkaConnector Excep...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/6040#discussion_r190111078 --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java --- @@ -390,6 +398,102 @@ public void testPeriodicWatermarksWithNoSubscribedPartitionsShouldYieldNoWaterma assertEquals(100, sourceContext.getLatestWatermark().getTimestamp()); } + @Test + public void testConcurrentPartitionsDiscoveryAndLoopFetching() throws Exception { + // test data + final KafkaTopicPartition testPartition = new KafkaTopicPartition("test", 42); + + final Map<KafkaTopicPartition, Long> testCommitData = new HashMap<>(); + testCommitData.put(testPartition, 11L); + + // - create the test fetcher - + + @SuppressWarnings("unchecked") + SourceContext sourceContext = PowerMockito.mock(SourceContext.class); + Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets = + Collections.singletonMap(testPartition, KafkaTopicPartitionStateSentinel.GROUP_OFFSET); + + final TestFetcher fetcher = new TestFetcher<>( + sourceContext, + partitionsWithInitialOffsets, + null, /* periodic assigner */ + null, /* punctuated assigner */ + new TestProcessingTimeService(), + 10); + + // - run the fetcher - + + final AtomicReference error = new AtomicReference<>(); --- End diff -- Flink provides a `CheckedThread` utility so you don't have to do this thread error referencing. ---
[GitHub] flink pull request #6040: [FLINK-9349][Kafka Connector] KafkaConnector Excep...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/6040#discussion_r190114844 --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java --- @@ -390,6 +398,102 @@ public void testPeriodicWatermarksWithNoSubscribedPartitionsShouldYieldNoWaterma assertEquals(100, sourceContext.getLatestWatermark().getTimestamp()); } + @Test + public void testConcurrentPartitionsDiscoveryAndLoopFetching() throws Exception { + // test data + final KafkaTopicPartition testPartition = new KafkaTopicPartition("test", 42); + + final Map<KafkaTopicPartition, Long> testCommitData = new HashMap<>(); + testCommitData.put(testPartition, 11L); + + // - create the test fetcher - + + @SuppressWarnings("unchecked") + SourceContext sourceContext = PowerMockito.mock(SourceContext.class); + Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets = + Collections.singletonMap(testPartition, KafkaTopicPartitionStateSentinel.GROUP_OFFSET); + + final TestFetcher fetcher = new TestFetcher<>( + sourceContext, + partitionsWithInitialOffsets, + null, /* periodic assigner */ + null, /* punctuated assigner */ + new TestProcessingTimeService(), + 10); + + // - run the fetcher - + + final AtomicReference error = new AtomicReference<>(); + int fetchTasks = 5; + final CountDownLatch latch = new CountDownLatch(fetchTasks); + ExecutorService service = Executors.newFixedThreadPool(fetchTasks + 1); + + service.submit(new Thread("fetcher runner") { + @Override + public void run() { + try { + latch.await(); + fetcher.runFetchLoop(); --- End diff -- The sequence here seems a bit odd to me. I think we should be testing this as follows: 1. Run the fetch loop, and let it be blocked on record emitting (which then should let it be blocked mid-iteration) 2. Add a discovered partition; this should not throw an exception. ---
[GitHub] flink issue #6045: [FLINK-9402] [kinesis] Kinesis consumer configuration req...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/6045 Thanks @tweise. Have only one comment, otherwise this looks good to merge. ---
[GitHub] flink pull request #6045: [FLINK-9402] [kinesis] Kinesis consumer configurat...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/6045#discussion_r189815405 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java --- @@ -244,7 +244,11 @@ public static void validateAwsConfiguration(Properties config) { } if (!config.containsKey(AWSConfigConstants.AWS_REGION)) { - throw new IllegalArgumentException("The AWS region ('" + AWSConfigConstants.AWS_REGION + "') must be set in the config."); + if (!config.containsKey(ConsumerConfigConstants.AWS_ENDPOINT)) { + // per validation in AwsClientBuilder + throw new IllegalArgumentException(String.format("Either AWS region ('%s') or AWS endpoint ('%s') must be set in the config.", + AWSConfigConstants.AWS_REGION, AWSConfigConstants.AWS_REGION)); + } } else { --- End diff -- Do we also need to check that not both `AWS_REGION` and `AWS_ENDPOINT` is set? (Since the AwsClientBuilder says that ONLY ONE of these 2 may be set). ---
[GitHub] flink issue #5977: [FLINK-9295][kafka] Fix transactional.id collisions for F...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5977 Thanks for the update @pnowojski. Changes LGTM, +1. Merging this .. ---
[GitHub] flink issue #5761: [FLINK-8989] [e2eTests] Elasticsearch1&2&5 end to end tes...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5761 +1, LGTM, merging ... ---
[GitHub] flink issue #5849: [FLINK-8986][e2e-test][WIP] Flink end to end test REST AP...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5849 @walterddr yes, you can do that. In the description, just leave some notice that the PR is based on another, and which of the commits are relevant. ---
[GitHub] flink issue #5958: [FLINK-8500] Get the timestamp of the Kafka message from ...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5958 @FredTing we had some offline discussion on how to proceed with this. @aljoscha, @twalthr, or @StephanEwen can probably comment more here if I missed anything. The conflict that Stephan mentioned between a "common deserialization schema" interface and exposing surfacing connector specific information is rooted in the fact that both concerns (deserialization and providing connector specific record meta information) is currently coupled in a single interface. Take for example the Kafka connector's `KeyedDeserializationSchema` - there we try to deserialize the Kafka bytes, as well as provide information such as topic / partition / timestamp etc. to allow the user to enrich their user records for downstream business logic. The first part (deserialization of bytes) should be something common for all connector sources, while the second part is Kafka-specific. Therefore, we should perhaps break this up into two separate interfaces, as follows: ``` // common interface for all sources (we already have this) interface DeserializationSchema { T deserialize(byte[] bytes); } // ... and a Kafka-specific interface that is only used to provide record meta information interface ConsumerRecordMetaInfoProvider { T enrich(T record, ConsumerRecordMetaInfo metaInfo); } ``` The second interface is something that each connector should have independently, and does not handle deserialization of the record bytes. The name, of course, is still open to discussion. What do you think? ---
[GitHub] flink issue #5849: [FLINK-8986][e2e-test][WIP] Flink end to end test REST AP...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5849 Hi @walterddr, what is the status of this PR? Would be nice if we can move forward with this PR (and also the CLI e2e test PR that also you opened.) ---
[GitHub] flink pull request #5863: [FLINK-8985][e2etest] initial support for End-to-e...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5863#discussion_r189185140 --- Diff: flink-end-to-end-tests/test-scripts/test_cli_api.sh --- @@ -0,0 +1,155 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +source "$(dirname "$0")"/common.sh + +start_cluster +$FLINK_DIR/bin/taskmanager.sh start +$FLINK_DIR/bin/taskmanager.sh start +$FLINK_DIR/bin/taskmanager.sh start + +# Test for CLI commands. +# verify only the return code the content correctness of the API results. +PERIODIC_JOB_JAR=$TEST_INFRA_DIR/../../flink-end-to-end-tests/flink-api-test/target/PeriodicStreamingJob.jar +JOB_ID_REGEX_EXTRACTOR=".*JobID ([0-9,a-f]*)" +SAVE_POINT_REGEX_EXTRACTOR=".*Savepoint stored in (.*)\\." + +EXIT_CODE=0 + +function extract_job_id_from_job_submission_return() { +if [[ $1 =~ $JOB_ID_REGEX_EXTRACTOR ]]; +then +JOB_ID="${BASH_REMATCH[1]}"; +else +JOB_ID="" +fi +echo "$JOB_ID" +} + +function extract_savepoint_path_from_savepoint_return() { +if [[ $1 =~ $SAVE_POINT_REGEX_EXTRACTOR ]]; +then +SAVEPOINT_PATH="${BASH_REMATCH[1]}"; +else +SAVEPOINT_PATH="" +fi +echo "$SAVEPOINT_PATH" +} + +function cleanup_cli_test() { + stop_cluster + $FLINK_DIR/bin/taskmanager.sh stop-all + + cleanup +} + +printf "\n==\n" +printf "Test default job launch with non-detach mode\n" +printf "==\n" +if [ $EXIT_CODE == 0 ]; then +eval "$FLINK_DIR/bin/flink run $FLINK_DIR/examples/batch/WordCount.jar" +EXIT_CODE=$? +fi + +printf "\n==\n" +printf "Test run with complex parameter set\n" +printf "==\n" +if [ $EXIT_CODE == 0 ]; then +eval "$FLINK_DIR/bin/flink run -m localhost:8081 -p 4 -q -d \ --- End diff -- Since this is a detached execution, we probably want to wait until this job completes before continuing? ---
[GitHub] flink pull request #5863: [FLINK-8985][e2etest] initial support for End-to-e...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5863#discussion_r189185253 --- Diff: flink-end-to-end-tests/test-scripts/test_cli_api.sh --- @@ -0,0 +1,155 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +source "$(dirname "$0")"/common.sh + +start_cluster +$FLINK_DIR/bin/taskmanager.sh start +$FLINK_DIR/bin/taskmanager.sh start +$FLINK_DIR/bin/taskmanager.sh start + +# Test for CLI commands. +# verify only the return code the content correctness of the API results. +PERIODIC_JOB_JAR=$TEST_INFRA_DIR/../../flink-end-to-end-tests/flink-api-test/target/PeriodicStreamingJob.jar +JOB_ID_REGEX_EXTRACTOR=".*JobID ([0-9,a-f]*)" +SAVE_POINT_REGEX_EXTRACTOR=".*Savepoint stored in (.*)\\." + +EXIT_CODE=0 + +function extract_job_id_from_job_submission_return() { +if [[ $1 =~ $JOB_ID_REGEX_EXTRACTOR ]]; +then +JOB_ID="${BASH_REMATCH[1]}"; +else +JOB_ID="" +fi +echo "$JOB_ID" +} + +function extract_savepoint_path_from_savepoint_return() { +if [[ $1 =~ $SAVE_POINT_REGEX_EXTRACTOR ]]; +then +SAVEPOINT_PATH="${BASH_REMATCH[1]}"; +else +SAVEPOINT_PATH="" +fi +echo "$SAVEPOINT_PATH" +} + +function cleanup_cli_test() { + stop_cluster + $FLINK_DIR/bin/taskmanager.sh stop-all + + cleanup +} + +printf "\n==\n" +printf "Test default job launch with non-detach mode\n" +printf "==\n" +if [ $EXIT_CODE == 0 ]; then +eval "$FLINK_DIR/bin/flink run $FLINK_DIR/examples/batch/WordCount.jar" +EXIT_CODE=$? +fi + +printf "\n==\n" +printf "Test run with complex parameter set\n" +printf "==\n" +if [ $EXIT_CODE == 0 ]; then +eval "$FLINK_DIR/bin/flink run -m localhost:8081 -p 4 -q -d \ + -c org.apache.flink.examples.java.wordcount.WordCount \ + $FLINK_DIR/examples/batch/WordCount.jar \ + --input file:///$FLINK_DIR/README.txt \ + --output file:///${TEST_DATA_DIR}/out/result" +EXIT_CODE=$? +fi + +printf "\n==\n" +printf "Test information APIs\n" +printf "==\n" +if [ $EXIT_CODE == 0 ]; then +eval "$FLINK_DIR/bin/flink info $FLINK_DIR/examples/batch/WordCount.jar" +EXIT_CODE=$? +fi +if [ $EXIT_CODE == 0 ]; then +eval "$FLINK_DIR/bin/flink list" --- End diff -- Should we verify the output of `list`? ---
[GitHub] flink pull request #5863: [FLINK-8985][e2etest] initial support for End-to-e...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5863#discussion_r189185689 --- Diff: flink-end-to-end-tests/test-scripts/test_cli_api.sh --- @@ -0,0 +1,155 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +source "$(dirname "$0")"/common.sh + +start_cluster +$FLINK_DIR/bin/taskmanager.sh start +$FLINK_DIR/bin/taskmanager.sh start +$FLINK_DIR/bin/taskmanager.sh start + +# Test for CLI commands. +# verify only the return code the content correctness of the API results. +PERIODIC_JOB_JAR=$TEST_INFRA_DIR/../../flink-end-to-end-tests/flink-api-test/target/PeriodicStreamingJob.jar +JOB_ID_REGEX_EXTRACTOR=".*JobID ([0-9,a-f]*)" +SAVE_POINT_REGEX_EXTRACTOR=".*Savepoint stored in (.*)\\." + +EXIT_CODE=0 + +function extract_job_id_from_job_submission_return() { +if [[ $1 =~ $JOB_ID_REGEX_EXTRACTOR ]]; +then +JOB_ID="${BASH_REMATCH[1]}"; +else +JOB_ID="" +fi +echo "$JOB_ID" +} + +function extract_savepoint_path_from_savepoint_return() { +if [[ $1 =~ $SAVE_POINT_REGEX_EXTRACTOR ]]; +then +SAVEPOINT_PATH="${BASH_REMATCH[1]}"; +else +SAVEPOINT_PATH="" +fi +echo "$SAVEPOINT_PATH" +} + +function cleanup_cli_test() { + stop_cluster + $FLINK_DIR/bin/taskmanager.sh stop-all + + cleanup +} + +printf "\n==\n" +printf "Test default job launch with non-detach mode\n" +printf "==\n" +if [ $EXIT_CODE == 0 ]; then +eval "$FLINK_DIR/bin/flink run $FLINK_DIR/examples/batch/WordCount.jar" +EXIT_CODE=$? +fi + +printf "\n==\n" +printf "Test run with complex parameter set\n" +printf "==\n" +if [ $EXIT_CODE == 0 ]; then +eval "$FLINK_DIR/bin/flink run -m localhost:8081 -p 4 -q -d \ + -c org.apache.flink.examples.java.wordcount.WordCount \ + $FLINK_DIR/examples/batch/WordCount.jar \ + --input file:///$FLINK_DIR/README.txt \ + --output file:///${TEST_DATA_DIR}/out/result" +EXIT_CODE=$? +fi + +printf "\n==\n" +printf "Test information APIs\n" +printf "==\n" +if [ $EXIT_CODE == 0 ]; then +eval "$FLINK_DIR/bin/flink info $FLINK_DIR/examples/batch/WordCount.jar" +EXIT_CODE=$? +fi +if [ $EXIT_CODE == 0 ]; then +eval "$FLINK_DIR/bin/flink list" +EXIT_CODE=$? +fi +if [ $EXIT_CODE == 0 ]; then +eval "$FLINK_DIR/bin/flink list -s" +EXIT_CODE=$? +fi +if [ $EXIT_CODE == 0 ]; then +eval "$FLINK_DIR/bin/flink list -r" +EXIT_CODE=$? +fi + +printf "\n==\n" +printf "Test operation on running streaming jobs\n" +printf "==\n" +if [ $EXIT_CODE == 0 ]; then +RETURN=`$FLINK_DIR/bin/flink run -d \ +
[GitHub] flink pull request #5863: [FLINK-8985][e2etest] initial support for End-to-e...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5863#discussion_r189185232 --- Diff: flink-end-to-end-tests/test-scripts/test_cli_api.sh --- @@ -0,0 +1,155 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +source "$(dirname "$0")"/common.sh + +start_cluster +$FLINK_DIR/bin/taskmanager.sh start +$FLINK_DIR/bin/taskmanager.sh start +$FLINK_DIR/bin/taskmanager.sh start + +# Test for CLI commands. +# verify only the return code the content correctness of the API results. +PERIODIC_JOB_JAR=$TEST_INFRA_DIR/../../flink-end-to-end-tests/flink-api-test/target/PeriodicStreamingJob.jar +JOB_ID_REGEX_EXTRACTOR=".*JobID ([0-9,a-f]*)" +SAVE_POINT_REGEX_EXTRACTOR=".*Savepoint stored in (.*)\\." + +EXIT_CODE=0 + +function extract_job_id_from_job_submission_return() { +if [[ $1 =~ $JOB_ID_REGEX_EXTRACTOR ]]; +then +JOB_ID="${BASH_REMATCH[1]}"; +else +JOB_ID="" +fi +echo "$JOB_ID" +} + +function extract_savepoint_path_from_savepoint_return() { +if [[ $1 =~ $SAVE_POINT_REGEX_EXTRACTOR ]]; +then +SAVEPOINT_PATH="${BASH_REMATCH[1]}"; +else +SAVEPOINT_PATH="" +fi +echo "$SAVEPOINT_PATH" +} + +function cleanup_cli_test() { + stop_cluster + $FLINK_DIR/bin/taskmanager.sh stop-all + + cleanup +} + +printf "\n==\n" +printf "Test default job launch with non-detach mode\n" +printf "==\n" +if [ $EXIT_CODE == 0 ]; then +eval "$FLINK_DIR/bin/flink run $FLINK_DIR/examples/batch/WordCount.jar" +EXIT_CODE=$? +fi + +printf "\n==\n" +printf "Test run with complex parameter set\n" +printf "==\n" +if [ $EXIT_CODE == 0 ]; then +eval "$FLINK_DIR/bin/flink run -m localhost:8081 -p 4 -q -d \ + -c org.apache.flink.examples.java.wordcount.WordCount \ + $FLINK_DIR/examples/batch/WordCount.jar \ + --input file:///$FLINK_DIR/README.txt \ + --output file:///${TEST_DATA_DIR}/out/result" +EXIT_CODE=$? +fi + +printf "\n==\n" +printf "Test information APIs\n" +printf "==\n" +if [ $EXIT_CODE == 0 ]; then +eval "$FLINK_DIR/bin/flink info $FLINK_DIR/examples/batch/WordCount.jar" --- End diff -- Should we verify the output of `info`? ---
[GitHub] flink pull request #5863: [FLINK-8985][e2etest] initial support for End-to-e...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5863#discussion_r189185047 --- Diff: flink-end-to-end-tests/test-scripts/test_cli_api.sh --- @@ -0,0 +1,155 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +source "$(dirname "$0")"/common.sh + +start_cluster +$FLINK_DIR/bin/taskmanager.sh start +$FLINK_DIR/bin/taskmanager.sh start +$FLINK_DIR/bin/taskmanager.sh start + +# Test for CLI commands. +# verify only the return code the content correctness of the API results. +PERIODIC_JOB_JAR=$TEST_INFRA_DIR/../../flink-end-to-end-tests/flink-api-test/target/PeriodicStreamingJob.jar +JOB_ID_REGEX_EXTRACTOR=".*JobID ([0-9,a-f]*)" +SAVE_POINT_REGEX_EXTRACTOR=".*Savepoint stored in (.*)\\." + +EXIT_CODE=0 + +function extract_job_id_from_job_submission_return() { +if [[ $1 =~ $JOB_ID_REGEX_EXTRACTOR ]]; +then +JOB_ID="${BASH_REMATCH[1]}"; +else +JOB_ID="" +fi +echo "$JOB_ID" +} + +function extract_savepoint_path_from_savepoint_return() { +if [[ $1 =~ $SAVE_POINT_REGEX_EXTRACTOR ]]; +then +SAVEPOINT_PATH="${BASH_REMATCH[1]}"; +else +SAVEPOINT_PATH="" +fi +echo "$SAVEPOINT_PATH" +} + +function cleanup_cli_test() { + stop_cluster + $FLINK_DIR/bin/taskmanager.sh stop-all + + cleanup +} + +printf "\n==\n" +printf "Test default job launch with non-detach mode\n" +printf "==\n" +if [ $EXIT_CODE == 0 ]; then +eval "$FLINK_DIR/bin/flink run $FLINK_DIR/examples/batch/WordCount.jar" +EXIT_CODE=$? +fi + +printf "\n==\n" +printf "Test run with complex parameter set\n" +printf "==\n" +if [ $EXIT_CODE == 0 ]; then +eval "$FLINK_DIR/bin/flink run -m localhost:8081 -p 4 -q -d \ --- End diff -- There probably should be some verification that the job actually runs with DOP=4 ---