Flink HA on Kubernetes - RPC port
Hello, We are migrating our HA setup from ZK to K8S, and we have a question regarding the RPC port. Previously with ZK, the RPC connection config was the : high-availability.jobmanager.port We were expecting that the config will be the same with K8S HA, as the doc says : "The port (range) used by the Flink Master for its RPC connections in highly-available setups. In highly-available setups, this value is used instead of 'jobmanager.rpc.port" (Flink 1.14) Plus this is in #advanced-high-availability-options section But it seems to use the common "jobmanager.rpc.port" (as the high-availability.jobmanager.port is closed) Does anyone know about this ? Best Regards, Bastien
DataSet API, chaining database access
Hello, I am struggling with DataSet API, I need to chain 2 db access, so I can easily do a source1 -> map2 The fact is that map should and can not be used for dbaccess, when the request is taking too long, we have timeout in akka connection between TM & JM I know we can not chain sources : source1 -> source2 And doing just one source for 2 requests is very complicated, as we need to do multiple requests (not just 2) Doing 2 jobs with 2 .execute() is not possible too as I need those requests to be done in the same flink job (source1 -> execute -> source2 -> execute) Can someone help me ? *Regards,* ------ Bastien DINE Data Architect / Software Engineer / Sysadmin bastiendine.io
Re: New KafkaSource API : Change in default behavior regarding starting offset
Hello Martijn, Thanks for the link to the release note, especially : "When resuming from the savepoint, please use setStartingOffsets(OffsetsInitializer.committedOffsets()) in the new KafkaSourceBuilder to transfer the offsets to the new source." So earliest is the new default We use for sure .committedOffsets - we have it by default in our custom KafkaSource builder to be sure we do not read all the previous data (earliest) What bother me is just this change in starting offset default behavior from FlinkKafkaConsumer to KafkaSource (this can lead to mistake) In fact it happens that we drop some of our kafka source state to read again from kafka committed offset, but maybe nodoby does that ^^ Anyway thanks for the focus on the release note ! Best Regards, ------ Bastien DINE Data Architect / Software Engineer / Sysadmin bastiendine.io Le mer. 15 juin 2022 à 10:58, Martijn Visser a écrit : > Hi Bastien, > > When the FlinkKafkaConsumer was deprecated in 1.14.0, the release notes > included the instruction how to migrate from FlinkKafkaConsumer to > KafkaConsumer [1]. Looking at the Kafka documentation [2], there is a > section on how to upgrade to the latest connector version that I think is > outdated. I'm leaning towards copying the migration instructions to the > generic documentation. Do you think that would have sufficed? > > Best regards, > > Martijn > > [1] > https://nightlies.apache.org/flink/flink-docs-master/release-notes/flink-1.14/#deprecate-flinkkafkaconsumer > [2] > https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#upgrading-to-the-latest-connector-version > > Op wo 15 jun. 2022 om 09:22 schreef bastien dine : > >> Hello jing, >> >> This was the previous method in old Kafka consumer API, it has been >> removed in 1.15, so source code is not in master anymore, >> Yes I know for the new Offset initializer, committed offset + earliest as >> fallback can be used to have the same behavior as before >> I just wanted to know whether this is a changed behavior or I am missing >> something >> >> >> >> Bastien DINE >> Freelance >> Data Architect / Software Engineer / Sysadmin >> http://bastiendine.io >> >> >> >> Le mar. 14 juin 2022 à 23:08, Jing Ge a écrit : >> >>> Hi Bastien, >>> >>> Thanks for asking. I didn't find any call of setStartFromGroupOffsets() >>> within >>> Flink in the master branch. Could you please point out the code that >>> committed offset is used as default? >>> >>> W.r.t. the new KafkaSource, if OffsetsInitializer.committedOffsets() >>> is used, an exception will be thrown at runtime in case there is no >>> committed offset, which is useful if the user is intended to read from the >>> committed offset but something is wrong. It might feel weird if it is used >>> as default, because an exception will be thrown when users start new jobs >>> with default settings. >>> >>> Best regards, >>> Jing >>> >>> On Tue, Jun 14, 2022 at 4:15 PM bastien dine >>> wrote: >>> >>>> Hello everyone, >>>> >>>> Does someone know why the starting offset behaviour has changed in the >>>> new Kafka Source ? >>>> >>>> This is now from earliest (code in KafkaSourceBuilder), doc says : >>>> "If offsets initializer is not specified, OffsetsInitializer.earliest() >>>> will >>>> be used by default." from : >>>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#starting-offset >>>> >>>> Before in old FlinkKafkaConsumer it was from committed offset (i.e : >>>> setStartFromGroupOffsets() >>>> method) >>>> >>>> which match with this behaviour in new KafkaSource : : >>>> OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST >>>> >>>> This change can lead to big troubles if user pay no attention to this >>>> point when migrating from old KafkaConsumer to new KafkaSource, >>>> >>>> Regards, >>>> Bastien >>>> >>>> -- >>>> >>>> Bastien DINE >>>> Data Architect / Software Engineer / Sysadmin >>>> bastiendine.io >>>> >>>
Re: New KafkaSource API : Change in default behavior regarding starting offset
Hello jing, This was the previous method in old Kafka consumer API, it has been removed in 1.15, so source code is not in master anymore, Yes I know for the new Offset initializer, committed offset + earliest as fallback can be used to have the same behavior as before I just wanted to know whether this is a changed behavior or I am missing something Bastien DINE Freelance Data Architect / Software Engineer / Sysadmin http://bastiendine.io Le mar. 14 juin 2022 à 23:08, Jing Ge a écrit : > Hi Bastien, > > Thanks for asking. I didn't find any call of setStartFromGroupOffsets() within > Flink in the master branch. Could you please point out the code that > committed offset is used as default? > > W.r.t. the new KafkaSource, if OffsetsInitializer.committedOffsets() > is used, an exception will be thrown at runtime in case there is no > committed offset, which is useful if the user is intended to read from the > committed offset but something is wrong. It might feel weird if it is used > as default, because an exception will be thrown when users start new jobs > with default settings. > > Best regards, > Jing > > On Tue, Jun 14, 2022 at 4:15 PM bastien dine > wrote: > >> Hello everyone, >> >> Does someone know why the starting offset behaviour has changed in the >> new Kafka Source ? >> >> This is now from earliest (code in KafkaSourceBuilder), doc says : >> "If offsets initializer is not specified, OffsetsInitializer.earliest() will >> be used by default." from : >> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#starting-offset >> >> Before in old FlinkKafkaConsumer it was from committed offset (i.e : >> setStartFromGroupOffsets() >> method) >> >> which match with this behaviour in new KafkaSource : : >> OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST >> >> This change can lead to big troubles if user pay no attention to this >> point when migrating from old KafkaConsumer to new KafkaSource, >> >> Regards, >> Bastien >> >> -- >> >> Bastien DINE >> Data Architect / Software Engineer / Sysadmin >> bastiendine.io >> >
New KafkaSource API : Change in default behavior regarding starting offset
Hello everyone, Does someone know why the starting offset behaviour has changed in the new Kafka Source ? This is now from earliest (code in KafkaSourceBuilder), doc says : "If offsets initializer is not specified, OffsetsInitializer.earliest() will be used by default." from : https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#starting-offset Before in old FlinkKafkaConsumer it was from committed offset (i.e : setStartFromGroupOffsets() method) which match with this behaviour in new KafkaSource : : OffsetsInitializer. committedOffsets(OffsetResetStrategy.EARLIEST This change can lead to big troubles if user pay no attention to this point when migrating from old KafkaConsumer to new KafkaSource, Regards, Bastien ------ Bastien DINE Data Architect / Software Engineer / Sysadmin bastiendine.io
Re: question on dataSource.collect() on reading states from a savepoint file
I haven't used s3 with Flink, but according to this doc : https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/filesystems/s3/ You can setup pretty easily s3 and use it with s3://path/to/your/file with a write sink The page talk about DataStream but it should work with DataSet ( https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/dataset/overview/#data-sinks ) Maybe someone else will have more information about s3 dataset sink Regards, Le jeu. 10 févr. 2022 à 20:52, Antonio Si a écrit : > Thanks Bastien. Can you point to an example of using a sink as we are > planning to write to S3? > > Thanks again for your help. > > Antonio. > > On Thu, Feb 10, 2022 at 11:49 AM bastien dine > wrote: > >> Hello Antonio, >> >> .collect() method should be use with caution as it's collecting the >> DataSet (multiple partitions on multiple TM) into a List single list on JM >> (so in memory) >> Unless you have a lot of RAM, you can not use it this way and you >> probably should not >> I recommend you to use a sink to print it into a formatted file instead >> (like CSV one) or if it's too big, into something splittable >> >> Regards, >> Bastien >> >> -- >> >> Bastien DINE >> Data Architect / Software Engineer / Sysadmin >> bastiendine.io >> >> >> Le jeu. 10 févr. 2022 à 20:32, Antonio Si a >> écrit : >> >>> Hi, >>> >>> I am using the stateful processing api to read the states from a >>> savepoint file. >>> It works fine when the state size is small, but when the state size is >>> larger, around 11GB, I am getting an OOM. I think it happens when it is >>> doing a dataSource.collect() to obtain the states. The stackTrace is copied >>> at the end of the message. >>> >>> Any suggestions or hints would be very helpful. >>> >>> Thanks in advance. >>> >>> Antonio. >>> >>> java.lang.OutOfMemoryError: null >>> at >>> java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123) >>> ~[?:1.8.0_282] >>> at >>> java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117) >>> ~[?:1.8.0_282] >>> at >>> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) >>> ~[?:1.8.0_282] >>> at >>> java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153) >>> ~[?:1.8.0_282] >>> at >>> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877) >>> ~[?:1.8.0_282] >>> at >>> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786) >>> ~[?:1.8.0_282] >>> at >>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189) >>> ~[?:1.8.0_282] >>> at >>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) >>> ~[?:1.8.0_282] >>> at >>> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:624) >>> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >>> at >>> org.apache.flink.util.SerializedValue.(SerializedValue.java:62) >>> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >>> at >>> org.apache.flink.runtime.executiongraph.ExecutionGraph.serializeAccumulator(ExecutionGraph.java:806) >>> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >>> at >>> org.apache.flink.runtime.executiongraph.ExecutionGraph.lambda$getAccumulatorsSerialized$0(ExecutionGraph.java:795) >>> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >>> at >>> java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321) >>> ~[?:1.8.0_282] >>> at >>> java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169) >>> ~[?:1.8.0_282] >>> at >>> java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1699) >>> ~[?:1.8.0_282] >>> at >>> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) >>> ~[?:1.8.0_282] >>> at >>> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) >>> ~[?:1.8.0_282] >>> at >>> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) >>> ~[?:1.8.0_282] >>> at >>> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) >>> ~[?:1.8.0_282] >>>
Re: question on dataSource.collect() on reading states from a savepoint file
Hello Antonio, .collect() method should be use with caution as it's collecting the DataSet (multiple partitions on multiple TM) into a List single list on JM (so in memory) Unless you have a lot of RAM, you can not use it this way and you probably should not I recommend you to use a sink to print it into a formatted file instead (like CSV one) or if it's too big, into something splittable Regards, Bastien -- Bastien DINE Data Architect / Software Engineer / Sysadmin bastiendine.io Le jeu. 10 févr. 2022 à 20:32, Antonio Si a écrit : > Hi, > > I am using the stateful processing api to read the states from a savepoint > file. > It works fine when the state size is small, but when the state size is > larger, around 11GB, I am getting an OOM. I think it happens when it is > doing a dataSource.collect() to obtain the states. The stackTrace is copied > at the end of the message. > > Any suggestions or hints would be very helpful. > > Thanks in advance. > > Antonio. > > java.lang.OutOfMemoryError: null > at > java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123) > ~[?:1.8.0_282] > at > java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117) > ~[?:1.8.0_282] > at > java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) > ~[?:1.8.0_282] > at > java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153) > ~[?:1.8.0_282] > at > java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877) > ~[?:1.8.0_282] > at > java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786) > ~[?:1.8.0_282] > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189) > ~[?:1.8.0_282] > at > java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) > ~[?:1.8.0_282] > at > org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:624) > ~[flink-dist_2.11-1.12.2.jar:1.12.2] > at > org.apache.flink.util.SerializedValue.(SerializedValue.java:62) > ~[flink-dist_2.11-1.12.2.jar:1.12.2] > at > org.apache.flink.runtime.executiongraph.ExecutionGraph.serializeAccumulator(ExecutionGraph.java:806) > ~[flink-dist_2.11-1.12.2.jar:1.12.2] > at > org.apache.flink.runtime.executiongraph.ExecutionGraph.lambda$getAccumulatorsSerialized$0(ExecutionGraph.java:795) > ~[flink-dist_2.11-1.12.2.jar:1.12.2] > at > java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321) > ~[?:1.8.0_282] > at > java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169) > ~[?:1.8.0_282] > at > java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1699) > ~[?:1.8.0_282] > at > java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) > ~[?:1.8.0_282] > at > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) > ~[?:1.8.0_282] > at > java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) > ~[?:1.8.0_282] > at > java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > ~[?:1.8.0_282] > at > java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566) > ~[?:1.8.0_282] > at > org.apache.flink.runtime.executiongraph.ExecutionGraph.getAccumulatorsSerialized(ExecutionGraph.java:792) > ~[flink-dist_2.11-1.12.2.jar:1.12.2] > at > org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph.createFrom(ArchivedExecutionGraph.java:325) > ~[flink-dist_2.11-1.12.2.jar:1.12.2] > at > org.apache.flink.runtime.scheduler.SchedulerBase.requestJob(SchedulerBase.java:810) > ~[flink-dist_2.11-1.12.2.jar:1.12.2] > at > org.apache.flink.runtime.jobmaster.JobMaster.jobStatusChanged(JobMaster.java:1085) > ~[flink-dist_2.11-1.12.2.jar:1.12.2] > at > org.apache.flink.runtime.jobmaster.JobMaster.access$2200(JobMaster.java:131) > ~[flink-dist_2.11-1.12.2.jar:1.12.2] > at > org.apache.flink.runtime.jobmaster.JobMaster$JobManagerJobStatusListener.lambda$jobStatusChanges$0(JobMaster.java:1356) > ~[flink-dist_2.11-1.12.2.jar:1.12.2] > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440) > ~[flink-dist_2.11-1.12.2.jar:1.12.2] > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208) > ~[flink-dist_2.11-1.12.2.jar:1.12.2] > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) > ~[flink-dist_2.11-1.12.2.jar:1.12.2] > at > org.apache.flink.runtime.rpc.akka.
Re: Flink High-Availability and Job-Manager recovery
Hello, On k8s the current recommendation is to set up 1 job manager with H-A enabled, so that cluster do not lost state upon crash 1. The storage dir can for sure be on kube PV, the directory should be shared within all JM, you will need to map the volume to the same local directory (e.g /data) so that the configuration amongst JM is the same 2. You can have only 1 JM, but you still need to enabled HA, since HA will write the cluster state into ZK & storage dir 3. I don't know anything about beam, so I can not help you with that, But per-job mode will not be available on k8s (neither native nor standalone kube) https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/resource-providers/standalone/kubernetes/#per-job-mode & https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/#per-job-mode, you will need YARN to do so (i think MESOS is deprecated) Application mode can be a bit tricky to understand, it will "move" the submit of the job inside the JM The chosen solution will depends on your deployment needs, I can't tell you without knowing more, But going into session mode + streaming job deployment is pretty standard and you can easily emulate "one cluster per job" with it (better for ops & tuning matters than a cluster with multiple jobs) Hope this can help, Regards, Bastien Le jeu. 3 févr. 2022 à 15:12, Koffman, Noa (Nokia - IL/Kfar Sava) < noa.koff...@nokia.com> a écrit : > Hi all, > > We are currently deploying flink on k8s 3 nodes cluster - with 1 > job-manager and 3 task managers > > We are trying to understand the recommendation for deployment, more > specifically for recovery from job-manager failure, and have some questions > about that: > > > >1. If we use flink HA solution (either Kubernetes-HA or zookeeper), >the documentation states we should define the ‘high-availability.storageDir > > In the examples we found, there is mostly hdfs or s3 storage. > > We were wondering if we could use Kubernetes PersistentVolumes and > PersistentVolumeClaims, if we do use that, can each job-manager have its > own volume? Or it must be shared? > >1. Is there a solution for jobmanager recovery without HA? With the >way our flink is currenly configured, killing the job-manager pod, all the >jobs are lost. > > Is there a way to configure the job-manager so that if it goes down and > k8s restarts it, it will continue from the same state (restart all the > tasks, etc…)? > > For this, can a Persistent Volume be used, without HDFS or external > solutions? > >1. Regarding the deployment mode: we are working with beam + flink, >and flink is running in session mode, we have a few long running streaming >pipelines deployed (less then 10). > > Is ‘session’ mode the right deployment mode for our type of deployment? Or > should we consider switching to something different? (Per-job/application) > > > > Thanks > > > > > > > > >
Re: Pojo State Migration - NPE with field deletion
Thanks for the JIRA ticket, This is for sure pretty critical. The "workaround" is to not remove the field but I am not sure if this is acceptable :) I could work on that, but someone need to point out to me where to start, Do I work on the PojoSerializer, to make this case not throwing an exception ? Or do I try to find the root cause, namely why the field serializer of the deleted field is still present ? Regards, ------ Bastien DINE Data Architect / Software Engineer / Sysadmin bastiendine.io Le mer. 2 févr. 2022 à 16:37, Alexis Sarda-Espinosa < alexis.sarda-espin...@microfocus.com> a écrit : > Hello, > > > > Happened to me too, here’s the JIRA ticket: > https://issues.apache.org/jira/browse/FLINK-21752 > > > > Regards, > > Alexis. > > > > *From:* bastien dine > *Sent:* Mittwoch, 2. Februar 2022 16:01 > *To:* user > *Subject:* Pojo State Migration - NPE with field deletion > > > > Hello, > > > > I have some trouble restoring a state (pojo) after deleting a field > > According to documentation, it should not be a problem with POJO : > > *"**Fields can be removed. Once removed, the previous value for the > removed field will be dropped in future checkpoints and savepoints."* > > > > Here is a short stack trace (full trace is below) : > > > > Caused by: java.lang.NullPointerException > > at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.( > PojoSerializer.java:119) > > at org.apache.flink.api.java.typeutils.runtime.PojoSerializer > .duplicate(PojoSerializer.java:184) > > at org.apache.flink.api.java.typeutils.runtime.PojoSerializer > .duplicate(PojoSerializer.java:56) > > at org.apache.flink.streaming.runtime.streamrecord. > StreamElementSerializer.duplicate(StreamElementSerializer.java:83) > > at org.apache.flink.streaming.runtime.streamrecord. > StreamElementSerializer.duplicate(StreamElementSerializer.java:46) > > > > > > After some debug, it seems that the deleted POJO field still has a field > serializer in the corresponding object PojoSerializer "fieldSerializers" > array > > But it is not present in the "fields", where we have a gap of 1 index (for > example 0-1-3-4) > > So when serializer reach index 2 we got this NPE, > > > > Why is the deleted field serializer still present ? this should have been > dropped when resolving schema compatibility right ? > > I can not find anything on that matter, could someone help me with it ? > > Reproduced in flink 1.13 & 1.14, can not find any related JIRA too > > > > Best Regards, > > Bastien > > > Full stack trace : > > 2022-02-02 15:44:20 > > java.io.IOException: Could not perform checkpoint 2737490 for operator > OperatorXXX > > at org.apache.flink.streaming.runtime.tasks.StreamTask > .triggerCheckpointOnBarrier(StreamTask.java:1274) > > at org.apache.flink.streaming.runtime.io.checkpointing. > CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java: > 147) > > at org.apache.flink.streaming.runtime.io.checkpointing. > SingleCheckpointBarrierHandler.triggerCheckpoint( > SingleCheckpointBarrierHandler.java:287) > > at org.apache.flink.streaming.runtime.io.checkpointing. > SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler > .java:64) > > at org.apache.flink.streaming.runtime.io.checkpointing. > SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint( > SingleCheckpointBarrierHandler.java:493) > > at org.apache.flink.streaming.runtime.io.checkpointing. > AbstractAlignedBarrierHandlerState.triggerGlobalCheckpoint( > AbstractAlignedBarrierHandlerState.java:74) > > at org.apache.flink.streaming.runtime.io.checkpointing. > AbstractAlignedBarrierHandlerState.barrierReceived( > AbstractAlignedBarrierHandlerState.java:66) > > at org.apache.flink.streaming.runtime.io.checkpointing. > SingleCheckpointBarrierHandler.lambda$processBarrier$2( > SingleCheckpointBarrierHandler.java:234) > > at org.apache.flink.streaming.runtime.io.checkpointing. > SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState( > SingleCheckpointBarrierHandler.java:262) > > at org.apache.flink.streaming.runtime.io.checkpointing. > SingleCheckpointBarrierHandler.processBarrier( > SingleCheckpointBarrierHandler.java:231) > > at org.apache.flink.streaming.runtime.io.checkpointing. > CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181) > > at org.apache.flink.streaming.runtime.io.checkpointing. > CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159) > > at
Pojo State Migration - NPE with field deletion
Hello, I have some trouble restoring a state (pojo) after deleting a field According to documentation, it should not be a problem with POJO : *"Fields can be removed. Once removed, the previous value for the removed field will be dropped in future checkpoints and savepoints."* Here is a short stack trace (full trace is below) : Caused by: java.lang.NullPointerException at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.( PojoSerializer.java:119) at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.duplicate( PojoSerializer.java:184) at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.duplicate( PojoSerializer.java:56) at org.apache.flink.streaming.runtime.streamrecord. StreamElementSerializer.duplicate(StreamElementSerializer.java:83) at org.apache.flink.streaming.runtime.streamrecord. StreamElementSerializer.duplicate(StreamElementSerializer.java:46) After some debug, it seems that the deleted POJO field still has a field serializer in the corresponding object PojoSerializer "fieldSerializers" array But it is not present in the "fields", where we have a gap of 1 index (for example 0-1-3-4) So when serializer reach index 2 we got this NPE, Why is the deleted field serializer still present ? this should have been dropped when resolving schema compatibility right ? I can not find anything on that matter, could someone help me with it ? Reproduced in flink 1.13 & 1.14, can not find any related JIRA too Best Regards, Bastien Full stack trace : 2022-02-02 15:44:20 java.io.IOException: Could not perform checkpoint 2737490 for operator OperatorXXX at org.apache.flink.streaming.runtime.tasks.StreamTask .triggerCheckpointOnBarrier(StreamTask.java:1274) at org.apache.flink.streaming.runtime.io.checkpointing. CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147) at org.apache.flink.streaming.runtime.io.checkpointing. SingleCheckpointBarrierHandler.triggerCheckpoint( SingleCheckpointBarrierHandler.java:287) at org.apache.flink.streaming.runtime.io.checkpointing. SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler .java:64) at org.apache.flink.streaming.runtime.io.checkpointing. SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint( SingleCheckpointBarrierHandler.java:493) at org.apache.flink.streaming.runtime.io.checkpointing. AbstractAlignedBarrierHandlerState.triggerGlobalCheckpoint( AbstractAlignedBarrierHandlerState.java:74) at org.apache.flink.streaming.runtime.io.checkpointing. AbstractAlignedBarrierHandlerState.barrierReceived( AbstractAlignedBarrierHandlerState.java:66) at org.apache.flink.streaming.runtime.io.checkpointing. SingleCheckpointBarrierHandler.lambda$processBarrier$2( SingleCheckpointBarrierHandler.java:234) at org.apache.flink.streaming.runtime.io.checkpointing. SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState( SingleCheckpointBarrierHandler.java:262) at org.apache.flink.streaming.runtime.io.checkpointing. SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler .java:231) at org.apache.flink.streaming.runtime.io.checkpointing. CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181) at org.apache.flink.streaming.runtime.io.checkpointing. CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput .emitNext(AbstractStreamTaskNetworkInput.java:110) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor .processInput(StreamOneInputProcessor.java:65) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput( StreamTask.java:496) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor .runMailboxLoop(MailboxProcessor.java:203) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop( StreamTask.java:809) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask .java:761) at org.apache.flink.runtime.taskmanager.Task .runWithSystemExitMonitoring(Task.java:958) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java: 937) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) at java.base/java.lang.Thread.run(Unknown Source) Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 2737490 for operator OperatorXXX at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler .snapshotState(StreamOperatorStateHandler.java:265) at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler .snapshotState(StreamOperatorStateHandler.java:170) at org.apache.flink.streaming.api.operators.AbstractStreamOperator .snapshotState(AbstractStreamOperator.java:348) at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain
Re: Problem when restoring from savepoint with missing state & POJO modification
Hello Yun, Thank you very much for your response, that's what I thought, However, it does not seem possible to remove only one state using the state processor API, We use it a lot, and we can only remove all of the operator states, not one specifically, Am I missing something? Best Regards, Bastien -- Bastien DINE Data Architect / Software Engineer / Sysadmin bastiendine.io Le mar. 8 déc. 2020 à 08:54, Yun Tang a écrit : > Hi Bastien, > > Flink supports to register state via state descriptor when > calling runtimeContext.getState(). However, once the state is registered, > it cannot be removed anymore. And when you restore from savepoint, the > previous state is registered again [1]. Flink does not to drop state > directly and you could use state processor API [2] to remove related state. > > > [1] > https://github.com/apache/flink/blob/d94c7a451d22f861bd3f79435f777b427020eba0/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/AbstractRocksDBRestoreOperation.java#L171 > [2] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html > > ------ > *From:* bastien dine > *Sent:* Tuesday, December 8, 2020 0:28 > *To:* user > *Subject:* Problem when restoring from savepoint with missing state & > POJO modification > > Hello, > We have experienced some weird issues with POJO mapState in a streaming > job upon checkpointing when removing state, then modifying the state POJO > and restoring job > > Caused by: java.lang.NullPointerException > at > org.apache.flink.api.java.typeutils.runtime.PojoSerializer.(PojoSerializer.java:123) > at > org.apache.flink.api.java.typeutils.runtime.PojoSerializer.duplicate(PojoSerializer.java:186) > > Reproduced in Flink 1.10 & 1.11 > (full stack below) > > > *Context : *We have a streaming job with a state name "buffer" and POJO > Buffer inside a CoFlatMap function > > MyCoFlat: > *public class MyCoFlat extends RichCoFlatMapFunction {* > *transient MapState buffer;* > *@Override* > *public void open(Configuration parameters) {* > *buffer = getRuntimeContext().getMapState(new > MapStateDescriptor<>("buffer", String.class, Buffer.class));* > > *} * > > Buffer : > > > > > *public class Buffer { private String field1; private String field2; > private String field3; ... + empty constructor + getter / setter for POJO > consideration* > > We had some troubles with our job, so we rework 2 things : > - we removed field2 in Buffer class, > - we stopped using "buffer" state anymore > > When restoring with savepoint (--allowNonRestoredState) we have the > exception below > The job is submitted to the cluster but fails on checkpointing, job is > totally stuck. > > > *Debug: *Debugging showed us some stuff, the exception is raised here (as > expected): > > > > > > > > > > > > > > *public PojoSerializer( Class clazz, TypeSerializer[] > fieldSerializers, Field[] fields, ExecutionConfig executionConfig) { > this.clazz = checkNotNull(clazz); this.fieldSerializers = > (TypeSerializer[]) checkNotNull(fieldSerializers); this.fields = > checkNotNull(fields); this.numFields = fieldSerializers.length; > this.executionConfig = checkNotNull(executionConfig); for (int i = 0; i < > numFields; i++) { this.fields[i].setAccessible(true); < HERE }* > > In our fields, we have field[0] & field[2] but field[1] is totally missing > from the array, that's why we have the NPE over here, when i=1 > > So what we have done is to put this state back in our streaming job (with > the missing field and POJO), redeploy with old savepoint and this went > totally fine > Then we have redeploy a job without this state > This has been a 2 times deployment for our job (1 -> modify the POJO, 2 -> > remove the state using this POJO) > But the non-used-anymore state is still (at least the serializer) in the > savepoints, we could be facing this problem again when we will > modify Buffer POJO later. > Finally we just modify a savepoint with API and remove this state once for > all, and restart from it. > > I have a couple of questions here: > Why does flink keep a non-used state in a savepoint (even if it can not > map it into a new topology and allowNonRestoredState is checked ?) > Why does flink not handle this case ? Behaviour seems to be different > between an existing POJO state and this non used POJO state > How can I clean my savepoint ? I don't want them to contain non-used state > > If anybody has experienced an issue like that before or knows how to > handle this, I would be glad to discuss ! > Best regards, > > -- > > Bastien DINE > Data Architect / Software Engineer / Sysadmin >
Problem when restoring from savepoint with missing state & POJO modification
Hello, We have experienced some weird issues with POJO mapState in a streaming job upon checkpointing when removing state, then modifying the state POJO and restoring job Caused by: java.lang.NullPointerException at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.(PojoSerializer.java:123) at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.duplicate(PojoSerializer.java:186) Reproduced in Flink 1.10 & 1.11 (full stack below) *Context :* We have a streaming job with a state name "buffer" and POJO Buffer inside a CoFlatMap function MyCoFlat: * public class MyCoFlat extends RichCoFlatMapFunction {* * transient MapState buffer;* * @Override* * public void open(Configuration parameters) {* * buffer = getRuntimeContext().getMapState(new MapStateDescriptor<>("buffer", String.class, Buffer.class));* * } * Buffer : * public class Buffer { private String field1; private String field2; private String field3; ... + empty constructor + getter / setter for POJO consideration* We had some troubles with our job, so we rework 2 things : - we removed field2 in Buffer class, - we stopped using "buffer" state anymore When restoring with savepoint (--allowNonRestoredState) we have the exception below The job is submitted to the cluster but fails on checkpointing, job is totally stuck. *Debug:*Debugging showed us some stuff, the exception is raised here (as expected): * public PojoSerializer( Class clazz, TypeSerializer[] fieldSerializers, Field[] fields, ExecutionConfig executionConfig) { this.clazz = checkNotNull(clazz); this.fieldSerializers = (TypeSerializer[]) checkNotNull(fieldSerializers); this.fields = checkNotNull(fields); this.numFields = fieldSerializers.length; this.executionConfig = checkNotNull(executionConfig); for (int i = 0; i < numFields; i++) { this.fields[i].setAccessible(true); < HERE }* In our fields, we have field[0] & field[2] but field[1] is totally missing from the array, that's why we have the NPE over here, when i=1 So what we have done is to put this state back in our streaming job (with the missing field and POJO), redeploy with old savepoint and this went totally fine Then we have redeploy a job without this state This has been a 2 times deployment for our job (1 -> modify the POJO, 2 -> remove the state using this POJO) But the non-used-anymore state is still (at least the serializer) in the savepoints, we could be facing this problem again when we will modify Buffer POJO later. Finally we just modify a savepoint with API and remove this state once for all, and restart from it. I have a couple of questions here: Why does flink keep a non-used state in a savepoint (even if it can not map it into a new topology and allowNonRestoredState is checked ?) Why does flink not handle this case ? Behaviour seems to be different between an existing POJO state and this non used POJO state How can I clean my savepoint ? I don't want them to contain non-used state If anybody has experienced an issue like that before or knows how to handle this, I would be glad to discuss ! Best regards, -- Bastien DINE Data Architect / Software Engineer / Sysadmin
Re: Keyed raw state - example
Hello Congxian, Thanks for your response, Don't you have an example with an Operator extending the AbstractUdfStreamOperator? Using the context.getRawKeyedStateInputs() (& output to snapshots) TimeService is reimplementing the whole stuff :/ -- Bastien DINE Data Architect / Software Engineer / Sysadmin bastiendine.io Le lun. 18 nov. 2019 à 03:19, Congxian Qiu a écrit : > Hi >Currently, I think you can ref the implementation of timerservice[1] > which used the raw keyed state, the snapshot happens in > AbstractStreamOperator#snapshotState(), for using Raw State you need to > implement a new operator[2]. There is an issue wants to give some example > for raw state[2] > > [1] > https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java > [2] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#raw-and-managed-state > [3] https://issues.apache.org/jira/browse/FLINK-14379 > > Best, > Congxian > > > bastien dine 于2019年11月16日周六 上午5:57写道: > >> Hello everyone, >> >> I would like to know if anybody has a working example on how to declare a >> keyed raw state ( in my case a keyedprocessoperator) and how to use it in >> my UDF (keyedprocessfunction)? >> >> Basicaly we have a huge problem with a ValueState w Rocksdb, getting >> serialized for every element ( need to access it and update) so it's taking >> a crazy amount of time and we would like to have it serialized only on >> snapshot, so using Raw state is a possible good solution, >> But i cannot find anyexample of it :/ >> >> Thanks and best regards, >> >> Bastien DINE >> Freelance >> Data Architect / Software Engineer / Sysadmin >> http://bastiendine.io >> >> >> >
Keyed raw state - example
Hello everyone, I would like to know if anybody has a working example on how to declare a keyed raw state ( in my case a keyedprocessoperator) and how to use it in my UDF (keyedprocessfunction)? Basicaly we have a huge problem with a ValueState w Rocksdb, getting serialized for every element ( need to access it and update) so it's taking a crazy amount of time and we would like to have it serialized only on snapshot, so using Raw state is a possible good solution, But i cannot find anyexample of it :/ Thanks and best regards, Bastien DINE Freelance Data Architect / Software Engineer / Sysadmin http://bastiendine.io
Async operator with a KeyedStream
Hello, I would like to know if you can use a KeyedStream with the Async operator : I want to use the async operator to insert some stuff in my database but I want to limit 1 request per element (with key=id) at a time With a regular keyBy / map, it's working, but it's too slow (i don't have enough ressources to increase my parallelism), As far as I have seen, this is not possible When I write something like Async.orderedWait(myStream.keyBy(myKeyselector)), the keyBy is totally ignored Have you a solution for this? Best Regards, Bastien -- Bastien DINE Data Architect / Software Engineer / Sysadmin bastiendine.io
NullPointerException on StreamTask
Hello eveyrone, I am experiencing random NullPointerException since I upgrade 1.8 Cannot find anything related on the JIRA, have someone an explanation ? or maybe can point me in the right direction to check some stuff ? My source is a KafkaProducer Stacktrace : java.lang.NullPointerException at org.apache.flink.streaming.runtime.tasks.StreamTask.createRecordWriters(StreamTask.java:1175) at org.apache.flink.streaming.runtime.tasks.StreamTask.(StreamTask.java:212) at org.apache.flink.streaming.runtime.tasks.StreamTask.(StreamTask.java:190) at org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.(TwoInputStreamTask.java:55) at sun.reflect.GeneratedConstructorAccessor29.newInstance(Unknown Source) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.flink.runtime.taskmanager.Task.loadAndInstantiateInvokable(Task.java:1405) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:689) at java.lang.Thread.run(Thread.java:748) -- Bastien DINE Data Architect / Software Engineer / Sysadmin bastiendine.io
Re: Job cluster and HA
Hello Boris, I think you are confused by the name of the shell script "standalone-job.sh" Which basically means that we start a "standalone job manager" as stated in the first comment of https://github.com/apache/flink/blob/release-1.8/flink-dist/src/main/flink-bin/bin/standalone-job.sh This is another version of : flink-dist/src/main/flink-bin/bin/jobmanager.sh It's not related to a job When you configure H-A on a flink cluster, and you submit a job, Flink (i.e the jobmanager) store the state of the job in Zookeeper / HDFS So when it crashes and comes back (with this entrypoint) it will read in ZK / HDFS and restore previous execution Regards, Bastien ------ Bastien DINE Data Architect / Software Engineer / Sysadmin bastiendine.io Le ven. 24 mai 2019 à 23:22, Boris Lublinsky a écrit : > Hi, > I was experimenting with HA lately and see that it recovers successfully > job, in the case of jobmanager restarts. > Now my question is whether it will work for the job cluster. > Based on the instructions > https://github.com/apache/flink/blob/release-1.8/flink-container/docker/README.md > I can see > https://github.com/apache/flink/blob/release-1.8/flink-container/docker/docker-entrypoint.sh > that > In this case the following command is invoked: > exec $FLINK_HOME/bin/standalone-job.sh start-foreground "$@“ > > Which means that if a jobManager restarts, the following is going to > happen: > > 1. It will use HA to restore job that was running > 2. A new job will be submitted, overwriting restored job and bypassing > checkpoint restore. > > Am I missing something here? > > > Boris Lublinsky > FDP Architect > boris.lublin...@lightbend.com > https://www.lightbend.com/ > >
State migration into multiple operators
Hello, I would like to have some advices about splitting an operator with a state into multiple operators. The new operators would have state containing pieces of information of the initial state We will "split" the state For exemple, I have operator (process) with uid A, with a state containing field1, field2 I would like to split it into two operator B & C with, respectively, state field1 and state field2 How can I split my state upon multiple operators ? Best Regards, Bastien ------ Bastien DINE Data Architect / Software Engineer / Sysadmin bastiendine.io
H-A Deployment : Job / task manager configuration
Hello everyone, I would like to know what exactly I need to configure on my job / task managers for an H-A deployment The document ( https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/jobmanager_high_availability.html) is not really fluent about this.. The conf/masters need to be on job / tasks ? or only on taskmaangers to find the job manager(s) If so, the conf/flink-conf.yaml of task manager need to be set to ha zookeeper only on job manager ? Or on taskmanager too ? Just to know exactly where we need to configure things will help to know a little more about interaction between job manager / task manager / zk Best Regards, Bastien -- Bastien DINE Data Architect / Software Engineer / Sysadmin bastiendine.io
Submit a job to a remote cluster : RemoteEnvironnement or ClientLevel ?
Hello all, I would like to know what is the best way to sumit a job to a remote cluster (from a java app) ? Between : - Using the remoteEnvironnement & calling env.execute() ( https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/cluster_execution.html#remote-environment ) - Using the clientLevel to submit a program ? (with a regular execution environment ) https://ci.apache.org/projects/flink/flink-docs-stable/dev/parallel.html#client-level What are the caveats of each methods ? Best Regards, Bastien -- Bastien DINE Data Architect / Software Engineer / Sysadmin bastiendine.io
Re: One TaskManager per node or multiple TaskManager per node
Hello Jamie, Does #1 apply to batch jobs too ? Regards, -- Bastien DINE Data Architect / Software Engineer / Sysadmin bastiendine.io Le lun. 14 janv. 2019 à 20:39, Jamie Grier a écrit : > There are a lot of different ways to deploy Flink. It would be easier to > answer your question with a little more context about your use case but in > general I would advocate the following: > > 1) Don't run a "permanent" Flink cluster and then submit jobs to it. > Instead what you should do is run an "ephemeral" cluster per job if > possible. This keeps jobs completely isolated from each other which helps > a lot with understanding performance, debugging, looking at logs, etc. > 2) Given that you can do #1 and you are running on bare metal (as opposed > to in containers) then run one TM per physical machine. > > There are many ways to accomplish the above depending on your deployment > infrastructure (YARN, K8S, bare metal, VMs, etc) so it's hard to give > detailed input but general you'll have the best luck if you don't run > multiple jobs in the same TM/JVM. > > In terms of the TM memory usage you can set that up by configuring it in > the flink-conf.yaml file. The config key you are looking or is > taskmanager.heap.size: > https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/config.html#taskmanager-heap-size > > > On Mon, Jan 14, 2019 at 8:05 AM Ethan Li > wrote: > >> Hello, >> >> I am setting up a standalone flink cluster and I am wondering what’s the >> best way to distribute TaskManagers. Do we usually launch one TaskManager >> (with many slots) per node or multiple TaskManagers per node (with smaller >> number of slots per tm) ? Also with one TaskManager per node, I am seeing >> that TM launches with only 30GB JVM heap by default while the node has 180 >> GB. Why is it not launching with more memory since there is a lot >> available? >> >> Thank you very much! >> >> - Ethan > >
Re: Flink on Kubernetes - Hostname resolution between job/tasks-managers
Nevermind.. Problem already discussed in thread : Flink 1.7 jobmanager tries to lookup taskmanager by its hostname in k8s environment" ------ Bastien DINE Data Architect / Software Engineer / Sysadmin bastiendine.io Le mar. 15 janv. 2019 à 15:16, bastien dine a écrit : > Hello, > I am trying to install Flink on Kube, it's almost working.. > I am using the kube files on flink 1.7.1 doc > > My cluster is starting well, my 2 tasksmanagers are registering > successfully to job manager > On webUI, i see them : > akka.tcp://flink@dev-flink-taskmanager-3717639837-gvwh4 > :37057/user/taskmanager_0 > > I can submit a job too.. > But when I am going in job detail, or try to load the logs.. I have > nothing.. and log on jobmanager give me plenty of error like : > > 2019-01-15 14:12:40.111 [flink-metrics-96] WARN > akka.remote.ReliableDeliverySupervisor > flink-metrics-akka.remote.default-remote-dispatcher-113 - Association with > remote system > [akka.tcp://flink-metrics@dev-flink-taskmanager-3717639837-gvwh4:40508] > has failed, address is now gated for [50] ms. Reason: [Association failed > with [akka.tcp://flink-metrics@dev-flink-taskmanager-3717639837-gvwh4:40508]] > Caused by: [dev-flink-taskmanager-3717639837-gvwh4: Name does not resolve] > > -> Name does not resolve.. > So trying to ping on the pod hostname and it's not working > Thus, ping on the pod's IP is working > > So, my question is : > - Can we force usage of IPv4 over hostname resolution ? (will be better > for perf also) > - If no, do I need to had a service or something to make it work ? > > Best Regards, > Bastien > > -- > > Bastien DINE > Data Architect / Software Engineer / Sysadmin > bastiendine.io >
Flink on Kubernetes - Hostname resolution between job/tasks-managers
Hello, I am trying to install Flink on Kube, it's almost working.. I am using the kube files on flink 1.7.1 doc My cluster is starting well, my 2 tasksmanagers are registering successfully to job manager On webUI, i see them : akka.tcp://flink@dev-flink-taskmanager-3717639837-gvwh4 :37057/user/taskmanager_0 I can submit a job too.. But when I am going in job detail, or try to load the logs.. I have nothing.. and log on jobmanager give me plenty of error like : 2019-01-15 14:12:40.111 [flink-metrics-96] WARN akka.remote.ReliableDeliverySupervisor flink-metrics-akka.remote.default-remote-dispatcher-113 - Association with remote system [akka.tcp://flink-metrics@dev-flink-taskmanager-3717639837-gvwh4:40508] has failed, address is now gated for [50] ms. Reason: [Association failed with [akka.tcp://flink-metrics@dev-flink-taskmanager-3717639837-gvwh4:40508]] Caused by: [dev-flink-taskmanager-3717639837-gvwh4: Name does not resolve] -> Name does not resolve.. So trying to ping on the pod hostname and it's not working Thus, ping on the pod's IP is working So, my question is : - Can we force usage of IPv4 over hostname resolution ? (will be better for perf also) - If no, do I need to had a service or something to make it work ? Best Regards, Bastien -- Bastien DINE Data Architect / Software Engineer / Sysadmin bastiendine.io
Re: Question about key group / key state & parallelism
Hi Hequn, thanks for your response ! Ok, that's what I was thinking about the key & operator instance If the affectation of key group to an instance is deterministic (and the hash of the key to belong to a key group) I have the following problem Let's say I have 4 key (A,B,C,D) & 2 parallel instance for my operator (1, 2). Flink determines that A/B belong 1 and C/D belong to 2. If I have a message keyed by A it will be processed by 1. But the following message is a B-key, it will wait for message A to be processed by 1 and then go to 1, even if 2 is not busy and can technically do the processing, right ? How can I deal with that ? Best Regard and many thanks ! Bastien -- Bastien DINE Data Architect / Software Engineer / Sysadmin bastiendine.io Le mer. 12 déc. 2018 à 13:39, Hequn Cheng a écrit : > Hi Bastien, > > Each key “belongs” to exactly one parallel instance of a keyed operator, > and each parallel instance contains one or more Key Groups. > Keys will be hashed into the corresponding key group deterministically. It > is hashed by the value instead of the number of the total records. > Different keys do not affect each other even a parallel instance contains > one or more Key Groups. > > Best, Hequn > > > On Wed, Dec 12, 2018 at 6:21 PM bastien dine > wrote: > >> Hello everyone, >> >> I have a question regarding the key state & parallelism of a process >> operation >> >> Doc says : "You can think of Keyed State as Operator State that has been >> partitioned, or sharded, with exactly one state-partition per key. Each >> keyed-state is logically bound to a unique composite of >> , and since each key “belongs” to exactly >> one parallel instance of a keyed operator, we can think of this simply as >> ." >> >> If I have less parallel operator instance (say 5) than my number of >> possible key (10), it means than every instance will "manage" 2 key state ? >> (is this spread evenly ?) >> Is the logical bound fixed ? I mean, are the state always managed by the >> same instance, or does this depends on the available instance at the moment >> ? >> >> "During execution each parallel instance of a keyed operator works with >> the keys for one or more Key Groups." >> -> this is related, does "works with the keys" means always the same keys >> ? >> >> Best Regards, >> Bastien >> >> -- >> >> Bastien DINE >> Data Architect / Software Engineer / Sysadmin >> bastiendine.io >> >
Question about key group / key state & parallelism
Hello everyone, I have a question regarding the key state & parallelism of a process operation Doc says : "You can think of Keyed State as Operator State that has been partitioned, or sharded, with exactly one state-partition per key. Each keyed-state is logically bound to a unique composite of , and since each key “belongs” to exactly one parallel instance of a keyed operator, we can think of this simply as ." If I have less parallel operator instance (say 5) than my number of possible key (10), it means than every instance will "manage" 2 key state ? (is this spread evenly ?) Is the logical bound fixed ? I mean, are the state always managed by the same instance, or does this depends on the available instance at the moment ? "During execution each parallel instance of a keyed operator works with the keys for one or more Key Groups." -> this is related, does "works with the keys" means always the same keys ? Best Regards, Bastien -- Bastien DINE Data Architect / Software Engineer / Sysadmin bastiendine.io
Re: Flink - Metric are not reported
Yea, that was I was thinking.. Batch can be quick Can I report metric on "added" action ? (i should override the notifyOnAddedMetric to report ?) ------ Bastien DINE Data Architect / Software Engineer / Sysadmin bastiendine.io Le mer. 28 nov. 2018 à 15:54, Chesnay Schepler a écrit : > How quick does job batch terminate? Metrics are unregistered once the job > ends; if the job duration is shorter than the report interval they may > never be exposed. > > On 28.11.2018 15:18, bastien dine wrote: > > Hello Chesnay, > > Thanks for your response ! > I have logs enable (info), slf4jReporter is working, I can see : > > 15:16:00.112 [Flink-MetricRegistry-thread-1] INFO > org.apache.flink.metrics.slf4j.Slf4jReporter - > === Starting metrics report > === > > -- Counters > --- > > -- Gauges > - > > > On both jobmanager & taskmanager, > *BUT* i see only system metrics, not my custom one.. > Am i missing something in declaration in my topology ? > > *Note* : I am using DataSet API (so my program is batch, and not > continuous) > > Regards, > Bastien > > -- > > Bastien DINE > Data Architect / Software Engineer / Sysadmin > bastiendine.io > > > Le mar. 27 nov. 2018 à 17:07, Chesnay Schepler a > écrit : > >> Please enable WARN logging and check for warnings by the SLF4JReporter >> and/or MetricQueryService. >> >> On 27.11.2018 17:00, bastien dine wrote: >> >> Hello everyone, >> >> Once again I require your help ! >> I am trying to report custom metric (see my code below) >> Yet, I do not see them anywhere.. nor in the metric tab from my tasks, >> nor in the rest API, nor in the declared slf4j reporter.. >> Can someone help me to debug this .. >> >> Here is my RichMap function : >> >> public class MetricGaugeRichMap> extends >> RichMapFunction { >> >> private transient T valueToExpose; >> private final String metricGroup; >> private final String metricName; >> >> >> public MetricGaugeRichMap(String metricGroup, String metricName) { >> this.metricGroup = metricGroup; >> this.metricName = metricName; >> } >> >> @Overridepublic void open(Configuration config) { >> getRuntimeContext() >> .getMetricGroup() >> .addGroup(metricGroup) >> .gauge(metricName, (Gauge) () -> valueToExpose); >> } >> >> @Overridepublic E map(E metricTuple) throws Exception { >> valueToExpose = metricTuple.getMetricValue(); >> return metricTuple; >> } >> } >> >> calling from topology : >> >> env.fromElements(new MetricTuple<>(metricGroup, metricName, metricValue)) >> .map(new MetricGaugeRichMap<>(metricGroup, metricName)) >> .output(new MetricGaugeOutputFormat<>()); // dummy output >> >> -- >> >> >> Bastien DINE >> Data Architect / Software Engineer / Sysadmin >> bastiendine.io >> >> >> >
Re: Flink - Metric are not reported
Hello Chesnay, Thanks for your response ! I have logs enable (info), slf4jReporter is working, I can see : 15:16:00.112 [Flink-MetricRegistry-thread-1] INFO org.apache.flink.metrics.slf4j.Slf4jReporter - === Starting metrics report === -- Counters --- -- Gauges - On both jobmanager & taskmanager, *BUT* i see only system metrics, not my custom one.. Am i missing something in declaration in my topology ? *Note* : I am using DataSet API (so my program is batch, and not continuous) Regards, Bastien -- Bastien DINE Data Architect / Software Engineer / Sysadmin bastiendine.io Le mar. 27 nov. 2018 à 17:07, Chesnay Schepler a écrit : > Please enable WARN logging and check for warnings by the SLF4JReporter > and/or MetricQueryService. > > On 27.11.2018 17:00, bastien dine wrote: > > Hello everyone, > > Once again I require your help ! > I am trying to report custom metric (see my code below) > Yet, I do not see them anywhere.. nor in the metric tab from my tasks, nor > in the rest API, nor in the declared slf4j reporter.. > Can someone help me to debug this .. > > Here is my RichMap function : > > public class MetricGaugeRichMap> extends > RichMapFunction { > > private transient T valueToExpose; > private final String metricGroup; > private final String metricName; > > > public MetricGaugeRichMap(String metricGroup, String metricName) { > this.metricGroup = metricGroup; > this.metricName = metricName; > } > > @Overridepublic void open(Configuration config) { > getRuntimeContext() > .getMetricGroup() > .addGroup(metricGroup) > .gauge(metricName, (Gauge) () -> valueToExpose); > } > > @Overridepublic E map(E metricTuple) throws Exception { > valueToExpose = metricTuple.getMetricValue(); > return metricTuple; > } > } > > calling from topology : > > env.fromElements(new MetricTuple<>(metricGroup, metricName, metricValue)) > .map(new MetricGaugeRichMap<>(metricGroup, metricName)) > .output(new MetricGaugeOutputFormat<>()); // dummy output > > -- > > > Bastien DINE > Data Architect / Software Engineer / Sysadmin > bastiendine.io > > >
Flink - Metric are not reported
Hello everyone, Once again I require your help ! I am trying to report custom metric (see my code below) Yet, I do not see them anywhere.. nor in the metric tab from my tasks, nor in the rest API, nor in the declared slf4j reporter.. Can someone help me to debug this .. Here is my RichMap function : public class MetricGaugeRichMap> extends RichMapFunction { private transient T valueToExpose; private final String metricGroup; private final String metricName; public MetricGaugeRichMap(String metricGroup, String metricName) { this.metricGroup = metricGroup; this.metricName = metricName; } @Override public void open(Configuration config) { getRuntimeContext() .getMetricGroup() .addGroup(metricGroup) .gauge(metricName, (Gauge) () -> valueToExpose); } @Override public E map(E metricTuple) throws Exception { valueToExpose = metricTuple.getMetricValue(); return metricTuple; } } calling from topology : env.fromElements(new MetricTuple<>(metricGroup, metricName, metricValue)) .map(new MetricGaugeRichMap<>(metricGroup, metricName)) .output(new MetricGaugeOutputFormat<>()); // dummy output -- Bastien DINE Data Architect / Software Engineer / Sysadmin bastiendine.io
Re: Multiple env.execute() into one Flink batch job
Oh god, if we have some code with Accumulator after the env.execute(), this will not be executed on the JobManager too ? Thanks, I would be interested indeed ! -- Bastien DINE Data Architect / Software Engineer / Sysadmin bastiendine.io Le ven. 23 nov. 2018 à 16:37, Flavio Pompermaier a écrit : > The problem is that the REST API block on env.execute. > If you want to run your Flink job you have to submit it using the CLI > client. > As a workaround we wrote a Spring REST API that to run a job open an SSH > connection to the job manager and execute the bin/flink run command.. > > If you're interested in I can share some code.. > > > > On Fri, Nov 23, 2018 at 4:32 PM bastien dine > wrote: > >> Hello, >> >> I need to chain processing in DataSet API, so I am launching severals >> jobs, with multiple env.execute() : >> >> topology1.define(); >> env.execute; >> >> topogy2.define(); >> env.execute; >> >> This is working fine when I am running it within IntellIiJ >> But when I am deploying it into my cluster, it only launch the first >> topology.. >> >> Could you please shed some light on this issue? >> >> Regards, >> Bastien >> > > >
Multiple env.execute() into one Flink batch job
Hello, I need to chain processing in DataSet API, so I am launching severals jobs, with multiple env.execute() : topology1.define(); env.execute; topogy2.define(); env.execute; This is working fine when I am running it within IntellIiJ But when I am deploying it into my cluster, it only launch the first topology.. Could you please shed some light on this issue? Regards, Bastien
Re: Call batch job in streaming context?
Hi Eric, You can run a job from another one, using the REST API This is the only way we have found to launch a batch job from a streaming job -- Bastien DINE Data Architect / Software Engineer / Sysadmin bastiendine.io Le ven. 23 nov. 2018 à 11:52, Piotr Nowojski a écrit : > Hi, > > I’m not sure if I understand your problem and your context, but spawning a > batch job every 45 seconds doesn’t sound as a that bad idea (as long as the > job is short). > > Another idea would be to incorporate this batch job inside your streaming > job, for example by reading from Cassandra using an AsyncIO operator: > > https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/asyncio.html > > Quick google search revealed for example this: > > > https://stackoverflow.com/questions/43067681/read-data-from-cassandra-for-processing-in-flink > > Piotrek > > > On 23 Nov 2018, at 10:33, eric hoffmann > wrote: > > > > Hi > > Is it possible to call batch job on a streaming context? > > what i want to do is: > > for a given input event, fetch cassandra elements based on event data, > apply transformation on them and apply a ranking when all elements fetched > by cassandra are processed. > > If i do this in batch mode i would have to submit a job on each events > and i can have an event every 45 seconds. > > Is there any alternative? can i start a batch job that will receive some > external request, process it and wait for another request? > > thx > > Eric > >
DataSet - Broadcast set in output format
Hello, I would like to use a broadcast variable in my outputformat (to pass some information, and control execution flow) How would I do it ? .output does not have a .withBroadcast function as it does not extends SingleInputUdfOperator -- Bastien DINE Data Architect / Software Engineer / Sysadmin bastiendine.io
Re: Metric on JobManager
Hi Jamie, thanks for your reponse.. Erm this will not be easy.. any idea on how to deal with the end time ? I can have some Runtime exception in my topology, so i would like to do it like : try { // Start time here env.execute() } catch (e: Expcetion) { } finally { // End time here } -- Bastien DINE Data Architect / Software Engineer / Sysadmin bastiendine.io Le mer. 21 nov. 2018 à 22:44, Jamie Grier a écrit : > What you're describing is not possible. There is no runtime context or > metrics you can use at that point. > > The best you can probably do (at least for start time) is just keep a flag > in your function and log a metric once and only once when it first starts > executing. > > On Wed, Nov 21, 2018 at 5:18 AM bastien dine > wrote: > >> Hello all, >> >> I am using metric to count some sutff in my topology, this is pretty easy >> with the metric API in getRuntimeContext in a Rich function >> However I would like to use this metric API to log start date & end date >> of my processing, but in the source code executed on the job manager (i.e >> not in a operator) before & after the env.execute.. >> How can i retrieve the runtime context, from the execution env maybe ? >> >> Regards, >> Bastien >> >> -- >> >> Bastien DINE >> Data Architect / Software Engineer / Sysadmin >> bastiendine.io >> >
Metric on JobManager
Hello all, I am using metric to count some sutff in my topology, this is pretty easy with the metric API in getRuntimeContext in a Rich function However I would like to use this metric API to log start date & end date of my processing, but in the source code executed on the job manager (i.e not in a operator) before & after the env.execute.. How can i retrieve the runtime context, from the execution env maybe ? Regards, Bastien -- Bastien DINE Data Architect / Software Engineer / Sysadmin bastiendine.io
Re: Counting DataSet in DataFlow
Hi Fabian, Thanks for the response, I am going to use the second solution ! Regards, Bastien -- Bastien DINE Data Architect / Software Engineer / Sysadmin bastiendine.io Le mer. 7 nov. 2018 à 14:16, Fabian Hueske a écrit : > Another option for certain tasks is to work with broadcast variables [1]. > The value could be use to configure two filters. > > DataSet input = > DataSet count = input.map(-> 1L).sum() > DataSet input.filter(if cnt == 0).withBroadcastSet("cnt", > count).doSomething > DataSet input.filter(if cnt != 0).withBroadcastSet("cnt", > count).doSomethingElse > > Fabian > > [1] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/batch/#broadcast-variables > > > Am Mi., 7. Nov. 2018 um 14:10 Uhr schrieb Fabian Hueske >: > >> Hi, >> >> Counting always requires a job to be executed. >> Not sure if this is what you want to do, but if you want to prevent to >> get an empty result due to an empty cross input, you can use a >> mapPartition() with parallelism 1 to emit a special record, in case the >> MapPartitionFunction didn't see any data. >> >> Best, Fabian >> >> Am Mi., 7. Nov. 2018 um 14:02 Uhr schrieb bastien dine < >> bastien.d...@gmail.com>: >> >>> Hello, >>> >>> I would like to a way to count a dataset to check if it is empty or >>> not.. But .count() throw an execution and I do not want to do separe job >>> execution plan, as hthis will trigger multiple reading.. >>> I would like to have something like.. >>> >>> Source -> map -> count -> if 0 -> do someting >>>if not -> do something >>> >>> >>> More concrete i would like to check if one of my dataset is empty before >>> doing a cross operation.. >>> >>> Thanks, >>> Bastien >>> >>> >>>
Counting DataSet in DataFlow
Hello, I would like to a way to count a dataset to check if it is empty or not.. But .count() throw an execution and I do not want to do separe job execution plan, as hthis will trigger multiple reading.. I would like to have something like.. Source -> map -> count -> if 0 -> do someting if not -> do something More concrete i would like to check if one of my dataset is empty before doing a cross operation.. Thanks, Bastien
Re: Flink - Process datastream in a bounded context (like Dataset) - Unifying stream & batch
Hi Hequn, Thanks for your response Yea I know about the table API, but I am searching a way to have a bounded context with a stream, somehow create a dataset from a buffer store in a window of datastream Regards, Bastien Le mar. 25 sept. 2018 à 14:50, Hequn Cheng a écrit : > Hi bastien, > > Flink features two relational APIs, the Table API and SQL. Both APIs are > unified APIs for batch and stream processing, i.e., queries are executed > with the same semantics on unbounded, real-time streams or bounded[1]. > There are also documents about Join[2]. > > Best, Hequn > [1] https://flink.apache.org/flink-applications.html#layered-apis > [2] > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql.html#joins > > On Tue, Sep 25, 2018 at 4:14 PM bastien dine > wrote: > >> Hello everyone, >> >> I need to join some files to perform some processing.. The dataset API is >> a perfect way to achieve this, I am able to do it when I read file in batch >> (csv) >> >> However in the prod environment, I will receive thoses files in kafka >> messages (one message = one line of a file) >> So I am considering using a global window + a custom trigger on a end of >> file message and a process window function. >> But I can not go too far with that as process is only one function and >> chaining functions will be a pain. I don't think that emitting a datastream >> & windows / trigger on EOF before every process function is a good idea >> >> However I would like to work in a bounded way once I received all of my >> elements (after the trigger on global window), like the dataset API, as I >> will join on my whole dataset.. >> >> I thought maybe it would be a good idea to go for table API and group >> window ? but you can not have custom trigger and a global group window on a >> table ?(like the global window on datastream ?) >> Best alternative would be to create a dataset as a result of my process >> window function.. but I don't think this is possible, is it ? >> >> Best Regards, >> Bastien >> >
Flink - Process datastream in a bounded context (like Dataset) - Unifying stream & batch
Hello everyone, I need to join some files to perform some processing.. The dataset API is a perfect way to achieve this, I am able to do it when I read file in batch (csv) However in the prod environment, I will receive thoses files in kafka messages (one message = one line of a file) So I am considering using a global window + a custom trigger on a end of file message and a process window function. But I can not go too far with that as process is only one function and chaining functions will be a pain. I don't think that emitting a datastream & windows / trigger on EOF before every process function is a good idea However I would like to work in a bounded way once I received all of my elements (after the trigger on global window), like the dataset API, as I will join on my whole dataset.. I thought maybe it would be a good idea to go for table API and group window ? but you can not have custom trigger and a global group window on a table ?(like the global window on datastream ?) Best alternative would be to create a dataset as a result of my process window function.. but I don't think this is possible, is it ? Best Regards, Bastien