[flink] 01/02: [FLINK-10826] [e2e] Decrease deployment size of heavy deplyment e2e test for Travis
This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit f393fb70b27cd2a64cb2c010d7d2e902f75c519f Author: Stefan Richter AuthorDate: Thu Nov 8 15:08:35 2018 +0100 [FLINK-10826] [e2e] Decrease deployment size of heavy deplyment e2e test for Travis This closes #7066. --- flink-end-to-end-tests/test-scripts/test_heavy_deployment.sh | 10 +- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/flink-end-to-end-tests/test-scripts/test_heavy_deployment.sh b/flink-end-to-end-tests/test-scripts/test_heavy_deployment.sh index 895e4a7..b4646fc 100755 --- a/flink-end-to-end-tests/test-scripts/test_heavy_deployment.sh +++ b/flink-end-to-end-tests/test-scripts/test_heavy_deployment.sh @@ -25,7 +25,7 @@ TEST=flink-heavy-deployment-stress-test TEST_PROGRAM_NAME=HeavyDeploymentStressTestProgram TEST_PROGRAM_JAR=${END_TO_END_DIR}/$TEST/target/$TEST_PROGRAM_NAME.jar -set_conf "taskmanager.heap.mb" "256" # 256Mb x 20TMs = 5Gb total heap +set_conf "taskmanager.heap.mb" "512" # 512Mb x 10TMs = 5Gb total heap set_conf "taskmanager.memory.size" "8" # 8Mb set_conf "taskmanager.network.memory.min" "8mb" @@ -35,12 +35,12 @@ set_conf "taskmanager.memory.segment-size" "8kb" set_conf "taskmanager.numberOfTaskSlots" "10" # 10 slots per TM start_cluster # this also starts 1TM -start_taskmanagers 19 # 1TM + 19TM = 20TM a 10 slots = 200 slots +start_taskmanagers 9 # 1TM + 9TM = 10TM a 10 slots = 100 slots -# This call will result in a deployment with state meta data of 200 x 200 x 50 union states x each 75 entries. +# This call will result in a deployment with state meta data of 100 x 100 x 50 union states x each 100 entries. # We can scale up the numbers to make the test even heavier. $FLINK_DIR/bin/flink run ${TEST_PROGRAM_JAR} \ ---environment.max_parallelism 1024 --environment.parallelism 200 \ +--environment.max_parallelism 1024 --environment.parallelism 100 \ --environment.restart_strategy fixed_delay --environment.restart_strategy.fixed_delay.attempts 3 \ --state_backend.checkpoint_directory ${CHECKPOINT_DIR} \ ---heavy_deployment_test.num_list_states_per_op 50 --heavy_deployment_test.num_partitions_per_list_state 75 +--heavy_deployment_test.num_list_states_per_op 50 --heavy_deployment_test.num_partitions_per_list_state 100
[flink] branch master updated (17a6e7a -> 6115b05)
This is an automated email from the ASF dual-hosted git repository. twalthr pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 17a6e7a [FLINK-10626] [docs] [table] Add documentation for temporal table joins new f393fb7 [FLINK-10826] [e2e] Decrease deployment size of heavy deplyment e2e test for Travis new 6115b05 [FLINK-10826] [e2e] Increase network timeout The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: flink-end-to-end-tests/test-scripts/test_heavy_deployment.sh | 11 ++- 1 file changed, 6 insertions(+), 5 deletions(-)
[flink] 02/02: [FLINK-10826] [e2e] Increase network timeout
This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 6115b05c87e387c301b21758a6d29e300d138a84 Author: Timo Walther AuthorDate: Mon Nov 12 10:28:51 2018 +0100 [FLINK-10826] [e2e] Increase network timeout --- flink-end-to-end-tests/test-scripts/test_heavy_deployment.sh | 1 + 1 file changed, 1 insertion(+) diff --git a/flink-end-to-end-tests/test-scripts/test_heavy_deployment.sh b/flink-end-to-end-tests/test-scripts/test_heavy_deployment.sh index b4646fc..8613470 100755 --- a/flink-end-to-end-tests/test-scripts/test_heavy_deployment.sh +++ b/flink-end-to-end-tests/test-scripts/test_heavy_deployment.sh @@ -30,6 +30,7 @@ set_conf "taskmanager.heap.mb" "512" # 512Mb x 10TMs = 5Gb total heap set_conf "taskmanager.memory.size" "8" # 8Mb set_conf "taskmanager.network.memory.min" "8mb" set_conf "taskmanager.network.memory.max" "8mb" +set_conf "taskmanager.network.request-backoff.max" "6" set_conf "taskmanager.memory.segment-size" "8kb" set_conf "taskmanager.numberOfTaskSlots" "10" # 10 slots per TM
[flink] branch master updated: [FLINK-10626] [docs] [table] Add documentation for temporal table joins
This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 17a6e7a [FLINK-10626] [docs] [table] Add documentation for temporal table joins 17a6e7a is described below commit 17a6e7a2be756883f9109ba04febe1f6743944a3 Author: Timo Walther AuthorDate: Thu Nov 8 17:32:35 2018 +0100 [FLINK-10626] [docs] [table] Add documentation for temporal table joins This closes #7065. --- docs/dev/table/streaming/joins.md | 33 +++-- 1 file changed, 27 insertions(+), 6 deletions(-) diff --git a/docs/dev/table/streaming/joins.md b/docs/dev/table/streaming/joins.md index 24c33b9..f293406 100644 --- a/docs/dev/table/streaming/joins.md +++ b/docs/dev/table/streaming/joins.md @@ -136,14 +136,14 @@ SELECT o.amount * r.rate AS amount FROM Orders AS o, - LATERAL TABLE (Rates(o.proctime)) AS r + LATERAL TABLE (Rates(o.rowtime)) AS r WHERE r.currency = o.currency {% endhighlight %} -Each record from the probe side will be joined with the version of the build side table at the time of the correlated time attribute of the probe side record. +Each record from the probe side will be joined with the version of the build side table at the time of the correlated time attribute of the probe side record. In order to support updates (overwrites) of previous values on the build side table, the table must define a primary key. -In our example, each record from `Orders` will be joined with the version of `Rates` at time `o.proctime`. Because the time attribute defines a processing-time notion, a newly appended order is always joined with the most recent version of `Rates` when executing the operation. The `currency` field has been defined as the primary key of `Rates` before and is used to connect both tables in our example. +In our example, each record from `Orders` will be joined with the version of `Rates` at time `o.rowtime`. The `currency` field has been defined as the primary key of `Rates` before and is used to connect both tables in our example. If the query were using a processing-time notion, a newly appended order would always be joined with the most recent version of `Rates` when executing the operation. In contrast to [regular joins](#regular-joins), this means that if there is a new record on the build side, it will not affect the previous results of the join. This again allows Flink to limit the number of elements that must be kept in the state. @@ -189,14 +189,35 @@ val result = orders +**Note**: State retention defined in a [query configuration](query_configuration.html) is not yet implemented for temporal joins. +This means that the required state to compute the query result might grow infinitely depending on the number of distinct primary keys for the history table. + ### Processing-time Temporal Joins With a processing-time time attribute, it is impossible to pass _past_ time attributes as an argument to the temporal table function. -By definition, it is always the current timestamp. Thus, processing-time temporal table function invocations will always return the latest known versions of the underlying table +By definition, it is always the current timestamp. Thus, invocations of a processing-time temporal table function will always return the latest known versions of the underlying table and any updates in the underlying history table will also immediately overwrite the current values. -Only the latest versions (with respect to the defined primary key) of the build side records are kept in the state. New updates will have no effect on the previously results emitted/processed records from the probe side. +Only the latest versions (with respect to the defined primary key) of the build side records are kept in the state. +New updates will have no effect on the previously results emitted/processed records from the probe side. -One can think about processing-time temporal join as a simple `HashMap` that stores all of the records from the build side. +One can think about a processing-time temporal join as a simple `HashMap` that stores all of the records from the build side. When a new record from the build side has the same key as some previous record, the old value is just simply overwritten. Every record from the probe side is always evaluated against the most recent/current state of the `HashMap`. + +### Event-time Temporal Joins + +With an event-time time attribute (i.e., a rowtime attribute), it is possible to pass _past_ time attributes to the temporal table function. +This allows for joining the two tables at a common point in time. + +Compared to processing-time temporal joins, the temporal table does not only keep the latest version (with respect to the defined primary key) of the build side records in the
[flink] 01/02: [FLINK-10826] [e2e] Decrease deployment size of heavy deplyment e2e test for Travis
This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch release-1.7 in repository https://gitbox.apache.org/repos/asf/flink.git commit 7d9fcbaf1408b09ee1d409ec1568a5af51d726d0 Author: Stefan Richter AuthorDate: Thu Nov 8 15:08:35 2018 +0100 [FLINK-10826] [e2e] Decrease deployment size of heavy deplyment e2e test for Travis This closes #7066. --- flink-end-to-end-tests/test-scripts/test_heavy_deployment.sh | 10 +- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/flink-end-to-end-tests/test-scripts/test_heavy_deployment.sh b/flink-end-to-end-tests/test-scripts/test_heavy_deployment.sh index 895e4a7..b4646fc 100755 --- a/flink-end-to-end-tests/test-scripts/test_heavy_deployment.sh +++ b/flink-end-to-end-tests/test-scripts/test_heavy_deployment.sh @@ -25,7 +25,7 @@ TEST=flink-heavy-deployment-stress-test TEST_PROGRAM_NAME=HeavyDeploymentStressTestProgram TEST_PROGRAM_JAR=${END_TO_END_DIR}/$TEST/target/$TEST_PROGRAM_NAME.jar -set_conf "taskmanager.heap.mb" "256" # 256Mb x 20TMs = 5Gb total heap +set_conf "taskmanager.heap.mb" "512" # 512Mb x 10TMs = 5Gb total heap set_conf "taskmanager.memory.size" "8" # 8Mb set_conf "taskmanager.network.memory.min" "8mb" @@ -35,12 +35,12 @@ set_conf "taskmanager.memory.segment-size" "8kb" set_conf "taskmanager.numberOfTaskSlots" "10" # 10 slots per TM start_cluster # this also starts 1TM -start_taskmanagers 19 # 1TM + 19TM = 20TM a 10 slots = 200 slots +start_taskmanagers 9 # 1TM + 9TM = 10TM a 10 slots = 100 slots -# This call will result in a deployment with state meta data of 200 x 200 x 50 union states x each 75 entries. +# This call will result in a deployment with state meta data of 100 x 100 x 50 union states x each 100 entries. # We can scale up the numbers to make the test even heavier. $FLINK_DIR/bin/flink run ${TEST_PROGRAM_JAR} \ ---environment.max_parallelism 1024 --environment.parallelism 200 \ +--environment.max_parallelism 1024 --environment.parallelism 100 \ --environment.restart_strategy fixed_delay --environment.restart_strategy.fixed_delay.attempts 3 \ --state_backend.checkpoint_directory ${CHECKPOINT_DIR} \ ---heavy_deployment_test.num_list_states_per_op 50 --heavy_deployment_test.num_partitions_per_list_state 75 +--heavy_deployment_test.num_list_states_per_op 50 --heavy_deployment_test.num_partitions_per_list_state 100
[flink] 02/02: [FLINK-10826] [e2e] Increase network timeout
This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch release-1.7 in repository https://gitbox.apache.org/repos/asf/flink.git commit 78ea3923782fddfd4fbb414c754d4db7701e24ca Author: Timo Walther AuthorDate: Mon Nov 12 10:28:51 2018 +0100 [FLINK-10826] [e2e] Increase network timeout --- flink-end-to-end-tests/test-scripts/test_heavy_deployment.sh | 1 + 1 file changed, 1 insertion(+) diff --git a/flink-end-to-end-tests/test-scripts/test_heavy_deployment.sh b/flink-end-to-end-tests/test-scripts/test_heavy_deployment.sh index b4646fc..8613470 100755 --- a/flink-end-to-end-tests/test-scripts/test_heavy_deployment.sh +++ b/flink-end-to-end-tests/test-scripts/test_heavy_deployment.sh @@ -30,6 +30,7 @@ set_conf "taskmanager.heap.mb" "512" # 512Mb x 10TMs = 5Gb total heap set_conf "taskmanager.memory.size" "8" # 8Mb set_conf "taskmanager.network.memory.min" "8mb" set_conf "taskmanager.network.memory.max" "8mb" +set_conf "taskmanager.network.request-backoff.max" "6" set_conf "taskmanager.memory.segment-size" "8kb" set_conf "taskmanager.numberOfTaskSlots" "10" # 10 slots per TM
[flink] branch release-1.7 updated (096918c -> 78ea392)
This is an automated email from the ASF dual-hosted git repository. twalthr pushed a change to branch release-1.7 in repository https://gitbox.apache.org/repos/asf/flink.git. from 096918c [FLINK-10626] [docs] [table] Add documentation for temporal table joins new 7d9fcba [FLINK-10826] [e2e] Decrease deployment size of heavy deplyment e2e test for Travis new 78ea392 [FLINK-10826] [e2e] Increase network timeout The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: flink-end-to-end-tests/test-scripts/test_heavy_deployment.sh | 11 ++- 1 file changed, 6 insertions(+), 5 deletions(-)
[flink] branch release-1.7 updated: [FLINK-10626] [docs] [table] Add documentation for temporal table joins
This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch release-1.7 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.7 by this push: new 096918c [FLINK-10626] [docs] [table] Add documentation for temporal table joins 096918c is described below commit 096918c6577b06bb9ac6250063c6c99a04907f77 Author: Timo Walther AuthorDate: Thu Nov 8 17:32:35 2018 +0100 [FLINK-10626] [docs] [table] Add documentation for temporal table joins This closes #7065. --- docs/dev/table/streaming/joins.md | 33 +++-- 1 file changed, 27 insertions(+), 6 deletions(-) diff --git a/docs/dev/table/streaming/joins.md b/docs/dev/table/streaming/joins.md index 24c33b9..f293406 100644 --- a/docs/dev/table/streaming/joins.md +++ b/docs/dev/table/streaming/joins.md @@ -136,14 +136,14 @@ SELECT o.amount * r.rate AS amount FROM Orders AS o, - LATERAL TABLE (Rates(o.proctime)) AS r + LATERAL TABLE (Rates(o.rowtime)) AS r WHERE r.currency = o.currency {% endhighlight %} -Each record from the probe side will be joined with the version of the build side table at the time of the correlated time attribute of the probe side record. +Each record from the probe side will be joined with the version of the build side table at the time of the correlated time attribute of the probe side record. In order to support updates (overwrites) of previous values on the build side table, the table must define a primary key. -In our example, each record from `Orders` will be joined with the version of `Rates` at time `o.proctime`. Because the time attribute defines a processing-time notion, a newly appended order is always joined with the most recent version of `Rates` when executing the operation. The `currency` field has been defined as the primary key of `Rates` before and is used to connect both tables in our example. +In our example, each record from `Orders` will be joined with the version of `Rates` at time `o.rowtime`. The `currency` field has been defined as the primary key of `Rates` before and is used to connect both tables in our example. If the query were using a processing-time notion, a newly appended order would always be joined with the most recent version of `Rates` when executing the operation. In contrast to [regular joins](#regular-joins), this means that if there is a new record on the build side, it will not affect the previous results of the join. This again allows Flink to limit the number of elements that must be kept in the state. @@ -189,14 +189,35 @@ val result = orders +**Note**: State retention defined in a [query configuration](query_configuration.html) is not yet implemented for temporal joins. +This means that the required state to compute the query result might grow infinitely depending on the number of distinct primary keys for the history table. + ### Processing-time Temporal Joins With a processing-time time attribute, it is impossible to pass _past_ time attributes as an argument to the temporal table function. -By definition, it is always the current timestamp. Thus, processing-time temporal table function invocations will always return the latest known versions of the underlying table +By definition, it is always the current timestamp. Thus, invocations of a processing-time temporal table function will always return the latest known versions of the underlying table and any updates in the underlying history table will also immediately overwrite the current values. -Only the latest versions (with respect to the defined primary key) of the build side records are kept in the state. New updates will have no effect on the previously results emitted/processed records from the probe side. +Only the latest versions (with respect to the defined primary key) of the build side records are kept in the state. +New updates will have no effect on the previously results emitted/processed records from the probe side. -One can think about processing-time temporal join as a simple `HashMap` that stores all of the records from the build side. +One can think about a processing-time temporal join as a simple `HashMap` that stores all of the records from the build side. When a new record from the build side has the same key as some previous record, the old value is just simply overwritten. Every record from the probe side is always evaluated against the most recent/current state of the `HashMap`. + +### Event-time Temporal Joins + +With an event-time time attribute (i.e., a rowtime attribute), it is possible to pass _past_ time attributes to the temporal table function. +This allows for joining the two tables at a common point in time. + +Compared to processing-time temporal joins, the temporal table does not only keep the latest version (with respect to the defined primary key) of the build side records
[flink] branch release-1.5 updated: [FLINK-10821] E2E now uses externalized checkpoint
This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch release-1.5 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.5 by this push: new c400273 [FLINK-10821] E2E now uses externalized checkpoint c400273 is described below commit c400273530687f952345922045d1124bf66fd65a Author: Igal Shilman AuthorDate: Fri Nov 9 13:59:24 2018 +0100 [FLINK-10821] E2E now uses externalized checkpoint This commit fixes the test_resume_externalized_checkpoints.sh script, by providing the path to the externalized checkpoint taken. --- .../test_resume_externalized_checkpoints.sh| 18 +++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh b/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh index 29c3786..6e8f462 100755 --- a/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh +++ b/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh @@ -67,8 +67,9 @@ TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-datastream-allround-test/target/DataStr function buildBaseJobCmd { local dop=$1 + local flink_args="$2" - echo "$FLINK_DIR/bin/flink run -d -p $dop $TEST_PROGRAM_JAR \ + echo "$FLINK_DIR/bin/flink run -d ${flink_args} -p $dop $TEST_PROGRAM_JAR \ --test.semantics exactly-once \ --environment.parallelism $dop \ --environment.externalize_checkpoint true \ @@ -126,9 +127,20 @@ fi echo "Restoring job with externalized checkpoint at $CHECKPOINT_PATH ..." -BASE_JOB_CMD=`buildBaseJobCmd $NEW_DOP` +BASE_JOB_CMD=`buildBaseJobCmd $NEW_DOP "-s file://${CHECKPOINT_PATH}"` +JOB_CMD="" +if [[ $SIMULATE_FAILURE == "true" ]]; then + JOB_CMD="$BASE_JOB_CMD \ +--test.simulate_failure true \ +--test.simulate_failure.num_records 0 \ +--test.simulate_failure.num_checkpoints 0 \ +--test.simulate_failure.max_failures 0 \ +--environment.restart_strategy no_restart" +else + JOB_CMD=$BASE_JOB_CMD +fi -DATASTREAM_JOB=$($BASE_JOB_CMD | grep "Job has been submitted with JobID" | sed 's/.* //g') +DATASTREAM_JOB=$($JOB_CMD | grep "Job has been submitted with JobID" | sed 's/.* //g') wait_job_running $DATASTREAM_JOB wait_oper_metric_num_in_records ArtificalKeyedStateMapper.0 200
[flink] branch master updated: [FLINK-10821] E2E now uses externalized checkpoint
This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 1ad17e0 [FLINK-10821] E2E now uses externalized checkpoint 1ad17e0 is described below commit 1ad17e04fe11f7eb170e06d0a076b1b9e31249d7 Author: Igal Shilman AuthorDate: Fri Nov 9 13:59:24 2018 +0100 [FLINK-10821] E2E now uses externalized checkpoint This commit fixes the test_resume_externalized_checkpoints.sh script, by providing the path to the externalized checkpoint taken. --- .../test_resume_externalized_checkpoints.sh| 18 +++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh b/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh index e1ba65d..fe23190 100755 --- a/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh +++ b/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh @@ -66,8 +66,9 @@ TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-datastream-allround-test/target/DataStr function buildBaseJobCmd { local dop=$1 + local flink_args="$2" - echo "$FLINK_DIR/bin/flink run -d -p $dop $TEST_PROGRAM_JAR \ + echo "$FLINK_DIR/bin/flink run -d ${flink_args} -p $dop $TEST_PROGRAM_JAR \ --test.semantics exactly-once \ --environment.parallelism $dop \ --environment.externalize_checkpoint true \ @@ -125,9 +126,20 @@ fi echo "Restoring job with externalized checkpoint at $CHECKPOINT_PATH ..." -BASE_JOB_CMD=`buildBaseJobCmd $NEW_DOP` +BASE_JOB_CMD=`buildBaseJobCmd $NEW_DOP "-s file://${CHECKPOINT_PATH}"` +JOB_CMD="" +if [[ $SIMULATE_FAILURE == "true" ]]; then + JOB_CMD="$BASE_JOB_CMD \ +--test.simulate_failure true \ +--test.simulate_failure.num_records 0 \ +--test.simulate_failure.num_checkpoints 0 \ +--test.simulate_failure.max_failures 0 \ +--environment.restart_strategy no_restart" +else + JOB_CMD=$BASE_JOB_CMD +fi -DATASTREAM_JOB=$($BASE_JOB_CMD | grep "Job has been submitted with JobID" | sed 's/.* //g') +DATASTREAM_JOB=$($JOB_CMD | grep "Job has been submitted with JobID" | sed 's/.* //g') wait_job_running $DATASTREAM_JOB wait_oper_metric_num_in_records SemanticsCheckMapper.0 200
[flink] branch release-1.6 updated: [FLINK-10821] E2E now uses externalized checkpoint
This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch release-1.6 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.6 by this push: new 4b0a941 [FLINK-10821] E2E now uses externalized checkpoint 4b0a941 is described below commit 4b0a941c2aface840819c8ff974113bf2b2221d9 Author: Igal Shilman AuthorDate: Fri Nov 9 13:59:24 2018 +0100 [FLINK-10821] E2E now uses externalized checkpoint This commit fixes the test_resume_externalized_checkpoints.sh script, by providing the path to the externalized checkpoint taken. --- .../test_resume_externalized_checkpoints.sh| 18 +++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh b/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh index e1ba65d..fe23190 100755 --- a/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh +++ b/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh @@ -66,8 +66,9 @@ TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-datastream-allround-test/target/DataStr function buildBaseJobCmd { local dop=$1 + local flink_args="$2" - echo "$FLINK_DIR/bin/flink run -d -p $dop $TEST_PROGRAM_JAR \ + echo "$FLINK_DIR/bin/flink run -d ${flink_args} -p $dop $TEST_PROGRAM_JAR \ --test.semantics exactly-once \ --environment.parallelism $dop \ --environment.externalize_checkpoint true \ @@ -125,9 +126,20 @@ fi echo "Restoring job with externalized checkpoint at $CHECKPOINT_PATH ..." -BASE_JOB_CMD=`buildBaseJobCmd $NEW_DOP` +BASE_JOB_CMD=`buildBaseJobCmd $NEW_DOP "-s file://${CHECKPOINT_PATH}"` +JOB_CMD="" +if [[ $SIMULATE_FAILURE == "true" ]]; then + JOB_CMD="$BASE_JOB_CMD \ +--test.simulate_failure true \ +--test.simulate_failure.num_records 0 \ +--test.simulate_failure.num_checkpoints 0 \ +--test.simulate_failure.max_failures 0 \ +--environment.restart_strategy no_restart" +else + JOB_CMD=$BASE_JOB_CMD +fi -DATASTREAM_JOB=$($BASE_JOB_CMD | grep "Job has been submitted with JobID" | sed 's/.* //g') +DATASTREAM_JOB=$($JOB_CMD | grep "Job has been submitted with JobID" | sed 's/.* //g') wait_job_running $DATASTREAM_JOB wait_oper_metric_num_in_records SemanticsCheckMapper.0 200
[flink] branch release-1.7 updated: [FLINK-10821] E2E now uses externalized checkpoint
This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch release-1.7 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.7 by this push: new 7dc4c70 [FLINK-10821] E2E now uses externalized checkpoint 7dc4c70 is described below commit 7dc4c70e2f749f0b0888940cbb1c43d93747d677 Author: Igal Shilman AuthorDate: Fri Nov 9 13:59:24 2018 +0100 [FLINK-10821] E2E now uses externalized checkpoint This commit fixes the test_resume_externalized_checkpoints.sh script, by providing the path to the externalized checkpoint taken. --- .../test_resume_externalized_checkpoints.sh| 18 +++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh b/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh index e1ba65d..fe23190 100755 --- a/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh +++ b/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh @@ -66,8 +66,9 @@ TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-datastream-allround-test/target/DataStr function buildBaseJobCmd { local dop=$1 + local flink_args="$2" - echo "$FLINK_DIR/bin/flink run -d -p $dop $TEST_PROGRAM_JAR \ + echo "$FLINK_DIR/bin/flink run -d ${flink_args} -p $dop $TEST_PROGRAM_JAR \ --test.semantics exactly-once \ --environment.parallelism $dop \ --environment.externalize_checkpoint true \ @@ -125,9 +126,20 @@ fi echo "Restoring job with externalized checkpoint at $CHECKPOINT_PATH ..." -BASE_JOB_CMD=`buildBaseJobCmd $NEW_DOP` +BASE_JOB_CMD=`buildBaseJobCmd $NEW_DOP "-s file://${CHECKPOINT_PATH}"` +JOB_CMD="" +if [[ $SIMULATE_FAILURE == "true" ]]; then + JOB_CMD="$BASE_JOB_CMD \ +--test.simulate_failure true \ +--test.simulate_failure.num_records 0 \ +--test.simulate_failure.num_checkpoints 0 \ +--test.simulate_failure.max_failures 0 \ +--environment.restart_strategy no_restart" +else + JOB_CMD=$BASE_JOB_CMD +fi -DATASTREAM_JOB=$($BASE_JOB_CMD | grep "Job has been submitted with JobID" | sed 's/.* //g') +DATASTREAM_JOB=$($JOB_CMD | grep "Job has been submitted with JobID" | sed 's/.* //g') wait_job_running $DATASTREAM_JOB wait_oper_metric_num_in_records SemanticsCheckMapper.0 200
[flink] 01/02: [FLINK-10839][serializer] Fix implementation of PojoSerializer.duplicate() w.r.t. subclass serializer
This is an automated email from the ASF dual-hosted git repository. srichter pushed a commit to branch release-1.5 in repository https://gitbox.apache.org/repos/asf/flink.git commit 7ba80b0a7bceb6af2e36b8408cf28a72e53a610c Author: Stefan Richter AuthorDate: Fri Nov 9 12:56:02 2018 +0100 [FLINK-10839][serializer] Fix implementation of PojoSerializer.duplicate() w.r.t. subclass serializer Signed-off-by: Stefan Richter --- .../api/java/typeutils/runtime/PojoSerializer.java | 34 -- 1 file changed, 18 insertions(+), 16 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java index a4d086d..021bc24 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java @@ -18,18 +18,6 @@ package org.apache.flink.api.java.typeutils.runtime; -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.lang.reflect.Field; -import java.lang.reflect.Modifier; -import java.util.Arrays; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.LinkedHashSet; -import java.util.Map; -import java.util.Objects; - import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.ExecutionConfig; @@ -50,6 +38,18 @@ import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.util.Preconditions; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; +import java.util.Arrays; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Objects; + import static org.apache.flink.util.Preconditions.checkNotNull; @Internal @@ -179,11 +179,13 @@ public final class PojoSerializer extends TypeSerializer { } } - if (stateful) { - return new PojoSerializer(clazz, duplicateFieldSerializers, fields, executionConfig); - } else { - return this; + if (!stateful) { + // as a small memory optimization, we can share the same object between instances + duplicateFieldSerializers = fieldSerializers; } + + // we must create a new instance, otherwise the subclassSerializerCache can create concurrency problems + return new PojoSerializer<>(clazz, duplicateFieldSerializers, fields, executionConfig); }
[flink] branch master updated: [FLINK-10753] Improve propagation and logging of snapshot exceptions
This is an automated email from the ASF dual-hosted git repository. srichter pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new dc8e27f [FLINK-10753] Improve propagation and logging of snapshot exceptions dc8e27f is described below commit dc8e27fb07ec8593037ce0205b5ecb4a5bc16a40 Author: Stefan Richter AuthorDate: Thu Nov 8 15:51:16 2018 +0100 [FLINK-10753] Improve propagation and logging of snapshot exceptions This closes #7064. --- .../runtime/checkpoint/CheckpointCoordinator.java | 9 ++--- .../flink/runtime/checkpoint/PendingCheckpoint.java | 21 ++--- .../taskexecutor/rpc/RpcCheckpointResponder.java| 6 ++ .../api/operators/AbstractStreamOperator.java | 6 -- .../restore/AbstractOperatorRestoreTestBase.java| 1 + 5 files changed, 27 insertions(+), 16 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index 57337b6..02b6fa4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -1249,11 +1249,14 @@ public class CheckpointCoordinator { final long checkpointId = pendingCheckpoint.getCheckpointId(); - final String reason = (cause != null) ? cause.getMessage() : ""; + LOG.info("Discarding checkpoint {} of job {}.", checkpointId, job, cause); - LOG.info("Discarding checkpoint {} of job {} because: {}", checkpointId, job, reason); + if (cause != null) { + pendingCheckpoint.abortError(cause); + } else { + pendingCheckpoint.abortDeclined(); + } - pendingCheckpoint.abortDeclined(); rememberRecentCheckpointId(checkpointId); // we don't have to schedule another "dissolving" checkpoint any more because the diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java index 1b51ac4..1bc6b0e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java @@ -34,6 +34,7 @@ import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.io.IOException; @@ -433,25 +434,23 @@ public class PendingCheckpoint { } } + public void abortDeclined() { - try { - Exception cause = new Exception("Checkpoint was declined (tasks not ready)"); - onCompletionPromise.completeExceptionally(cause); - reportFailedCheckpoint(cause); - } finally { - dispose(true); - } + abortWithCause(new Exception("Checkpoint was declined (tasks not ready)")); } /** * Aborts the pending checkpoint due to an error. * @param cause The error's exception. */ - public void abortError(Throwable cause) { + public void abortError(@Nonnull Throwable cause) { + abortWithCause(new Exception("Checkpoint failed: " + cause.getMessage(), cause)); + } + + private void abortWithCause(@Nonnull Exception cause) { try { - Exception failure = new Exception("Checkpoint failed: " + cause.getMessage(), cause); - onCompletionPromise.completeExceptionally(failure); - reportFailedCheckpoint(failure); + onCompletionPromise.completeExceptionally(cause); + reportFailedCheckpoint(cause); } finally { dispose(true); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java index aba8bda..918fa50 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java @@ -26,8 +26,13 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.taskmanager.CheckpointResponder; import org.apache.flink.util.Preconditions; +import
[flink] branch release-1.7 updated: [FLINK-10809][state] Include keyed state that is not from head operators in state assignment
This is an automated email from the ASF dual-hosted git repository. srichter pushed a commit to branch release-1.7 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.7 by this push: new 45ad36f [FLINK-10809][state] Include keyed state that is not from head operators in state assignment 45ad36f is described below commit 45ad36fd752f53b1fa17d7226d3c93614fd24b3f Author: Stefan Richter AuthorDate: Wed Nov 7 14:05:07 2018 +0100 [FLINK-10809][state] Include keyed state that is not from head operators in state assignment This closes #7048. Signed-off-by: Stefan Richter --- .../checkpoint/StateAssignmentOperation.java | 29 +++ .../ReinterpretDataStreamAsKeyedStreamITCase.java | 99 -- 2 files changed, 102 insertions(+), 26 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java index b017388..02fc201 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java @@ -71,13 +71,12 @@ public class StateAssignmentOperation { this.allowNonRestoredState = allowNonRestoredState; } - public boolean assignStates() throws Exception { + public void assignStates() { Map localOperators = new HashMap<>(operatorStates); - Map localTasks = this.tasks; checkStateMappingCompleteness(allowNonRestoredState, operatorStates, tasks); - for (Map.Entry task : localTasks.entrySet()) { + for (Map.Entry task : this.tasks.entrySet()) { final ExecutionJobVertex executionJobVertex = task.getValue(); // find the states of all operators belonging to this task @@ -108,7 +107,6 @@ public class StateAssignmentOperation { assignAttemptState(task.getValue(), operatorStates); } - return true; } private void assignAttemptState(ExecutionJobVertex executionJobVertex, List operatorStates) { @@ -254,10 +252,6 @@ public class StateAssignmentOperation { new StateObjectCollection<>(subRawKeyedState.getOrDefault(instanceID, Collections.emptyList(; } - private static boolean isHeadOperator(int opIdx, List operatorIDs) { - return opIdx == operatorIDs.size() - 1; - } - public void checkParallelismPreconditions(List operatorStates, ExecutionJobVertex executionJobVertex) { for (OperatorState operatorState : operatorStates) { checkParallelismPreconditions(operatorState, executionJobVertex); @@ -278,19 +272,16 @@ public class StateAssignmentOperation { for (int operatorIndex = 0; operatorIndex < newOperatorIDs.size(); operatorIndex++) { OperatorState operatorState = oldOperatorStates.get(operatorIndex); int oldParallelism = operatorState.getParallelism(); - for (int subTaskIndex = 0; subTaskIndex < newParallelism; subTaskIndex++) { OperatorInstanceID instanceID = OperatorInstanceID.of(subTaskIndex, newOperatorIDs.get(operatorIndex)); - if (isHeadOperator(operatorIndex, newOperatorIDs)) { - Tuple2, List> subKeyedStates = reAssignSubKeyedStates( - operatorState, - newKeyGroupPartitions, - subTaskIndex, - newParallelism, - oldParallelism); - newManagedKeyedState.put(instanceID, subKeyedStates.f0); - newRawKeyedState.put(instanceID, subKeyedStates.f1); - } + Tuple2, List> subKeyedStates = reAssignSubKeyedStates( + operatorState, + newKeyGroupPartitions, + subTaskIndex, + newParallelism, + oldParallelism); + newManagedKeyedState.put(instanceID, subKeyedStates.f0); + newRawKeyedState.put(instanceID, subKeyedStates.f1); } } } diff --git
[flink] branch master updated: [FLINK-10809][state] Include keyed state that is not from head operators in state assignment
This is an automated email from the ASF dual-hosted git repository. srichter pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new bf760f9 [FLINK-10809][state] Include keyed state that is not from head operators in state assignment bf760f9 is described below commit bf760f9312c547493620955dc57f57fd5da12596 Author: Stefan Richter AuthorDate: Wed Nov 7 14:05:07 2018 +0100 [FLINK-10809][state] Include keyed state that is not from head operators in state assignment This closes #7048. Signed-off-by: Stefan Richter --- .../checkpoint/StateAssignmentOperation.java | 29 +++ .../ReinterpretDataStreamAsKeyedStreamITCase.java | 99 -- 2 files changed, 102 insertions(+), 26 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java index b017388..02fc201 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java @@ -71,13 +71,12 @@ public class StateAssignmentOperation { this.allowNonRestoredState = allowNonRestoredState; } - public boolean assignStates() throws Exception { + public void assignStates() { Map localOperators = new HashMap<>(operatorStates); - Map localTasks = this.tasks; checkStateMappingCompleteness(allowNonRestoredState, operatorStates, tasks); - for (Map.Entry task : localTasks.entrySet()) { + for (Map.Entry task : this.tasks.entrySet()) { final ExecutionJobVertex executionJobVertex = task.getValue(); // find the states of all operators belonging to this task @@ -108,7 +107,6 @@ public class StateAssignmentOperation { assignAttemptState(task.getValue(), operatorStates); } - return true; } private void assignAttemptState(ExecutionJobVertex executionJobVertex, List operatorStates) { @@ -254,10 +252,6 @@ public class StateAssignmentOperation { new StateObjectCollection<>(subRawKeyedState.getOrDefault(instanceID, Collections.emptyList(; } - private static boolean isHeadOperator(int opIdx, List operatorIDs) { - return opIdx == operatorIDs.size() - 1; - } - public void checkParallelismPreconditions(List operatorStates, ExecutionJobVertex executionJobVertex) { for (OperatorState operatorState : operatorStates) { checkParallelismPreconditions(operatorState, executionJobVertex); @@ -278,19 +272,16 @@ public class StateAssignmentOperation { for (int operatorIndex = 0; operatorIndex < newOperatorIDs.size(); operatorIndex++) { OperatorState operatorState = oldOperatorStates.get(operatorIndex); int oldParallelism = operatorState.getParallelism(); - for (int subTaskIndex = 0; subTaskIndex < newParallelism; subTaskIndex++) { OperatorInstanceID instanceID = OperatorInstanceID.of(subTaskIndex, newOperatorIDs.get(operatorIndex)); - if (isHeadOperator(operatorIndex, newOperatorIDs)) { - Tuple2, List> subKeyedStates = reAssignSubKeyedStates( - operatorState, - newKeyGroupPartitions, - subTaskIndex, - newParallelism, - oldParallelism); - newManagedKeyedState.put(instanceID, subKeyedStates.f0); - newRawKeyedState.put(instanceID, subKeyedStates.f1); - } + Tuple2, List> subKeyedStates = reAssignSubKeyedStates( + operatorState, + newKeyGroupPartitions, + subTaskIndex, + newParallelism, + oldParallelism); + newManagedKeyedState.put(instanceID, subKeyedStates.f0); + newRawKeyedState.put(instanceID, subKeyedStates.f1); } } } diff --git
[flink] branch release-1.5 updated: [FLINK-10753] Improve propagation and logging of snapshot exceptions
This is an automated email from the ASF dual-hosted git repository. srichter pushed a commit to branch release-1.5 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.5 by this push: new 1725591 [FLINK-10753] Improve propagation and logging of snapshot exceptions 1725591 is described below commit 1725591ccf6dea55b7ab85da0753b0a635434eee Author: Stefan Richter AuthorDate: Thu Nov 8 15:51:16 2018 +0100 [FLINK-10753] Improve propagation and logging of snapshot exceptions This closes #7064. --- .../runtime/checkpoint/CheckpointCoordinator.java | 9 ++--- .../flink/runtime/checkpoint/PendingCheckpoint.java | 21 ++--- .../taskexecutor/rpc/RpcCheckpointResponder.java| 6 ++ .../api/operators/AbstractStreamOperator.java | 6 -- .../restore/AbstractOperatorRestoreTestBase.java| 1 + 5 files changed, 27 insertions(+), 16 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index 1ee2ece..51adeae 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -1247,11 +1247,14 @@ public class CheckpointCoordinator { final long checkpointId = pendingCheckpoint.getCheckpointId(); - final String reason = (cause != null) ? cause.getMessage() : ""; + LOG.info("Discarding checkpoint {} of job {}.", checkpointId, job, cause); - LOG.info("Discarding checkpoint {} of job {} because: {}", checkpointId, job, reason); + if (cause != null) { + pendingCheckpoint.abortError(cause); + } else { + pendingCheckpoint.abortDeclined(); + } - pendingCheckpoint.abortDeclined(); rememberRecentCheckpointId(checkpointId); // we don't have to schedule another "dissolving" checkpoint any more because the diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java index 1b51ac4..1bc6b0e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java @@ -34,6 +34,7 @@ import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.io.IOException; @@ -433,25 +434,23 @@ public class PendingCheckpoint { } } + public void abortDeclined() { - try { - Exception cause = new Exception("Checkpoint was declined (tasks not ready)"); - onCompletionPromise.completeExceptionally(cause); - reportFailedCheckpoint(cause); - } finally { - dispose(true); - } + abortWithCause(new Exception("Checkpoint was declined (tasks not ready)")); } /** * Aborts the pending checkpoint due to an error. * @param cause The error's exception. */ - public void abortError(Throwable cause) { + public void abortError(@Nonnull Throwable cause) { + abortWithCause(new Exception("Checkpoint failed: " + cause.getMessage(), cause)); + } + + private void abortWithCause(@Nonnull Exception cause) { try { - Exception failure = new Exception("Checkpoint failed: " + cause.getMessage(), cause); - onCompletionPromise.completeExceptionally(failure); - reportFailedCheckpoint(failure); + onCompletionPromise.completeExceptionally(cause); + reportFailedCheckpoint(cause); } finally { dispose(true); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java index aba8bda..918fa50 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java @@ -26,8 +26,13 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.taskmanager.CheckpointResponder; import org.apache.flink.util.Preconditions; +import
[flink] 01/02: [FLINK-10839][serializer] Fix implementation of PojoSerializer.duplicate() w.r.t. subclass serializer
This is an automated email from the ASF dual-hosted git repository. srichter pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit c4d366c9abeeb18caf51c3a5a3272c3c9987738f Author: Stefan Richter AuthorDate: Fri Nov 9 12:56:02 2018 +0100 [FLINK-10839][serializer] Fix implementation of PojoSerializer.duplicate() w.r.t. subclass serializer --- .../api/java/typeutils/runtime/PojoSerializer.java | 38 -- 1 file changed, 20 insertions(+), 18 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java index a4d3839..f1dd8fc 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java @@ -18,29 +18,17 @@ package org.apache.flink.api.java.typeutils.runtime; -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.lang.reflect.Field; -import java.lang.reflect.Modifier; -import java.util.Arrays; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.LinkedHashSet; -import java.util.Map; -import java.util.Objects; - import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeutils.CompatibilityResult; import org.apache.flink.api.common.typeutils.CompatibilityUtil; import org.apache.flink.api.common.typeutils.GenericTypeSerializerConfigSnapshot; -import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; -import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil; import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil; import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TypeExtractor; @@ -52,6 +40,18 @@ import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.util.Preconditions; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; +import java.util.Arrays; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Objects; + import static org.apache.flink.util.Preconditions.checkNotNull; @Internal @@ -181,11 +181,13 @@ public final class PojoSerializer extends TypeSerializer { } } - if (stateful) { - return new PojoSerializer(clazz, duplicateFieldSerializers, fields, executionConfig); - } else { - return this; + if (!stateful) { + // as a small memory optimization, we can share the same object between instances + duplicateFieldSerializers = fieldSerializers; } + + // we must create a new instance, otherwise the subclassSerializerCache can create concurrency problems + return new PojoSerializer<>(clazz, duplicateFieldSerializers, fields, executionConfig); }
[flink] branch master updated (1ad17e0 -> 2db59cf)
This is an automated email from the ASF dual-hosted git repository. srichter pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 1ad17e0 [FLINK-10821] E2E now uses externalized checkpoint new c4d366c [FLINK-10839][serializer] Fix implementation of PojoSerializer.duplicate() w.r.t. subclass serializer new 2db59cf [FLINK-10827][tests] Add test for duplicate() to SerializerTestBase The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../api/java/typeutils/runtime/PojoSerializer.java | 38 +++--- .../api/common/typeutils/SerializerTestBase.java | 128 ++--- 2 files changed, 132 insertions(+), 34 deletions(-)
[flink] 02/02: [FLINK-10827][tests] Add test for duplicate() to SerializerTestBase
This is an automated email from the ASF dual-hosted git repository. srichter pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 2db59cf365a84bdbcd69f6f5e57bc6e639da4930 Author: Stefan Richter AuthorDate: Thu Nov 8 14:44:08 2018 +0100 [FLINK-10827][tests] Add test for duplicate() to SerializerTestBase This closes #7061. --- .../api/common/typeutils/SerializerTestBase.java | 128 ++--- 1 file changed, 112 insertions(+), 16 deletions(-) diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java index a4908f9..151dafb 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java @@ -18,32 +18,39 @@ package org.apache.flink.api.common.typeutils; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.util.Arrays; - import org.apache.flink.api.java.typeutils.runtime.NullableSerializer; +import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.util.Preconditions; import org.apache.flink.util.TestLogger; -import org.junit.Assert; import org.apache.commons.lang3.SerializationException; import org.apache.commons.lang3.SerializationUtils; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; +import org.junit.Assert; import org.junit.Test; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.CyclicBarrier; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + /** * Abstract test base for serializers. * @@ -436,6 +443,33 @@ public abstract class SerializerTestBase extends TestLogger { } } + @Test + public void testDuplicate() throws Exception { + final int numThreads = 10; + final TypeSerializer serializer = getSerializer(); + final CyclicBarrier startLatch = new CyclicBarrier(numThreads); + final List> concurrentRunners = new ArrayList<>(numThreads); + Assert.assertEquals(serializer, serializer.duplicate()); + + T[] testData = getData(); + + for (int i = 0; i < numThreads; ++i) { + SerializerRunner runner = new SerializerRunner<>( + startLatch, + serializer.duplicate(), + testData, + 120L); + + runner.start(); + concurrentRunners.add(runner); + } + + for (SerializerRunner concurrentRunner : concurrentRunners) { + concurrentRunner.join(); + concurrentRunner.checkResult(); + } + } + // protected void deepEquals(String message, T should, T is) { @@ -526,6 +560,68 @@ public abstract class SerializerTestBase extends TestLogger { } } + /** +* Runner to test serializer duplication via concurrency. +* @param type of the test elements. +*/ + static class SerializerRunner extends Thread { + final CyclicBarrier allReadyBarrier; + final TypeSerializer serializer; + final T[] testData; + final long durationLimitMillis; + Exception failure; + + SerializerRunner( + CyclicBarrier allReadyBarrier, + TypeSerializer serializer, + T[] testData, +
[flink] 01/02: [FLINK-10839][serializer] Fix implementation of PojoSerializer.duplicate() w.r.t. subclass serializer
This is an automated email from the ASF dual-hosted git repository. srichter pushed a commit to branch release-1.6 in repository https://gitbox.apache.org/repos/asf/flink.git commit 8196a69f95af08ec829c77e190161d99f186aa57 Author: Stefan Richter AuthorDate: Fri Nov 9 12:56:02 2018 +0100 [FLINK-10839][serializer] Fix implementation of PojoSerializer.duplicate() w.r.t. subclass serializer --- .../api/java/typeutils/runtime/PojoSerializer.java | 36 -- 1 file changed, 20 insertions(+), 16 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java index a4d086d..f38a620 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java @@ -18,18 +18,6 @@ package org.apache.flink.api.java.typeutils.runtime; -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.lang.reflect.Field; -import java.lang.reflect.Modifier; -import java.util.Arrays; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.LinkedHashSet; -import java.util.Map; -import java.util.Objects; - import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.ExecutionConfig; @@ -39,6 +27,8 @@ import org.apache.flink.api.common.typeutils.GenericTypeSerializerConfigSnapshot import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil; import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TypeExtractor; @@ -50,6 +40,18 @@ import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.util.Preconditions; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; +import java.util.Arrays; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Objects; + import static org.apache.flink.util.Preconditions.checkNotNull; @Internal @@ -179,11 +181,13 @@ public final class PojoSerializer extends TypeSerializer { } } - if (stateful) { - return new PojoSerializer(clazz, duplicateFieldSerializers, fields, executionConfig); - } else { - return this; + if (!stateful) { + // as a small memory optimization, we can share the same object between instances + duplicateFieldSerializers = fieldSerializers; } + + // we must create a new instance, otherwise the subclassSerializerCache can create concurrency problems + return new PojoSerializer<>(clazz, duplicateFieldSerializers, fields, executionConfig); }
[flink] branch release-1.6 updated: [FLINK-10753] Improve propagation and logging of snapshot exceptions
This is an automated email from the ASF dual-hosted git repository. srichter pushed a commit to branch release-1.6 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.6 by this push: new 6814e5f [FLINK-10753] Improve propagation and logging of snapshot exceptions 6814e5f is described below commit 6814e5fc64a289a10dbd509dd6055448fc6be71d Author: Stefan Richter AuthorDate: Thu Nov 8 15:51:16 2018 +0100 [FLINK-10753] Improve propagation and logging of snapshot exceptions This closes #7064. Signed-off-by: Stefan Richter --- .../runtime/checkpoint/CheckpointCoordinator.java | 9 ++--- .../flink/runtime/checkpoint/PendingCheckpoint.java | 21 ++--- .../taskexecutor/rpc/RpcCheckpointResponder.java| 6 ++ .../api/operators/AbstractStreamOperator.java | 6 -- .../restore/AbstractOperatorRestoreTestBase.java| 1 + 5 files changed, 27 insertions(+), 16 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index 57337b6..02b6fa4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -1249,11 +1249,14 @@ public class CheckpointCoordinator { final long checkpointId = pendingCheckpoint.getCheckpointId(); - final String reason = (cause != null) ? cause.getMessage() : ""; + LOG.info("Discarding checkpoint {} of job {}.", checkpointId, job, cause); - LOG.info("Discarding checkpoint {} of job {} because: {}", checkpointId, job, reason); + if (cause != null) { + pendingCheckpoint.abortError(cause); + } else { + pendingCheckpoint.abortDeclined(); + } - pendingCheckpoint.abortDeclined(); rememberRecentCheckpointId(checkpointId); // we don't have to schedule another "dissolving" checkpoint any more because the diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java index 1b51ac4..1bc6b0e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java @@ -34,6 +34,7 @@ import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.io.IOException; @@ -433,25 +434,23 @@ public class PendingCheckpoint { } } + public void abortDeclined() { - try { - Exception cause = new Exception("Checkpoint was declined (tasks not ready)"); - onCompletionPromise.completeExceptionally(cause); - reportFailedCheckpoint(cause); - } finally { - dispose(true); - } + abortWithCause(new Exception("Checkpoint was declined (tasks not ready)")); } /** * Aborts the pending checkpoint due to an error. * @param cause The error's exception. */ - public void abortError(Throwable cause) { + public void abortError(@Nonnull Throwable cause) { + abortWithCause(new Exception("Checkpoint failed: " + cause.getMessage(), cause)); + } + + private void abortWithCause(@Nonnull Exception cause) { try { - Exception failure = new Exception("Checkpoint failed: " + cause.getMessage(), cause); - onCompletionPromise.completeExceptionally(failure); - reportFailedCheckpoint(failure); + onCompletionPromise.completeExceptionally(cause); + reportFailedCheckpoint(cause); } finally { dispose(true); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java index aba8bda..918fa50 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java @@ -26,8 +26,13 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.taskmanager.CheckpointResponder; import
[flink] 02/02: [FLINK-10827][tests] Add test for duplicate() to SerializerTestBase
This is an automated email from the ASF dual-hosted git repository. srichter pushed a commit to branch release-1.7 in repository https://gitbox.apache.org/repos/asf/flink.git commit bd75a06c7aecd3b3f2d6fcf64bd4dde4624828a6 Author: Stefan Richter AuthorDate: Thu Nov 8 14:44:08 2018 +0100 [FLINK-10827][tests] Add test for duplicate() to SerializerTestBase This closes #7061. --- .../api/common/typeutils/SerializerTestBase.java | 128 ++--- 1 file changed, 112 insertions(+), 16 deletions(-) diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java index a4908f9..151dafb 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java @@ -18,32 +18,39 @@ package org.apache.flink.api.common.typeutils; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.util.Arrays; - import org.apache.flink.api.java.typeutils.runtime.NullableSerializer; +import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.util.Preconditions; import org.apache.flink.util.TestLogger; -import org.junit.Assert; import org.apache.commons.lang3.SerializationException; import org.apache.commons.lang3.SerializationUtils; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; +import org.junit.Assert; import org.junit.Test; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.CyclicBarrier; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + /** * Abstract test base for serializers. * @@ -436,6 +443,33 @@ public abstract class SerializerTestBase extends TestLogger { } } + @Test + public void testDuplicate() throws Exception { + final int numThreads = 10; + final TypeSerializer serializer = getSerializer(); + final CyclicBarrier startLatch = new CyclicBarrier(numThreads); + final List> concurrentRunners = new ArrayList<>(numThreads); + Assert.assertEquals(serializer, serializer.duplicate()); + + T[] testData = getData(); + + for (int i = 0; i < numThreads; ++i) { + SerializerRunner runner = new SerializerRunner<>( + startLatch, + serializer.duplicate(), + testData, + 120L); + + runner.start(); + concurrentRunners.add(runner); + } + + for (SerializerRunner concurrentRunner : concurrentRunners) { + concurrentRunner.join(); + concurrentRunner.checkResult(); + } + } + // protected void deepEquals(String message, T should, T is) { @@ -526,6 +560,68 @@ public abstract class SerializerTestBase extends TestLogger { } } + /** +* Runner to test serializer duplication via concurrency. +* @param type of the test elements. +*/ + static class SerializerRunner extends Thread { + final CyclicBarrier allReadyBarrier; + final TypeSerializer serializer; + final T[] testData; + final long durationLimitMillis; + Exception failure; + + SerializerRunner( + CyclicBarrier allReadyBarrier, + TypeSerializer serializer, + T[] testData, +
[flink] branch release-1.7 updated (7dc4c70 -> bd75a06)
This is an automated email from the ASF dual-hosted git repository. srichter pushed a change to branch release-1.7 in repository https://gitbox.apache.org/repos/asf/flink.git. from 7dc4c70 [FLINK-10821] E2E now uses externalized checkpoint new 19c34ba [FLINK-10839][serializer] Fix implementation of PojoSerializer.duplicate() w.r.t. subclass serializer new bd75a06 [FLINK-10827][tests] Add test for duplicate() to SerializerTestBase The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../api/java/typeutils/runtime/PojoSerializer.java | 38 +++--- .../api/common/typeutils/SerializerTestBase.java | 128 ++--- 2 files changed, 132 insertions(+), 34 deletions(-)
[flink] 01/02: [FLINK-10839][serializer] Fix implementation of PojoSerializer.duplicate() w.r.t. subclass serializer
This is an automated email from the ASF dual-hosted git repository. srichter pushed a commit to branch release-1.7 in repository https://gitbox.apache.org/repos/asf/flink.git commit 19c34baa6fd9554048108b58baa517addf483c2c Author: Stefan Richter AuthorDate: Fri Nov 9 12:56:02 2018 +0100 [FLINK-10839][serializer] Fix implementation of PojoSerializer.duplicate() w.r.t. subclass serializer --- .../api/java/typeutils/runtime/PojoSerializer.java | 38 -- 1 file changed, 20 insertions(+), 18 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java index a4d3839..f1dd8fc 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java @@ -18,29 +18,17 @@ package org.apache.flink.api.java.typeutils.runtime; -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.lang.reflect.Field; -import java.lang.reflect.Modifier; -import java.util.Arrays; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.LinkedHashSet; -import java.util.Map; -import java.util.Objects; - import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeutils.CompatibilityResult; import org.apache.flink.api.common.typeutils.CompatibilityUtil; import org.apache.flink.api.common.typeutils.GenericTypeSerializerConfigSnapshot; -import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; -import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil; import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil; import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TypeExtractor; @@ -52,6 +40,18 @@ import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.util.Preconditions; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; +import java.util.Arrays; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Objects; + import static org.apache.flink.util.Preconditions.checkNotNull; @Internal @@ -181,11 +181,13 @@ public final class PojoSerializer extends TypeSerializer { } } - if (stateful) { - return new PojoSerializer(clazz, duplicateFieldSerializers, fields, executionConfig); - } else { - return this; + if (!stateful) { + // as a small memory optimization, we can share the same object between instances + duplicateFieldSerializers = fieldSerializers; } + + // we must create a new instance, otherwise the subclassSerializerCache can create concurrency problems + return new PojoSerializer<>(clazz, duplicateFieldSerializers, fields, executionConfig); }
[flink] branch release-1.6 updated (4b0a941 -> fffe6e1)
This is an automated email from the ASF dual-hosted git repository. srichter pushed a change to branch release-1.6 in repository https://gitbox.apache.org/repos/asf/flink.git. from 4b0a941 [FLINK-10821] E2E now uses externalized checkpoint new 8196a69 [FLINK-10839][serializer] Fix implementation of PojoSerializer.duplicate() w.r.t. subclass serializer new fffe6e1 [FLINK-10827][tests] Add test for duplicate() to SerializerTestBase The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../api/java/typeutils/runtime/PojoSerializer.java | 34 +++--- .../api/common/typeutils/SerializerTestBase.java | 130 ++--- 2 files changed, 131 insertions(+), 33 deletions(-)
[flink] 02/02: [FLINK-10827][tests] Add test for duplicate() to SerializerTestBase
This is an automated email from the ASF dual-hosted git repository. srichter pushed a commit to branch release-1.6 in repository https://gitbox.apache.org/repos/asf/flink.git commit fffe6e18bec2361001c8b852aec465d9fe4fc718 Author: Stefan Richter AuthorDate: Thu Nov 8 14:44:08 2018 +0100 [FLINK-10827][tests] Add test for duplicate() to SerializerTestBase This closes #7061. --- .../api/java/typeutils/runtime/PojoSerializer.java | 2 - .../api/common/typeutils/SerializerTestBase.java | 130 ++--- 2 files changed, 113 insertions(+), 19 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java index f38a620..021bc24 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java @@ -27,8 +27,6 @@ import org.apache.flink.api.common.typeutils.GenericTypeSerializerConfigSnapshot import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil; -import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; -import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil; import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TypeExtractor; diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java index 1997866..61aee94 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java @@ -18,33 +18,40 @@ package org.apache.flink.api.common.typeutils; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.util.Arrays; - import org.apache.flink.api.java.typeutils.runtime.NullableSerializer; +import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.util.Preconditions; import org.apache.flink.util.TestLogger; -import org.junit.Assert; import org.apache.commons.lang3.SerializationException; import org.apache.commons.lang3.SerializationUtils; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; +import org.junit.Assert; import org.junit.Test; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.CyclicBarrier; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + /** * Abstract test base for serializers. * @@ -437,6 +444,33 @@ public abstract class SerializerTestBase extends TestLogger { } } + @Test + public void testDuplicate() throws Exception { + final int numThreads = 10; + final TypeSerializer serializer = getSerializer(); + final CyclicBarrier startLatch = new CyclicBarrier(numThreads); + final List> concurrentRunners = new ArrayList<>(numThreads); + Assert.assertEquals(serializer, serializer.duplicate()); + + T[] testData = getData(); + + for (int i = 0; i < numThreads; ++i) { + SerializerRunner runner = new SerializerRunner<>( + startLatch, + serializer.duplicate(), + testData, +
[flink] 02/02: [FLINK-10827][tests] Add test for duplicate() to SerializerTestBase
This is an automated email from the ASF dual-hosted git repository. srichter pushed a commit to branch release-1.5 in repository https://gitbox.apache.org/repos/asf/flink.git commit 4f1a3764e44410231ad013e3bf069118494e6eab Author: Stefan Richter AuthorDate: Thu Nov 8 14:44:08 2018 +0100 [FLINK-10827][tests] Add test for duplicate() to SerializerTestBase This closes #7061. --- .../api/common/typeutils/SerializerTestBase.java | 254 ++--- 1 file changed, 175 insertions(+), 79 deletions(-) diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java index 57015c7..49b80d3 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java @@ -18,32 +18,39 @@ package org.apache.flink.api.common.typeutils; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.util.Arrays; - +import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.util.Preconditions; import org.apache.flink.util.TestLogger; -import org.junit.Assert; import org.apache.commons.lang3.SerializationException; import org.apache.commons.lang3.SerializationUtils; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; +import org.junit.Assert; import org.junit.Test; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.CyclicBarrier; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + /** * Abstract test base for serializers. * @@ -53,23 +60,23 @@ import org.junit.Test; * internal state would be corrupt, which becomes evident when toString is called. */ public abstract class SerializerTestBase extends TestLogger { - + protected abstract TypeSerializer createSerializer(); /** * Gets the expected length for the serializer's {@link TypeSerializer#getLength()} method. -* +* * The expected length should be positive, for fix-length data types, or {@code -1} for * variable-length types. */ protected abstract int getLength(); - + protected abstract Class getTypeClass(); - + protected abstract T[] getTestData(); // - + @Test public void testInstantiate() { try { @@ -80,7 +87,7 @@ public abstract class SerializerTestBase extends TestLogger { } T instance = serializer.createInstance(); assertNotNull("The created instance must not be null.", instance); - + Class type = getTypeClass(); assertNotNull("The test is corrupt: type class is null.", type); @@ -127,7 +134,7 @@ public abstract class SerializerTestBase extends TestLogger { strategy = getSerializer().ensureCompatibility(new TestIncompatibleSerializerConfigSnapshot()); assertTrue(strategy.isRequiresMigration()); } - + @Test public void testGetLength() { final int len = getLength(); @@ -146,13 +153,13 @@ public abstract class SerializerTestBase extends TestLogger { fail("Exception in test: " + e.getMessage()); } } - + @Test public void testCopy() { try { TypeSerializer serializer = getSerializer(); T[]
[flink] branch release-1.5 updated (c400273 -> 4f1a376)
This is an automated email from the ASF dual-hosted git repository. srichter pushed a change to branch release-1.5 in repository https://gitbox.apache.org/repos/asf/flink.git. from c400273 [FLINK-10821] E2E now uses externalized checkpoint new 7ba80b0 [FLINK-10839][serializer] Fix implementation of PojoSerializer.duplicate() w.r.t. subclass serializer new 4f1a376 [FLINK-10827][tests] Add test for duplicate() to SerializerTestBase The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../api/java/typeutils/runtime/PojoSerializer.java | 34 +-- .../api/common/typeutils/SerializerTestBase.java | 254 ++--- 2 files changed, 193 insertions(+), 95 deletions(-)