[jira] [Commented] (BEAM-2831) Possible bug in Beam+Flink memory management, disk spillover
[ https://issues.apache.org/jira/browse/BEAM-2831?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16151162#comment-16151162 ] Kenneth Knowles commented on BEAM-2831: --- I think for the most part there's no implications. I have no immediate objection based on any principles. The choice between wrapping an IOException into a CoderException versus not doing so is not clear in many cases. In particular, EOF and parse error are not really distinct errors, since malformed data causes the EOF. This may violate our rigid backwards compatibility requirements, though it probably affects zero actual users. Especially since they have to handle all IOExceptions anyhow, and because of the semantic ambiguity above, basically have to take the same action too. We have 100% rigidity on type errors and leaving working code working, but for bugs and places where the behavior is not well-defined (like what kind of exception a particular coder throws in particular situations) maybe there's flexibility... > Possible bug in Beam+Flink memory management, disk spillover > > > Key: BEAM-2831 > URL: https://issues.apache.org/jira/browse/BEAM-2831 > Project: Beam > Issue Type: Bug > Components: runner-flink >Affects Versions: 2.0.0, 2.1.0 > Environment: Flink 1.2.1 and 1.3.0, Java HotSpot and OpenJDK 8, macOS > 10.12.6 and unknown Linux >Reporter: Reinier Kip >Assignee: Aljoscha Krettek > > I’ve been running a Beam pipeline on Flink. Depending on the dataset size and > the heap memory configuration of the jobmanager and taskmanager, I may run > into an EOFException, which causes the job to fail. > As [discussed on Flink's > mailinglist|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/EOFException-related-to-memory-segments-during-run-of-Beam-pipeline-on-Flink-td15255.html] > (stacktrace enclosed), Flink catches these EOFExceptions and activates disk > spillover. Because Beam wraps these exceptions, this mechanism fails, the > exception travels up the stack, and the job aborts. > Hopefully this is enough information and this is something that can be > adjusted for in Beam. I'd be glad to provide more information where needed. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (BEAM-2831) Possible bug in Beam+Flink memory management, disk spillover
[ https://issues.apache.org/jira/browse/BEAM-2831?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16150656#comment-16150656 ] Aljoscha Krettek commented on BEAM-2831: [~kenn] Do you think we can change {{SerializableCoder}} to pass through the exception instead of wrapping it in a {{CoderException}}? Not sure what implications this could have for other runners. > Possible bug in Beam+Flink memory management, disk spillover > > > Key: BEAM-2831 > URL: https://issues.apache.org/jira/browse/BEAM-2831 > Project: Beam > Issue Type: Bug > Components: runner-flink >Affects Versions: 2.0.0, 2.1.0 > Environment: Flink 1.2.1 and 1.3.0, Java HotSpot and OpenJDK 8, macOS > 10.12.6 and unknown Linux >Reporter: Reinier Kip >Assignee: Aljoscha Krettek > > I’ve been running a Beam pipeline on Flink. Depending on the dataset size and > the heap memory configuration of the jobmanager and taskmanager, I may run > into an EOFException, which causes the job to fail. > As [discussed on Flink's > mailinglist|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/EOFException-related-to-memory-segments-during-run-of-Beam-pipeline-on-Flink-td15255.html] > (stacktrace enclosed), Flink catches these EOFExceptions and activates disk > spillover. Because Beam wraps these exceptions, this mechanism fails, the > exception travels up the stack, and the job aborts. > Hopefully this is enough information and this is something that can be > adjusted for in Beam. I'd be glad to provide more information where needed. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (BEAM-2831) Possible bug in Beam+Flink memory management, disk spillover
[ https://issues.apache.org/jira/browse/BEAM-2831?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16149496#comment-16149496 ] Reinier Kip commented on BEAM-2831: --- [~aljoscha] Yes! The pipeline consistently succeeds with this change. Thanks for your effort. I suspect we will intregrate this replacement coder while we wait for the fix to come out. Do you foresee any consequences due to the considerations you wrote near the bottom? > Possible bug in Beam+Flink memory management, disk spillover > > > Key: BEAM-2831 > URL: https://issues.apache.org/jira/browse/BEAM-2831 > Project: Beam > Issue Type: Bug > Components: runner-flink >Affects Versions: 2.0.0, 2.1.0 > Environment: Flink 1.2.1 and 1.3.0, Java HotSpot and OpenJDK 8, macOS > 10.12.6 and unknown Linux >Reporter: Reinier Kip >Assignee: Aljoscha Krettek > > I’ve been running a Beam pipeline on Flink. Depending on the dataset size and > the heap memory configuration of the jobmanager and taskmanager, I may run > into an EOFException, which causes the job to fail. > As [discussed on Flink's > mailinglist|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/EOFException-related-to-memory-segments-during-run-of-Beam-pipeline-on-Flink-td15255.html] > (stacktrace enclosed), Flink catches these EOFExceptions and activates disk > spillover. Because Beam wraps these exceptions, this mechanism fails, the > exception travels up the stack, and the job aborts. > Hopefully this is enough information and this is something that can be > adjusted for in Beam. I'd be glad to provide more information where needed. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (BEAM-2831) Possible bug in Beam+Flink memory management, disk spillover
[ https://issues.apache.org/jira/browse/BEAM-2831?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16148985#comment-16148985 ] Aljoscha Krettek commented on BEAM-2831: Could you try running it with this modified {{SerializableCoder}}: {coder} public class SerializableCoder extends CustomCoder { /** * Returns a {@link SerializableCoder} instance for the provided element type. * @param the element type */ public static SerializableCoder of(TypeDescriptor type) { @SuppressWarnings("unchecked") Class clazz = (Class) type.getRawType(); return new SerializableCoder<>(clazz, type); } /** * Returns a {@link SerializableCoder} instance for the provided element class. * @param the element type */ public static SerializableCoder of(Class clazz) { return new SerializableCoder<>(clazz, TypeDescriptor.of(clazz)); } /** * Returns a {@link CoderProvider} which uses the {@link SerializableCoder} if possible for * all types. * * This method is invoked reflectively from {@link DefaultCoder}. */ @SuppressWarnings("unused") public static CoderProvider getCoderProvider() { return new SerializableCoderProvider(); } /** * A {@link CoderProviderRegistrar} which registers a {@link CoderProvider} which can handle * serializable types. */ public static class SerializableCoderProviderRegistrar implements CoderProviderRegistrar { @Override public List getCoderProviders() { return ImmutableList.of(getCoderProvider()); } } /** * A {@link CoderProvider} that constructs a {@link SerializableCoder} for any class that * implements serializable. */ static class SerializableCoderProvider extends CoderProvider { @Override public Coder coderFor(TypeDescriptor typeDescriptor, List> componentCoders) throws CannotProvideCoderException { if (Serializable.class.isAssignableFrom(typeDescriptor.getRawType())) { return SerializableCoder.of((TypeDescriptor) typeDescriptor); } throw new CannotProvideCoderException( "Cannot provide SerializableCoder because " + typeDescriptor + " does not implement Serializable"); } } private final Class type; private transient TypeDescriptor typeDescriptor; protected SerializableCoder(Class type, TypeDescriptor typeDescriptor) { this.type = type; this.typeDescriptor = typeDescriptor; } public Class getRecordType() { return type; } @Override public void encode(T value, OutputStream outStream) throws IOException { ObjectOutputStream oos = new ObjectOutputStream(outStream); oos.writeObject(value); oos.flush(); } @Override public T decode(InputStream inStream) throws IOException, CoderException { try { ObjectInputStream ois = new ObjectInputStream(inStream); return type.cast(ois.readObject()); } catch (ClassNotFoundException e) { throw new CoderException("unable to deserialize record", e); } } /** * {@inheritDoc} * * @throws NonDeterministicException always. Java serialization is not * deterministic with respect to {@link Object#equals} for all types. */ @Override public void verifyDeterministic() throws NonDeterministicException { throw new NonDeterministicException(this, "Java Serialization may be non-deterministic."); } @Override public boolean equals(Object other) { return !(other == null || getClass() != other.getClass()) && type == ((SerializableCoder) other).type; } @Override public int hashCode() { return type.hashCode(); } @Override public TypeDescriptor getEncodedTypeDescriptor() { if (typeDescriptor == null) { typeDescriptor = TypeDescriptor.of(type); } return typeDescriptor; } // This coder inherits isRegisterByteSizeObserverCheap, // getEncodedElementByteSize and registerByteSizeObserver // from StructuredCoder. Looks like we cannot do much better // in this case. } {code} The only change is in {{encode()}} where we don't wrap the {{EOFException}} anymore. I think this should fix the problem and if it indeed does we should include this change in Beam. > Possible bug in Beam+Flink memory management, disk spillover > > > Key: BEAM-2831 > URL: https://issues.apache.org/jira/browse/BEAM-2831 > Project: Beam > Issue Type: Bug > Components: runner-flink >Affects Versions: 2.0.0, 2.1.0 > Environment: Flink 1.2.1 and 1.3.0, Java HotSpot and OpenJDK 8, macOS > 10.12.6 and unknown Linux >Reporter: Reinier Kip >Assignee: Aljoscha Krettek > > I’ve been running a Beam pipeline on Flink. Depending on the dataset size and > the heap memory configuration of the jobmanager and