Ok I think I have an understanding of what happens - somehow. Flink switched their RocksDB fork in the 1.8 release, this is why the dependency must now be explicitly added to a project. [0] I did both actually, adding this dependency to my projects pom (resulting in beam_pipelines.jar) and to the lib directory of the Flink docker image to execute the pipeline [1]:
FROM flink:1.8.0-scala_2.11 ADD --chown=flink:flink http://central.maven.org/maven2/org/apache/flink/flink-statebackend-rocksdb_2.11/1.8.0/flink-statebackend-rocksdb_2.11-1.8.0.jar /opt/flink/lib/flink-statebackend-rocksdb_2.11-1.8.0.jar ADD --chown=flink:flink target/di-beam-bundled.jar /opt/flink/lib/beam_pipelines.jar Now everything works up the point when I hit the "Stop" button in the Flink web interface. I think the dependency that the Beam Flink Runner has is wrong as Flink switched to FRocksDB in 1.8 [2]. I guess that's why the runner then hits the: java.lang.NoSuchMethodError: org.rocksdb.ColumnFamilyHandle. getDescriptor()Lorg/rocksdb/ColumnFamilyDescriptor; But I might also be wrong, I am still investigating. Best, Tobi [0] https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/state_backends.html#setting-the-per-job-state-backend [1] https://hub.docker.com/_/flink [2] https://ci.apache.org/projects/flink/flink-docs-stable/release-notes/flink-1.8.html#rocksdb-version-bump-and-switch-to-frocksdb-flink-10471 On Tue, Aug 13, 2019 at 2:50 PM Kaymak, Tobias <[email protected]> wrote: > This is a major issue for us as we are no longer able to do a > clean-shutdown of the pipelines right now - only cancelling them hard is > possible. > > On Tue, Aug 13, 2019 at 2:46 PM Kaymak, Tobias <[email protected]> > wrote: > >> I just rolled out the upgraded and working 1.8.0/2.14.0 combination to >> production and noticed that when I try to cleanly shutdown a pipeline via >> the stop button in the web-interface of Flink 1.8.0 I get exactly the same >> error: >> >> java.lang.NoSuchMethodError: >> org.rocksdb.ColumnFamilyHandle.getDescriptor()Lorg/rocksdb/ColumnFamilyDescriptor; >> at >> org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.addColumnFamilyOptionsToCloseLater(RocksDBOperationUtils.java:160) >> at >> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.dispose(RocksDBKeyedStateBackend.java:331) >> at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator.dispose(AbstractStreamOperator.java:362) >> at >> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.dispose(DoFnOperator.java:470) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.tryDisposeAllOperators(StreamTask.java:454) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:337) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) >> at java.lang.Thread.run(Thread.java:748) >> >> >> The pipeline then restores from the last snapshot and continues to run, >> it does not shut-down as expected. >> >> Any idea why this could happen? >> >> On Mon, Aug 12, 2019 at 9:49 PM Kaymak, Tobias <[email protected]> >> wrote: >> >>> * each time :) >>> >>> On Mon, Aug 12, 2019 at 9:48 PM Kaymak, Tobias <[email protected]> >>> wrote: >>> >>>> I've checked multiple times now and it breaks as with the 1.8.1 image - >>>> I've completely rebuilt the Docker image and teared down the testing >>>> cluster. >>>> >>>> Best, >>>> Tobi >>>> >>>> On Mon, Aug 12, 2019 at 1:45 PM Maximilian Michels <[email protected]> >>>> wrote: >>>> >>>>> Hi Tobias! >>>>> >>>>> I've checked if there were any relevant changes to the RocksDB state >>>>> backend in 1.8.1, but I couldn't spot anything. Could it be that an old >>>>> version of RocksDB is still in the Flink cluster path? >>>>> >>>>> Cheers, >>>>> Max >>>>> >>>>> On 06.08.19 16:43, Kaymak, Tobias wrote: >>>>> > And of course the moment I click "send" I find that: 😂 >>>>> > >>>>> > If you use Scala 2.11 and dependency version 1.8.0 in your Beam >>>>> projects >>>>> > pom.xml it *does* work: >>>>> > >>>>> > <dependency> >>>>> > <groupId>org.apache.flink</groupId> >>>>> > <artifactId>flink-statebackend-rocksdb_2.11</artifactId> >>>>> > <version>1.8.0</version> >>>>> > </dependency> >>>>> > >>>>> > However, if you want to use 1.8.1 - it *does not*. >>>>> > >>>>> > I still found it confusing, as I am using the official Flink Docker >>>>> > images which are currently at version 1.8.1. It would have helped me >>>>> if >>>>> > Beam would bundle the statebackend dependency (as already mentioned >>>>> Beam >>>>> > allows the user to set a state backend via parameters of the >>>>> Flink Runner). >>>>> > >>>>> > On Tue, Aug 6, 2019 at 4:35 PM Kaymak, Tobias < >>>>> [email protected] >>>>> > <mailto:[email protected]>> wrote: >>>>> > >>>>> > Hello, >>>>> > >>>>> > Flink requires in version 1.8, that if one wants to use RocksDB >>>>> as a >>>>> > state backend, that dependency has to be added to the >>>>> pom.xml file. [0] >>>>> > >>>>> > My cluster stopped working with RocksDB so I did added this >>>>> > dependency to the pom.xml of my Beam project (I've tried 1.8.1 >>>>> and >>>>> > 1.8.0): >>>>> > >>>>> > <dependency> >>>>> > <groupId>org.apache.flink</groupId> >>>>> > >>>>> <artifactId>flink-statebackend-rocksdb_2.11</artifactId> >>>>> > <version>1.8.0</version> >>>>> > </dependency> >>>>> > >>>>> > I also tried to instead add >>>>> > the flink-statebackend-rocksdb_2.11-1.8.0.jar to the lib >>>>> directory >>>>> > of the Flink cluster instead (TaskManagers and JobManager) in all >>>>> > cases I get this error: >>>>> > >>>>> > 2019-08-06 14:14:15,670 ERROR >>>>> > org.apache.flink.streaming.runtime.tasks.StreamTask - >>>>> > Error during disposal of stream operator >>>>> > java.lang.NoSuchMethodError: >>>>> > >>>>> org.rocksdb.ColumnFamilyHandle.getDescriptor()Lorg/rocksdb/ColumnFamilyDescriptor; >>>>> > at >>>>> > >>>>> org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.addColumnFamilyOptionsToCloseLater(RocksDBOperationUtils.java:160) >>>>> > at >>>>> > >>>>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.dispose(RocksDBKeyedStateBackend.java:331) >>>>> > at >>>>> > >>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.dispose(AbstractStreamOperator.java:362) >>>>> > at >>>>> > >>>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.dispose(DoFnOperator.java:470) >>>>> > at >>>>> > >>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:477) >>>>> > at >>>>> > >>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:378) >>>>> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) >>>>> > at java.lang.Thread.run(Thread.java:748) >>>>> > >>>>> > This looks like a version mismatch to me, but I don't know how to >>>>> > solve it - could Beam maybe include the dependency for the >>>>> RocksDB >>>>> > backend for Flink 1.8 or higher, as it allows to set this value >>>>> via >>>>> > parameters for the Flink Runner? [1] >>>>> > >>>>> > >>>>> > [0] >>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/state/state_backends.html#setting-the-per-job-state-backend >>>>> > [1] >>>>> https://beam.apache.org/documentation/runners/flink/#pipeline-options-for-the-flink-runner >>>>> > >>>>> >>>>>
