Re: `env.java.opts` not persisting after job canceled or failed and then restarted

2019-01-29 Thread Ufuk Celebi
Hey Aaron,

I'm glad to hear that you resolved the issue.

I think a docs contribution for this would be very helpful and could
update this page:
https://github.com/apache/flink/blob/master/docs/monitoring/debugging_classloading.md.

If you want to create a separate JIRA ticket for this, ping me with
your JIRA username and I'll add you to the list of contributors (which
gives you permissions to create tickets).

I'll think a bit more about the other points you mentioned and get
back to you if I have another idea.

Best,

Ufuk

On Tue, Jan 29, 2019 at 10:48 PM Aaron Levin  wrote:
>
> Hi Ufuk,
>
> I'll answer your question, but first I'll give you an update on how we 
> resolved the issue:
>
> * adding `org.apache.hadoop.io.compress.SnappyCodec` to 
> `classloader.parent-first-patterns.additional` in `flink-conf.yaml` (though, 
> putting `org.apache.hadoop.util.NativeCodeLoader` also worked)
> * putting a jar with `hadoop-common` + it's transitive dependencies, then 
> using jarjar[0] to `keep org.apache.hadoop.io.compress.SnappyCodec` (and its 
> transitive dependencies). So we end up with jar that has `SnappyCodec` and 
> whatever it needs to call transitively. We put this jar on the task manager 
> classpath.
>
> I believe `SnappyCodec` was being called via our code. This worked the first 
> time but deploying a second time caused `libhadoop.so` to be loaded in a 
> second class loader. By putting a jar with `SnappyCodec` and it's transitive 
> dependencies on the task manager classpath and specifying that `SnappyCodec` 
> needs to be loaded from the parent classloader, we ensure that only one 
> classloader loads `libhadoop.so`. I don't think this is the best way to 
> achieve what we want, but it works for now.
>
> Next steps: if no one is on it, I can take a stab at updating the 
> documentation to clarify how to debug and resolve Native library loading. 
> This was a nice learning experience and I think it'll be helpful to have this 
> in the docs for those who aren't well-versed in how classloading on the JVM 
> works!
>
> To answer your questions:
>
> 1. We install hadoop on our machines and tell flink task managers to access 
> it via `env.java.opts: -Djava.library.path=/usr/local/hadoop/lib/native` in 
> `flink-conf.yaml`
> 2. We put flink's shaded hadoop-fs-s3 on both the task manager and job 
> manager classpath (I believe this is only used by the Job Managers when they 
> interact with S3 for checkpoints etc. I don't believe any user code is using 
> this).
> 3. Our flink applications consist of a "fat jar" that has some 
> `org.apache.hadoop` dependencies bundled with it. I believe this is the 
> source of why we're loading `SnappyCodec` twice and triggering this issue.
> 4. For example code: we have a small wrapper around 
> `org.apache.flink.api.common.io.FileInputFormat` which does the work with 
> sequence files. It looks like (after removing some stuff to make it more 
> clear):
>
> ```
> abstract class FlinkSequenceFileInputFormat[T, K <: Writable, V <: Writable](
> typeInformation: TypeInformation[T]
> ) extends FileInputFormat[T]
> with ResultTypeQueryable[T] {
>   @transient private var bufferedNextRecord: T = _
>   @transient private var hadoopStream: HadoopFSDataInputStream = _
>   @transient private var sequenceFileReader: SequenceFile.Reader = _
>
>   unsplittable = true
>   enumerateNestedFiles = true
>
>   // *
>   // This is where we'd see exceptions.
>   // *
>   override def open(fileSplit: FileInputSplit): Unit = {
> super.open(fileSplit)
> val config = new Configuration()
> hadoopStream = WrappedHadoopInputStream.wrap(stream)
> sequenceFileReader = new SequenceFile.Reader(config, 
> SequenceFile.Reader.stream(hadoopStream))
> bufferNextRecord()
>   }
> ...
> }
>
> // AND
>
> class WrappedHadoopInputStream(underlying: FlinkFSDataInputStream)
> extends InputStream
> with Seekable
> with PositionedReadable {
>
>   def read(): Int = underlying.read()
>   def seek(pos: Long): Unit = underlying.seek(pos)
>   def getPos: Long = underlying.getPos
> }
> ...
> ```
>
> Thanks for all your help, I appreciate it! I wouldn't have been able to debug 
> and resolve this if it wasn't for you filing the ticket. Thank you so much!
>
> [0] https://github.com/pantsbuild/jarjar
>
> Aaron Levin
>
> On Mon, Jan 28, 2019 at 4:16 AM Ufuk Celebi  wrote:
>>
>> Hey Aaron,
>>
>> sorry for the late reply (again).
>>
>> (1) I think that your final result is in line with what I have
>> reproduced in https://issues.apache.org/jira/browse/FLINK-11402.
>>
>> (2) I think renaming the file would not help as it will still be
>> loaded multiple times when the jobs restarts (as it happens in
>> FLINK-11402).
>>
>> (3) I'll try to check whether Flink's shading of Hadoop is related to
>> this. I don't think so though. @Chesnay (cc'd): What do you think?
>>
>> (4) @Aaron: Can you tell me which 

Re: `env.java.opts` not persisting after job canceled or failed and then restarted

2019-01-29 Thread Aaron Levin
Hi Ufuk,

I'll answer your question, but first I'll give you an update on how we
resolved the issue:

* adding `org.apache.hadoop.io.compress.SnappyCodec` to
`classloader.parent-first-patterns.additional` in `flink-conf.yaml`
(though, putting `org.apache.hadoop.util.NativeCodeLoader` also worked)
* putting a jar with `hadoop-common` + it's transitive dependencies, then
using jarjar[0] to `keep org.apache.hadoop.io.compress.SnappyCodec` (and
its transitive dependencies). So we end up with jar that has `SnappyCodec`
and whatever it needs to call transitively. We put this jar on the task
manager classpath.

