[GitHub] flink pull request #5737: [FLINK-8721][flip6] Handle archiving failures for ...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5737 ---
[GitHub] flink pull request #5737: [FLINK-8721][flip6] Handle archiving failures for ...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5737#discussion_r176676947 --- Diff: flink-core/src/main/java/org/apache/flink/util/OptionalFailure.java --- @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.util.Objects; +import java.util.function.Supplier; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Wrapper around an object representing either a success (with a given value) or a failure cause. + */ +public class OptionalFailure implements Serializable { + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(OptionalFailure.class); + + @Nullable + private T value; + + @Nullable + private Throwable failureCause; + + private OptionalFailure(@Nullable T value, @Nullable Throwable failureCause) { + this.value = value; + this.failureCause = failureCause; + } + + public static OptionalFailure of(T value) { + return new OptionalFailure<>(value, null); + } + + public static OptionalFailure ofFailure(Throwable failureCause) { + return new OptionalFailure<>(null, failureCause); + } + + /** +* @return wrapped {@link OptionalFailure} returned by {@code valueSupplier} or wrapped failure if +* {@code valueSupplier} has thrown a {@link RuntimeException}. +*/ + public static OptionalFailure createFrom(Supplier valueSupplier) { + try { + return OptionalFailure.of(valueSupplier.get()); + } + catch (RuntimeException ex) { + LOG.error("Failed to archive accumulators", ex); --- End diff -- Ops, it was pulled in here by an accident. ---
[GitHub] flink pull request #5737: [FLINK-8721][flip6] Handle archiving failures for ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5737#discussion_r176515090 --- Diff: flink-core/src/main/java/org/apache/flink/util/OptionalFailure.java --- @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.util.Objects; +import java.util.function.Supplier; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Wrapper around an object representing either a success (with a given value) or a failure cause. + */ +public class OptionalFailure implements Serializable { + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(OptionalFailure.class); + + @Nullable + private T value; + + @Nullable + private Throwable failureCause; + + private OptionalFailure(@Nullable T value, @Nullable Throwable failureCause) { + this.value = value; + this.failureCause = failureCause; + } + + public static OptionalFailure of(T value) { + return new OptionalFailure<>(value, null); + } + + public static OptionalFailure ofFailure(Throwable failureCause) { + return new OptionalFailure<>(null, failureCause); + } + + /** +* @return wrapped {@link OptionalFailure} returned by {@code valueSupplier} or wrapped failure if +* {@code valueSupplier} has thrown a {@link RuntimeException}. +*/ + public static OptionalFailure createFrom(Supplier valueSupplier) { + try { + return OptionalFailure.of(valueSupplier.get()); + } + catch (RuntimeException ex) { --- End diff -- Not sure whether we should capture the `RuntimeException` here. To me a `supplier` should not throw `RuntimeExceptions` and if so, then it should not produce a `OptionalFailure` but instead fail with a `RuntimeException`. ---
[GitHub] flink pull request #5737: [FLINK-8721][flip6] Handle archiving failures for ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5737#discussion_r176517944 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/client/SerializedJobExecutionResultTest.java --- @@ -21,83 +21,98 @@ import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.JobID; import org.apache.flink.core.testutils.CommonTestUtils; +import org.apache.flink.runtime.operators.testutils.ExpectedTestException; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.OptionalFailure; import org.apache.flink.util.SerializedValue; + import org.junit.Test; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; /** * Tests for the SerializedJobExecutionResult */ public class SerializedJobExecutionResultTest { --- End diff -- `extends TestLogger` missing ---
[GitHub] flink pull request #5737: [FLINK-8721][flip6] Handle archiving failures for ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5737#discussion_r176515453 --- Diff: flink-core/src/main/java/org/apache/flink/util/OptionalFailure.java --- @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.util.Objects; +import java.util.function.Supplier; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Wrapper around an object representing either a success (with a given value) or a failure cause. + */ +public class OptionalFailure implements Serializable { + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(OptionalFailure.class); + + @Nullable + private T value; + + @Nullable + private Throwable failureCause; + + private OptionalFailure(@Nullable T value, @Nullable Throwable failureCause) { + this.value = value; + this.failureCause = failureCause; + } + + public static OptionalFailure of(T value) { + return new OptionalFailure<>(value, null); + } + + public static OptionalFailure ofFailure(Throwable failureCause) { + return new OptionalFailure<>(null, failureCause); + } + + /** +* @return wrapped {@link OptionalFailure} returned by {@code valueSupplier} or wrapped failure if +* {@code valueSupplier} has thrown a {@link RuntimeException}. +*/ + public static OptionalFailure createFrom(Supplier valueSupplier) { + try { + return OptionalFailure.of(valueSupplier.get()); + } + catch (RuntimeException ex) { + LOG.error("Failed to archive accumulators", ex); + return OptionalFailure.ofFailure(ex); + } + } + + /** +* @return stored value or throw a {@link FlinkRuntimeException} with {@code failureCause}. +*/ + public T get() throws FlinkRuntimeException { --- End diff -- I think `get` should throw a checked exception and not an unchecked exception. Otherwise users won't be aware of it. We could provide a method `getUnchecked` where we throw an unchecked exception. ---
[GitHub] flink pull request #5737: [FLINK-8721][flip6] Handle archiving failures for ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5737#discussion_r176515734 --- Diff: flink-core/src/main/java/org/apache/flink/util/OptionalFailure.java --- @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.util.Objects; +import java.util.function.Supplier; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Wrapper around an object representing either a success (with a given value) or a failure cause. + */ +public class OptionalFailure implements Serializable { + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(OptionalFailure.class); + + @Nullable + private T value; + + @Nullable + private Throwable failureCause; + + private OptionalFailure(@Nullable T value, @Nullable Throwable failureCause) { + this.value = value; + this.failureCause = failureCause; + } + + public static OptionalFailure of(T value) { + return new OptionalFailure<>(value, null); + } + + public static OptionalFailure ofFailure(Throwable failureCause) { + return new OptionalFailure<>(null, failureCause); + } + + /** +* @return wrapped {@link OptionalFailure} returned by {@code valueSupplier} or wrapped failure if +* {@code valueSupplier} has thrown a {@link RuntimeException}. +*/ + public static OptionalFailure createFrom(Supplier valueSupplier) { + try { + return OptionalFailure.of(valueSupplier.get()); + } + catch (RuntimeException ex) { + LOG.error("Failed to archive accumulators", ex); + return OptionalFailure.ofFailure(ex); + } + } + + /** +* @return stored value or throw a {@link FlinkRuntimeException} with {@code failureCause}. +*/ + public T get() throws FlinkRuntimeException { + if (value != null) { + return value; + } + checkNotNull(failureCause); + throw new FlinkRuntimeException(failureCause); + } + + public Throwable getFailureCause() { + return checkNotNull(failureCause); + } + + public boolean isFailure() { + return failureCause != null; + } + + @Override + public int hashCode() { + return Objects.hash(value, failureCause); + } + + @Override + public boolean equals(Object object) { + if (object == null) { + return false; + } + if (object == this) { + return true; + } + if (!(object instanceof OptionalFailure)) { + return false; + } + OptionalFailure other = (OptionalFailure) object; --- End diff -- Let's cast to `OptionalFailure` ---
[GitHub] flink pull request #5737: [FLINK-8721][flip6] Handle archiving failures for ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5737#discussion_r176515638 --- Diff: flink-core/src/main/java/org/apache/flink/util/OptionalFailure.java --- @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.util.Objects; +import java.util.function.Supplier; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Wrapper around an object representing either a success (with a given value) or a failure cause. + */ +public class OptionalFailure implements Serializable { + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(OptionalFailure.class); + + @Nullable + private T value; + + @Nullable + private Throwable failureCause; + + private OptionalFailure(@Nullable T value, @Nullable Throwable failureCause) { + this.value = value; + this.failureCause = failureCause; + } + + public static OptionalFailure of(T value) { + return new OptionalFailure<>(value, null); + } + + public static OptionalFailure ofFailure(Throwable failureCause) { + return new OptionalFailure<>(null, failureCause); + } + + /** +* @return wrapped {@link OptionalFailure} returned by {@code valueSupplier} or wrapped failure if +* {@code valueSupplier} has thrown a {@link RuntimeException}. +*/ + public static OptionalFailure createFrom(Supplier valueSupplier) { + try { + return OptionalFailure.of(valueSupplier.get()); + } + catch (RuntimeException ex) { + LOG.error("Failed to archive accumulators", ex); + return OptionalFailure.ofFailure(ex); + } + } + + /** +* @return stored value or throw a {@link FlinkRuntimeException} with {@code failureCause}. +*/ + public T get() throws FlinkRuntimeException { + if (value != null) { + return value; + } + checkNotNull(failureCause); + throw new FlinkRuntimeException(failureCause); + } + + public Throwable getFailureCause() { + return checkNotNull(failureCause); + } + + public boolean isFailure() { + return failureCause != null; + } + + @Override + public int hashCode() { + return Objects.hash(value, failureCause); + } + + @Override + public boolean equals(Object object) { --- End diff -- why deviating from the super class' parameter name `obj`? ---
[GitHub] flink pull request #5737: [FLINK-8721][flip6] Handle archiving failures for ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5737#discussion_r176518316 --- Diff: flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java --- @@ -45,84 +47,87 @@ * b) are not compatible with existing accumulator. */ public class AccumulatorErrorITCase extends TestLogger { - - private static LocalFlinkMiniCluster cluster; - - private static ExecutionEnvironment env; - - @BeforeClass - public static void startCluster() { + private static final String FAULTY_CLONE_ACCUMULATOR = "faulty-clone"; + private static final String FAULTY_MERGE_ACCUMULATOR = "faulty-merge"; + private static final String INCOMPATIBLE_ACCUMULATORS_NAME = "incompatible-accumulators"; + + @ClassRule + public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource( + new MiniClusterResource.MiniClusterResourceConfiguration( + getConfiguration(), + 2, + 3)); + + public static Configuration getConfiguration() { Configuration config = new Configuration(); - config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2); - config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 3); config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 12L); - cluster = new LocalFlinkMiniCluster(config, false); - - cluster.start(); - - env = new TestEnvironment(cluster, 6, false); - } - - @AfterClass - public static void shutdownCluster() { - cluster.stop(); - cluster = null; + return config; } @Test public void testFaultyAccumulator() throws Exception { - + TestEnvironment env = MINI_CLUSTER_RESOURCE.getTestEnvironment(); --- End diff -- a no-op sink? ---
[GitHub] flink pull request #5737: [FLINK-8721][flip6] Handle archiving failures for ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5737#discussion_r176516910 --- Diff: flink-core/src/main/java/org/apache/flink/util/OptionalFailure.java --- @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.util.Objects; +import java.util.function.Supplier; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Wrapper around an object representing either a success (with a given value) or a failure cause. + */ +public class OptionalFailure implements Serializable { + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(OptionalFailure.class); + + @Nullable + private T value; + + @Nullable + private Throwable failureCause; + + private OptionalFailure(@Nullable T value, @Nullable Throwable failureCause) { + this.value = value; + this.failureCause = failureCause; + } + + public static OptionalFailure of(T value) { + return new OptionalFailure<>(value, null); + } + + public static OptionalFailure ofFailure(Throwable failureCause) { + return new OptionalFailure<>(null, failureCause); + } + + /** +* @return wrapped {@link OptionalFailure} returned by {@code valueSupplier} or wrapped failure if +* {@code valueSupplier} has thrown a {@link RuntimeException}. +*/ + public static OptionalFailure createFrom(Supplier valueSupplier) { + try { + return OptionalFailure.of(valueSupplier.get()); + } + catch (RuntimeException ex) { + LOG.error("Failed to archive accumulators", ex); --- End diff -- The message indicates that `OptionalFailure` was implemented for the accumulators in mind, but I think it should be more generic. I guess that `AccumulatorHelper#67` is also the reason why we catch the `RuntimeException` to make the merge supplier as smooth as possible. ---
[GitHub] flink pull request #5737: [FLINK-8721][flip6] Handle archiving failures for ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5737#discussion_r176517725 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsInfo.java --- @@ -50,13 +51,13 @@ @JsonProperty(FIELD_NAME_SERIALIZED_USER_TASK_ACCUMULATORS) @JsonSerialize(contentUsing = SerializedValueSerializer.class) - private MapserializedUserAccumulators; + private Map > serializedUserAccumulators; @JsonCreator public JobAccumulatorsInfo( - @JsonProperty(FIELD_NAME_JOB_ACCUMULATORS) List jobAccumulators, - @JsonProperty(FIELD_NAME_USER_TASK_ACCUMULATORS) List userAccumulators, - @JsonDeserialize(contentUsing = SerializedValueDeserializer.class) @JsonProperty(FIELD_NAME_SERIALIZED_USER_TASK_ACCUMULATORS) Map serializedUserAccumulators) { + @JsonProperty(FIELD_NAME_JOB_ACCUMULATORS) List jobAccumulators, + @JsonProperty(FIELD_NAME_USER_TASK_ACCUMULATORS) List userAccumulators, + @JsonDeserialize(contentUsing = SerializedValueDeserializer.class) @JsonProperty(FIELD_NAME_SERIALIZED_USER_TASK_ACCUMULATORS) Map > serializedUserAccumulators) { --- End diff -- indentation is wrong ---
[GitHub] flink pull request #5737: [FLINK-8721][flip6] Handle archiving failures for ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5737#discussion_r176517187 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/client/SerializedJobExecutionResult.java --- @@ -48,8 +49,9 @@ * @param netRuntime The net runtime of the job (excluding pre-flight phase like the optimizer) in milliseconds * @param accumulators A map of all accumulator results produced by the job, in serialized form */ - public SerializedJobExecutionResult(JobID jobID, long netRuntime, - Mapaccumulators) { + public SerializedJobExecutionResult(JobID jobID, + long netRuntime, + Map > accumulators) { --- End diff -- Something is with the indentation off here. ---
[GitHub] flink pull request #5737: [FLINK-8721][flip6] Handle archiving failures for ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5737#discussion_r176514312 --- Diff: flink-core/src/main/java/org/apache/flink/util/OptionalFailure.java --- @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.util.Objects; +import java.util.function.Supplier; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Wrapper around an object representing either a success (with a given value) or a failure cause. + */ +public class OptionalFailure implements Serializable { + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(OptionalFailure.class); + + @Nullable + private T value; --- End diff -- This type is not serializable. I think you should mark it `transient` and then override `readObject` and `writeObject` similar to how `ArrayList` does it. ---
[GitHub] flink pull request #5737: [FLINK-8721][flip6] Handle archiving failures for ...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5737#discussion_r176454726 --- Diff: flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java --- @@ -45,84 +47,87 @@ * b) are not compatible with existing accumulator. */ public class AccumulatorErrorITCase extends TestLogger { - - private static LocalFlinkMiniCluster cluster; - - private static ExecutionEnvironment env; - - @BeforeClass - public static void startCluster() { + private static final String FAULTY_CLONE_ACCUMULATOR = "faulty-clone"; + private static final String FAULTY_MERGE_ACCUMULATOR = "faulty-merge"; + private static final String INCOMPATIBLE_ACCUMULATORS_NAME = "incompatible-accumulators"; + + @ClassRule + public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource( + new MiniClusterResource.MiniClusterResourceConfiguration( + getConfiguration(), + 2, + 3)); + + public static Configuration getConfiguration() { Configuration config = new Configuration(); - config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2); - config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 3); config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 12L); - cluster = new LocalFlinkMiniCluster(config, false); - - cluster.start(); - - env = new TestEnvironment(cluster, 6, false); - } - - @AfterClass - public static void shutdownCluster() { - cluster.stop(); - cluster = null; + return config; } @Test public void testFaultyAccumulator() throws Exception { - + TestEnvironment env = MINI_CLUSTER_RESOURCE.getTestEnvironment(); --- End diff -- Is there an equivalent to `.output(new DiscardingOutputFormat<>());` in the `StreamExecutionEnvironment`? ---
[GitHub] flink pull request #5737: [FLINK-8721][flip6] Handle archiving failures for ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5737#discussion_r176207009 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/StringifiedAccumulatorResult.java --- @@ -67,22 +72,29 @@ public String getValue() { int i = 0; for (Map.Entryentry : accs.entrySet()) { - StringifiedAccumulatorResult result; - Accumulator accumulator = entry.getValue(); - if (accumulator != null) { - Object localValue = accumulator.getLocalValue(); - if (localValue != null) { - result = new StringifiedAccumulatorResult(entry.getKey(), accumulator.getClass().getSimpleName(), localValue.toString()); - } else { - result = new StringifiedAccumulatorResult(entry.getKey(), accumulator.getClass().getSimpleName(), "null"); - } - } else { - result = new StringifiedAccumulatorResult(entry.getKey(), "null", "null"); - } - - results[i++] = result; + results[i++] = stringifyAccumulatorResult(entry.getKey(), entry.getValue()); } return results; } } + + private static StringifiedAccumulatorResult stringifyAccumulatorResult(String name, Accumulator accumulator) { --- End diff -- `@Nullable` missing for `accumulator` ---
[GitHub] flink pull request #5737: [FLINK-8721][flip6] Handle archiving failures for ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5737#discussion_r176208188 --- Diff: flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java --- @@ -45,84 +47,87 @@ * b) are not compatible with existing accumulator. */ public class AccumulatorErrorITCase extends TestLogger { - - private static LocalFlinkMiniCluster cluster; - - private static ExecutionEnvironment env; - - @BeforeClass - public static void startCluster() { + private static final String FAULTY_CLONE_ACCUMULATOR = "faulty-clone"; + private static final String FAULTY_MERGE_ACCUMULATOR = "faulty-merge"; + private static final String INCOMPATIBLE_ACCUMULATORS_NAME = "incompatible-accumulators"; + + @ClassRule + public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource( + new MiniClusterResource.MiniClusterResourceConfiguration( + getConfiguration(), + 2, + 3)); + + public static Configuration getConfiguration() { Configuration config = new Configuration(); - config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2); - config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 3); config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 12L); - cluster = new LocalFlinkMiniCluster(config, false); - - cluster.start(); - - env = new TestEnvironment(cluster, 6, false); - } - - @AfterClass - public static void shutdownCluster() { - cluster.stop(); - cluster = null; + return config; } @Test public void testFaultyAccumulator() throws Exception { - + TestEnvironment env = MINI_CLUSTER_RESOURCE.getTestEnvironment(); --- End diff -- You could also write `StreamExecutionEnvironment.getExecutionEnvironment()`. ---
[GitHub] flink pull request #5737: [FLINK-8721][flip6] Handle archiving failures for ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5737#discussion_r176207225 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/StringifiedAccumulatorResult.java --- @@ -67,22 +72,29 @@ public String getValue() { int i = 0; for (Map.Entryentry : accs.entrySet()) { - StringifiedAccumulatorResult result; - Accumulator accumulator = entry.getValue(); - if (accumulator != null) { - Object localValue = accumulator.getLocalValue(); - if (localValue != null) { - result = new StringifiedAccumulatorResult(entry.getKey(), accumulator.getClass().getSimpleName(), localValue.toString()); - } else { - result = new StringifiedAccumulatorResult(entry.getKey(), accumulator.getClass().getSimpleName(), "null"); - } - } else { - result = new StringifiedAccumulatorResult(entry.getKey(), "null", "null"); - } - - results[i++] = result; + results[i++] = stringifyAccumulatorResult(entry.getKey(), entry.getValue()); } return results; } } + + private static StringifiedAccumulatorResult stringifyAccumulatorResult(String name, Accumulator accumulator) { + if (accumulator == null) { + return new StringifiedAccumulatorResult(name, "null", "null"); + } else { + Object localValue; + try { + localValue = accumulator.getLocalValue(); + } + catch (RuntimeException exception) { + LOG.error("Failed to stringify accumulator", exception); --- End diff -- Maybe add `name` to log statement. ---
[GitHub] flink pull request #5737: [FLINK-8721][flip6] Handle archiving failures for ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5737#discussion_r176212439 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java --- @@ -763,48 +764,32 @@ public Executor getFutureExecutor() { return userAccumulators; } - /** -* Gets the accumulator results. -*/ - public MapgetAccumulators() { - - Map accumulatorMap = aggregateUserAccumulators(); - - Map result = new HashMap<>(); - for (Map.Entry entry : accumulatorMap.entrySet()) { - result.put(entry.getKey(), entry.getValue().getLocalValue()); - } - - return result; - } - /** * Gets a serialized accumulator map. * @return The accumulator map with serialized accumulator values. */ @Override public Map getAccumulatorsSerialized() { + return aggregateUserAccumulators() + .entrySet() + .stream() + .collect(Collectors.toMap(Map.Entry::getKey, entry -> serializeAccumulator(entry.getKey(), entry.getValue(; + } - Map accumulatorMap = aggregateUserAccumulators(); - - Map result = new HashMap<>(accumulatorMap.size()); - for (Map.Entry entry : accumulatorMap.entrySet()) { - + private static SerializedValue serializeAccumulator(String name, Accumulator accumulator) { + try { + if (accumulator instanceof FailedAccumulator) { + return new SerializedValue<>(accumulator); + } + return new SerializedValue<>(accumulator.getLocalValue()); + } catch (IOException ioe) { + LOG.error("Could not serialize accumulator " + name + '.', ioe); try { - final SerializedValue serializedValue = new SerializedValue<>(entry.getValue().getLocalValue()); - result.put(entry.getKey(), serializedValue); - } catch (IOException ioe) { - LOG.error("Could not serialize accumulator " + entry.getKey() + '.', ioe); - - try { - result.put(entry.getKey(), new SerializedValue<>(new FailedAccumulatorSerialization(ioe))); - } catch (IOException e) { - throw new RuntimeException("It should never happen that we cannot serialize the accumulator serialization exception.", e); - } + return new SerializedValue<>(new FailedAccumulator(ioe)); --- End diff -- Hmm, the problem I see here is that in the success case, we store the accumulator value and in the failure case we store an `Accumulator` instance. Thus, the user will expect the accumulator value and casting it accordingly. Thus he will never call the `Accumulator` methods which will throw the exceptions (see `JobExecutionResult` for how the user interacts with the accumulator values). In that sense the previous solution with storing a `FailedAccumulatorSerialization` was also flawed. What we actually would have to store in the `SerializedValue` is something like an `Either `. On the client side when accessing the `accumulatorsValueMap` it should check whether it is left or right and in the left case throw the exception. Alternatively, we say that an accumulator failure always results in a job failure. This means that in `JobMaster#jobStatusChanged` we generate a failed `ArchivedExecutionGraph` in case of an accumulator failure. ---
[GitHub] flink pull request #5737: [FLINK-8721][flip6] Handle archiving failures for ...
GitHub user pnowojski opened a pull request: https://github.com/apache/flink/pull/5737 [FLINK-8721][flip6] Handle archiving failures for accumulators During archivization, wrap errors thrown by users' Accumulators into a FailedAccumulator and do not fail the job because of that. ## Verifying this change This change is covered by existing AccumulatorErrorITCase ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/pnowojski/flink f8721 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5737.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5737 commit c61014ae458767926c4f5b13a9df7bc70135d13c Author: Piotr NowojskiDate: 2018-03-21T09:54:49Z [hotfix][runtime] Remove unused method commit 655644322f7c61ddd89bcc3e5aab636047b97d55 Author: Piotr Nowojski Date: 2018-03-21T12:08:36Z [FLINK-8721][flip6] Handle archiving failures for accumulators ---