[flink] 01/02: [FLINK-10826] [e2e] Decrease deployment size of heavy deplyment e2e test for Travis

2018-11-12 Thread twalthr
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f393fb70b27cd2a64cb2c010d7d2e902f75c519f
Author: Stefan Richter 
AuthorDate: Thu Nov 8 15:08:35 2018 +0100

[FLINK-10826] [e2e] Decrease deployment size of heavy deplyment e2e test 
for Travis

This closes #7066.
---
 flink-end-to-end-tests/test-scripts/test_heavy_deployment.sh | 10 +-
 1 file changed, 5 insertions(+), 5 deletions(-)

diff --git a/flink-end-to-end-tests/test-scripts/test_heavy_deployment.sh 
b/flink-end-to-end-tests/test-scripts/test_heavy_deployment.sh
index 895e4a7..b4646fc 100755
--- a/flink-end-to-end-tests/test-scripts/test_heavy_deployment.sh
+++ b/flink-end-to-end-tests/test-scripts/test_heavy_deployment.sh
@@ -25,7 +25,7 @@ TEST=flink-heavy-deployment-stress-test
 TEST_PROGRAM_NAME=HeavyDeploymentStressTestProgram
 TEST_PROGRAM_JAR=${END_TO_END_DIR}/$TEST/target/$TEST_PROGRAM_NAME.jar
 
-set_conf "taskmanager.heap.mb" "256" # 256Mb x 20TMs = 5Gb total heap
+set_conf "taskmanager.heap.mb" "512" # 512Mb x 10TMs = 5Gb total heap
 
 set_conf "taskmanager.memory.size" "8" # 8Mb
 set_conf "taskmanager.network.memory.min" "8mb"
@@ -35,12 +35,12 @@ set_conf "taskmanager.memory.segment-size" "8kb"
 set_conf "taskmanager.numberOfTaskSlots" "10" # 10 slots per TM
 
 start_cluster # this also starts 1TM
-start_taskmanagers 19 # 1TM + 19TM = 20TM a 10 slots = 200 slots
+start_taskmanagers 9 # 1TM + 9TM = 10TM a 10 slots = 100 slots
 
-# This call will result in a deployment with state meta data of 200 x 200 x 50 
union states x each 75 entries.
+# This call will result in a deployment with state meta data of 100 x 100 x 50 
union states x each 100 entries.
 # We can scale up the numbers to make the test even heavier.
 $FLINK_DIR/bin/flink run ${TEST_PROGRAM_JAR} \
---environment.max_parallelism 1024 --environment.parallelism 200 \
+--environment.max_parallelism 1024 --environment.parallelism 100 \
 --environment.restart_strategy fixed_delay 
--environment.restart_strategy.fixed_delay.attempts 3 \
 --state_backend.checkpoint_directory ${CHECKPOINT_DIR} \
---heavy_deployment_test.num_list_states_per_op 50 
--heavy_deployment_test.num_partitions_per_list_state 75
+--heavy_deployment_test.num_list_states_per_op 50 
--heavy_deployment_test.num_partitions_per_list_state 100



[flink] branch master updated (17a6e7a -> 6115b05)

2018-11-12 Thread twalthr
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 17a6e7a  [FLINK-10626] [docs] [table] Add documentation for temporal 
table joins
 new f393fb7  [FLINK-10826] [e2e] Decrease deployment size of heavy 
deplyment e2e test for Travis
 new 6115b05  [FLINK-10826] [e2e] Increase network timeout

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 flink-end-to-end-tests/test-scripts/test_heavy_deployment.sh | 11 ++-
 1 file changed, 6 insertions(+), 5 deletions(-)



[flink] 02/02: [FLINK-10826] [e2e] Increase network timeout

2018-11-12 Thread twalthr
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6115b05c87e387c301b21758a6d29e300d138a84
Author: Timo Walther 
AuthorDate: Mon Nov 12 10:28:51 2018 +0100

[FLINK-10826] [e2e] Increase network timeout
---
 flink-end-to-end-tests/test-scripts/test_heavy_deployment.sh | 1 +
 1 file changed, 1 insertion(+)

diff --git a/flink-end-to-end-tests/test-scripts/test_heavy_deployment.sh 
b/flink-end-to-end-tests/test-scripts/test_heavy_deployment.sh
index b4646fc..8613470 100755
--- a/flink-end-to-end-tests/test-scripts/test_heavy_deployment.sh
+++ b/flink-end-to-end-tests/test-scripts/test_heavy_deployment.sh
@@ -30,6 +30,7 @@ set_conf "taskmanager.heap.mb" "512" # 512Mb x 10TMs = 5Gb 
total heap
 set_conf "taskmanager.memory.size" "8" # 8Mb
 set_conf "taskmanager.network.memory.min" "8mb"
 set_conf "taskmanager.network.memory.max" "8mb"
+set_conf "taskmanager.network.request-backoff.max" "6"
 set_conf "taskmanager.memory.segment-size" "8kb"
 
 set_conf "taskmanager.numberOfTaskSlots" "10" # 10 slots per TM



[flink] branch master updated: [FLINK-10626] [docs] [table] Add documentation for temporal table joins

2018-11-12 Thread twalthr
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 17a6e7a  [FLINK-10626] [docs] [table] Add documentation for temporal 
table joins
17a6e7a is described below

commit 17a6e7a2be756883f9109ba04febe1f6743944a3
Author: Timo Walther 
AuthorDate: Thu Nov 8 17:32:35 2018 +0100

[FLINK-10626] [docs] [table] Add documentation for temporal table joins

This closes #7065.
---
 docs/dev/table/streaming/joins.md | 33 +++--
 1 file changed, 27 insertions(+), 6 deletions(-)

diff --git a/docs/dev/table/streaming/joins.md 
b/docs/dev/table/streaming/joins.md
index 24c33b9..f293406 100644
--- a/docs/dev/table/streaming/joins.md
+++ b/docs/dev/table/streaming/joins.md
@@ -136,14 +136,14 @@ SELECT
   o.amount * r.rate AS amount
 FROM
   Orders AS o,
-  LATERAL TABLE (Rates(o.proctime)) AS r
+  LATERAL TABLE (Rates(o.rowtime)) AS r
 WHERE r.currency = o.currency
 {% endhighlight %}
 
-Each record from the probe side will be joined with the version of the build 
side table at the time of the correlated time attribute of the probe side 
record. 
+Each record from the probe side will be joined with the version of the build 
side table at the time of the correlated time attribute of the probe side 
record.
 In order to support updates (overwrites) of previous values on the build side 
table, the table must define a primary key.
 