I believe `SnappyCodec` was being called via our code. This worked the
first time but deploying a second time caused `libhadoop.so` to be loaded
in a second class loader. By putting a jar with `SnappyCodec` and it's
transitive dependencies on the task manager classpath and specifying that
`SnappyCodec` needs to be loaded from the parent classloader, we ensure
that only one classloader loads `libhadoop.so`. I don't think this is the
best way to achieve what we want, but it works for now.

Next steps: if no one is on it, I can take a stab at updating the
documentation to clarify how to debug and resolve Native library loading.
This was a nice learning experience and I think it'll be helpful to have
this in the docs for those who aren't well-versed in how classloading on
the JVM works!

To answer your questions:

1. We install hadoop on our machines and tell flink task managers to access
it via `env.java.opts: -Djava.library.path=/usr/local/hadoop/lib/native` in
`flink-conf.yaml`
2. We put flink's shaded hadoop-fs-s3 on both the task manager and job
manager classpath (I believe this is only used by the Job Managers when
they interact with S3 for checkpoints etc. I don't believe any user code is
using this).
3. Our flink applications consist of a "fat jar" that has some
`org.apache.hadoop` dependencies bundled with it. I believe this is the
source of why we're loading `SnappyCodec` twice and triggering this issue.
4. For example code: we have a small wrapper around
`org.apache.flink.api.common.io.FileInputFormat` which does the work with
sequence files. It looks like (after removing some stuff to make it more
clear):

```
abstract class FlinkSequenceFileInputFormat[T, K <: Writable, V <:
Writable](
typeInformation: TypeInformation[T]
) extends FileInputFormat[T]
with ResultTypeQueryable[T] {
  @transient private var bufferedNextRecord: T = _
  @transient private var hadoopStream: HadoopFSDataInputStream = _
  @transient private var sequenceFileReader: SequenceFile.Reader = _

  unsplittable = true
  enumerateNestedFiles = true

  // *
  // This is where we'd see exceptions.
  // *
  override def open(fileSplit: FileInputSplit): Unit = {
super.open(fileSplit)
val config = new Configuration()
hadoopStream = WrappedHadoopInputStream.wrap(stream)
sequenceFileReader = new SequenceFile.Reader(config,
SequenceFile.Reader.stream(hadoopStream))
bufferNextRecord()
  }
...
}

// AND

class WrappedHadoopInputStream(underlying: FlinkFSDataInputStream)
extends InputStream
with Seekable
with PositionedReadable {

  def read(): Int = underlying.read()
  def seek(pos: Long): Unit = underlying.seek(pos)
  def getPos: Long = underlying.getPos
}
...
```

Thanks for all your help, I appreciate it! I wouldn't have been able to
debug and resolve this if it wasn't for you filing the ticket. Thank you so
much!

[0] https://github.com/pantsbuild/jarjar

Aaron Levin

On Mon, Jan 28, 2019 at 4:16 AM Ufuk Celebi  wrote:

