[GitHub] flink pull request #5737: [FLINK-8721][flip6] Handle archiving failures for ...

2018-03-26 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5737


---


[GitHub] flink pull request #5737: [FLINK-8721][flip6] Handle archiving failures for ...

2018-03-23 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5737#discussion_r176676947
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/util/OptionalFailure.java ---
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.Objects;
+import java.util.function.Supplier;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Wrapper around an object representing either a success (with a given 
value) or a failure cause.
+ */
+public class OptionalFailure implements Serializable {
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(OptionalFailure.class);
+
+   @Nullable
+   private T value;
+
+   @Nullable
+   private Throwable failureCause;
+
+   private OptionalFailure(@Nullable T value, @Nullable Throwable 
failureCause) {
+   this.value = value;
+   this.failureCause = failureCause;
+   }
+
+   public static  OptionalFailure of(T value) {
+   return new OptionalFailure<>(value, null);
+   }
+
+   public static  OptionalFailure ofFailure(Throwable failureCause) {
+   return new OptionalFailure<>(null, failureCause);
+   }
+
+   /**
+* @return wrapped {@link OptionalFailure} returned by {@code 
valueSupplier} or wrapped failure if
+* {@code valueSupplier} has thrown a {@link RuntimeException}.
+*/
+   public static  OptionalFailure createFrom(Supplier 
valueSupplier) {
+   try {
+   return OptionalFailure.of(valueSupplier.get());
+   }
+   catch (RuntimeException ex) {
+   LOG.error("Failed to archive accumulators", ex);
--- End diff --

Ops, it was pulled in here by an accident.


---


[GitHub] flink pull request #5737: [FLINK-8721][flip6] Handle archiving failures for ...

2018-03-22 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5737#discussion_r176515090
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/util/OptionalFailure.java ---
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.Objects;
+import java.util.function.Supplier;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Wrapper around an object representing either a success (with a given 
value) or a failure cause.
+ */
+public class OptionalFailure implements Serializable {
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(OptionalFailure.class);
+
+   @Nullable
+   private T value;
+
+   @Nullable
+   private Throwable failureCause;
+
+   private OptionalFailure(@Nullable T value, @Nullable Throwable 
failureCause) {
+   this.value = value;
+   this.failureCause = failureCause;
+   }
+
+   public static  OptionalFailure of(T value) {
+   return new OptionalFailure<>(value, null);
+   }
+
+   public static  OptionalFailure ofFailure(Throwable failureCause) {
+   return new OptionalFailure<>(null, failureCause);
+   }
+
+   /**
+* @return wrapped {@link OptionalFailure} returned by {@code 
valueSupplier} or wrapped failure if
+* {@code valueSupplier} has thrown a {@link RuntimeException}.
+*/
+   public static  OptionalFailure createFrom(Supplier 
valueSupplier) {
+   try {
+   return OptionalFailure.of(valueSupplier.get());
+   }
+   catch (RuntimeException ex) {
--- End diff --

Not sure whether we should capture the `RuntimeException` here. To me a 
`supplier` should not throw `RuntimeExceptions` and if so, then it should not 
produce a `OptionalFailure` but instead fail with a `RuntimeException`.


---


[GitHub] flink pull request #5737: [FLINK-8721][flip6] Handle archiving failures for ...

2018-03-22 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5737#discussion_r176517944
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/client/SerializedJobExecutionResultTest.java
 ---
@@ -21,83 +21,98 @@
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.OptionalFailure;
 import org.apache.flink.util.SerializedValue;
+
 import org.junit.Test;
 
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 /**
  * Tests for the SerializedJobExecutionResult
  */
 public class SerializedJobExecutionResultTest {
--- End diff --

`extends TestLogger` missing


---


[GitHub] flink pull request #5737: [FLINK-8721][flip6] Handle archiving failures for ...

2018-03-22 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5737#discussion_r176515453
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/util/OptionalFailure.java ---
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.Objects;
+import java.util.function.Supplier;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Wrapper around an object representing either a success (with a given 
value) or a failure cause.
+ */
+public class OptionalFailure implements Serializable {
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(OptionalFailure.class);
+
+   @Nullable
+   private T value;
+
+   @Nullable
+   private Throwable failureCause;
+
+   private OptionalFailure(@Nullable T value, @Nullable Throwable 
failureCause) {
+   this.value = value;
+   this.failureCause = failureCause;
+   }
+
+   public static  OptionalFailure of(T value) {
+   return new OptionalFailure<>(value, null);
+   }
+
+   public static  OptionalFailure ofFailure(Throwable failureCause) {
+   return new OptionalFailure<>(null, failureCause);
+   }
+
+   /**
+* @return wrapped {@link OptionalFailure} returned by {@code 
valueSupplier} or wrapped failure if
+* {@code valueSupplier} has thrown a {@link RuntimeException}.
+*/
+   public static  OptionalFailure createFrom(Supplier 
valueSupplier) {
+   try {
+   return OptionalFailure.of(valueSupplier.get());
+   }
+   catch (RuntimeException ex) {
+   LOG.error("Failed to archive accumulators", ex);
+   return OptionalFailure.ofFailure(ex);
+   }
+   }
+
+   /**
+* @return stored value or throw a {@link FlinkRuntimeException} with 
{@code failureCause}.
+*/
+   public T get() throws FlinkRuntimeException {
--- End diff --

I think `get` should throw a checked exception and not an unchecked 
exception. Otherwise users won't be aware of it. We could provide a method 
`getUnchecked` where we throw an unchecked exception.


---


[GitHub] flink pull request #5737: [FLINK-8721][flip6] Handle archiving failures for ...

2018-03-22 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5737#discussion_r176515734
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/util/OptionalFailure.java ---
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.Objects;
+import java.util.function.Supplier;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Wrapper around an object representing either a success (with a given 
value) or a failure cause.
+ */
+public class OptionalFailure implements Serializable {
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(OptionalFailure.class);
+
+   @Nullable
+   private T value;
+
+   @Nullable
+   private Throwable failureCause;
+
+   private OptionalFailure(@Nullable T value, @Nullable Throwable 
failureCause) {
+   this.value = value;
+   this.failureCause = failureCause;
+   }
+
+   public static  OptionalFailure of(T value) {
+   return new OptionalFailure<>(value, null);
+   }
+
+   public static  OptionalFailure ofFailure(Throwable failureCause) {
+   return new OptionalFailure<>(null, failureCause);
+   }
+
+   /**
+* @return wrapped {@link OptionalFailure} returned by {@code 
valueSupplier} or wrapped failure if
+* {@code valueSupplier} has thrown a {@link RuntimeException}.
+*/
+   public static  OptionalFailure createFrom(Supplier 
valueSupplier) {
+   try {
+   return OptionalFailure.of(valueSupplier.get());
+   }
+   catch (RuntimeException ex) {
+   LOG.error("Failed to archive accumulators", ex);
+   return OptionalFailure.ofFailure(ex);
+   }
+   }
+
+   /**
+* @return stored value or throw a {@link FlinkRuntimeException} with 
{@code failureCause}.
+*/
+   public T get() throws FlinkRuntimeException {
+   if (value != null) {
+   return value;
+   }
+   checkNotNull(failureCause);
+   throw new FlinkRuntimeException(failureCause);
+   }
+
+   public Throwable getFailureCause() {
+   return checkNotNull(failureCause);
+   }
+
+   public boolean isFailure() {
+   return failureCause != null;
+   }
+
+   @Override
+   public int hashCode() {
+   return Objects.hash(value, failureCause);
+   }
+
+   @Override
+   public boolean equals(Object object) {
+   if (object == null) {
+   return false;
+   }
+   if (object == this) {
+   return true;
+   }
+   if (!(object instanceof OptionalFailure)) {
+   return false;
+   }
+   OptionalFailure other = (OptionalFailure) object;
--- End diff --

Let's cast to `OptionalFailure`


---


[GitHub] flink pull request #5737: [FLINK-8721][flip6] Handle archiving failures for ...

2018-03-22 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5737#discussion_r176515638
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/util/OptionalFailure.java ---
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.Objects;
+import java.util.function.Supplier;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Wrapper around an object representing either a success (with a given 
value) or a failure cause.
+ */
+public class OptionalFailure implements Serializable {
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(OptionalFailure.class);
+
+   @Nullable
+   private T value;
+
+   @Nullable
+   private Throwable failureCause;
+
+   private OptionalFailure(@Nullable T value, @Nullable Throwable 
failureCause) {
+   this.value = value;
+   this.failureCause = failureCause;
+   }
+
+   public static  OptionalFailure of(T value) {
+   return new OptionalFailure<>(value, null);
+   }
+
+   public static  OptionalFailure ofFailure(Throwable failureCause) {
+   return new OptionalFailure<>(null, failureCause);
+   }
+
+   /**
+* @return wrapped {@link OptionalFailure} returned by {@code 
valueSupplier} or wrapped failure if
+* {@code valueSupplier} has thrown a {@link RuntimeException}.
+*/
+   public static  OptionalFailure createFrom(Supplier 
valueSupplier) {
+   try {
+   return OptionalFailure.of(valueSupplier.get());
+   }
+   catch (RuntimeException ex) {
+   LOG.error("Failed to archive accumulators", ex);
+   return OptionalFailure.ofFailure(ex);
+   }
+   }
+
+   /**
+* @return stored value or throw a {@link FlinkRuntimeException} with 
{@code failureCause}.
+*/
+   public T get() throws FlinkRuntimeException {
+   if (value != null) {
+   return value;
+   }
+   checkNotNull(failureCause);
+   throw new FlinkRuntimeException(failureCause);
+   }
+
+   public Throwable getFailureCause() {
+   return checkNotNull(failureCause);
+   }
+
+   public boolean isFailure() {
+   return failureCause != null;
+   }
+
+   @Override
+   public int hashCode() {
+   return Objects.hash(value, failureCause);
+   }
+
+   @Override
+   public boolean equals(Object object) {
--- End diff --

why deviating from the super class' parameter name `obj`?


---


[GitHub] flink pull request #5737: [FLINK-8721][flip6] Handle archiving failures for ...

2018-03-22 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5737#discussion_r176518316
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java
 ---
@@ -45,84 +47,87 @@
  *  b) are not compatible with existing accumulator.
  */
 public class AccumulatorErrorITCase extends TestLogger {
-
-   private static LocalFlinkMiniCluster cluster;
-
-   private static ExecutionEnvironment env;
-
-   @BeforeClass
-   public static void startCluster() {
+   private static final String FAULTY_CLONE_ACCUMULATOR = "faulty-clone";
+   private static final String FAULTY_MERGE_ACCUMULATOR = "faulty-merge";
+   private static final String INCOMPATIBLE_ACCUMULATORS_NAME = 
"incompatible-accumulators";
+
+   @ClassRule
+   public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new 
MiniClusterResource(
+   new MiniClusterResource.MiniClusterResourceConfiguration(
+   getConfiguration(),
+   2,
+   3));
+
+   public static Configuration getConfiguration() {
Configuration config = new Configuration();
-   config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
-   config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
3);
config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 12L);
-   cluster = new LocalFlinkMiniCluster(config, false);
-
-   cluster.start();
-
-   env = new TestEnvironment(cluster, 6, false);
-   }
-
-   @AfterClass
-   public static void shutdownCluster() {
-   cluster.stop();
-   cluster = null;
+   return config;
}
 
@Test
public void testFaultyAccumulator() throws Exception {
-
+   TestEnvironment env = 
MINI_CLUSTER_RESOURCE.getTestEnvironment();
--- End diff --

a no-op sink?


---


[GitHub] flink pull request #5737: [FLINK-8721][flip6] Handle archiving failures for ...

2018-03-22 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5737#discussion_r176516910
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/util/OptionalFailure.java ---
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.Objects;
+import java.util.function.Supplier;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Wrapper around an object representing either a success (with a given 
value) or a failure cause.
+ */
+public class OptionalFailure implements Serializable {
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(OptionalFailure.class);
+
+   @Nullable
+   private T value;
+
+   @Nullable
+   private Throwable failureCause;
+
+   private OptionalFailure(@Nullable T value, @Nullable Throwable 
failureCause) {
+   this.value = value;
+   this.failureCause = failureCause;
+   }
+
+   public static  OptionalFailure of(T value) {
+   return new OptionalFailure<>(value, null);
+   }
+
+   public static  OptionalFailure ofFailure(Throwable failureCause) {
+   return new OptionalFailure<>(null, failureCause);
+   }
+
+   /**
+* @return wrapped {@link OptionalFailure} returned by {@code 
valueSupplier} or wrapped failure if
+* {@code valueSupplier} has thrown a {@link RuntimeException}.
+*/
+   public static  OptionalFailure createFrom(Supplier 
valueSupplier) {
+   try {
+   return OptionalFailure.of(valueSupplier.get());
+   }
+   catch (RuntimeException ex) {
+   LOG.error("Failed to archive accumulators", ex);
--- End diff --

The message indicates that `OptionalFailure` was implemented for the 
accumulators in mind, but I think it should be more generic. I guess that 
`AccumulatorHelper#67` is also the reason why we catch the `RuntimeException` 
to make the merge supplier as smooth as possible.


---


[GitHub] flink pull request #5737: [FLINK-8721][flip6] Handle archiving failures for ...

2018-03-22 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5737#discussion_r176517725
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsInfo.java
 ---
@@ -50,13 +51,13 @@
 
@JsonProperty(FIELD_NAME_SERIALIZED_USER_TASK_ACCUMULATORS)
@JsonSerialize(contentUsing = SerializedValueSerializer.class)
-   private Map serializedUserAccumulators;
+   private Map> 
serializedUserAccumulators;
 
@JsonCreator
public JobAccumulatorsInfo(
-   @JsonProperty(FIELD_NAME_JOB_ACCUMULATORS) 
List jobAccumulators,
-   @JsonProperty(FIELD_NAME_USER_TASK_ACCUMULATORS) 
List userAccumulators,
-   @JsonDeserialize(contentUsing = 
SerializedValueDeserializer.class) 
@JsonProperty(FIELD_NAME_SERIALIZED_USER_TASK_ACCUMULATORS) Map serializedUserAccumulators) {
+   @JsonProperty(FIELD_NAME_JOB_ACCUMULATORS) List 
jobAccumulators,
+   @JsonProperty(FIELD_NAME_USER_TASK_ACCUMULATORS) 
List userAccumulators,
+   @JsonDeserialize(contentUsing = 
SerializedValueDeserializer.class) 
@JsonProperty(FIELD_NAME_SERIALIZED_USER_TASK_ACCUMULATORS) Map> serializedUserAccumulators) {
--- End diff --

indentation is wrong


---


[GitHub] flink pull request #5737: [FLINK-8721][flip6] Handle archiving failures for ...

2018-03-22 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5737#discussion_r176517187
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/client/SerializedJobExecutionResult.java
 ---
@@ -48,8 +49,9 @@
 * @param netRuntime The net runtime of the job (excluding pre-flight 
phase like the optimizer) in milliseconds
 * @param accumulators A map of all accumulator results produced by the 
job, in serialized form
 */
-   public SerializedJobExecutionResult(JobID jobID, long netRuntime,
-   
Map accumulators) {
+   public SerializedJobExecutionResult(JobID jobID,
+   
long netRuntime,
+   
Map> accumulators) {
--- End diff --

Something is with the indentation off here.


---


[GitHub] flink pull request #5737: [FLINK-8721][flip6] Handle archiving failures for ...

2018-03-22 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5737#discussion_r176514312
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/util/OptionalFailure.java ---
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.Objects;
+import java.util.function.Supplier;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Wrapper around an object representing either a success (with a given 
value) or a failure cause.
+ */
+public class OptionalFailure implements Serializable {
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(OptionalFailure.class);
+
+   @Nullable
+   private T value;
--- End diff --

This type is not serializable. I think you should mark it `transient` and 
then override `readObject` and `writeObject` similar to how `ArrayList` does it.


---


[GitHub] flink pull request #5737: [FLINK-8721][flip6] Handle archiving failures for ...

2018-03-22 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5737#discussion_r176454726
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java
 ---
@@ -45,84 +47,87 @@
  *  b) are not compatible with existing accumulator.
  */
 public class AccumulatorErrorITCase extends TestLogger {
-
-   private static LocalFlinkMiniCluster cluster;
-
-   private static ExecutionEnvironment env;
-
-   @BeforeClass
-   public static void startCluster() {
+   private static final String FAULTY_CLONE_ACCUMULATOR = "faulty-clone";
+   private static final String FAULTY_MERGE_ACCUMULATOR = "faulty-merge";
+   private static final String INCOMPATIBLE_ACCUMULATORS_NAME = 
"incompatible-accumulators";
+
+   @ClassRule
+   public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new 
MiniClusterResource(
+   new MiniClusterResource.MiniClusterResourceConfiguration(
+   getConfiguration(),
+   2,
+   3));
+
+   public static Configuration getConfiguration() {
Configuration config = new Configuration();
-   config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
-   config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
3);
config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 12L);
-   cluster = new LocalFlinkMiniCluster(config, false);
-
-   cluster.start();
-
-   env = new TestEnvironment(cluster, 6, false);
-   }
-
-   @AfterClass
-   public static void shutdownCluster() {
-   cluster.stop();
-   cluster = null;
+   return config;
}
 
@Test
public void testFaultyAccumulator() throws Exception {
-
+   TestEnvironment env = 
MINI_CLUSTER_RESOURCE.getTestEnvironment();
--- End diff --

Is there an equivalent to `.output(new DiscardingOutputFormat<>());` in the 
 `StreamExecutionEnvironment`?


---


[GitHub] flink pull request #5737: [FLINK-8721][flip6] Handle archiving failures for ...

2018-03-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5737#discussion_r176207009
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/StringifiedAccumulatorResult.java
 ---
@@ -67,22 +72,29 @@ public String getValue() {
 
int i = 0;
for (Map.Entry entry : 
accs.entrySet()) {
-   StringifiedAccumulatorResult result;
-   Accumulator accumulator = 
entry.getValue();
-   if (accumulator != null) {
-   Object localValue = 
accumulator.getLocalValue();
-   if (localValue != null) {
-   result = new 
StringifiedAccumulatorResult(entry.getKey(), 
accumulator.getClass().getSimpleName(), localValue.toString());
-   } else {
-   result = new 
StringifiedAccumulatorResult(entry.getKey(), 
accumulator.getClass().getSimpleName(), "null");
-   }
-   } else {
-   result = new 
StringifiedAccumulatorResult(entry.getKey(), "null", "null");
-   }
-
-   results[i++] = result;
+   results[i++] = 
stringifyAccumulatorResult(entry.getKey(), entry.getValue());
}
return results;
}
}
+
+   private static StringifiedAccumulatorResult 
stringifyAccumulatorResult(String name, Accumulator accumulator) {
--- End diff --

`@Nullable` missing for `accumulator`


---


[GitHub] flink pull request #5737: [FLINK-8721][flip6] Handle archiving failures for ...

2018-03-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5737#discussion_r176208188
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java
 ---
@@ -45,84 +47,87 @@
  *  b) are not compatible with existing accumulator.
  */
 public class AccumulatorErrorITCase extends TestLogger {
-
-   private static LocalFlinkMiniCluster cluster;
-
-   private static ExecutionEnvironment env;
-
-   @BeforeClass
-   public static void startCluster() {
+   private static final String FAULTY_CLONE_ACCUMULATOR = "faulty-clone";
+   private static final String FAULTY_MERGE_ACCUMULATOR = "faulty-merge";
+   private static final String INCOMPATIBLE_ACCUMULATORS_NAME = 
"incompatible-accumulators";
+
+   @ClassRule
+   public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new 
MiniClusterResource(
+   new MiniClusterResource.MiniClusterResourceConfiguration(
+   getConfiguration(),
+   2,
+   3));
+
+   public static Configuration getConfiguration() {
Configuration config = new Configuration();
-   config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
-   config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
3);
config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 12L);
-   cluster = new LocalFlinkMiniCluster(config, false);
-
-   cluster.start();
-
-   env = new TestEnvironment(cluster, 6, false);
-   }
-
-   @AfterClass
-   public static void shutdownCluster() {
-   cluster.stop();
-   cluster = null;
+   return config;
}
 
@Test
public void testFaultyAccumulator() throws Exception {
-
+   TestEnvironment env = 
MINI_CLUSTER_RESOURCE.getTestEnvironment();
--- End diff --

You could also write `StreamExecutionEnvironment.getExecutionEnvironment()`.


---


[GitHub] flink pull request #5737: [FLINK-8721][flip6] Handle archiving failures for ...

2018-03-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5737#discussion_r176207225
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/StringifiedAccumulatorResult.java
 ---
@@ -67,22 +72,29 @@ public String getValue() {
 
int i = 0;
for (Map.Entry entry : 
accs.entrySet()) {
-   StringifiedAccumulatorResult result;
-   Accumulator accumulator = 
entry.getValue();
-   if (accumulator != null) {
-   Object localValue = 
accumulator.getLocalValue();
-   if (localValue != null) {
-   result = new 
StringifiedAccumulatorResult(entry.getKey(), 
accumulator.getClass().getSimpleName(), localValue.toString());
-   } else {
-   result = new 
StringifiedAccumulatorResult(entry.getKey(), 
accumulator.getClass().getSimpleName(), "null");
-   }
-   } else {
-   result = new 
StringifiedAccumulatorResult(entry.getKey(), "null", "null");
-   }
-
-   results[i++] = result;
+   results[i++] = 
stringifyAccumulatorResult(entry.getKey(), entry.getValue());
}
return results;
}
}
+
+   private static StringifiedAccumulatorResult 
stringifyAccumulatorResult(String name, Accumulator accumulator) {
+   if (accumulator == null) {
+   return new StringifiedAccumulatorResult(name, "null", 
"null");
+   } else {
+   Object localValue;
+   try {
+   localValue = accumulator.getLocalValue();
+   }
+   catch (RuntimeException exception) {
+   LOG.error("Failed to stringify accumulator", 
exception);
--- End diff --

Maybe add `name` to log statement.


---


[GitHub] flink pull request #5737: [FLINK-8721][flip6] Handle archiving failures for ...

2018-03-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5737#discussion_r176212439
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 ---
@@ -763,48 +764,32 @@ public Executor getFutureExecutor() {
return userAccumulators;
}
 
-   /**
-* Gets the accumulator results.
-*/
-   public Map getAccumulators() {
-
-   Map accumulatorMap = 
aggregateUserAccumulators();
-
-   Map result = new HashMap<>();
-   for (Map.Entry entry : 
accumulatorMap.entrySet()) {
-   result.put(entry.getKey(), 
entry.getValue().getLocalValue());
-   }
-
-   return result;
-   }
-
/**
 * Gets a serialized accumulator map.
 * @return The accumulator map with serialized accumulator values.
 */
@Override
public Map getAccumulatorsSerialized() 
{
+   return aggregateUserAccumulators()
+   .entrySet()
+   .stream()
+   .collect(Collectors.toMap(Map.Entry::getKey, entry -> 
serializeAccumulator(entry.getKey(), entry.getValue(;
+   }
 
-   Map accumulatorMap = 
aggregateUserAccumulators();
-
-   Map result = new 
HashMap<>(accumulatorMap.size());
-   for (Map.Entry entry : 
accumulatorMap.entrySet()) {
-
+   private static SerializedValue serializeAccumulator(String 
name, Accumulator accumulator) {
+   try {
+   if (accumulator instanceof FailedAccumulator) {
+   return new SerializedValue<>(accumulator);
+   }
+   return new 
SerializedValue<>(accumulator.getLocalValue());
+   } catch (IOException ioe) {
+   LOG.error("Could not serialize accumulator " + name + 
'.', ioe);
try {
-   final SerializedValue serializedValue = 
new SerializedValue<>(entry.getValue().getLocalValue());
-   result.put(entry.getKey(), serializedValue);
-   } catch (IOException ioe) {
-   LOG.error("Could not serialize accumulator " + 
entry.getKey() + '.', ioe);
-
-   try {
-   result.put(entry.getKey(), new 
SerializedValue<>(new FailedAccumulatorSerialization(ioe)));
-   } catch (IOException e) {
-   throw new RuntimeException("It should 
never happen that we cannot serialize the accumulator serialization 
exception.", e);
-   }
+   return new SerializedValue<>(new 
FailedAccumulator(ioe));
--- End diff --

Hmm, the problem I see here is that in the success case, we store the 
accumulator value and in the failure case we store an `Accumulator` instance. 
Thus, the user will expect the accumulator value and casting it accordingly. 
Thus he will never call the `Accumulator` methods which will throw the 
exceptions (see `JobExecutionResult` for how the user interacts with the 
accumulator values). In that sense the previous solution with storing a 
`FailedAccumulatorSerialization` was also flawed.

What we actually would have to store in the `SerializedValue` is something 
like an `Either`. On the client side when accessing the 
`accumulatorsValueMap` it should check whether it is left or right and in the 
left case throw the exception.

Alternatively, we say that an accumulator failure always results in a job 
failure. This means that in `JobMaster#jobStatusChanged` we generate a failed 
`ArchivedExecutionGraph` in case of an accumulator failure.


---


[GitHub] flink pull request #5737: [FLINK-8721][flip6] Handle archiving failures for ...

2018-03-21 Thread pnowojski
GitHub user pnowojski opened a pull request:

https://github.com/apache/flink/pull/5737

[FLINK-8721][flip6] Handle archiving failures for accumulators

During archivization, wrap errors thrown by users' Accumulators into a 
FailedAccumulator and do not fail the job because of that.

## Verifying this change

This change is covered by existing AccumulatorErrorITCase

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/pnowojski/flink f8721

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5737.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5737


commit c61014ae458767926c4f5b13a9df7bc70135d13c
Author: Piotr Nowojski 
Date:   2018-03-21T09:54:49Z

[hotfix][runtime] Remove unused method

commit 655644322f7c61ddd89bcc3e5aab636047b97d55
Author: Piotr Nowojski 
Date:   2018-03-21T12:08:36Z

[FLINK-8721][flip6] Handle archiving failures for accumulators




---