-In our example, each record from `Orders` will be joined with the version of 
`Rates` at time `o.proctime`. Because the time attribute defines a 
processing-time notion, a newly appended order is always joined with the most 
recent version of `Rates` when executing the operation. The `currency` field 
has been defined as the primary key of `Rates` before and is used to connect 
both tables in our example.
+In our example, each record from `Orders` will be joined with the version of 
`Rates` at time `o.rowtime`. The `currency` field has been defined as the 
primary key of `Rates` before and is used to connect both tables in our 
example. If the query were using a processing-time notion, a newly appended 
order would always be joined with the most recent version of `Rates` when 
executing the operation. 
 
 In contrast to [regular joins](#regular-joins), this means that if there is a 
new record on the build side, it will not affect the previous results of the 
join.
 This again allows Flink to limit the number of elements that must be kept in 
the state.
@@ -189,14 +189,35 @@ val result = orders
 
 
 
+**Note**: State retention defined in a [query 
configuration](query_configuration.html) is not yet implemented for temporal 
joins.
+This means that the required state to compute the query result might grow 
infinitely depending on the number of distinct primary keys for the history 
table.
+
 ### Processing-time Temporal Joins
 
 With a processing-time time attribute, it is impossible to pass _past_ time 
attributes as an argument to the temporal table function.
-By definition, it is always the current timestamp. Thus, processing-time 
temporal table function invocations will always return the latest known 
versions of the underlying table
+By definition, it is always the current timestamp. Thus, invocations of a 
processing-time temporal table function will always return the latest known 
versions of the underlying table
 and any updates in the underlying history table will also immediately 
overwrite the current values.
 
-Only the latest versions (with respect to the defined primary key) of the 
build side records are kept in the state. New updates will have no effect on 
the previously results emitted/processed records from the probe side.
+Only the latest versions (with respect to the defined primary key) of the 
build side records are kept in the state.
+New updates will have no effect on the previously results emitted/processed 
records from the probe side.
 
-One can think about processing-time temporal join as a simple `HashMap` 
that stores all of the records from the build side.
+One can think about a processing-time temporal join as a simple `HashMap` that stores all of the records from the build side.
 When a new record from the build side has the same key as some previous 
record, the old value is just simply overwritten.
 Every record from the probe side is always evaluated against the most 
recent/current state of the `HashMap`.
+
+### Event-time Temporal Joins
+
+With an event-time time attribute (i.e., a rowtime attribute), it is possible 
to pass _past_ time attributes to the temporal table function.
+This allows for joining the two tables at a common point in time.
+
+Compared to processing-time temporal joins, the temporal table does not only 
keep the latest version (with respect to the defined primary key) of the build 
side records in the 

[flink] 01/02: [FLINK-10826] [e2e] Decrease deployment size of heavy deplyment e2e test for Travis

2018-11-12 Thread twalthr
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.7
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7d9fcbaf1408b09ee1d409ec1568a5af51d726d0
Author: Stefan Richter 
AuthorDate: Thu Nov 8 15:08:35 2018 +0100

[FLINK-10826] [e2e] Decrease deployment size of heavy deplyment e2e test 
for Travis

This closes #7066.
---
 flink-end-to-end-tests/test-scripts/test_heavy_deployment.sh | 10 +-
 1 file changed, 5 insertions(+), 5 deletions(-)

diff --git a/flink-end-to-end-tests/test-scripts/test_heavy_deployment.sh 
b/flink-end-to-end-tests/test-scripts/test_heavy_deployment.sh
index 895e4a7..b4646fc 100755
--- a/flink-end-to-end-tests/test-scripts/test_heavy_deployment.sh
+++ b/flink-end-to-end-tests/test-scripts/test_heavy_deployment.sh
@@ -25,7 +25,7 @@ TEST=flink-heavy-deployment-stress-test
 TEST_PROGRAM_NAME=HeavyDeploymentStressTestProgram
 TEST_PROGRAM_JAR=${END_TO_END_DIR}/$TEST/target/$TEST_PROGRAM_NAME.jar
 
-set_conf "taskmanager.heap.mb" "256" # 256Mb x 20TMs = 5Gb total heap
+set_conf "taskmanager.heap.mb" "512" # 512Mb x 10TMs = 5Gb total heap
 
 set_conf "taskmanager.memory.size" "8" # 8Mb
 set_conf "taskmanager.network.memory.min" "8mb"
@@ -35,12 +35,12 @@ set_conf "taskmanager.memory.segment-size" "8kb"
 set_conf "taskmanager.numberOfTaskSlots" "10" # 10 slots per TM
 
 start_cluster # this also starts 1TM
-start_taskmanagers 19 # 1TM + 19TM = 20TM a 10 slots = 200 slots
+start_taskmanagers 9 # 1TM + 9TM = 10TM a 10 slots = 100 slots
 
-# This call will result in a deployment with state meta data of 200 x 200 x 50 
union states x each 75 entries.
+# This call will result in a deployment with state meta data of 100 x 100 x 50 
union states x each 100 entries.
 # We can scale up the numbers to make the test even heavier.
 $FLINK_DIR/bin/flink run ${TEST_PROGRAM_JAR} \
---environment.max_parallelism 1024 --environment.parallelism 200 \
+--environment.max_parallelism 1024 --environment.parallelism 100 \
 --environment.restart_strategy fixed_delay 
--environment.restart_strategy.fixed_delay.attempts 3 \
 --state_backend.checkpoint_directory ${CHECKPOINT_DIR} \
---heavy_deployment_test.num_list_states_per_op 50 
--heavy_deployment_test.num_partitions_per_list_state 75
+--heavy_deployment_test.num_list_states_per_op 50 
--heavy_deployment_test.num_partitions_per_list_state 100



[flink] 02/02: [FLINK-10826] [e2e] Increase network timeout

2018-11-12 Thread twalthr
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.7
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 78ea3923782fddfd4fbb414c754d4db7701e24ca
Author: Timo Walther 
AuthorDate: Mon Nov 12 10:28:51 2018 +0100

[FLINK-10826] [e2e] Increase network timeout
---
 flink-end-to-end-tests/test-scripts/test_heavy_deployment.sh | 1 +
 1 file changed, 1 insertion(+)

diff --git a/flink-end-to-end-tests/test-scripts/test_heavy_deployment.sh 
b/flink-end-to-end-tests/test-scripts/test_heavy_deployment.sh
index b4646fc..8613470 100755
--- a/flink-end-to-end-tests/test-scripts/test_heavy_deployment.sh
+++ b/flink-end-to-end-tests/test-scripts/test_heavy_deployment.sh
@@ -30,6 +30,7 @@ set_conf "taskmanager.heap.mb" "512" # 512Mb x 10TMs = 5Gb 
total heap
 set_conf "taskmanager.memory.size" "8" # 8Mb
 set_conf "taskmanager.network.memory.min" "8mb"
 set_conf "taskmanager.network.memory.max" "8mb"
+set_conf "taskmanager.network.request-backoff.max" "6"
 set_conf "taskmanager.memory.segment-size" "8kb"
 
 set_conf "taskmanager.numberOfTaskSlots" "10" # 10 slots per TM



[flink] branch release-1.7 updated (096918c -> 78ea392)

2018-11-12 Thread twalthr
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a change to branch release-1.7
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 096918c  [FLINK-10626] [docs] [table] Add documentation for temporal 
table joins
 new 7d9fcba  [FLINK-10826] [e2e] Decrease deployment size of heavy 
deplyment e2e test for Travis
 new 78ea392  [FLINK-10826] [e2e] Increase network timeout

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 flink-end-to-end-tests/test-scripts/test_heavy_deployment.sh | 11 ++-
 1 file changed, 6 insertions(+), 5 deletions(-)



[flink] branch release-1.7 updated: [FLINK-10626] [docs] [table] Add documentation for temporal table joins

2018-11-12 Thread twalthr
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.7
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.7 by this push:
 new 096918c  [FLINK-10626] [docs] [table] Add documentation for temporal 
table joins
096918c is described below

commit 096918c6577b06bb9ac6250063c6c99a04907f77
Author: Timo Walther 
AuthorDate: Thu Nov 8 17:32:35 2018 +0100

[FLINK-10626] [docs] [table] Add documentation for temporal table joins

This closes #7065.
---
 docs/dev/table/streaming/joins.md | 33 +++--
 1 file changed, 27 insertions(+), 6 deletions(-)

diff --git a/docs/dev/table/streaming/joins.md 
b/docs/dev/table/streaming/joins.md
index 24c33b9..f293406 100644
--- a/docs/dev/table/streaming/joins.md
+++ b/docs/dev/table/streaming/joins.md
@@ -136,14 +136,14 @@ SELECT
   o.amount * r.rate AS amount
 FROM
   Orders AS o,
-  LATERAL TABLE (Rates(o.proctime)) AS r
+  LATERAL TABLE (Rates(o.rowtime)) AS r
 WHERE r.currency = o.currency
 {% endhighlight %}
 
-Each record from the probe side will be joined with the version of the build 
side table at the time of the correlated time attribute of the probe side 
record. 
+Each record from the probe side will be joined with the version of the build 
side table at the time of the correlated time attribute of the probe side 
record.
 In order to support updates (overwrites) of previous values on the build side 
table, the table must define a primary key.
 
-In our example, each record from `Orders` will be joined with the version of 
`Rates` at time `o.proctime`. Because the time attribute defines a 
processing-time notion, a newly appended order is always joined with the most 
recent version of `Rates` when executing the operation. The `currency` field 
has been defined as the primary key of `Rates` before and is used to connect 
both tables in our example.
+In our example, each record from `Orders` will be joined with the version of 
`Rates` at time `o.rowtime`. The `currency` field has been defined as the 
primary key of `Rates` before and is used to connect both tables in our 
example. If the query were using a processing-time notion, a newly appended 
order would always be joined with the most recent version of `Rates` when 
executing the operation. 
 
 In contrast to [regular joins](#regular-joins), this means that if there is a 
new record on the build side, it will not affect the previous results of the 
join.
 This again allows Flink to limit the number of elements that must be kept in 
the state.
@@ -189,14 +189,35 @@ val result = orders
 
 
 
+**Note**: State retention defined in a [query 
configuration](query_configuration.html) is not yet implemented for temporal 
joins.
+This means that the required state to compute the query result might grow 
infinitely depending on the number of distinct primary keys for the history 
table.
+
 ### Processing-time Temporal Joins
 
 With a processing-time time attribute, it is impossible to pass _past_ time 
attributes as an argument to the temporal table function.
-By definition, it is always the current timestamp. Thus, processing-time 
temporal table function invocations will always return the latest known 
versions of the underlying table
+By definition, it is always the current timestamp. Thus, invocations of a 
processing-time temporal table function will always return the latest known 
versions of the underlying table
 and any updates in the underlying history table will also immediately 
overwrite the current values.
 
-Only the latest versions (with respect to the defined primary key) of the 
build side records are kept in the state. New updates will have no effect on 
the previously results emitted/processed records from the probe side.
+Only the latest versions (with respect to the defined primary key) of the 
build side records are kept in the state.
+New updates will have no effect on the previously results emitted/processed 
records from the probe side.
 
-One can think about processing-time temporal join as a simple `HashMap` 
that stores all of the records from the build side.
+One can think about a processing-time temporal join as a simple `HashMap` that stores all of the records from the build side.
 When a new record from the build side has the same key as some previous 
record, the old value is just simply overwritten.
 Every record from the probe side is always evaluated against the most 
recent/current state of the `HashMap`.
+
+### Event-time Temporal Joins
+
+With an event-time time attribute (i.e., a rowtime attribute), it is possible 
to pass _past_ time attributes to the temporal table function.
+This allows for joining the two tables at a common point in time.
+
+Compared to processing-time temporal joins, the temporal table does not only 
keep the latest version (with respect to the defined primary key) of the build 
side records 

[flink] branch release-1.5 updated: [FLINK-10821] E2E now uses externalized checkpoint

2018-11-12 Thread trohrmann
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.5 by this push:
 new c400273  [FLINK-10821] E2E now uses externalized checkpoint
c400273 is described below

commit c400273530687f952345922045d1124bf66fd65a
Author: Igal Shilman 
AuthorDate: Fri Nov 9 13:59:24 2018 +0100

[FLINK-10821] E2E now uses externalized checkpoint

This commit fixes the test_resume_externalized_checkpoints.sh script,
by providing the path to the externalized checkpoint taken.
---
 .../test_resume_externalized_checkpoints.sh| 18 +++---
 1 file changed, 15 insertions(+), 3 deletions(-)

diff --git 
a/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh 
b/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh
index 29c3786..6e8f462 100755
--- 
a/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh
+++ 
b/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh
@@ -67,8 +67,9 @@ 
TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-datastream-allround-test/target/DataStr
 
 function buildBaseJobCmd {
   local dop=$1
+  local flink_args="$2"
 
-  echo "$FLINK_DIR/bin/flink run -d -p $dop $TEST_PROGRAM_JAR \
+  echo "$FLINK_DIR/bin/flink run -d ${flink_args} -p $dop $TEST_PROGRAM_JAR \
 --test.semantics exactly-once \
 --environment.parallelism $dop \
 --environment.externalize_checkpoint true \
@@ -126,9 +127,20 @@ fi
 
 echo "Restoring job with externalized checkpoint at $CHECKPOINT_PATH ..."
 
-BASE_JOB_CMD=`buildBaseJobCmd $NEW_DOP`
+BASE_JOB_CMD=`buildBaseJobCmd $NEW_DOP "-s file://${CHECKPOINT_PATH}"`
+JOB_CMD=""
+if [[ $SIMULATE_FAILURE == "true" ]]; then
+  JOB_CMD="$BASE_JOB_CMD \
+--test.simulate_failure true \
+--test.simulate_failure.num_records 0 \
+--test.simulate_failure.num_checkpoints 0 \
+--test.simulate_failure.max_failures 0 \
+--environment.restart_strategy no_restart"
+else
+  JOB_CMD=$BASE_JOB_CMD
+fi
 
-DATASTREAM_JOB=$($BASE_JOB_CMD | grep "Job has been submitted with JobID" | 
sed 's/.* //g')
+DATASTREAM_JOB=$($JOB_CMD | grep "Job has been submitted with JobID" | sed 
's/.* //g')
 
 wait_job_running $DATASTREAM_JOB
 wait_oper_metric_num_in_records ArtificalKeyedStateMapper.0 200



[flink] branch master updated: [FLINK-10821] E2E now uses externalized checkpoint

2018-11-12 Thread trohrmann
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 1ad17e0  [FLINK-10821] E2E now uses externalized checkpoint
1ad17e0 is described below

commit 1ad17e04fe11f7eb170e06d0a076b1b9e31249d7
Author: Igal Shilman 
AuthorDate: Fri Nov 9 13:59:24 2018 +0100

[FLINK-10821] E2E now uses externalized checkpoint

This commit fixes the test_resume_externalized_checkpoints.sh script,
by providing the path to the externalized checkpoint taken.
---
 .../test_resume_externalized_checkpoints.sh| 18 +++---
 1 file changed, 15 insertions(+), 3 deletions(-)

diff --git 
a/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh 
b/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh
index e1ba65d..fe23190 100755
--- 
a/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh
+++ 
b/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh
@@ -66,8 +66,9 @@ 
TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-datastream-allround-test/target/DataStr
 
 function buildBaseJobCmd {
   local dop=$1
+  local flink_args="$2"
 
-  echo "$FLINK_DIR/bin/flink run -d -p $dop $TEST_PROGRAM_JAR \
+  echo "$FLINK_DIR/bin/flink run -d ${flink_args} -p $dop $TEST_PROGRAM_JAR \
 --test.semantics exactly-once \
 --environment.parallelism $dop \
 --environment.externalize_checkpoint true \
@@ -125,9 +126,20 @@ fi
 
 echo "Restoring job with externalized checkpoint at $CHECKPOINT_PATH ..."
 
-BASE_JOB_CMD=`buildBaseJobCmd $NEW_DOP`
+BASE_JOB_CMD=`buildBaseJobCmd $NEW_DOP "-s file://${CHECKPOINT_PATH}"`
+JOB_CMD=""
+if [[ $SIMULATE_FAILURE == "true" ]]; then
+  JOB_CMD="$BASE_JOB_CMD \
+--test.simulate_failure true \
+--test.simulate_failure.num_records 0 \
+--test.simulate_failure.num_checkpoints 0 \
+--test.simulate_failure.max_failures 0 \
+--environment.restart_strategy no_restart"
+else
+  JOB_CMD=$BASE_JOB_CMD
+fi
 
-DATASTREAM_JOB=$($BASE_JOB_CMD | grep "Job has been submitted with JobID" | 
sed 's/.* //g')
+DATASTREAM_JOB=$($JOB_CMD | grep "Job has been submitted with JobID" | sed 
's/.* //g')
 
 wait_job_running $DATASTREAM_JOB
 wait_oper_metric_num_in_records SemanticsCheckMapper.0 200



[flink] branch release-1.6 updated: [FLINK-10821] E2E now uses externalized checkpoint

2018-11-12 Thread trohrmann
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.6 by this push:
 new 4b0a941  [FLINK-10821] E2E now uses externalized checkpoint
4b0a941 is described below

commit 4b0a941c2aface840819c8ff974113bf2b2221d9
Author: Igal Shilman 
AuthorDate: Fri Nov 9 13:59:24 2018 +0100

[FLINK-10821] E2E now uses externalized checkpoint

This commit fixes the test_resume_externalized_checkpoints.sh script,
by providing the path to the externalized checkpoint taken.
---
 .../test_resume_externalized_checkpoints.sh| 18 +++---
 1 file changed, 15 insertions(+), 3 deletions(-)

diff --git 
a/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh 
b/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh
index e1ba65d..fe23190 100755
--- 
a/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh
+++ 
b/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh
@@ -66,8 +66,9 @@ 
TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-datastream-allround-test/target/DataStr
 
 function buildBaseJobCmd {
   local dop=$1
+  local flink_args="$2"
 
-  echo "$FLINK_DIR/bin/flink run -d -p $dop $TEST_PROGRAM_JAR \
+  echo "$FLINK_DIR/bin/flink run -d ${flink_args} -p $dop $TEST_PROGRAM_JAR \
 --test.semantics exactly-once \
 --environment.parallelism $dop \
 --environment.externalize_checkpoint true \
@@ -125,9 +126,20 @@ fi
 
 echo "Restoring job with externalized checkpoint at $CHECKPOINT_PATH ..."
 
-BASE_JOB_CMD=`buildBaseJobCmd $NEW_DOP`
+BASE_JOB_CMD=`buildBaseJobCmd $NEW_DOP "-s file://${CHECKPOINT_PATH}"`
+JOB_CMD=""
+if [[ $SIMULATE_FAILURE == "true" ]]; then
+  JOB_CMD="$BASE_JOB_CMD \
+--test.simulate_failure true \
+--test.simulate_failure.num_records 0 \
+--test.simulate_failure.num_checkpoints 0 \
+--test.simulate_failure.max_failures 0 \
+--environment.restart_strategy no_restart"
+else
+  JOB_CMD=$BASE_JOB_CMD
+fi
 
-DATASTREAM_JOB=$($BASE_JOB_CMD | grep "Job has been submitted with JobID" | 
sed 's/.* //g')
+DATASTREAM_JOB=$($JOB_CMD | grep "Job has been submitted with JobID" | sed 
's/.* //g')
 
 wait_job_running $DATASTREAM_JOB
 wait_oper_metric_num_in_records SemanticsCheckMapper.0 200



[flink] branch release-1.7 updated: [FLINK-10821] E2E now uses externalized checkpoint

2018-11-12 Thread trohrmann
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.7
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.7 by this push:
 new 7dc4c70  [FLINK-10821] E2E now uses externalized checkpoint
7dc4c70 is described below

commit 7dc4c70e2f749f0b0888940cbb1c43d93747d677
Author: Igal Shilman 
AuthorDate: Fri Nov 9 13:59:24 2018 +0100

[FLINK-10821] E2E now uses externalized checkpoint

This commit fixes the test_resume_externalized_checkpoints.sh script,
by providing the path to the externalized checkpoint taken.
---
 .../test_resume_externalized_checkpoints.sh| 18 +++---
 1 file changed, 15 insertions(+), 3 deletions(-)

diff --git 
a/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh 
b/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh
index e1ba65d..fe23190 100755
--- 
a/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh
+++ 
b/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh
@@ -66,8 +66,9 @@ 
TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-datastream-allround-test/target/DataStr
 
 function buildBaseJobCmd {
   local dop=$1
+  local flink_args="$2"
 
-  echo "$FLINK_DIR/bin/flink run -d -p $dop $TEST_PROGRAM_JAR \
+  echo "$FLINK_DIR/bin/flink run -d ${flink_args} -p $dop $TEST_PROGRAM_JAR \
 --test.semantics exactly-once \
 --environment.parallelism $dop \
 --environment.externalize_checkpoint true \
@@ -125,9 +126,20 @@ fi
 
 echo "Restoring job with externalized checkpoint at $CHECKPOINT_PATH ..."
 
-BASE_JOB_CMD=`buildBaseJobCmd $NEW_DOP`
+BASE_JOB_CMD=`buildBaseJobCmd $NEW_DOP "-s file://${CHECKPOINT_PATH}"`
+JOB_CMD=""
+if [[ $SIMULATE_FAILURE == "true" ]]; then
+  JOB_CMD="$BASE_JOB_CMD \
+--test.simulate_failure true \
+--test.simulate_failure.num_records 0 \
+--test.simulate_failure.num_checkpoints 0 \
+--test.simulate_failure.max_failures 0 \
+--environment.restart_strategy no_restart"
+else
+  JOB_CMD=$BASE_JOB_CMD
+fi
 
-DATASTREAM_JOB=$($BASE_JOB_CMD | grep "Job has been submitted with JobID" | 
sed 's/.* //g')
+DATASTREAM_JOB=$($JOB_CMD | grep "Job has been submitted with JobID" | sed 
's/.* //g')
 
 wait_job_running $DATASTREAM_JOB
 wait_oper_metric_num_in_records SemanticsCheckMapper.0 200



[flink] 01/02: [FLINK-10839][serializer] Fix implementation of PojoSerializer.duplicate() w.r.t. subclass serializer

2018-11-12 Thread srichter
This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7ba80b0a7bceb6af2e36b8408cf28a72e53a610c
Author: Stefan Richter 
AuthorDate: Fri Nov 9 12:56:02 2018 +0100

[FLINK-10839][serializer] Fix implementation of PojoSerializer.duplicate() 
w.r.t. subclass serializer

Signed-off-by: Stefan Richter 
---
 .../api/java/typeutils/runtime/PojoSerializer.java | 34 --
 1 file changed, 18 insertions(+), 16 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
index a4d086d..021bc24 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
@@ -18,18 +18,6 @@
 
 package org.apache.flink.api.java.typeutils.runtime;
 
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.lang.reflect.Field;
-import java.lang.reflect.Modifier;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.Map;
-import java.util.Objects;
-
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.ExecutionConfig;
@@ -50,6 +38,18 @@ import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.util.Preconditions;
 
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Objects;
+
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 @Internal
@@ -179,11 +179,13 @@ public final class PojoSerializer extends 
TypeSerializer {
}
}
 
-   if (stateful) {
-   return new PojoSerializer(clazz, 
duplicateFieldSerializers, fields, executionConfig);
-   } else {
-   return this;
+   if (!stateful) {
+   // as a small memory optimization, we can share the 
same object between instances
+   duplicateFieldSerializers = fieldSerializers;
}
+
+   // we must create a new instance, otherwise the 
subclassSerializerCache can create concurrency problems
+   return new PojoSerializer<>(clazz, duplicateFieldSerializers, 
fields, executionConfig);
}
 




[flink] branch master updated: [FLINK-10753] Improve propagation and logging of snapshot exceptions

2018-11-12 Thread srichter
This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new dc8e27f  [FLINK-10753] Improve propagation and logging of snapshot 
exceptions
dc8e27f is described below

commit dc8e27fb07ec8593037ce0205b5ecb4a5bc16a40
Author: Stefan Richter 
AuthorDate: Thu Nov 8 15:51:16 2018 +0100

[FLINK-10753] Improve propagation and logging of snapshot exceptions

This closes #7064.
---
 .../runtime/checkpoint/CheckpointCoordinator.java   |  9 ++---
 .../flink/runtime/checkpoint/PendingCheckpoint.java | 21 ++---
 .../taskexecutor/rpc/RpcCheckpointResponder.java|  6 ++
 .../api/operators/AbstractStreamOperator.java   |  6 --
 .../restore/AbstractOperatorRestoreTestBase.java|  1 +
 5 files changed, 27 insertions(+), 16 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 57337b6..02b6fa4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -1249,11 +1249,14 @@ public class CheckpointCoordinator {
 
final long checkpointId = pendingCheckpoint.getCheckpointId();
 
-   final String reason = (cause != null) ? cause.getMessage() : "";
+   LOG.info("Discarding checkpoint {} of job {}.", checkpointId, 
job, cause);
 
-   LOG.info("Discarding checkpoint {} of job {} because: {}", 
checkpointId, job, reason);
+   if (cause != null) {
+   pendingCheckpoint.abortError(cause);
+   } else {
+   pendingCheckpoint.abortDeclined();
+   }
 
-   pendingCheckpoint.abortDeclined();
rememberRecentCheckpointId(checkpointId);
 
// we don't have to schedule another "dissolving" checkpoint 
any more because the
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index 1b51ac4..1bc6b0e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -34,6 +34,7 @@ import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.io.IOException;
@@ -433,25 +434,23 @@ public class PendingCheckpoint {
}
}
 
+
public void abortDeclined() {
-   try {
-   Exception cause = new Exception("Checkpoint was 
declined (tasks not ready)");
-   onCompletionPromise.completeExceptionally(cause);
-   reportFailedCheckpoint(cause);
-   } finally {
-   dispose(true);
-   }
+   abortWithCause(new Exception("Checkpoint was declined (tasks 
not ready)"));
}
 
/**
 * Aborts the pending checkpoint due to an error.
 * @param cause The error's exception.
 */
-   public void abortError(Throwable cause) {
+   public void abortError(@Nonnull Throwable cause) {
+   abortWithCause(new Exception("Checkpoint failed: " + 
cause.getMessage(), cause));
+   }
+
+   private void abortWithCause(@Nonnull Exception cause) {
try {
-   Exception failure = new Exception("Checkpoint failed: " 
+ cause.getMessage(), cause);
-   onCompletionPromise.completeExceptionally(failure);
-   reportFailedCheckpoint(failure);
+   onCompletionPromise.completeExceptionally(cause);
+   reportFailedCheckpoint(cause);
} finally {
dispose(true);
}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java
index aba8bda..918fa50 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java
@@ -26,8 +26,13 @@ import 
org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.taskmanager.CheckpointResponder;
 import org.apache.flink.util.Preconditions;
 
+import 

[flink] branch release-1.7 updated: [FLINK-10809][state] Include keyed state that is not from head operators in state assignment

2018-11-12 Thread srichter
This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch release-1.7
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.7 by this push:
 new 45ad36f  [FLINK-10809][state] Include keyed state that is not from 
head operators in state assignment
45ad36f is described below

commit 45ad36fd752f53b1fa17d7226d3c93614fd24b3f
Author: Stefan Richter 
AuthorDate: Wed Nov 7 14:05:07 2018 +0100

[FLINK-10809][state] Include keyed state that is not from head operators in 
state assignment

This closes #7048.

Signed-off-by: Stefan Richter 
---
 .../checkpoint/StateAssignmentOperation.java   | 29 +++
 .../ReinterpretDataStreamAsKeyedStreamITCase.java  | 99 --
 2 files changed, 102 insertions(+), 26 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
index b017388..02fc201 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
@@ -71,13 +71,12 @@ public class StateAssignmentOperation {
this.allowNonRestoredState = allowNonRestoredState;
}
 
-   public boolean assignStates() throws Exception {
+   public void assignStates() {
Map localOperators = new 
HashMap<>(operatorStates);
-   Map localTasks = this.tasks;
 
checkStateMappingCompleteness(allowNonRestoredState, 
operatorStates, tasks);
 
-   for (Map.Entry task : 
localTasks.entrySet()) {
+   for (Map.Entry task : 
this.tasks.entrySet()) {
final ExecutionJobVertex executionJobVertex = 
task.getValue();
 
// find the states of all operators belonging to this 
task
@@ -108,7 +107,6 @@ public class StateAssignmentOperation {
assignAttemptState(task.getValue(), operatorStates);
}
 
-   return true;
}
 
private void assignAttemptState(ExecutionJobVertex executionJobVertex, 
List operatorStates) {
@@ -254,10 +252,6 @@ public class StateAssignmentOperation {
new 
StateObjectCollection<>(subRawKeyedState.getOrDefault(instanceID, 
Collections.emptyList(;
}
 
-   private static boolean isHeadOperator(int opIdx, List 
operatorIDs) {
-   return opIdx == operatorIDs.size() - 1;
-   }
-
public void checkParallelismPreconditions(List 
operatorStates, ExecutionJobVertex executionJobVertex) {
for (OperatorState operatorState : operatorStates) {
checkParallelismPreconditions(operatorState, 
executionJobVertex);
@@ -278,19 +272,16 @@ public class StateAssignmentOperation {
for (int operatorIndex = 0; operatorIndex < 
newOperatorIDs.size(); operatorIndex++) {
OperatorState operatorState = 
oldOperatorStates.get(operatorIndex);
int oldParallelism = operatorState.getParallelism();
-
for (int subTaskIndex = 0; subTaskIndex < 
newParallelism; subTaskIndex++) {
OperatorInstanceID instanceID = 
OperatorInstanceID.of(subTaskIndex, newOperatorIDs.get(operatorIndex));
-   if (isHeadOperator(operatorIndex, 
newOperatorIDs)) {
-   Tuple2, 
List> subKeyedStates = reAssignSubKeyedStates(
-   operatorState,
-   newKeyGroupPartitions,
-   subTaskIndex,
-   newParallelism,
-   oldParallelism);
-   newManagedKeyedState.put(instanceID, 
subKeyedStates.f0);
-   newRawKeyedState.put(instanceID, 
subKeyedStates.f1);
-   }
+   Tuple2, 
List> subKeyedStates = reAssignSubKeyedStates(
+   operatorState,
+   newKeyGroupPartitions,
+   subTaskIndex,
+   newParallelism,
+   oldParallelism);
+   newManagedKeyedState.put(instanceID, 
subKeyedStates.f0);
+   newRawKeyedState.put(instanceID, 
subKeyedStates.f1);
}
}
}
diff --git 

[flink] branch master updated: [FLINK-10809][state] Include keyed state that is not from head operators in state assignment

2018-11-12 Thread srichter
This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new bf760f9  [FLINK-10809][state] Include keyed state that is not from 
head operators in state assignment
bf760f9 is described below

commit bf760f9312c547493620955dc57f57fd5da12596
Author: Stefan Richter 
AuthorDate: Wed Nov 7 14:05:07 2018 +0100

[FLINK-10809][state] Include keyed state that is not from head operators in 
state assignment

This closes #7048.

Signed-off-by: Stefan Richter 
---
 .../checkpoint/StateAssignmentOperation.java   | 29 +++
 .../ReinterpretDataStreamAsKeyedStreamITCase.java  | 99 --
 2 files changed, 102 insertions(+), 26 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
index b017388..02fc201 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
@@ -71,13 +71,12 @@ public class StateAssignmentOperation {
this.allowNonRestoredState = allowNonRestoredState;
}
 
-   public boolean assignStates() throws Exception {
+   public void assignStates() {
Map localOperators = new 
HashMap<>(operatorStates);
-   Map localTasks = this.tasks;
 
checkStateMappingCompleteness(allowNonRestoredState, 
operatorStates, tasks);
 
-   for (Map.Entry task : 
localTasks.entrySet()) {
+   for (Map.Entry task : 
this.tasks.entrySet()) {
final ExecutionJobVertex executionJobVertex = 
task.getValue();
 
// find the states of all operators belonging to this 
task
@@ -108,7 +107,6 @@ public class StateAssignmentOperation {
assignAttemptState(task.getValue(), operatorStates);
}
 
-   return true;
}
 
private void assignAttemptState(ExecutionJobVertex executionJobVertex, 
List operatorStates) {
@@ -254,10 +252,6 @@ public class StateAssignmentOperation {
new 
StateObjectCollection<>(subRawKeyedState.getOrDefault(instanceID, 
Collections.emptyList(;
}
 
-   private static boolean isHeadOperator(int opIdx, List 
operatorIDs) {
-   return opIdx == operatorIDs.size() - 1;
-   }
-
public void checkParallelismPreconditions(List 
operatorStates, ExecutionJobVertex executionJobVertex) {
for (OperatorState operatorState : operatorStates) {
checkParallelismPreconditions(operatorState, 
executionJobVertex);
@@ -278,19 +272,16 @@ public class StateAssignmentOperation {
for (int operatorIndex = 0; operatorIndex < 
newOperatorIDs.size(); operatorIndex++) {
OperatorState operatorState = 
oldOperatorStates.get(operatorIndex);
int oldParallelism = operatorState.getParallelism();
-
for (int subTaskIndex = 0; subTaskIndex < 
newParallelism; subTaskIndex++) {
OperatorInstanceID instanceID = 
OperatorInstanceID.of(subTaskIndex, newOperatorIDs.get(operatorIndex));
-   if (isHeadOperator(operatorIndex, 
newOperatorIDs)) {
-   Tuple2, 
List> subKeyedStates = reAssignSubKeyedStates(
-   operatorState,
-   newKeyGroupPartitions,
-   subTaskIndex,
-   newParallelism,
-   oldParallelism);
-   newManagedKeyedState.put(instanceID, 
subKeyedStates.f0);
-   newRawKeyedState.put(instanceID, 
subKeyedStates.f1);
-   }
+   Tuple2, 
List> subKeyedStates = reAssignSubKeyedStates(
+   operatorState,
+   newKeyGroupPartitions,
+   subTaskIndex,
+   newParallelism,
+   oldParallelism);
+   newManagedKeyedState.put(instanceID, 
subKeyedStates.f0);
+   newRawKeyedState.put(instanceID, 
subKeyedStates.f1);
}
}
}
diff --git 

[flink] branch release-1.5 updated: [FLINK-10753] Improve propagation and logging of snapshot exceptions

2018-11-12 Thread srichter
This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.5 by this push:
 new 1725591  [FLINK-10753] Improve propagation and logging of snapshot 
exceptions
1725591 is described below

commit 1725591ccf6dea55b7ab85da0753b0a635434eee
Author: Stefan Richter 
AuthorDate: Thu Nov 8 15:51:16 2018 +0100

[FLINK-10753] Improve propagation and logging of snapshot exceptions

This closes #7064.
---
 .../runtime/checkpoint/CheckpointCoordinator.java   |  9 ++---
 .../flink/runtime/checkpoint/PendingCheckpoint.java | 21 ++---
 .../taskexecutor/rpc/RpcCheckpointResponder.java|  6 ++
 .../api/operators/AbstractStreamOperator.java   |  6 --
 .../restore/AbstractOperatorRestoreTestBase.java|  1 +
 5 files changed, 27 insertions(+), 16 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 1ee2ece..51adeae 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -1247,11 +1247,14 @@ public class CheckpointCoordinator {
 
final long checkpointId = pendingCheckpoint.getCheckpointId();
 
-   final String reason = (cause != null) ? cause.getMessage() : "";
+   LOG.info("Discarding checkpoint {} of job {}.", checkpointId, 
job, cause);
 
-   LOG.info("Discarding checkpoint {} of job {} because: {}", 
checkpointId, job, reason);
+   if (cause != null) {
+   pendingCheckpoint.abortError(cause);
+   } else {
+   pendingCheckpoint.abortDeclined();
+   }
 
-   pendingCheckpoint.abortDeclined();
rememberRecentCheckpointId(checkpointId);
 
// we don't have to schedule another "dissolving" checkpoint 
any more because the
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index 1b51ac4..1bc6b0e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -34,6 +34,7 @@ import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.io.IOException;
@@ -433,25 +434,23 @@ public class PendingCheckpoint {
}
}
 
+
public void abortDeclined() {
-   try {
-   Exception cause = new Exception("Checkpoint was 
declined (tasks not ready)");
-   onCompletionPromise.completeExceptionally(cause);
-   reportFailedCheckpoint(cause);
-   } finally {
-   dispose(true);
-   }
+   abortWithCause(new Exception("Checkpoint was declined (tasks 
not ready)"));
}
 
/**
 * Aborts the pending checkpoint due to an error.
 * @param cause The error's exception.
 */
-   public void abortError(Throwable cause) {
+   public void abortError(@Nonnull Throwable cause) {
+   abortWithCause(new Exception("Checkpoint failed: " + 
cause.getMessage(), cause));
+   }
+
+   private void abortWithCause(@Nonnull Exception cause) {
try {
-   Exception failure = new Exception("Checkpoint failed: " 
+ cause.getMessage(), cause);
-   onCompletionPromise.completeExceptionally(failure);
-   reportFailedCheckpoint(failure);
+   onCompletionPromise.completeExceptionally(cause);
+   reportFailedCheckpoint(cause);
} finally {
dispose(true);
}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java
index aba8bda..918fa50 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java
@@ -26,8 +26,13 @@ import 
org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.taskmanager.CheckpointResponder;
 import org.apache.flink.util.Preconditions;
 
+import 

[flink] 01/02: [FLINK-10839][serializer] Fix implementation of PojoSerializer.duplicate() w.r.t. subclass serializer

2018-11-12 Thread srichter
This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c4d366c9abeeb18caf51c3a5a3272c3c9987738f
Author: Stefan Richter 
AuthorDate: Fri Nov 9 12:56:02 2018 +0100

[FLINK-10839][serializer] Fix implementation of PojoSerializer.duplicate() 
w.r.t. subclass serializer
---
 .../api/java/typeutils/runtime/PojoSerializer.java | 38 --
 1 file changed, 20 insertions(+), 18 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
index a4d3839..f1dd8fc 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
@@ -18,29 +18,17 @@
 
 package org.apache.flink.api.java.typeutils.runtime;
 
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.lang.reflect.Field;
-import java.lang.reflect.Modifier;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.Map;
-import java.util.Objects;
-
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.CompatibilityUtil;
 import 
org.apache.flink.api.common.typeutils.GenericTypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
-import 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil;
 import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil;
 import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
@@ -52,6 +40,18 @@ import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.util.Preconditions;
 
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Objects;
+
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 @Internal
@@ -181,11 +181,13 @@ public final class PojoSerializer extends 
TypeSerializer {
}
}
 
-   if (stateful) {
-   return new PojoSerializer(clazz, 
duplicateFieldSerializers, fields, executionConfig);
-   } else {
-   return this;
+   if (!stateful) {
+   // as a small memory optimization, we can share the 
same object between instances
+   duplicateFieldSerializers = fieldSerializers;
}
+
+   // we must create a new instance, otherwise the 
subclassSerializerCache can create concurrency problems
+   return new PojoSerializer<>(clazz, duplicateFieldSerializers, 
fields, executionConfig);
}
 




[flink] branch master updated (1ad17e0 -> 2db59cf)

2018-11-12 Thread srichter
This is an automated email from the ASF dual-hosted git repository.

srichter pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 1ad17e0  [FLINK-10821] E2E now uses externalized checkpoint
 new c4d366c  [FLINK-10839][serializer] Fix implementation of 
PojoSerializer.duplicate() w.r.t. subclass serializer
 new 2db59cf  [FLINK-10827][tests] Add test for duplicate() to 
SerializerTestBase

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../api/java/typeutils/runtime/PojoSerializer.java |  38 +++---
 .../api/common/typeutils/SerializerTestBase.java   | 128 ++---
 2 files changed, 132 insertions(+), 34 deletions(-)



[flink] 02/02: [FLINK-10827][tests] Add test for duplicate() to SerializerTestBase

2018-11-12 Thread srichter
This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2db59cf365a84bdbcd69f6f5e57bc6e639da4930
Author: Stefan Richter 
AuthorDate: Thu Nov 8 14:44:08 2018 +0100

[FLINK-10827][tests] Add test for duplicate() to SerializerTestBase

This closes #7061.
---
 .../api/common/typeutils/SerializerTestBase.java   | 128 ++---
 1 file changed, 112 insertions(+), 16 deletions(-)

diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
index a4908f9..151dafb 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
@@ -18,32 +18,39 @@
 
 package org.apache.flink.api.common.typeutils;
 
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.Arrays;
-
 import org.apache.flink.api.java.typeutils.runtime.NullableSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
-import org.junit.Assert;
 
 import org.apache.commons.lang3.SerializationException;
 import org.apache.commons.lang3.SerializationUtils;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
+import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.CyclicBarrier;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 /**
  * Abstract test base for serializers.
  *
@@ -436,6 +443,33 @@ public abstract class SerializerTestBase extends 
TestLogger {
}
}
 
+   @Test
+   public void testDuplicate() throws Exception {
+   final int numThreads = 10;
+   final TypeSerializer serializer = getSerializer();
+   final CyclicBarrier startLatch = new CyclicBarrier(numThreads);
+   final List> concurrentRunners = new 
ArrayList<>(numThreads);
+   Assert.assertEquals(serializer, serializer.duplicate());
+
+   T[] testData = getData();
+
+   for (int i = 0; i < numThreads; ++i) {
+   SerializerRunner runner = new SerializerRunner<>(
+   startLatch,
+   serializer.duplicate(),
+   testData,
+   120L);
+
+   runner.start();
+   concurrentRunners.add(runner);
+   }
+
+   for (SerializerRunner concurrentRunner : concurrentRunners) {
+   concurrentRunner.join();
+   concurrentRunner.checkResult();
+   }
+   }
+
// 

 
protected void deepEquals(String message, T should, T is) {
@@ -526,6 +560,68 @@ public abstract class SerializerTestBase extends 
TestLogger {
}
}
 
+   /**
+* Runner to test serializer duplication via concurrency.
+* @param  type of the test elements.
+*/
+   static class SerializerRunner extends Thread {
+   final CyclicBarrier allReadyBarrier;
+   final TypeSerializer serializer;
+   final T[] testData;
+   final long durationLimitMillis;
+   Exception failure;
+
+   SerializerRunner(
+   CyclicBarrier allReadyBarrier,
+   TypeSerializer serializer,
+   T[] testData,
+

[flink] 01/02: [FLINK-10839][serializer] Fix implementation of PojoSerializer.duplicate() w.r.t. subclass serializer

2018-11-12 Thread srichter
This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8196a69f95af08ec829c77e190161d99f186aa57
Author: Stefan Richter 
AuthorDate: Fri Nov 9 12:56:02 2018 +0100

[FLINK-10839][serializer] Fix implementation of PojoSerializer.duplicate() 
w.r.t. subclass serializer
---
 .../api/java/typeutils/runtime/PojoSerializer.java | 36 --
 1 file changed, 20 insertions(+), 16 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
index a4d086d..f38a620 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
@@ -18,18 +18,6 @@
 
 package org.apache.flink.api.java.typeutils.runtime;
 
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.lang.reflect.Field;
-import java.lang.reflect.Modifier;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.Map;
-import java.util.Objects;
-
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.ExecutionConfig;
@@ -39,6 +27,8 @@ import 
org.apache.flink.api.common.typeutils.GenericTypeSerializerConfigSnapshot
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil;
 import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
@@ -50,6 +40,18 @@ import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.util.Preconditions;
 
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Objects;
+
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 @Internal
@@ -179,11 +181,13 @@ public final class PojoSerializer extends 
TypeSerializer {
}
}
 
-   if (stateful) {
-   return new PojoSerializer(clazz, 
duplicateFieldSerializers, fields, executionConfig);
-   } else {
-   return this;
+   if (!stateful) {
+   // as a small memory optimization, we can share the 
same object between instances
+   duplicateFieldSerializers = fieldSerializers;
}
+
+   // we must create a new instance, otherwise the 
subclassSerializerCache can create concurrency problems
+   return new PojoSerializer<>(clazz, duplicateFieldSerializers, 
fields, executionConfig);
}
 




[flink] branch release-1.6 updated: [FLINK-10753] Improve propagation and logging of snapshot exceptions

2018-11-12 Thread srichter
This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.6 by this push:
 new 6814e5f  [FLINK-10753] Improve propagation and logging of snapshot 
exceptions
6814e5f is described below

commit 6814e5fc64a289a10dbd509dd6055448fc6be71d
Author: Stefan Richter 
AuthorDate: Thu Nov 8 15:51:16 2018 +0100

[FLINK-10753] Improve propagation and logging of snapshot exceptions

This closes #7064.

Signed-off-by: Stefan Richter 
---
 .../runtime/checkpoint/CheckpointCoordinator.java   |  9 ++---
 .../flink/runtime/checkpoint/PendingCheckpoint.java | 21 ++---
 .../taskexecutor/rpc/RpcCheckpointResponder.java|  6 ++
 .../api/operators/AbstractStreamOperator.java   |  6 --
 .../restore/AbstractOperatorRestoreTestBase.java|  1 +
 5 files changed, 27 insertions(+), 16 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 57337b6..02b6fa4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -1249,11 +1249,14 @@ public class CheckpointCoordinator {
 
final long checkpointId = pendingCheckpoint.getCheckpointId();
 
-   final String reason = (cause != null) ? cause.getMessage() : "";
+   LOG.info("Discarding checkpoint {} of job {}.", checkpointId, 
job, cause);
 
-   LOG.info("Discarding checkpoint {} of job {} because: {}", 
checkpointId, job, reason);
+   if (cause != null) {
+   pendingCheckpoint.abortError(cause);
+   } else {
+   pendingCheckpoint.abortDeclined();
+   }
 
-   pendingCheckpoint.abortDeclined();
rememberRecentCheckpointId(checkpointId);
 
// we don't have to schedule another "dissolving" checkpoint 
any more because the
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index 1b51ac4..1bc6b0e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -34,6 +34,7 @@ import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.io.IOException;
@@ -433,25 +434,23 @@ public class PendingCheckpoint {
}
}
 
+
public void abortDeclined() {
-   try {
-   Exception cause = new Exception("Checkpoint was 
declined (tasks not ready)");
-   onCompletionPromise.completeExceptionally(cause);
-   reportFailedCheckpoint(cause);
-   } finally {
-   dispose(true);
-   }
+   abortWithCause(new Exception("Checkpoint was declined (tasks 
not ready)"));
}
 
/**
 * Aborts the pending checkpoint due to an error.
 * @param cause The error's exception.
 */
-   public void abortError(Throwable cause) {
+   public void abortError(@Nonnull Throwable cause) {
+   abortWithCause(new Exception("Checkpoint failed: " + 
cause.getMessage(), cause));
+   }
+
+   private void abortWithCause(@Nonnull Exception cause) {
try {
-   Exception failure = new Exception("Checkpoint failed: " 
+ cause.getMessage(), cause);
-   onCompletionPromise.completeExceptionally(failure);
-   reportFailedCheckpoint(failure);
+   onCompletionPromise.completeExceptionally(cause);
+   reportFailedCheckpoint(cause);
} finally {
dispose(true);
}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java
index aba8bda..918fa50 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java
@@ -26,8 +26,13 @@ import 
org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.taskmanager.CheckpointResponder;
 import 

[flink] 02/02: [FLINK-10827][tests] Add test for duplicate() to SerializerTestBase

2018-11-12 Thread srichter
This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch release-1.7
in repository https://gitbox.apache.org/repos/asf/flink.git

commit bd75a06c7aecd3b3f2d6fcf64bd4dde4624828a6
Author: Stefan Richter 
AuthorDate: Thu Nov 8 14:44:08 2018 +0100

[FLINK-10827][tests] Add test for duplicate() to SerializerTestBase

This closes #7061.
---
 .../api/common/typeutils/SerializerTestBase.java   | 128 ++---
 1 file changed, 112 insertions(+), 16 deletions(-)

diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
index a4908f9..151dafb 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
@@ -18,32 +18,39 @@
 
 package org.apache.flink.api.common.typeutils;
 
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.Arrays;
-
 import org.apache.flink.api.java.typeutils.runtime.NullableSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
-import org.junit.Assert;
 
 import org.apache.commons.lang3.SerializationException;
 import org.apache.commons.lang3.SerializationUtils;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
+import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.CyclicBarrier;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 /**
  * Abstract test base for serializers.
  *
@@ -436,6 +443,33 @@ public abstract class SerializerTestBase extends 
TestLogger {
}
}
 
+   @Test
+   public void testDuplicate() throws Exception {
+   final int numThreads = 10;
+   final TypeSerializer serializer = getSerializer();
+   final CyclicBarrier startLatch = new CyclicBarrier(numThreads);
+   final List> concurrentRunners = new 
ArrayList<>(numThreads);
+   Assert.assertEquals(serializer, serializer.duplicate());
+
+   T[] testData = getData();
+
+   for (int i = 0; i < numThreads; ++i) {
+   SerializerRunner runner = new SerializerRunner<>(
+   startLatch,
+   serializer.duplicate(),
+   testData,
+   120L);
+
+   runner.start();
+   concurrentRunners.add(runner);
+   }
+
+   for (SerializerRunner concurrentRunner : concurrentRunners) {
+   concurrentRunner.join();
+   concurrentRunner.checkResult();
+   }
+   }
+
// 

 
protected void deepEquals(String message, T should, T is) {
@@ -526,6 +560,68 @@ public abstract class SerializerTestBase extends 
TestLogger {
}
}
 
+   /**
+* Runner to test serializer duplication via concurrency.
+* @param  type of the test elements.
+*/
+   static class SerializerRunner extends Thread {
+   final CyclicBarrier allReadyBarrier;
+   final TypeSerializer serializer;
+   final T[] testData;
+   final long durationLimitMillis;
+   Exception failure;
+
+   SerializerRunner(
+   CyclicBarrier allReadyBarrier,
+   TypeSerializer serializer,
+   T[] testData,
+   

[flink] branch release-1.7 updated (7dc4c70 -> bd75a06)

2018-11-12 Thread srichter
This is an automated email from the ASF dual-hosted git repository.

srichter pushed a change to branch release-1.7
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 7dc4c70  [FLINK-10821] E2E now uses externalized checkpoint
 new 19c34ba  [FLINK-10839][serializer] Fix implementation of 
PojoSerializer.duplicate() w.r.t. subclass serializer
 new bd75a06  [FLINK-10827][tests] Add test for duplicate() to 
SerializerTestBase

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../api/java/typeutils/runtime/PojoSerializer.java |  38 +++---
 .../api/common/typeutils/SerializerTestBase.java   | 128 ++---
 2 files changed, 132 insertions(+), 34 deletions(-)



[flink] 01/02: [FLINK-10839][serializer] Fix implementation of PojoSerializer.duplicate() w.r.t. subclass serializer

2018-11-12 Thread srichter
This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch release-1.7
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 19c34baa6fd9554048108b58baa517addf483c2c
Author: Stefan Richter 
AuthorDate: Fri Nov 9 12:56:02 2018 +0100

[FLINK-10839][serializer] Fix implementation of PojoSerializer.duplicate() 
w.r.t. subclass serializer
---
 .../api/java/typeutils/runtime/PojoSerializer.java | 38 --
 1 file changed, 20 insertions(+), 18 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
index a4d3839..f1dd8fc 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
@@ -18,29 +18,17 @@
 
 package org.apache.flink.api.java.typeutils.runtime;
 
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.lang.reflect.Field;
-import java.lang.reflect.Modifier;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.Map;
-import java.util.Objects;
-
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.CompatibilityUtil;
 import 
org.apache.flink.api.common.typeutils.GenericTypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
-import 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil;
 import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil;
 import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
@@ -52,6 +40,18 @@ import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.util.Preconditions;
 
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Objects;
+
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 @Internal
@@ -181,11 +181,13 @@ public final class PojoSerializer extends 
TypeSerializer {
}
}
 
-   if (stateful) {
-   return new PojoSerializer(clazz, 
duplicateFieldSerializers, fields, executionConfig);
-   } else {
-   return this;
+   if (!stateful) {
+   // as a small memory optimization, we can share the 
same object between instances
+   duplicateFieldSerializers = fieldSerializers;
}
+
+   // we must create a new instance, otherwise the 
subclassSerializerCache can create concurrency problems
+   return new PojoSerializer<>(clazz, duplicateFieldSerializers, 
fields, executionConfig);
}
 




[flink] branch release-1.6 updated (4b0a941 -> fffe6e1)

2018-11-12 Thread srichter
This is an automated email from the ASF dual-hosted git repository.

srichter pushed a change to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 4b0a941  [FLINK-10821] E2E now uses externalized checkpoint
 new 8196a69  [FLINK-10839][serializer] Fix implementation of 
PojoSerializer.duplicate() w.r.t. subclass serializer
 new fffe6e1  [FLINK-10827][tests] Add test for duplicate() to 
SerializerTestBase

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../api/java/typeutils/runtime/PojoSerializer.java |  34 +++---
 .../api/common/typeutils/SerializerTestBase.java   | 130 ++---
 2 files changed, 131 insertions(+), 33 deletions(-)



[flink] 02/02: [FLINK-10827][tests] Add test for duplicate() to SerializerTestBase

2018-11-12 Thread srichter
This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fffe6e18bec2361001c8b852aec465d9fe4fc718
Author: Stefan Richter 
AuthorDate: Thu Nov 8 14:44:08 2018 +0100

[FLINK-10827][tests] Add test for duplicate() to SerializerTestBase

This closes #7061.
---
 .../api/java/typeutils/runtime/PojoSerializer.java |   2 -
 .../api/common/typeutils/SerializerTestBase.java   | 130 ++---
 2 files changed, 113 insertions(+), 19 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
index f38a620..021bc24 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
@@ -27,8 +27,6 @@ import 
org.apache.flink.api.common.typeutils.GenericTypeSerializerConfigSnapshot
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil;
-import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
-import 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil;
 import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
index 1997866..61aee94 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
@@ -18,33 +18,40 @@
 
 package org.apache.flink.api.common.typeutils;
 
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.Arrays;
-
 import org.apache.flink.api.java.typeutils.runtime.NullableSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
-import org.junit.Assert;
 
 import org.apache.commons.lang3.SerializationException;
 import org.apache.commons.lang3.SerializationUtils;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
+import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.CyclicBarrier;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 /**
  * Abstract test base for serializers.
  *
@@ -437,6 +444,33 @@ public abstract class SerializerTestBase extends 
TestLogger {
}
}
 
+   @Test
+   public void testDuplicate() throws Exception {
+   final int numThreads = 10;
+   final TypeSerializer serializer = getSerializer();
+   final CyclicBarrier startLatch = new CyclicBarrier(numThreads);
+   final List> concurrentRunners = new 
ArrayList<>(numThreads);
+   Assert.assertEquals(serializer, serializer.duplicate());
+
+   T[] testData = getData();
+
+   for (int i = 0; i < numThreads; ++i) {
+   SerializerRunner runner = new SerializerRunner<>(
+   startLatch,
+   serializer.duplicate(),
+   testData,
+ 

[flink] 02/02: [FLINK-10827][tests] Add test for duplicate() to SerializerTestBase

2018-11-12 Thread srichter
This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4f1a3764e44410231ad013e3bf069118494e6eab
Author: Stefan Richter 
AuthorDate: Thu Nov 8 14:44:08 2018 +0100

[FLINK-10827][tests] Add test for duplicate() to SerializerTestBase

This closes #7061.
---
 .../api/common/typeutils/SerializerTestBase.java   | 254 ++---
 1 file changed, 175 insertions(+), 79 deletions(-)

diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
index 57015c7..49b80d3 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
@@ -18,32 +18,39 @@
 
 package org.apache.flink.api.common.typeutils;
 
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.Arrays;
-
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
-import org.junit.Assert;
 
 import org.apache.commons.lang3.SerializationException;
 import org.apache.commons.lang3.SerializationUtils;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
+import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.CyclicBarrier;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 /**
  * Abstract test base for serializers.
  *
@@ -53,23 +60,23 @@ import org.junit.Test;
  * internal state would be corrupt, which becomes evident when toString is 
called.
  */
 public abstract class SerializerTestBase extends TestLogger {
-   
+
protected abstract TypeSerializer createSerializer();
 
/**
 * Gets the expected length for the serializer's {@link 
TypeSerializer#getLength()} method.
-* 
+*
 * The expected length should be positive, for fix-length data 
types, or {@code -1} for
 * variable-length types.
 */
protected abstract int getLength();
-   
+
protected abstract Class getTypeClass();
-   
+
protected abstract T[] getTestData();
 
// 

-   
+
@Test
public void testInstantiate() {
try {
@@ -80,7 +87,7 @@ public abstract class SerializerTestBase extends 
TestLogger {
}
T instance = serializer.createInstance();
assertNotNull("The created instance must not be null.", 
instance);
-   
+
Class type = getTypeClass();
assertNotNull("The test is corrupt: type class is 
null.", type);
 
@@ -127,7 +134,7 @@ public abstract class SerializerTestBase extends 
TestLogger {
strategy = getSerializer().ensureCompatibility(new 
TestIncompatibleSerializerConfigSnapshot());
assertTrue(strategy.isRequiresMigration());
}
-   
+
@Test
public void testGetLength() {
final int len = getLength();
@@ -146,13 +153,13 @@ public abstract class SerializerTestBase extends 
TestLogger {
fail("Exception in test: " + e.getMessage());
}
}
-   
+
@Test
public void testCopy() {
try {
TypeSerializer serializer = getSerializer();
T[] 

[flink] branch release-1.5 updated (c400273 -> 4f1a376)

2018-11-12 Thread srichter
This is an automated email from the ASF dual-hosted git repository.

srichter pushed a change to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git.


from c400273  [FLINK-10821] E2E now uses externalized checkpoint
 new 7ba80b0  [FLINK-10839][serializer] Fix implementation of 
PojoSerializer.duplicate() w.r.t. subclass serializer
 new 4f1a376  [FLINK-10827][tests] Add test for duplicate() to 
SerializerTestBase

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../api/java/typeutils/runtime/PojoSerializer.java |  34 +--
 .../api/common/typeutils/SerializerTestBase.java   | 254 ++---
 2 files changed, 193 insertions(+), 95 deletions(-)