> Hey Aaron,
>
> sorry for the late reply (again).
>
> (1) I think that your final result is in line with what I have
> reproduced in https://issues.apache.org/jira/browse/FLINK-11402.
>
> (2) I think renaming the file would not help as it will still be
> loaded multiple times when the jobs restarts (as it happens in
> FLINK-11402).
>
> (3) I'll try to check whether Flink's shading of Hadoop is related to
> this. I don't think so though. @Chesnay (cc'd): What do you think?
>
> (4) @Aaron: Can you tell me which Hadoop libraries you use and share
> some code so I can try to reproduce this exactly on my side? Judging
> from the earlier stack traces you have shared, I'm assuming you are
> trying to read Snappy-compressed sequence files.
>
> – Ufuk
>
> On Fri, Jan 25, 2019 at 4:37 PM Aaron Levin  wrote:
> >
> > I don't control the code calling `System.loadLibrary("hadoop")` so
> that's not an option for me, unfortunately.
> >
> > On Thu, Jan 24, 2019 at 7:47 PM Guowei Ma  wrote:
> >>
> >> This may be caused by a  jvm process can only load a so once.So a triky
> way is to rename it。
> >>
> >> 发自我的 iPhone
> >>
> >> 在 2019年1月25日,上午7:12,Aaron Levin  写道:
> >>
> >> Hi Ufuk,
> >>
> >> Update: I've pinned down the issue. It's multiple classloaders loading
> `libhadoop.so`:
> >>
> >> ```
> >> failed to load native 

Re: `env.java.opts` not persisting after job canceled or failed and then restarted

2019-01-28 Thread Ufuk Celebi
Hey Aaron,

sorry for the late reply (again).

(1) I think that your final result is in line with what I have
reproduced in https://issues.apache.org/jira/browse/FLINK-11402.

(2) I think renaming the file would not help as it will still be
loaded multiple times when the jobs restarts (as it happens in
FLINK-11402).

(3) I'll try to check whether Flink's shading of Hadoop is related to
this. I don't think so though. @Chesnay (cc'd): What do you think?

(4) @Aaron: Can you tell me which Hadoop libraries you use and share
some code so I can try to reproduce this exactly on my side? Judging
from the earlier stack traces you have shared, I'm assuming you are
trying to read Snappy-compressed sequence files.

– Ufuk

On Fri, Jan 25, 2019 at 4:37 PM Aaron Levin  wrote:
>
> I don't control the code calling `System.loadLibrary("hadoop")` so that's not 
> an option for me, unfortunately.
>
> On Thu, Jan 24, 2019 at 7:47 PM Guowei Ma  wrote:
>>
>> This may be caused by a  jvm process can only load a so once.So a triky way 
>> is to rename it。
>>
>> 发自我的 iPhone
>>
>> 在 2019年1月25日,上午7:12,Aaron Levin  写道:
>>
>> Hi Ufuk,
>>
>> Update: I've pinned down the issue. It's multiple classloaders loading 
>> `libhadoop.so`:
>>
>> ```
>> failed to load native hadoop with error: java.lang.UnsatisfiedLinkError: 
>> Native Library /usr/lib/libhadoop.so already loaded in another classloader
>> ```
>>
>> I'm not quite sure what the solution is. Ideally flink would destroy a 
>> classloader when a job is canceled, but perhaps there's a jvm limitation 
>> there? Putting the libraries into `/usr/lib` or `/lib` does not work (as 
>> suggested by Chesnay in the ticket) as I get the same error. I might see if 
>> I can put a jar with `org.apache.hadoop.common.io.compress` in 
>> `/flink/install/lib` and then remove it from my jar. It's not an ideal 
>> solution but I can't think of anything else.
>>
>> Best,
>>
>> Aaron Levin
>>
>> On Thu, Jan 24, 2019 at 12:18 PM Aaron Levin  wrote:
>>>
>>> Hi Ufuk,
>>>
>>> I'm starting to believe the bug is much deeper than the originally reported 
>>> error because putting the libraries in `/usr/lib` or `/lib` does not work. 
>>> This morning I dug into why putting `libhadoop.so` into `/usr/lib` didn't 
>>> work, despite that being in the `java.library.path` at the call site of the 
>>> error. I wrote a small program to test the loading of native libraries, and 
>>> it was able to successfully load `libhadoop.so`. I'm very perplexed. Could 
>>> this be related to the way flink shades hadoop stuff?
>>>
>>> Here is my program and its output:
>>>
>>> ```
>>> $ cat LibTest.scala
>>> package com.redacted.flink
>>>
>>> object LibTest {
>>>   def main(args: Array[String]): Unit = {
>>> val library = args(0)
>>> 
>>> System.out.println(s"java.library.path=${System.getProperty("java.library.path")}")
>>> System.out.println(s"Attempting to load $library")
>>> System.out.flush()
>>> System.loadLibrary(library)
>>> System.out.println(s"Successfully loaded ")
>>> System.out.flush()
>>> }
>>> ```
>>>
>>> I then tried running that on one of the task managers with `hadoop` as an 
>>> argument:
>>>
>>> ```
>>> $ java -jar lib_test_deploy.jar hadoop
>>> java.library.path=/usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib
>>> Attempting to load hadoop
>>> Exception in thread "main" java.lang.UnsatisfiedLinkError: no hadoop in 
>>> java.library.path
>>> at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1867)
>>> at java.lang.Runtime.loadLibrary0(Runtime.java:870)
>>> at java.lang.System.loadLibrary(System.java:1122)
>>> at com.stripe.flink.LibTest$.main(LibTest.scala:11)
>>> at com.stripe.flink.LibTest.main(LibTest.scala)
>>> ```
>>>
>>> I then copied the native libraries into `/usr/lib/` and ran it again:
>>>
>>> ```
>>> $ sudo cp /usr/local/hadoop/lib/native/libhadoop.so /usr/lib/
>>> $ java -jar lib_test_deploy.jar hadoop
>>> java.library.path=/usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib
>>> Attempting to load hadoop
>>> Successfully loaded
>>> ```
>>>
>>> Any ideas?
>>>
>>> On Wed, Jan 23, 2019 at 7:13 PM Aaron Levin  wrote:

 Hi Ufuk,

 One more update: I tried copying all the hadoop native `.so` files (mainly 
 `libhadoop.so`) into `/lib` and am I still experiencing the issue I 
 reported. I also tried naively adding the `.so` files to the jar with the 
 flink application and am still experiencing the issue I reported (however, 
 I'm going to investigate this further as I might not have done it 
 correctly).

 Best,

 Aaron Levin

 On Wed, Jan 23, 2019 at 3:18 PM Aaron Levin  wrote:
>
> Hi Ufuk,
>
> Two updates:
>
> 1. As suggested in the ticket, I naively copied the every `.so` in 
> `hadoop-3.0.0/lib/native/` into `/lib/` and 

Re: `env.java.opts` not persisting after job canceled or failed and then restarted

2019-01-24 Thread Guowei Ma
This may be caused by a  jvm process can only load a so once.So a triky way is 
to rename it。

发自我的 iPhone

> 在 2019年1月25日,上午7:12,Aaron Levin  写道:
> 
> Hi Ufuk,
> 
> Update: I've pinned down the issue. It's multiple classloaders loading 
> `libhadoop.so`:
> 
> ```
> failed to load native hadoop with error: java.lang.UnsatisfiedLinkError: 
> Native Library /usr/lib/libhadoop.so already loaded in another classloader
> ```
> 
> I'm not quite sure what the solution is. Ideally flink would destroy a 
> classloader when a job is canceled, but perhaps there's a jvm limitation 
> there? Putting the libraries into `/usr/lib` or `/lib` does not work (as 
> suggested by Chesnay in the ticket) as I get the same error. I might see if I 
> can put a jar with `org.apache.hadoop.common.io.compress` in 
> `/flink/install/lib` and then remove it from my jar. It's not an ideal 
> solution but I can't think of anything else.
> 
> Best,
> 
> Aaron Levin
> 
>> On Thu, Jan 24, 2019 at 12:18 PM Aaron Levin  wrote:
>> Hi Ufuk,
>> 
>> I'm starting to believe the bug is much deeper than the originally reported 
>> error because putting the libraries in `/usr/lib` or `/lib` does not work. 
>> This morning I dug into why putting `libhadoop.so` into `/usr/lib` didn't 
>> work, despite that being in the `java.library.path` at the call site of the 
>> error. I wrote a small program to test the loading of native libraries, and 
>> it was able to successfully load `libhadoop.so`. I'm very perplexed. Could 
>> this be related to the way flink shades hadoop stuff? 
>> 
>> Here is my program and its output:
>> 
>> ```
>> $ cat LibTest.scala
>> package com.redacted.flink
>> 
>> object LibTest {
>>   def main(args: Array[String]): Unit = {
>> val library = args(0)
>> 
>> System.out.println(s"java.library.path=${System.getProperty("java.library.path")}")
>> System.out.println(s"Attempting to load $library")
>> System.out.flush()
>> System.loadLibrary(library)
>> System.out.println(s"Successfully loaded ")
>> System.out.flush()
>> }
>> ```
>> 
>> I then tried running that on one of the task managers with `hadoop` as an 
>> argument:
>> 
>> ```
>> $ java -jar lib_test_deploy.jar hadoop
>> java.library.path=/usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib
>> Attempting to load hadoop
>> Exception in thread "main" java.lang.UnsatisfiedLinkError: no hadoop in 
>> java.library.path
>>  at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1867)
>>  at java.lang.Runtime.loadLibrary0(Runtime.java:870)
>>  at java.lang.System.loadLibrary(System.java:1122)
>>  at com.stripe.flink.LibTest$.main(LibTest.scala:11)
>>  at com.stripe.flink.LibTest.main(LibTest.scala)
>> ```
>> 
>> I then copied the native libraries into `/usr/lib/` and ran it again:
>> 
>> ```
>> $ sudo cp /usr/local/hadoop/lib/native/libhadoop.so /usr/lib/
>> $ java -jar lib_test_deploy.jar hadoop
>> java.library.path=/usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib
>> Attempting to load hadoop
>> Successfully loaded
>> ```
>> 
>> Any ideas? 
>> 
>>> On Wed, Jan 23, 2019 at 7:13 PM Aaron Levin  wrote:
>>> Hi Ufuk,
>>> 
>>> One more update: I tried copying all the hadoop native `.so` files (mainly 
>>> `libhadoop.so`) into `/lib` and am I still experiencing the issue I 
>>> reported. I also tried naively adding the `.so` files to the jar with the 
>>> flink application and am still experiencing the issue I reported (however, 
>>> I'm going to investigate this further as I might not have done it 
>>> correctly).
>>> 
>>> Best,
>>> 
>>> Aaron Levin
>>> 
 On Wed, Jan 23, 2019 at 3:18 PM Aaron Levin  wrote:
 Hi Ufuk,
 
 Two updates:
 
 1. As suggested in the ticket, I naively copied the every `.so` in 
 `hadoop-3.0.0/lib/native/` into `/lib/` and this did not seem to help. My 
 knowledge of how shared libs get picked up is hazy, so I'm not sure if 
 blindly copying them like that should work. I did check what 
 `System.getProperty("java.library.path")` returns at the call-site and 
 it's: 
 java.library.path=/usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib
 2. The exception I see comes from 
 `hadoop.util.NativeCodeLoader.buildSupportsSnappy` (stack-trace below). 
 This uses `System.loadLibrary("hadoop")`.
 
 [2019-01-23 19:52:33.081216] java.lang.UnsatisfiedLinkError: 
 org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy()Z
 [2019-01-23 19:52:33.081376]  at 
 org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy(Native Method)
 [2019-01-23 19:52:33.081406]  at 
 org.apache.hadoop.io.compress.SnappyCodec.checkNativeCodeLoaded(SnappyCodec.java:63)
 [2019-01-23 19:52:33.081429]  at 
 

Re: `env.java.opts` not persisting after job canceled or failed and then restarted

2019-01-24 Thread Aaron Levin
Hi Ufuk,

I'm starting to believe the bug is much deeper than the originally reported
error because putting the libraries in `/usr/lib` or `/lib` does not work.
This morning I dug into why putting `libhadoop.so` into `/usr/lib` didn't
work, despite that being in the `java.library.path` at the call site of the
error. I wrote a small program to test the loading of native libraries, and
it was able to successfully load `libhadoop.so`. I'm very perplexed. Could
this be related to the way flink shades hadoop stuff?

Here is my program and its output:

```
$ cat LibTest.scala
package com.redacted.flink

