As another data point, here’s an except from a stack dump for the task manager:

"heartbeat-filter -> input-trace-filter -> filter-inactive-ds -> 
filter-duplicates (5/10)-EventThread" #94 daemon prio=5 os_prio=0 
tid=0x00007f48c04d4800 nid=
0x68ef waiting on condition [0x00007f48470eb000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00000000cd6121c0> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at 
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at 
com.intellify.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:501)

   Locked ownable synchronizers:
        - None

"heartbeat-filter -> input-trace-filter -> filter-inactive-ds -> 
filter-duplicates (ip-10-80-52-23.us-west-2.compute.internal:2181)" #93 daemon 
prio=5 os_prio=0 tid=0x00007f48c04e1800 nid=0x68ee waiting on condition 
[0x00007f48471ec000]
   java.lang.Thread.State: TIMED_WAITING (sleeping)
        at java.lang.Thread.sleep(Native Method)
        at 
com.intellify.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1051)

   Locked ownable synchronizers:
        - None

"Sink: ES (5/10)-EventThread" #68 daemon prio=5 os_prio=0 
tid=0x00007f48c80a4800 nid=0x6829 waiting on condition [0x00007f4851e08000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00000000cc4f7950> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at 
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at 
com.intellify.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:501)

   Locked ownable synchronizers:
        - None

"Sink: ES (ip-10-80-53-99.us-west-2.compute.internal:2181)" #67 daemon prio=5 
os_prio=0 tid=0x00007f48c80aa000 nid=0x6827 runnable [0x00007f4851f09000]
   java.lang.Thread.State: RUNNABLE
        at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
        at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
        at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
        - locked <0x00000000cc4f7c18> (a sun.nio.ch.Util$3)
        - locked <0x00000000cc4f7c08> (a java.util.Collections$UnmodifiableSet)
        - locked <0x00000000cc4f7bc0> (a sun.nio.ch.EPollSelectorImpl)
        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
        at 
com.intellify.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:349)
        at 
com.intellify.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1141)

   Locked ownable synchronizers:
        - None

--
Jared Stehler
Chief Architect - Intellify Learning
o: 617.701.6330 x703



> On Jan 11, 2018, at 10:07 AM, Jared Stehler 
> <jared.steh...@intellifylearning.com> wrote:
> 
> I’m seeing sporadic issues where it appears that curator (or other) user 
> threads are left running after a stream shutdown, and then the user class 
> loader goes away and I get spammed with ClassNotFoundExceptions… I’m 
> wondering if this might have something to do with perhaps the UserClassLoader 
> being shut down before close is invoked on all operators?
> 
> Here’s a stack trace I see from an attempt at closing an elastic search sink:
> 
> java.lang.ClassNotFoundException: 
> com.intellify.flink.shaded.curator.org.apache.curator.utils.CloseableUtils
>     at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>     at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>     at 
> org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:128)
>     at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>     at 
> com.intellify.flink.shaded.curator.org.apache.curator.ConnectionState.close(ConnectionState.java:119)
>     at 
> com.intellify.flink.shaded.curator.org.apache.curator.CuratorZookeeperClient.close(CuratorZookeeperClient.java:227)
>     at 
> com.intellify.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl.close(CuratorFrameworkImpl.java:381)
>     at 
> com.intellify.config.ManagedCuratorFramework.reallyClose(ManagedCuratorFramework.java:48)
>     at 
> com.intellify.config.ArchaiusInitializer.close(ArchaiusInitializer.java:75)
>     at org.apache.commons.io.IOUtils.closeQuietly(IOUtils.java:303)
>     at 
> com.intellify.flink.shared.config.SharedIntellifyConfigProvider.close(SharedIntellifyConfigProvider.java:56)
>     at 
> com.intellify.flink.shared.config.SerializableLiveProperty.close(SerializableLiveProperty.java:68)
>     at 
> com.intellify.flink.shared.elasticsearch.LiveResolvingEs1ApiCallBridge.cleanup(LiveResolvingEs1ApiCallBridge.java:105)
>     at 
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.close(ElasticsearchSinkBase.java:323)
>     at 
> com.intellify.flink.shared.tracer.TracingSink.close(TracingSink.java:50)
>     at 
> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
>     at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:446)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:351)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>     at java.lang.Thread.run(Thread.java:748)
> I’m using a curator connection for archaius, and closing it in the call 
> bridge’s cleanup method. I’m ensuring that I’m not reaching up into the 
> parent class loader by shading curator and zookeeper. 
> 
> I also see the following on repeat in my task manager log:
> 
> 2018-01-11 14:53:13.313 [heartbeat-filter -> input-trace-filter -> 
> filter-inactive-ds -> filter-duplicates (ip-10-80-53-99.us 
> <http://ip-10-80-53-99.us/>-west-2.compute.internal:2181)] WARN  
> c.i.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Session 
> 0x3c00002d8a7603de for server ip-10-80-53-99.us 
> <http://ip-10-80-53-99.us/>-west-2.compute.internal/10.80.53.99:2181, 
> unexpected error, closing socket connection and attempting reconnect
> java.lang.NoClassDefFoundError: 
> com/intellify/flink/shaded/zookeeper/org/apache/zookeeper/proto/SetWatches
>       at 
> com.intellify.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn$SendThread.primeConnection(ClientCnxn.java:926)
>       at 
> com.intellify.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:363)
>       at 
> com.intellify.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1141)
> 
> 
> Does anyone have any insight into what might be happening here? Does this 
> seem like I’m not closing a thread properly, or something else entirely?
> 
> 
> --
> Jared Stehler
> Chief Architect - Intellify Learning
> o: 617.701.6330 x703
> 
> 
> 

Reply via email to