[jira] [Commented] (BEAM-2831) Possible bug in Beam+Flink memory management, disk spillover

2017-09-01 Thread Kenneth Knowles (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2831?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16151162#comment-16151162
 ] 

Kenneth Knowles commented on BEAM-2831:
---

I think for the most part there's no implications. I have no immediate 
objection based on any principles. The choice between wrapping an IOException 
into a CoderException versus not doing so is not clear in many cases. In 
particular, EOF and parse error are not really distinct errors, since malformed 
data causes the EOF.

This may violate our rigid backwards compatibility requirements, though it 
probably affects zero actual users. Especially since they have to handle all 
IOExceptions anyhow, and because of the semantic ambiguity above, basically 
have to take the same action too.

We have 100% rigidity on type errors and leaving working code working, but for 
bugs and places where the behavior is not well-defined (like what kind of 
exception a particular coder throws in particular situations) maybe there's 
flexibility...

> Possible bug in Beam+Flink memory management, disk spillover
> 
>
> Key: BEAM-2831
> URL: https://issues.apache.org/jira/browse/BEAM-2831
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Affects Versions: 2.0.0, 2.1.0
> Environment: Flink 1.2.1 and 1.3.0, Java HotSpot and OpenJDK 8, macOS 
> 10.12.6 and unknown Linux
>Reporter: Reinier Kip
>Assignee: Aljoscha Krettek
>
> I’ve been running a Beam pipeline on Flink. Depending on the dataset size and 
> the heap memory configuration of the jobmanager and taskmanager, I may run 
> into an EOFException, which causes the job to fail.
> As [discussed on Flink's 
> mailinglist|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/EOFException-related-to-memory-segments-during-run-of-Beam-pipeline-on-Flink-td15255.html]
>  (stacktrace enclosed), Flink catches these EOFExceptions and activates disk 
> spillover. Because Beam wraps these exceptions, this mechanism fails, the 
> exception travels up the stack, and the job aborts.
> Hopefully this is enough information and this is something that can be 
> adjusted for in Beam. I'd be glad to provide more information where needed.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-2831) Possible bug in Beam+Flink memory management, disk spillover

2017-09-01 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2831?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16150656#comment-16150656
 ] 

Aljoscha Krettek commented on BEAM-2831:


[~kenn] Do you think we can change {{SerializableCoder}} to pass through the 
exception instead of wrapping it in a {{CoderException}}? Not sure what 
implications this could have for other runners.

> Possible bug in Beam+Flink memory management, disk spillover
> 
>
> Key: BEAM-2831
> URL: https://issues.apache.org/jira/browse/BEAM-2831
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Affects Versions: 2.0.0, 2.1.0
> Environment: Flink 1.2.1 and 1.3.0, Java HotSpot and OpenJDK 8, macOS 
> 10.12.6 and unknown Linux
>Reporter: Reinier Kip
>Assignee: Aljoscha Krettek
>
> I’ve been running a Beam pipeline on Flink. Depending on the dataset size and 
> the heap memory configuration of the jobmanager and taskmanager, I may run 
> into an EOFException, which causes the job to fail.
> As [discussed on Flink's 
> mailinglist|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/EOFException-related-to-memory-segments-during-run-of-Beam-pipeline-on-Flink-td15255.html]
>  (stacktrace enclosed), Flink catches these EOFExceptions and activates disk 
> spillover. Because Beam wraps these exceptions, this mechanism fails, the 
> exception travels up the stack, and the job aborts.
> Hopefully this is enough information and this is something that can be 
> adjusted for in Beam. I'd be glad to provide more information where needed.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-2831) Possible bug in Beam+Flink memory management, disk spillover

2017-08-31 Thread Reinier Kip (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2831?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16149496#comment-16149496
 ] 

Reinier Kip commented on BEAM-2831:
---

[~aljoscha] Yes! The pipeline consistently succeeds with this change. Thanks 
for your effort. I suspect we will intregrate this replacement coder while we 
wait for the fix to come out. Do you foresee any consequences due to the 
considerations you wrote near the bottom?

> Possible bug in Beam+Flink memory management, disk spillover
> 
>
> Key: BEAM-2831
> URL: https://issues.apache.org/jira/browse/BEAM-2831
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Affects Versions: 2.0.0, 2.1.0
> Environment: Flink 1.2.1 and 1.3.0, Java HotSpot and OpenJDK 8, macOS 
> 10.12.6 and unknown Linux
>Reporter: Reinier Kip
>Assignee: Aljoscha Krettek
>
> I’ve been running a Beam pipeline on Flink. Depending on the dataset size and 
> the heap memory configuration of the jobmanager and taskmanager, I may run 
> into an EOFException, which causes the job to fail.
> As [discussed on Flink's 
> mailinglist|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/EOFException-related-to-memory-segments-during-run-of-Beam-pipeline-on-Flink-td15255.html]
>  (stacktrace enclosed), Flink catches these EOFExceptions and activates disk 
> spillover. Because Beam wraps these exceptions, this mechanism fails, the 
> exception travels up the stack, and the job aborts.
> Hopefully this is enough information and this is something that can be 
> adjusted for in Beam. I'd be glad to provide more information where needed.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-2831) Possible bug in Beam+Flink memory management, disk spillover