object LibTest {
  def main(args: Array[String]): Unit = {
val library = args(0)

System.out.println(s"java.library.path=${System.getProperty("java.library.path")}")
System.out.println(s"Attempting to load $library")
System.out.flush()
System.loadLibrary(library)
System.out.println(s"Successfully loaded ")
System.out.flush()
}
```

I then tried running that on one of the task managers with `hadoop` as an
argument:

```
$ java -jar lib_test_deploy.jar hadoop
java.library.path=/usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib
Attempting to load hadoop
Exception in thread "main" java.lang.UnsatisfiedLinkError: no hadoop in
java.library.path
at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1867)
at java.lang.Runtime.loadLibrary0(Runtime.java:870)
at java.lang.System.loadLibrary(System.java:1122)
at com.stripe.flink.LibTest$.main(LibTest.scala:11)
at com.stripe.flink.LibTest.main(LibTest.scala)
```

I then copied the native libraries into `/usr/lib/` and ran it again:

```
$ sudo cp /usr/local/hadoop/lib/native/libhadoop.so /usr/lib/
$ java -jar lib_test_deploy.jar hadoop
java.library.path=/usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib
Attempting to load hadoop
Successfully loaded
```

Any ideas?

On Wed, Jan 23, 2019 at 7:13 PM Aaron Levin  wrote:

> Hi Ufuk,
>
> One more update: I tried copying all the hadoop native `.so` files (mainly
> `libhadoop.so`) into `/lib` and am I still experiencing the issue I
> reported. I also tried naively adding the `.so` files to the jar with the
> flink application and am still experiencing the issue I reported (however,
> I'm going to investigate this further as I might not have done it
> correctly).
>
> Best,
>
> Aaron Levin
>
> On Wed, Jan 23, 2019 at 3:18 PM Aaron Levin  wrote:
>
>> Hi Ufuk,
>>
>> Two updates:
>>
>> 1. As suggested in the ticket, I naively copied the every `.so` in
>> `hadoop-3.0.0/lib/native/` into `/lib/` and this did not seem to help. My
>> knowledge of how shared libs get picked up is hazy, so I'm not sure if
>> blindly copying them like that should work. I did check what
>> `System.getProperty("java.library.path")` returns at the call-site and
>> it's: 
>> java.library.path=/usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib
>> 2. The exception I see comes from
>> `hadoop.util.NativeCodeLoader.buildSupportsSnappy` (stack-trace below).
>> This uses `System.loadLibrary("hadoop")`.
>>
>> [2019-01-23 19:52:33.081216] java.lang.UnsatisfiedLinkError:
>> org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy()Z
>> [2019-01-23 19:52:33.081376]  at
>> org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy(Native Method)
>> [2019-01-23 19:52:33.081406]  at
>> org.apache.hadoop.io.compress.SnappyCodec.checkNativeCodeLoaded(SnappyCodec.java:63)
>> [2019-01-23 19:52:33.081429]  at
>> org.apache.hadoop.io.compress.SnappyCodec.getDecompressorType(SnappyCodec.java:195)
>> [2019-01-23 19:52:33.081457]  at
>> org.apache.hadoop.io.compress.CodecPool.getDecompressor(CodecPool.java:181)
>> [2019-01-23 19:52:33.081494]  at
>> org.apache.hadoop.io.SequenceFile$Reader.init(SequenceFile.java:2037)
>> [2019-01-23 19:52:33.081517]  at
>> org.apache.hadoop.io.SequenceFile$Reader.initialize(SequenceFile.java:1923)
>> [2019-01-23 19:52:33.081549]  at
>> org.apache.hadoop.io.SequenceFile$Reader.(SequenceFile.java:1872)
>> ... (redacted) ...
>> [2019-01-23 19:52:33.081728]  at
>> scala.collection.immutable.List.foreach(List.scala:392)
>> ... (redacted) ...
>> [2019-01-23 19:52:33.081832]  at
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94)
>> [2019-01-23 19:52:33.081854]  at
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
>> [2019-01-23 19:52:33.081882]  at
>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
>> [2019-01-23 19:52:33.081904]  at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>> [2019-01-23 19:52:33.081946]  at
>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>> [2019-01-23 19:52:33.081967]  at java.lang.Thread.run(Thread.java:748)

Re: `env.java.opts` not persisting after job canceled or failed and then restarted

2019-01-23 Thread Aaron Levin
Hi Ufuk,

One more update: I tried copying all the hadoop native `.so` files (mainly
`libhadoop.so`) into `/lib` and am I still experiencing the issue I
reported. I also tried naively adding the `.so` files to the jar with the
flink application and am still experiencing the issue I reported (however,
I'm going to investigate this further as I might not have done it
correctly).

Best,

Aaron Levin

On Wed, Jan 23, 2019 at 3:18 PM Aaron Levin  wrote:

> Hi Ufuk,
>
> Two updates:
>
> 1. As suggested in the ticket, I naively copied the every `.so` in
> `hadoop-3.0.0/lib/native/` into `/lib/` and this did not seem to help. My
> knowledge of how shared libs get picked up is hazy, so I'm not sure if
> blindly copying them like that should work. I did check what
> `System.getProperty("java.library.path")` returns at the call-site and
> it's: 
> java.library.path=/usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib
> 2. The exception I see comes from
> `hadoop.util.NativeCodeLoader.buildSupportsSnappy` (stack-trace below).
> This uses `System.loadLibrary("hadoop")`.
>
> [2019-01-23 19:52:33.081216] java.lang.UnsatisfiedLinkError:
> org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy()Z
> [2019-01-23 19:52:33.081376]  at
> org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy(Native Method)
> [2019-01-23 19:52:33.081406]  at
> org.apache.hadoop.io.compress.SnappyCodec.checkNativeCodeLoaded(SnappyCodec.java:63)
> [2019-01-23 19:52:33.081429]  at
> org.apache.hadoop.io.compress.SnappyCodec.getDecompressorType(SnappyCodec.java:195)
> [2019-01-23 19:52:33.081457]  at
> org.apache.hadoop.io.compress.CodecPool.getDecompressor(CodecPool.java:181)
> [2019-01-23 19:52:33.081494]  at
> org.apache.hadoop.io.SequenceFile$Reader.init(SequenceFile.java:2037)
> [2019-01-23 19:52:33.081517]  at
> org.apache.hadoop.io.SequenceFile$Reader.initialize(SequenceFile.java:1923)
> [2019-01-23 19:52:33.081549]  at
> org.apache.hadoop.io.SequenceFile$Reader.(SequenceFile.java:1872)
> ... (redacted) ...
> [2019-01-23 19:52:33.081728]  at
> scala.collection.immutable.List.foreach(List.scala:392)
> ... (redacted) ...
> [2019-01-23 19:52:33.081832]  at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94)
> [2019-01-23 19:52:33.081854]  at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
> [2019-01-23 19:52:33.081882]  at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
> [2019-01-23 19:52:33.081904]  at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
> [2019-01-23 19:52:33.081946]  at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
> [2019-01-23 19:52:33.081967]  at java.lang.Thread.run(Thread.java:748)
>
> On Tue, Jan 22, 2019 at 2:31 PM Aaron Levin  wrote:
>
>> Hey Ufuk,
>>
>> So, I looked into this a little bit:
>>
>> 1. clarification: my issues are with the hadoop-related snappy libraries
>> and not libsnappy itself (this is my bad for not being clearer, sorry!). I
>> already have `libsnappy` on my classpath, but I am looking into including
>> the hadoop snappy libraries.
>> 2. exception: I don't see the class loading error. I'm going to try to
>> put some more instrumentation and see if I can get a clearer stacktrace
>> (right now I get an NPE on closing a sequence file in a finalizer - when I
>> last logged the exception it was something deep in hadoop's snappy libs -
>> I'll get clarification soon).
>> 3. I'm looking into including hadoop's snappy libs in my jar and we'll
>> see if that resolves the problem.
>>
>> Thanks again for your help!
>>
>> Best,
>>
>> Aaron Levin
>>
>> On Tue, Jan 22, 2019 at 10:47 AM Aaron Levin 
>> wrote:
>>
>>> Hey,
>>>
>>> Thanks so much for the help! This is awesome. I'll start looking into
>>> all of this right away and report back.
>>>
>>> Best,
>>>
>>> Aaron Levin
>>>
>>> On Mon, Jan 21, 2019 at 5:16 PM Ufuk Celebi  wrote:
>>>
 Hey Aaron,

 sorry for the late reply.

 (1) I think I was able to reproduce this issue using snappy-java. I've
 filed a ticket here:
 https://issues.apache.org/jira/browse/FLINK-11402. Can you check the
 ticket description whether it's in line with what you are
 experiencing? Most importantly, do you see the same Exception being
 reported after cancelling and re-starting the job?

 (2) I don't think it's caused by the environment options not being
 picked up. You can check the head of the log files of the JobManager
 or TaskManager to verify that your provided option is picked up as
 expected. You should see something similar to this:

 2019-01-21 22:53:49,863 INFO
 org.apache.flink.runtime.entrypoint.ClusterEntrypoint -

 
 2019-01-21 22:53:49,864 INFO
 

Re: `env.java.opts` not persisting after job canceled or failed and then restarted

2019-01-23 Thread Aaron Levin
Hi Ufuk,

Two updates:

1. As suggested in the ticket, I naively copied the every `.so` in
`hadoop-3.0.0/lib/native/` into `/lib/` and this did not seem to help. My
knowledge of how shared libs get picked up is hazy, so I'm not sure if
blindly copying them like that should work. I did check what
`System.getProperty("java.library.path")` returns at the call-site and
it's: 
java.library.path=/usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib
2. The exception I see comes from
`hadoop.util.NativeCodeLoader.buildSupportsSnappy` (stack-trace below).
This uses `System.loadLibrary("hadoop")`.

[2019-01-23 19:52:33.081216] java.lang.UnsatisfiedLinkError:
org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy()Z
[2019-01-23 19:52:33.081376]  at
org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy(Native Method)
[2019-01-23 19:52:33.081406]  at
org.apache.hadoop.io.compress.SnappyCodec.checkNativeCodeLoaded(SnappyCodec.java:63)
[2019-01-23 19:52:33.081429]  at
org.apache.hadoop.io.compress.SnappyCodec.getDecompressorType(SnappyCodec.java:195)
[2019-01-23 19:52:33.081457]  at
org.apache.hadoop.io.compress.CodecPool.getDecompressor(CodecPool.java:181)
[2019-01-23 19:52:33.081494]  at
org.apache.hadoop.io.SequenceFile$Reader.init(SequenceFile.java:2037)
[2019-01-23 19:52:33.081517]  at
org.apache.hadoop.io.SequenceFile$Reader.initialize(SequenceFile.java:1923)
[2019-01-23 19:52:33.081549]  at
org.apache.hadoop.io.SequenceFile$Reader.(SequenceFile.java:1872)
... (redacted) ...
[2019-01-23 19:52:33.081728]  at
scala.collection.immutable.List.foreach(List.scala:392)
... (redacted) ...
[2019-01-23 19:52:33.081832]  at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94)
[2019-01-23 19:52:33.081854]  at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
[2019-01-23 19:52:33.081882]  at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
[2019-01-23 19:52:33.081904]  at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
[2019-01-23 19:52:33.081946]  at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
[2019-01-23 19:52:33.081967]  at java.lang.Thread.run(Thread.java:748)

On Tue, Jan 22, 2019 at 2:31 PM Aaron Levin  wrote:

> Hey Ufuk,
>
> So, I looked into this a little bit:
>
> 1. clarification: my issues are with the hadoop-related snappy libraries
> and not libsnappy itself (this is my bad for not being clearer, sorry!). I
> already have `libsnappy` on my classpath, but I am looking into including
> the hadoop snappy libraries.
> 2. exception: I don't see the class loading error. I'm going to try to put
> some more instrumentation and see if I can get a clearer stacktrace (right
> now I get an NPE on closing a sequence file in a finalizer - when I last
> logged the exception it was something deep in hadoop's snappy libs - I'll
> get clarification soon).
> 3. I'm looking into including hadoop's snappy libs in my jar and we'll see
> if that resolves the problem.
>
> Thanks again for your help!
>
> Best,
>
> Aaron Levin
>
> On Tue, Jan 22, 2019 at 10:47 AM Aaron Levin 
> wrote:
>
>> Hey,
>>
>> Thanks so much for the help! This is awesome. I'll start looking into all
>> of this right away and report back.
>>
>> Best,
>>
>> Aaron Levin
>>
>> On Mon, Jan 21, 2019 at 5:16 PM Ufuk Celebi  wrote:
>>
>>> Hey Aaron,
>>>
>>> sorry for the late reply.
>>>
>>> (1) I think I was able to reproduce this issue using snappy-java. I've
>>> filed a ticket here:
>>> https://issues.apache.org/jira/browse/FLINK-11402. Can you check the
>>> ticket description whether it's in line with what you are
>>> experiencing? Most importantly, do you see the same Exception being
>>> reported after cancelling and re-starting the job?
>>>
>>> (2) I don't think it's caused by the environment options not being
>>> picked up. You can check the head of the log files of the JobManager
>>> or TaskManager to verify that your provided option is picked up as
>>> expected. You should see something similar to this:
>>>
>>> 2019-01-21 22:53:49,863 INFO
>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
>>>
>>> 
>>> 2019-01-21 22:53:49,864 INFO
>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
>>> Starting StandaloneSessionClusterEntrypoint (Version: 1.7.0,
>>> Rev:49da9f9, Date:28.11.2018 @ 17:59:06 UTC)
>>> ...
>>> 2019-01-21 22:53:49,865 INFO
>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -  JVM
>>> Options:
>>> 2019-01-21 22:53:49,865 INFO
>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
>>> -Xms1024m
>>> 2019-01-21 22:53:49,865 INFO
>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
>>> -Xmx1024m
>>> You are looking for this line > 2019-01-21 22:53:49,865 INFO
>>> 

Re: `env.java.opts` not persisting after job canceled or failed and then restarted

2019-01-22 Thread Aaron Levin
Hey Ufuk,

So, I looked into this a little bit:

1. clarification: my issues are with the hadoop-related snappy libraries
and not libsnappy itself (this is my bad for not being clearer, sorry!). I
already have `libsnappy` on my classpath, but I am looking into including
the hadoop snappy libraries.
2. exception: I don't see the class loading error. I'm going to try to put
some more instrumentation and see if I can get a clearer stacktrace (right
now I get an NPE on closing a sequence file in a finalizer - when I last
logged the exception it was something deep in hadoop's snappy libs - I'll
get clarification soon).
3. I'm looking into including hadoop's snappy libs in my jar and we'll see
if that resolves the problem.

Thanks again for your help!

Best,

Aaron Levin

On Tue, Jan 22, 2019 at 10:47 AM Aaron Levin  wrote:

> Hey,
>
> Thanks so much for the help! This is awesome. I'll start looking into all
> of this right away and report back.
>
> Best,
>
> Aaron Levin
>
> On Mon, Jan 21, 2019 at 5:16 PM Ufuk Celebi  wrote:
>
>> Hey Aaron,
>>
>> sorry for the late reply.
>>
>> (1) I think I was able to reproduce this issue using snappy-java. I've
>> filed a ticket here:
>> https://issues.apache.org/jira/browse/FLINK-11402. Can you check the
>> ticket description whether it's in line with what you are
>> experiencing? Most importantly, do you see the same Exception being
>> reported after cancelling and re-starting the job?
>>
>> (2) I don't think it's caused by the environment options not being
>> picked up. You can check the head of the log files of the JobManager
>> or TaskManager to verify that your provided option is picked up as
>> expected. You should see something similar to this:
>>
>> 2019-01-21 22:53:49,863 INFO
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
>>
>> 
>> 2019-01-21 22:53:49,864 INFO
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
>> Starting StandaloneSessionClusterEntrypoint (Version: 1.7.0,
>> Rev:49da9f9, Date:28.11.2018 @ 17:59:06 UTC)
>> ...
>> 2019-01-21 22:53:49,865 INFO
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -  JVM
>> Options:
>> 2019-01-21 22:53:49,865 INFO
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
>> -Xms1024m
>> 2019-01-21 22:53:49,865 INFO
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
>> -Xmx1024m
>> You are looking for this line > 2019-01-21 22:53:49,865 INFO
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
>> -Djava.library.path=/.../org/xerial/snappy/native/Mac/x86_64/ <
>> 2019-01-21 22:53:49,865 INFO
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
>> -Dlog.file=/.../flink-1.7.0/log/flink-standalonesession-0.local.log
>> ...
>> 2019-01-21 22:53:49,866 INFO
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
>> Program Arguments:
>> 2019-01-21 22:53:49,866 INFO
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
>> --configDir
>> 2019-01-21 22:53:49,866 INFO
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
>> /.../flink-1.7.0/conf
>> 2019-01-21 22:53:49,866 INFO
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
>> --executionMode
>> 2019-01-21 22:53:49,866 INFO
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
>> cluster
>> ...
>> 2019-01-21 22:53:49,866 INFO
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
>>
>> 
>>
>> Can you verify that you see the log messages as expected?
>>
>> (3) As noted FLINK-11402, is it possible to package the snappy library
>> as part of your user code instead of loading the library via
>> java.library.path? In my example, that seems to work fine.
>>
>> – Ufuk
>>
>> On Thu, Jan 17, 2019 at 5:53 PM Aaron Levin 
>> wrote:
>> >
>> > Hello!
>> >
>> > *tl;dr*: settings in `env.java.opts` seem to stop having impact when a
>> job is canceled or fails and then is restarted (with or without
>> savepoint/checkpoints). If I restart the task-managers, the `env.java.opts`
>> seem to start having impact again and our job will run without failure.
>> More below.
>> >
>> > We use consume Snappy-compressed sequence files in our flink job. This
>> requires access to the hadoop native libraries. In our `flink-conf.yaml`
>> for both the task manager and the job manager, we put:
>> >
>> > ```
>> > env.java.opts: -Djava.library.path=/usr/local/hadoop/lib/native
>> > ```
>> >
>> > If I launch our job on freshly-restarted task managers, the job
>> operates fine. If at some point I cancel the job or if the job restarts for
>> some other reason, the job will begin to crashloop because it tries to open
>> a Snappy-compressed file but doesn't have access to the codec from the
>> native hadoop libraries in `/usr/local/hadoop/lib/native`. If I then
>> restart the 

Re: `env.java.opts` not persisting after job canceled or failed and then restarted

2019-01-22 Thread Aaron Levin
Hey,

Thanks so much for the help! This is awesome. I'll start looking into all
of this right away and report back.

Best,

Aaron Levin

On Mon, Jan 21, 2019 at 5:16 PM Ufuk Celebi  wrote:

> Hey Aaron,
>
> sorry for the late reply.
>
> (1) I think I was able to reproduce this issue using snappy-java. I've
> filed a ticket here:
> https://issues.apache.org/jira/browse/FLINK-11402. Can you check the
> ticket description whether it's in line with what you are
> experiencing? Most importantly, do you see the same Exception being
> reported after cancelling and re-starting the job?
>
> (2) I don't think it's caused by the environment options not being
> picked up. You can check the head of the log files of the JobManager
> or TaskManager to verify that your provided option is picked up as
> expected. You should see something similar to this:
>
> 2019-01-21 22:53:49,863 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
>
> 
> 2019-01-21 22:53:49,864 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
> Starting StandaloneSessionClusterEntrypoint (Version: 1.7.0,
> Rev:49da9f9, Date:28.11.2018 @ 17:59:06 UTC)
> ...
> 2019-01-21 22:53:49,865 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -  JVM
> Options:
> 2019-01-21 22:53:49,865 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
> -Xms1024m
> 2019-01-21 22:53:49,865 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
> -Xmx1024m
> You are looking for this line > 2019-01-21 22:53:49,865 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
> -Djava.library.path=/.../org/xerial/snappy/native/Mac/x86_64/ <
> 2019-01-21 22:53:49,865 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
> -Dlog.file=/.../flink-1.7.0/log/flink-standalonesession-0.local.log
> ...
> 2019-01-21 22:53:49,866 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
> Program Arguments:
> 2019-01-21 22:53:49,866 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
> --configDir
> 2019-01-21 22:53:49,866 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
> /.../flink-1.7.0/conf
> 2019-01-21 22:53:49,866 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
> --executionMode
> 2019-01-21 22:53:49,866 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
> cluster
> ...
> 2019-01-21 22:53:49,866 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
>
> 
>
> Can you verify that you see the log messages as expected?
>
> (3) As noted FLINK-11402, is it possible to package the snappy library
> as part of your user code instead of loading the library via
> java.library.path? In my example, that seems to work fine.
>
> – Ufuk
>
> On Thu, Jan 17, 2019 at 5:53 PM Aaron Levin  wrote:
> >
> > Hello!
> >
> > *tl;dr*: settings in `env.java.opts` seem to stop having impact when a
> job is canceled or fails and then is restarted (with or without
> savepoint/checkpoints). If I restart the task-managers, the `env.java.opts`
> seem to start having impact again and our job will run without failure.
> More below.
> >
> > We use consume Snappy-compressed sequence files in our flink job. This
> requires access to the hadoop native libraries. In our `flink-conf.yaml`
> for both the task manager and the job manager, we put:
> >
> > ```
> > env.java.opts: -Djava.library.path=/usr/local/hadoop/lib/native
> > ```
> >
> > If I launch our job on freshly-restarted task managers, the job operates
> fine. If at some point I cancel the job or if the job restarts for some
> other reason, the job will begin to crashloop because it tries to open a
> Snappy-compressed file but doesn't have access to the codec from the native
> hadoop libraries in `/usr/local/hadoop/lib/native`. If I then restart the
> task manager while the job is crashlooping, the job is start running
> without any codec failures.
> >
> > The only reason I can conjure that would cause the Snappy compression to
> fail is if the `env.java.opts` were not being passed through to the job on
> restart for some reason.
> >
> > Does anyone know what's going on? Am I missing some additional
> configuration? I really appreciate any help!
> >
> > About our setup:
> >
> > - Flink Version: 1.7.0
> > - Deployment: Standalone in HA
> > - Hadoop/S3 setup: we do *not* set `HADOOP_CLASSPATH`. We use Flink’s
> shaded jars to access our files in S3. We do not use the
> `bundled-with-hadoop` distribution of Flink.
> >
> > Best,
> >
> > Aaron Levin
>


Re: `env.java.opts` not persisting after job canceled or failed and then restarted

2019-01-21 Thread Ufuk Celebi
Hey Aaron,

sorry for the late reply.

(1) I think I was able to reproduce this issue using snappy-java. I've
filed a ticket here:
https://issues.apache.org/jira/browse/FLINK-11402. Can you check the
ticket description whether it's in line with what you are
experiencing? Most importantly, do you see the same Exception being
reported after cancelling and re-starting the job?

(2) I don't think it's caused by the environment options not being
picked up. You can check the head of the log files of the JobManager
or TaskManager to verify that your provided option is picked up as
expected. You should see something similar to this:

2019-01-21 22:53:49,863 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint -

2019-01-21 22:53:49,864 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
Starting StandaloneSessionClusterEntrypoint (Version: 1.7.0,
Rev:49da9f9, Date:28.11.2018 @ 17:59:06 UTC)
...
2019-01-21 22:53:49,865 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint -  JVM
Options:
2019-01-21 22:53:49,865 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
-Xms1024m
2019-01-21 22:53:49,865 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
-Xmx1024m
You are looking for this line > 2019-01-21 22:53:49,865 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
-Djava.library.path=/.../org/xerial/snappy/native/Mac/x86_64/ <
2019-01-21 22:53:49,865 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
-Dlog.file=/.../flink-1.7.0/log/flink-standalonesession-0.local.log
...
2019-01-21 22:53:49,866 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
Program Arguments:
2019-01-21 22:53:49,866 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
--configDir
2019-01-21 22:53:49,866 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
/.../flink-1.7.0/conf
2019-01-21 22:53:49,866 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
--executionMode
2019-01-21 22:53:49,866 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
cluster
...
2019-01-21 22:53:49,866 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint -


Can you verify that you see the log messages as expected?

(3) As noted FLINK-11402, is it possible to package the snappy library
as part of your user code instead of loading the library via
java.library.path? In my example, that seems to work fine.

– Ufuk

On Thu, Jan 17, 2019 at 5:53 PM Aaron Levin  wrote:
>
> Hello!
>
> *tl;dr*: settings in `env.java.opts` seem to stop having impact when a job is 
> canceled or fails and then is restarted (with or without 
> savepoint/checkpoints). If I restart the task-managers, the `env.java.opts` 
> seem to start having impact again and our job will run without failure. More 
> below.
>
> We use consume Snappy-compressed sequence files in our flink job. This 
> requires access to the hadoop native libraries. In our `flink-conf.yaml` for 
> both the task manager and the job manager, we put:
>
> ```
> env.java.opts: -Djava.library.path=/usr/local/hadoop/lib/native
> ```
>
> If I launch our job on freshly-restarted task managers, the job operates 
> fine. If at some point I cancel the job or if the job restarts for some other 
> reason, the job will begin to crashloop because it tries to open a 
> Snappy-compressed file but doesn't have access to the codec from the native 
> hadoop libraries in `/usr/local/hadoop/lib/native`. If I then restart the 
> task manager while the job is crashlooping, the job is start running without 
> any codec failures.
>
> The only reason I can conjure that would cause the Snappy compression to fail 
> is if the `env.java.opts` were not being passed through to the job on restart 
> for some reason.
>
> Does anyone know what's going on? Am I missing some additional configuration? 
> I really appreciate any help!
>
> About our setup:
>
> - Flink Version: 1.7.0
> - Deployment: Standalone in HA
> - Hadoop/S3 setup: we do *not* set `HADOOP_CLASSPATH`. We use Flink’s shaded 
> jars to access our files in S3. We do not use the `bundled-with-hadoop` 
> distribution of Flink.
>
> Best,
>
> Aaron Levin