Exceptions from collector.collect after cancelling job
When I cancel a job, I get many exceptions that look like this: java.lang.RuntimeException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:358) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) ... lots of Flink and user code (a flat map function) stack entries here at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecord(AbstractFetcher.java:225) at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.run(Kafka09Fetcher.java:239) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.RuntimeException: Buffer pool is destroyed. at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:81) at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:39) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329) at org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator.processElement(TimestampsAndPeriodicWatermarksOperator.java:66) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:373) ... 43 more Caused by: java.lang.IllegalStateException: Buffer pool is destroyed. at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:144) at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:93) at org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:86) at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:78) ... 48 more It looks like Flink is disabling the objects that the FlatMap collector relies on before disabling the operator itself. Is that expected/normal? Is there anything I should change in my FlatMap function or job code to account for it? -Shannon
Re: Error while adding data to RocksDB: No more bytes left
Hi Stephan! The failure appeared to occur every 10 minutes, which is also the interval for checkpointing. However, I agree with you that the stack trace appears to be independent. Could this perhaps be an issue with multithreading, where the checkpoint mechanism is somehow interfering with ongoing operation of the state backend? I've never seen this problem until now, so I am a little suspicious that it might be due to something in my code, but so far it's been difficult to figure out what that might be. I am using the default, SemiAsync snapshot mode. The classes of the data flow are a bit too large to put here in their entirety. We are using Scala case classes, Java classes generated by Avro, Tuples, Scala Option, java.util.UUID and Scala mutable.Map. The majority of these classes have been operational in other jobs before. I added a unit test for the class which contains a mutable.Map to see whether that was causing a problem. Does this look like a reasonable unit test to verify Flink serializability to you? it("roundtrip serializes in Flink") { val millis: Long = TimeUnit.DAYS.toMillis(2) val original: PreferredAirportDailySum = new PreferredAirportDailySum(millis) original.add("a", TimestampedAirportCount(4, 6)) original.add("b", TimestampedAirportCount(7, 8)) val deserialized: PreferredAirportDailySum = serializationRoundTrip(original, 100) deserialized.timestamp shouldBe millis deserialized.immutableItems("a") shouldBe TimestampedAirportCount(4, 6) deserialized.immutableItems("b") shouldBe TimestampedAirportCount(7, 8) } def serializationRoundTrip[T : ClassTag : TypeInformation](original: T, expectedMaxBytes: Int): T = { val typeInfo = implicitly[TypeInformation[T]] val serializer: TypeSerializer[T] = typeInfo.createSerializer(new ExecutionConfig) val out: ByteArrayOutputStream = new ByteArrayOutputStream(expectedMaxBytes) val outputView: DataOutputView = new DataOutputViewStreamWrapper(out) serializer.serialize(original, outputView) out.size() should be <= expectedMaxBytes val inputView: DataInputViewStreamWrapper = new DataInputViewStreamWrapper(new ByteArrayInputStream(out.toByteArray)) val deserialized: T = serializer.deserialize(inputView) deserialized } I tried running my job in a local one-slot cluster with RocksDB enabled but checkpointing to local filesystem. Similar errors occur, but are more sporadic. I have not yet been able to capture the error while debugging, but if I do I will provide additional information. I noticed that locally, execution only reaches DefaultClassResolver#readName(Input)->Class.forName() when a checkpoint completes. Also, the timing of checkpointing a bit odd: in the example below the checkpoint takes 200s to complete after being triggered even though RocksDB reports that it only took ~100ms. 2016-09-29 12:56:17,619 INFO CheckpointCoordinator - Triggering checkpoint 2 @ 1475171777619 2016-09-29 12:59:38,079 INFO RocksDBStateBackend - RocksDB (/var/folders/…./WindowOperator_38_0/dummy_state/730773a2-bb33-4021-aa9e-9b4e3cb172f3/db) backup (synchronous part) took 7 ms. 2016-09-29 12:59:38,214 INFO RocksDBStateBackend - RocksDB materialization from /var/folders/…/WindowOperator_38_0/dummy_state/730773a2-bb33-4021-aa9e-9b4e3cb172f3/local-chk-2 to file:/var/flinkstate/…/WindowOperator_38_0/dummy_state/chk-2 (asynchronous part) took 96 ms. 2016-09-29 12:59:38,333 INFO CheckpointCoordinator - Completed checkpoint 2 (in 200621 ms) Do you have any other advice? Exceptions from local execution: java.lang.RuntimeException: Error while adding data to RocksDB at org.apache.flink.contrib.streaming.state.RocksDBFoldingState.add(RocksDBFoldingState.java:125) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:382) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:176) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:66) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:266) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) at java.lang.Thread.run(Thread.java:745) Caused by: com.esotericsoftware.kryo.KryoException: Unable to find class: 'CLE at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138) at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115) at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116) at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) at
Re: Flink-HBase connectivity through Kerberos keytab | Simple get/put sample implementation
Hi! In Flink prior to 1.2, you can use Kerberos with HBase via Hadoop's mechanism: https://ci.apache.org/projects/flink/flink-docs- master/setup/config.html#kerberos In Flink 1.2-SNAPSHOT, keytabs are installed in the Java security context, from which HBase can probably pick it up. This is quite new (docs are not yet updated), I'll let others chime in to add some more details. @Max / @Vijay do you have any details? Stephan On Thu, Sep 29, 2016 at 7:47 PM, Stephan Ewenwrote: > Hi! > > In Flink prior to 1.2, you can use Kerberos with HBase via Hadoop's > mechanism: https://ci.apache.org/projects/flink/flink-docs- > master/setup/config.html#kerberos > > In Flink 1.2-SNAPSHOT, keytabs are installed in the Java security context, > from which HBase can probably pick it up. This is quite new, I'll let > others chime in to add some more details. > > @Max / @ > > On Thu, Sep 29, 2016 at 7:20 PM, Anchit Jatana < > development.anc...@gmail.com> wrote: > >> Hi Fabian, >> >> Right, I hope the committers take into account the kerberised access as >> well. Thanks for the update! >> >> Regards, >> Anchit >> >> On Thu, Sep 29, 2016 at 6:15 AM, Fabian Hueske wrote: >> >>> Hi Anchit, >>> >>> Flink does not yet have a streaming sink connector for HBase. Some >>> members of the community are working on this though [1]. >>> I think we resolved a similar issue for the Kafka connector recently >>> [2]. >>> Maybe the related commits contain some relevant code for your problem. >>> >>> Best, Fabian >>> >>> [1] https://github.com/apache/flink/pull/2332 >>> [2] https://issues.apache.org/jira/browse/FLINK-3239 >>> >>> 2016-09-29 9:12 GMT+02:00 Anchit Jatana : >>> Hi All, I'm trying to link my flink application with HBase for simple read/write operations. I need to implement Flink to HBase the connectivity through Kerberos using the keytab. Can somebody share(or link me to some resource) a sample code/implementation on how to achieve Flink to HBase connectivity through Kerberos using keytab for simple read/write (get/put) operation. Thank you! Regards, Anchit >>> >>> >> >
Re: Flink-HBase connectivity through Kerberos keytab | Simple get/put sample implementation
Hi! In Flink prior to 1.2, you can use Kerberos with HBase via Hadoop's mechanism: https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#kerberos In Flink 1.2-SNAPSHOT, keytabs are installed in the Java security context, from which HBase can probably pick it up. This is quite new, I'll let others chime in to add some more details. @Max / @ On Thu, Sep 29, 2016 at 7:20 PM, Anchit Jatanawrote: > Hi Fabian, > > Right, I hope the committers take into account the kerberised access as > well. Thanks for the update! > > Regards, > Anchit > > On Thu, Sep 29, 2016 at 6:15 AM, Fabian Hueske wrote: > >> Hi Anchit, >> >> Flink does not yet have a streaming sink connector for HBase. Some >> members of the community are working on this though [1]. >> I think we resolved a similar issue for the Kafka connector recently [2]. >> Maybe the related commits contain some relevant code for your problem. >> >> Best, Fabian >> >> [1] https://github.com/apache/flink/pull/2332 >> [2] https://issues.apache.org/jira/browse/FLINK-3239 >> >> 2016-09-29 9:12 GMT+02:00 Anchit Jatana : >> >>> Hi All, >>> >>> I'm trying to link my flink application with HBase for simple read/write >>> operations. I need to implement Flink to HBase the connectivity through >>> Kerberos using the keytab. >>> >>> Can somebody share(or link me to some resource) a sample >>> code/implementation on how to achieve Flink to HBase connectivity through >>> Kerberos using keytab for simple read/write (get/put) operation. >>> >>> Thank you! >>> >>> Regards, >>> Anchit >>> >> >> >
Re: How to interact with a running flink application?
Hi Ufuk, Thanks for your help, I'm working on using the suggested approach to address my use case. Regards, Anchit On Wed, Sep 28, 2016 at 12:48 AM, Ufuk Celebiwrote: > Hey Anchit, > > the usual recommendation for this is to use a CoMap/CoFlatMap > operator, where the second input are the lookup location changes. You > can then use this input to update the location. > > Search for CoMap/CoFlatMap here: > https://ci.apache.org/projects/flink/flink-docs-master/dev/datastream_api. > html#datastream-transformations > > Best, > > Ufuk > > On Wed, Sep 28, 2016 at 9:37 AM, Anchit Jatana > wrote: > > Hi All, > > > > Brief: I have a use case where I need to interact with a running flink > > application. > > > > Detail: > > > > My Flink application has a Kafka source, an operator processing on the > > content received from the Kafka stream(this operator is using a lookup > from > > an external source file to accomplish the processing of the Kafka > content). > > If the content of the file kept at the same source location changes, I > need > > to notify the operator to update its lookup content loaded in the memory > and > > continue its processing of Kafka content with the new loaded lookup > content > > without stopping the Flink application. > > > > Is there a way where I can "interact with the running Flink Application" > > through some event or something to notify the application to make some > > changes in its operation without stopping the application. > > > > Thank you! > > > > Regards, > > Anchit >
Re: AW: Problem with CEPPatternOperator when taskmanager is killed
Great, thanks! I gave you contributor permissions in JIRA. You can now also assign issues to yourself if you decide to continue to contribute. Best, Fabian 2016-09-29 16:48 GMT+02:00 jaxbihani: > Hi Fabian > > My JIRA user is: jaxbihani > I have created a pull request for the fix : > https://github.com/apache/flink/pull/2568 > > > > -- > View this message in context: http://apache-flink-user- > mailing-list-archive.2336050.n4.nabble.com/Problem-with- > CEPPatternOperator-when-taskmanager-is-killed-tp9024p9246.html > Sent from the Apache Flink User Mailing List archive. mailing list archive > at Nabble.com. >
Re: AW: Problem with CEPPatternOperator when taskmanager is killed
Hi Fabian My JIRA user is: jaxbihani I have created a pull request for the fix : https://github.com/apache/flink/pull/2568 -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Problem-with-CEPPatternOperator-when-taskmanager-is-killed-tp9024p9246.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Using Flink and Cassandra with Scala
Hi, Does the Cassandra sink support Scala and case classes? It looks like using Java is at the moment best practice. Cheers, Sanne
Re: Iterations vs. combo source/sink
Hi Ken, you can certainly have partitioned sources and sinks. You can control the parallelism by calling .setParallelism() method. If you need a partitioned sink, you can call .keyBy() to hash partition. I did not completely understand the requirements of your program. Can you maybe provide pseudo code for how the program should look like. Some general comments: - Flink's fault tolerance mechanism does not work with iterative data flows yet. This is work in progress see: FLINK-3257 [1] - Flink's fault tolerance mechanism does only work if you expose all! internal operator state. So you would need to put your Java DB in Flink state to have a recoverable job. - Is the DB essential in your application? Could you use Flink's key-partitioned state interface instead? That would help to make your job fault-tolerant. Best, Fabian [1] https://issues.apache.org/jira/browse/FLINK-3257 [2] https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/state.html#using-the-keyvalue-state-interface 2016-09-29 1:15 GMT+02:00 Ken Krugler: > Hi all, > > I’ve got a very specialized DB (runs in the JVM) that I need to use to > both keep track of state and generate new records to be processed by my > Flink streaming workflow. Some of the workflow results are updates to be > applied to the DB. > > And the DB needs to be partitioned. > > My initial approach is to wrap it in a regular operator, and have > subsequent streams be inputs for updating state. So now I’ve got an > IterativeDataStream, which should work. > > But I imagine I could also wrap this DB in a source and a sink, yes? > Though I’m not sure how I could partition it as a source, in that case. > > If it is feasible to have a partitioned source/sink, are there general > pros/cons to either approach? > > Thanks, > > — Ken > >
Re: Flink-HBase connectivity through Kerberos keytab | Simple get/put sample implementation
Hi Anchit, Flink does not yet have a streaming sink connector for HBase. Some members of the community are working on this though [1]. I think we resolved a similar issue for the Kafka connector recently [2]. Maybe the related commits contain some relevant code for your problem. Best, Fabian [1] https://github.com/apache/flink/pull/2332 [2] https://issues.apache.org/jira/browse/FLINK-3239 2016-09-29 9:12 GMT+02:00 Anchit Jatana: > Hi All, > > I'm trying to link my flink application with HBase for simple read/write > operations. I need to implement Flink to HBase the connectivity through > Kerberos using the keytab. > > Can somebody share(or link me to some resource) a sample > code/implementation on how to achieve Flink to HBase connectivity through > Kerberos using keytab for simple read/write (get/put) operation. > > Thank you! > > Regards, > Anchit >
Re: flink 1.1.2 RichFunction not working
Sorry for that inconvenience. You are right about mentioning that in the release notes (adding it even after the release). We'll take that as feedback for the next release. On Tue, Sep 27, 2016 at 9:42 PM, Chen Bekorwrote: > thanks. worth mentioning in the release notes of 1.1.2 that file source is > broken. we spent a substantial time on trying to figure out what's the > root cause. > > On Sep 27, 2016 9:40 PM, "Stephan Ewen" wrote: > >> Sorry for the inconvenience. This is a known issue and being fixed for >> Flink 1.1.3 - the problem is that the streaming File sources were reworked >> to continuously monitor the File System, but the watermarks are not handled >> correctly. >> >> https://issues.apache.org/jira/browse/FLINK-4329 >> >> So far, 2/3 parts of the fix are in, the last part is an open pull >> request. >> Once that is in, we can look into getting Flink 1.1.3 out. >> >> Best, >> Stephan >> >> >> >> On Tue, Sep 27, 2016 at 8:17 PM, sunny patel >> wrote: >> >>> Hi Chen, >>> >>> Please upload your Flink scala library dependencies. >>> >>> Regards >>> Sunny. >>> >>> On Tue, Sep 27, 2016 at 5:56 PM, Chen Bekor >>> wrote: >>> >>> > Hi, >>> > >>> > Since upgrading to flink 1.1.2 from 1.0.2 - I'm experiencing >>> regression in >>> > my code. In order to Isolate the issue I have written a small flink job >>> > that demonstrates that. >>> > >>> > The job does some time based window operations with an input csv file >>> (in >>> > the example below - count the number of events on sliding window of >>> 3600 >>> > seconds every 10 seconds) using event time instead of wall clock time, >>> and >>> > writes results down to an output csv file. >>> > >>> > When I upload the jar via flink admin UI (local cluster) and submit the >>> > job, it returns as finished after a couple of seconds but the >>> RichFunction >>> > is not applied (I do not see any log messages and the output csv is >>> empty) >>> > >>> > same results when running in local mode (through the IDE). >>> > >>> > Here is sample code (fully working example) to demonstrate what I'm >>> trying >>> > to achieve. Please note - it works on flink 1.0.2 but doesn't work on >>> flink >>> > 1.1.2. No error thrown. The output CSV file is simply empty. >>> > >>> > >>> > *Job class:* >>> > >>> > import com.firelayers.spike.functions.sink.FileSinkFunction >>> > import org.apache.flink.api.common.functions.RichMapFunction >>> > import org.apache.flink.api.common.typeinfo.TypeInformation >>> > import org.apache.flink.api.java.utils.ParameterTool >>> > import org.apache.flink.streaming.api.TimeCharacteristic >>> > import org.apache.flink.streaming.api.functions. >>> > AssignerWithPeriodicWatermarks >>> > import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment >>> > import org.apache.flink.streaming.api.scala.function.WindowFunction >>> > import org.apache.flink.streaming.api.watermark.Watermark >>> > import org.apache.flink.streaming.api.windowing.time.Time >>> > import org.apache.flink.streaming.api.windowing.windows.TimeWindow >>> > import org.apache.flink.util.Collector >>> > import org.slf4j.{Logger, LoggerFactory} >>> > import play.api.libs.json.Json >>> > >>> > import scala.language.postfixOps >>> > >>> > object JobTest { >>> > >>> > implicit val Logger: Logger = LoggerFactory.getLogger( >>> > JobTest.getClass) >>> > >>> > var parameterTool: ParameterTool = null >>> > var env: StreamExecutionEnvironment = null >>> > >>> > def main(args: Array[String]): Unit = { >>> > implicit val ev = TypeInformation.of(classOf[MyEvent]) >>> > parameterTool = ParameterTool.fromArgs(args) >>> > val input_path = parameterTool.getRequired("in.path") >>> > val out_path = parameterTool.getRequired("out.path") >>> > env = StreamExecutionEnvironment.getExecutionEnvironment >>> > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) >>> > val events_stream = env.readTextFile(s"file://$inp >>> ut_path").map(new >>> > ParseMyEventFunction).assignTimestampsAndWatermarks(new MyTimestamp) >>> > implicit val sti = TypeInformation.of(classOf[String]) >>> > events_stream.keyBy(_.guid) >>> > .timeWindow(Time.seconds(3600), Time.seconds(10)) >>> > .apply[String](new TFunction[MyEvent]).name("test") >>> > .addSink(new FileSinkFunction[String](out_path)) >>> > env.execute("test") >>> > } >>> > } >>> > >>> > class TFunction[T] extends WindowFunction[T, String, String, >>> TimeWindow] { >>> > val logger: Logger = LoggerFactory.getLogger(this.getClass) >>> > >>> > override def apply( >>> > cell: String, >>> > window: TimeWindow, >>> > events: Iterable[T], >>> > out: Collector[String]): Unit = { >>> > >>> > val events_as_seq =
Unsubscribe
Flink-HBase connectivity through Kerberos keytab | Simple get/put sample implementation
Hi All, I'm trying to link my flink application with HBase for simple read/write operations. I need to implement Flink to HBase the connectivity through Kerberos using the keytab. Can somebody share(or link me to some resource) a sample code/implementation on how to achieve Flink to HBase connectivity through Kerberos using keytab for simple read/write (get/put) operation. Thank you! Regards, Anchit