2017-08-31 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2831?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16148985#comment-16148985
 ] 

Aljoscha Krettek commented on BEAM-2831:


Could you try running it with this modified {{SerializableCoder}}:
{coder}
public class SerializableCoder extends CustomCoder {

  /**
   * Returns a {@link SerializableCoder} instance for the provided element type.
   * @param  the element type
   */
  public static  SerializableCoder 
of(TypeDescriptor type) {
@SuppressWarnings("unchecked")
Class clazz = (Class) type.getRawType();
return new SerializableCoder<>(clazz, type);
  }

  /**
   * Returns a {@link SerializableCoder} instance for the provided element 
class.
   * @param  the element type
   */
  public static  SerializableCoder of(Class 
clazz) {
return new SerializableCoder<>(clazz, TypeDescriptor.of(clazz));
  }

  /**
   * Returns a {@link CoderProvider} which uses the {@link SerializableCoder} 
if possible for
   * all types.
   *
   * This method is invoked reflectively from {@link DefaultCoder}.
   */
  @SuppressWarnings("unused")
  public static CoderProvider getCoderProvider() {
return new SerializableCoderProvider();
  }

  /**
   * A {@link CoderProviderRegistrar} which registers a {@link CoderProvider} 
which can handle
   * serializable types.
   */
  public static class SerializableCoderProviderRegistrar implements 
CoderProviderRegistrar {

@Override
public List getCoderProviders() {
  return ImmutableList.of(getCoderProvider());
}
  }

  /**
   * A {@link CoderProvider} that constructs a {@link SerializableCoder} for 
any class that
   * implements serializable.
   */
  static class SerializableCoderProvider extends CoderProvider {
@Override
public  Coder coderFor(TypeDescriptor typeDescriptor,
List> componentCoders) throws 
CannotProvideCoderException {
  if (Serializable.class.isAssignableFrom(typeDescriptor.getRawType())) {
return SerializableCoder.of((TypeDescriptor) typeDescriptor);
  }
  throw new CannotProvideCoderException(
  "Cannot provide SerializableCoder because " + typeDescriptor
  + " does not implement Serializable");
}
  }

  private final Class type;
  private transient TypeDescriptor typeDescriptor;

  protected SerializableCoder(Class type, TypeDescriptor typeDescriptor) {
this.type = type;
this.typeDescriptor = typeDescriptor;
  }

  public Class getRecordType() {
return type;
  }

  @Override
  public void encode(T value, OutputStream outStream)
  throws IOException {
ObjectOutputStream oos = new ObjectOutputStream(outStream);
oos.writeObject(value);
oos.flush();
  }

  @Override
  public T decode(InputStream inStream)
  throws IOException, CoderException {
try {
  ObjectInputStream ois = new ObjectInputStream(inStream);
  return type.cast(ois.readObject());
} catch (ClassNotFoundException e) {
  throw new CoderException("unable to deserialize record", e);
}
  }

  /**
   * {@inheritDoc}
   *
   * @throws NonDeterministicException always. Java serialization is not
   * deterministic with respect to {@link Object#equals} for all types.
   */
  @Override
  public void verifyDeterministic() throws NonDeterministicException {
throw new NonDeterministicException(this,
"Java Serialization may be non-deterministic.");
  }

  @Override
  public boolean equals(Object other) {
return !(other == null || getClass() != other.getClass())
&& type == ((SerializableCoder) other).type;
  }

  @Override
  public int hashCode() {
return type.hashCode();
  }

  @Override
  public TypeDescriptor getEncodedTypeDescriptor() {
if (typeDescriptor == null) {
  typeDescriptor = TypeDescriptor.of(type);
}
return typeDescriptor;
  }

  // This coder inherits isRegisterByteSizeObserverCheap,
  // getEncodedElementByteSize and registerByteSizeObserver
  // from StructuredCoder. Looks like we cannot do much better
  // in this case.
}
{code}

The only change is in {{encode()}} where we don't wrap the {{EOFException}} 
anymore. I think this should fix the problem and if it indeed does we should 
include this change in Beam.

> Possible bug in Beam+Flink memory management, disk spillover
> 
>
> Key: BEAM-2831
> URL: https://issues.apache.org/jira/browse/BEAM-2831
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Affects Versions: 2.0.0, 2.1.0
> Environment: Flink 1.2.1 and 1.3.0, Java HotSpot and OpenJDK 8, macOS 
> 10.12.6 and unknown Linux
>Reporter: Reinier Kip
>Assignee: Aljoscha Krettek
>
> I’ve been running a Beam pipeline on Flink. Depending on the dataset size and 
> the heap memory configuration of the jobmanager and