Thank you Stephan for spotting the problem. In hindsight it’s obvious that this 
can never work. I’ll figure something out :)

> Am 24.03.2017 um 10:28 schrieb Stephan Ewen <se...@apache.org>:
> 
> The code will not work properly, sorry.
> 
> The value returned by the state is whatever is stored under the key for which 
> the function was called the last time.
> In addition, the unsynchronized access is most likely causing the RocksDB 
> fault.
> 
> TL:DR - The "ValueState" / "ListState" / etc in Flink are not intended to be 
> used across threads.
> 
> 
> 
> On Fri, Mar 24, 2017 at 8:28 AM, Florian König <florian.koe...@micardo.com> 
> wrote:
> Hi,
> 
> @Robert: I have uploaded all the log files that I could get my hands on to 
> https://www.dropbox.com/sh/l35q6979hy7mue7/AAAe1gABW59eQt6jGxA3pAYaa?dl=0. I 
> tried to remove all unrelated messages logged by the job itself. In 
> flink-root-jobmanager-0-micardo-dev.log I kept the Flink startup messages and 
> the last half hour before the segfault.
> 
> @Stefan: Your theory could be the key. In the stack trace I see a call to 
> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge 
> that results in a call to 
> com.micardo.backend.tail.TransitionProcessor$2.getValue()Ljava/lang/Long 
> later. Here’s a slimmed-down snippet of the relevant code:
> 
> class TransitionProcessor extends RichFlatMapFunction<Transition, Reaction> {
> 
>         transient ValueState<IdSet> headSeenState;
> 
>         public void open(final Configuration parameters) throws Exception {
>                 headSeenState = FlinkUtil.getStateHandle(this, "head-seen", 
> IdSet.class);
> 
>                 getRuntimeContext()
>                         .getMetricGroup()
>                         .gauge("head-seen", new Gauge<Long>() {
>                                 public Long getValue() {
>                                         try {
>                                                 return 
> headSeenState.value().count();
>                                         } catch (IOException e) {
>                                                 e.printStackTrace();
>                                                 return 0L;
>                                         }
>                                 }
>                         });
>         }
> …
> }
> 
> FlinkUtil.getStateHandle instantiates a ValueStateDescriptor and acquires a 
> reference to that state via the RuntimeContext of the RichFunction passed as 
> ‚this‘ in the above code.
> 
> Further along in the stack trace I see that headSeenState.value() results in 
> a call to org.apache.flink.contrib.streaming.state.RocksDBValueState.value() 
> and then to org.rocksdb.RocksDB.get(J[BIJ).
> 
> It looks like part of the metrics system asynchronously reads the value of 
> the gauge and needs RocksDB for that. Is it possible that this thread does 
> not hold the checkpointing lock you were talking about?
> 
> Best regards
> Florian
> 
> 
> > Am 22.03.2017 um 18:19 schrieb Stefan Richter <s.rich...@data-artisans.com>:
> >
> > Hi,
> >
> > for the first checkpoint, from the stacktrace I assume that the backend is 
> > not accessed as part of processing an element, but by another thread. Is 
> > that correct? RocksDB requires accessing threads to hold the task’s 
> > checkpointing lock, otherwise they might call methods on an instance that 
> > is already disposed. However, this should only happen when the task was 
> > already about to shutdown anyways. Is that a plausible explanation for your 
> > observed behaviour? I can also not rule out that segfaults can happen 
> > inside RocksDB or due to the JNI bridge.
> >
> > Best,
> > Stefan
> >
> >> Am 22.03.2017 um 16:53 schrieb Florian König <florian.koe...@micardo.com>:
> >>
> >> Hi Stephen,
> >>
> >> you are right, the second stack trace is indeed from a run of Flink 1.1.4. 
> >> Sorry, my bad.
> >>
> >> That leaves us with the first trace of a segfault for which I can 
> >> guarantee that it brought down a 1.2.0 instance. Unfortunately I cannot 
> >> reproduce the problem. It has happened twice so far, but I can’t see any 
> >> pattern. Is there anything in the stack trace that could point us to a 
> >> probable cause?
> >>
> >> Florian
> >>
> >>> Am 22.03.2017 um 16:00 schrieb Stephan Ewen <se...@apache.org>:
> >>>
> >>> Hi!
> >>>
> >>> It looks like you are running the RocksDB state backend 1.1 (is still an 
> >>> old version packaged into your JAR file?)
> >>>
> >>> This line indicates that: 
> >>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.performSemiAsyncSnapshot
> >>>  (the method does not exist in 1.2 any more)
> >>>
> >>> Can you try and run 1.2 and see if that still occurs? In general, I 
> >>> cannot vouch 100% for RocksDBs JNI API, but in our 1.2 tests so far it 
> >>> was stable.
> >>>
> >>> Stephan
> >>>
> >>>
> >>>
> >>> On Wed, Mar 22, 2017 at 3:13 PM, Florian König 
> >>> <florian.koe...@micardo.com> wrote:
> >>> Hi,
> >>>
> >>> I have experienced two crashes of Flink 1.2 by SIGSEGV, caused by 
> >>> something in RocksDB. What is the preferred way to report them? All I got 
> >>> at the moment are two hs_err_pid12345.log files. They are over 4000 lines 
> >>> long each. Is there anything significant that I should extract to help 
> >>> you guys and/or put into a JIRA ticket?
> >>>
> >>> The first thing that came to my mind was the stack traces (see below). 
> >>> Anything else?
> >>>
> >>> Thanks
> >>> Florian
> >>>
> >>> ----
> >>>
> >>> Stack: [0x00007fec04341000,0x00007fec04442000],  sp=0x00007fec0443ff48,  
> >>> free space=1019k
> >>> Java frames: (J=compiled Java code, j=interpreted, Vv=VM code)
> >>> J 10252  org.rocksdb.RocksDB.get(J[BIJ)[B (0 bytes) @ 0x00007fec925887cc 
> >>> [0x00007fec92588780+0x4c]
> >>> J 27241 C2 
> >>> org.apache.flink.contrib.streaming.state.RocksDBValueState.value()Ljava/lang/Object;
> >>>  (78 bytes) @ 0x00007fec94010ca4 [0x00007fec940109c0+0x2e4]
> >>> j  com.micardo.backend.TransitionProcessor$2.getValue()Ljava/lang/Long;+7
> >>> j  
> >>> com.micardo.backend.TransitionProcessor$2.getValue()Ljava/lang/Object;+1
> >>> J 38483 C2 
> >>> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(Ljava/io/DataOutput;Lorg/apache/flink/runtime/metrics/dump/QueryScopeInfo;Ljava/lang/String;Lorg/apache/flink/metrics/Gauge;)V
> >>>  (114 bytes) @ 0x00007fec918eabf0 [0x00007fec918eabc0+0x30]
> >>> J 38522 C2 
> >>> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(Ljava/util/Map;Ljava/util/Map;Ljava/util/Map;Ljava/util/Map;)Lorg/apache/flink/runtime/metrics/dump/MetricDumpSerialization$MetricSerializationResult;
> >>>  (471 bytes) @ 0x00007fec94eb6260 [0x00007fec94eb57a0+0xac0]
> >>> J 47531 C2 
> >>> org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(Ljava/lang/Object;)V
> >>>  (453 bytes) @ 0x00007fec95ca57a0 [0x00007fec95ca4da0+0xa00]
> >>> J 5815 C2 
> >>> akka.actor.UntypedActor.aroundReceive(Lscala/PartialFunction;Ljava/lang/Object;)V
> >>>  (7 bytes) @ 0x00007fec91e3ae6c [0x00007fec91e3adc0+0xac]
> >>> J 5410 C2 akka.actor.ActorCell.invoke(Lakka/dispatch/Envelope;)V (104 
> >>> bytes) @ 0x00007fec91d5bc44 [0x00007fec91d5b9a0+0x2a4]
> >>> J 6628 C2 
> >>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(Lscala/concurrent/forkjoin/ForkJoinTask;)V
> >>>  (60 bytes) @ 0x00007fec9212d050 [0x00007fec9212ccc0+0x390]
> >>> J 40130 C2 scala.concurrent.forkjoin.ForkJoinWorkerThread.run()V (182 
> >>> bytes) @ 0x00007fec923f8170 [0x00007fec923f7fc0+0x1b0]
> >>> v  ~StubRoutines::call_stub
> >>>
> >>> --------------------------------------
> >>>
> >>> Stack: [0x00007f167a5b7000,0x00007f167a6b8000],  sp=0x00007f167a6b5f40,  
> >>> free space=1019k
> >>> Native frames: (J=compiled Java code, j=interpreted, Vv=VM code, C=native 
> >>> code)
> >>> C  [libstdc++.so.6+0xc026b]  std::basic_string<char, 
> >>> std::char_traits<char>, std::allocator<char> >::basic_string(std::string 
> >>> const&)+0xb
> >>> C  [librocksdbjni8426686507832168508.so+0x2f14ca]  
> >>> rocksdb::BackupEngine::Open(rocksdb::Env*, rocksdb::BackupableDBOptions 
> >>> const&, rocksdb::BackupEngine**)+0x3a
> >>> C  [librocksdbjni8426686507832168508.so+0x180ad5]  
> >>> Java_org_rocksdb_BackupEngine_open+0x25
> >>> J 50030  org.rocksdb.BackupEngine.open(JJ)J (0 bytes) @ 
> >>> 0x00007f16cb79aa36 [0x00007f16cb79a980+0xb6]
> >>> J 49809 C1 
> >>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.performSemiAsyncSnapshot(JJ)Ljava/util/HashMap;
> >>>  (416 bytes) @ 0x00007f16cd2733d4 [0x00007f16cd2719a0+0x1a34]
> >>> J 51766 C2 
> >>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.snapshotPartitionedState(JJ)Ljava/util/HashMap;
> >>>  (40 bytes) @ 0x00007f16cb40a1fc [0x00007f16cb40a1a0+0x5c]
> >>> J 50547 C2 
> >>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotOperatorState(JJ)Lorg/apache/flink/streaming/runtime/tasks/StreamTaskState;
> >>>  (206 bytes) @ 0x00007f16cb8be89c [0x00007f16cb8be7e0+0xbc]
> >>> J 52232 C2 
> >>> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(JJ)Z
> >>>  (650 bytes) @ 0x00007f16cbfbbf60 [0x00007f16cbfbb540+0xa20]
> >>> J 52419 C2 
> >>> org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(Lorg/apache/flink/runtime/io/network/api/CheckpointBarrier;)V
> >>>  (25 bytes) @ 0x00007f16cbdd2624 [0x00007f16cbdd25c0+0x64]
> >>> J 41649 C2 
> >>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(Lorg/apache/flink/streaming/api/operators/OneInputStreamOperator;Ljava/lang/Object;)Z
> >>>  (439 bytes) @ 0x00007f16cc8aed5c [0x00007f16cc8add40+0x101c]
> >>> J 33374% C2 
> >>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run()V (42 
> >>> bytes) @ 0x00007f16cbdc21d0 [0x00007f16cbdc20c0+0x110]
> >>> j  org.apache.flink.streaming.runtime.tasks.StreamTask.invoke()V+301
> >>> j  org.apache.flink.runtime.taskmanager.Task.run()V+700
> >>> j  java.lang.Thread.run()V+11
> >>> v  ~StubRoutines::call_stub
> >>> V  [libjvm.so+0x68c616]
> >>> V  [libjvm.so+0x68cb21]
> >>> V  [libjvm.so+0x68cfc7]
> >>> V  [libjvm.so+0x723d80]
> >>> V  [libjvm.so+0xa69dcf]
> >>> V  [libjvm.so+0xa69efc]
> >>> V  [libjvm.so+0x91d9d8]
> >>> C  [libpthread.so.0+0x8064]  start_thread+0xc4
> >>>
> >>> Java frames: (J=compiled Java code, j=interpreted, Vv=VM code)
> >>> J 50030  org.rocksdb.BackupEngine.open(JJ)J (0 bytes) @ 
> >>> 0x00007f16cb79a9c4 [0x00007f16cb79a980+0x44]
> >>> J 49809 C1 
> >>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.performSemiAsyncSnapshot(JJ)Ljava/util/HashMap;
> >>>  (416 bytes) @ 0x00007f16cd2733d4 [0x00007f16cd2719a0+0x1a34]
> >>> J 51766 C2 
> >>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.snapshotPartitionedState(JJ)Ljava/util/HashMap;
> >>>  (40 bytes) @ 0x00007f16cb40a1fc [0x00007f16cb40a1a0+0x5c]
> >>> J 50547 C2 
> >>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotOperatorState(JJ)Lorg/apache/flink/streaming/runtime/tasks/StreamTaskState;
> >>>  (206 bytes) @ 0x00007f16cb8be89c [0x00007f16cb8be7e0+0xbc]
> >>> J 52232 C2 
> >>> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(JJ)Z
> >>>  (650 bytes) @ 0x00007f16cbfbbf60 [0x00007f16cbfbb540+0xa20]
> >>> J 52419 C2 
> >>> org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(Lorg/apache/flink/runtime/io/network/api/CheckpointBarrier;)V
> >>>  (25 bytes) @ 0x00007f16cbdd2624 [0x00007f16cbdd25c0+0x64]
> >>> J 41649 C2 
> >>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(Lorg/apache/flink/streaming/api/operators/OneInputStreamOperator;Ljava/lang/Object;)Z
> >>>  (439 bytes) @ 0x00007f16cc8aed5c [0x00007f16cc8add40+0x101c]
> >>> J 33374% C2 
> >>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run()V (42 
> >>> bytes) @ 0x00007f16cbdc21d0 [0x00007f16cbdc20c0+0x110]
> >>> j  org.apache.flink.streaming.runtime.tasks.StreamTask.invoke()V+301
> >>> j  org.apache.flink.runtime.taskmanager.Task.run()V+700
> >>> j  java.lang.Thread.run()V+11
> >>> v  ~StubRoutines::call_stub
> >>>
> >>>
> >>>
> >>>
> >>
> >>
> >
> 
> 
> 


Reply via email to