Flink HA on Kubernetes - RPC port

2023-01-20 Thread bastien dine
Hello,

We are migrating our HA setup from ZK to K8S, and we have a question
regarding the RPC port.

Previously with ZK, the RPC connection config was the :
high-availability.jobmanager.port

We were expecting that the config will be the same with K8S HA, as the doc
says : "The port (range) used by the Flink Master for its RPC connections
in highly-available setups. In highly-available setups, this value is used
instead of 'jobmanager.rpc.port" (Flink 1.14)
Plus this is in #advanced-high-availability-options section

But it seems to use the common "jobmanager.rpc.port" (as
the high-availability.jobmanager.port is closed)

Does anyone know about this ?

Best Regards,
Bastien


DataSet API, chaining database access

2022-07-12 Thread bastien dine
Hello,
I am struggling with DataSet API,
I need to chain 2 db access, so I can easily do a source1 -> map2
The fact is that map should and can not be used for dbaccess, when the
request is taking too long, we have timeout in akka connection between TM &
JM
I know we can not chain sources : source1 -> source2
And doing just one source for 2 requests is very complicated, as we need to
do multiple requests (not just 2)
Doing 2 jobs with 2 .execute() is not possible too as I need those requests
to be done in the same flink job  (source1 -> execute -> source2 -> execute)
Can someone help me ?

*Regards,*
------

Bastien DINE
Data Architect / Software Engineer / Sysadmin
bastiendine.io


Re: New KafkaSource API : Change in default behavior regarding starting offset

2022-06-15 Thread bastien dine
Hello Martijn,

Thanks for the link to the release note, especially :
"When resuming from the savepoint, please use
setStartingOffsets(OffsetsInitializer.committedOffsets()) in the new
KafkaSourceBuilder to transfer the offsets to the new source."
So earliest is the new default
We use for sure  .committedOffsets - we have it by default in our custom
KafkaSource builder to be sure we do not read all the previous data
(earliest)

What bother me is just this change in starting offset default behavior from
FlinkKafkaConsumer to KafkaSource (this can lead to mistake)
In fact it happens that we drop some of our kafka source state to read
again from kafka committed offset, but maybe nodoby does that ^^

Anyway thanks for the focus on the release note !

Best Regards,

------

Bastien DINE
Data Architect / Software Engineer / Sysadmin
bastiendine.io


Le mer. 15 juin 2022 à 10:58, Martijn Visser  a
écrit :

> Hi Bastien,
>
> When the FlinkKafkaConsumer was deprecated in 1.14.0, the release notes
> included the instruction how to migrate from FlinkKafkaConsumer to
> KafkaConsumer [1]. Looking at the Kafka documentation [2], there is a
> section on how to upgrade to the latest connector version that I think is
> outdated. I'm leaning towards copying the migration instructions to the
> generic documentation. Do you think that would have sufficed?
>
> Best regards,
>
> Martijn
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/release-notes/flink-1.14/#deprecate-flinkkafkaconsumer
> [2]
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#upgrading-to-the-latest-connector-version
>
> Op wo 15 jun. 2022 om 09:22 schreef bastien dine :
>
>> Hello jing,
>>
>> This was the previous method in old Kafka consumer API, it has been
>> removed in 1.15, so source code is not in master anymore,
>> Yes I know for the new Offset initializer, committed offset + earliest as
>> fallback can be used to have the same behavior as before
>> I just wanted to know whether this is a changed behavior or I am missing
>> something
>>
>>
>>
>> Bastien DINE
>> Freelance
>> Data Architect / Software Engineer / Sysadmin
>> http://bastiendine.io
>>
>>
>>
>> Le mar. 14 juin 2022 à 23:08, Jing Ge  a écrit :
>>
>>> Hi Bastien,
>>>
>>> Thanks for asking. I didn't find any call of setStartFromGroupOffsets() 
>>> within
>>> Flink in the master branch. Could you please point out the code that
>>> committed offset is used as default?
>>>
>>> W.r.t. the new KafkaSource, if OffsetsInitializer.committedOffsets()
>>> is used, an exception will be thrown at runtime in case there is no
>>> committed offset, which is useful if the user is intended to read from the
>>> committed offset but something is wrong. It might feel weird if it is used
>>> as default, because an exception will be thrown when users start new jobs
>>> with default settings.
>>>
>>> Best regards,
>>> Jing
>>>
>>> On Tue, Jun 14, 2022 at 4:15 PM bastien dine 
>>> wrote:
>>>
>>>> Hello everyone,
>>>>
>>>> Does someone know why the starting offset behaviour has changed in the
>>>> new Kafka Source ?
>>>>
>>>> This is now from earliest (code in KafkaSourceBuilder), doc says :
>>>> "If offsets initializer is not specified, OffsetsInitializer.earliest() 
>>>> will
>>>> be used by default." from :
>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#starting-offset
>>>>
>>>> Before in old FlinkKafkaConsumer it was from committed offset (i.e : 
>>>> setStartFromGroupOffsets()
>>>> method)
>>>>
>>>> which match with this behaviour in new KafkaSource :   :
>>>> OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST
>>>>
>>>> This change can lead to big troubles if user pay no attention to this
>>>> point when migrating from old KafkaConsumer to new KafkaSource,
>>>>
>>>> Regards,
>>>> Bastien
>>>>
>>>> --
>>>>
>>>> Bastien DINE
>>>> Data Architect / Software Engineer / Sysadmin
>>>> bastiendine.io
>>>>
>>>


Re: New KafkaSource API : Change in default behavior regarding starting offset

2022-06-15 Thread bastien dine
Hello jing,

This was the previous method in old Kafka consumer API, it has been removed
in 1.15, so source code is not in master anymore,
Yes I know for the new Offset initializer, committed offset + earliest as
fallback can be used to have the same behavior as before
I just wanted to know whether this is a changed behavior or I am missing
something



Bastien DINE
Freelance
Data Architect / Software Engineer / Sysadmin
http://bastiendine.io



Le mar. 14 juin 2022 à 23:08, Jing Ge  a écrit :

> Hi Bastien,
>
> Thanks for asking. I didn't find any call of setStartFromGroupOffsets() within
> Flink in the master branch. Could you please point out the code that
> committed offset is used as default?
>
> W.r.t. the new KafkaSource, if OffsetsInitializer.committedOffsets()
> is used, an exception will be thrown at runtime in case there is no
> committed offset, which is useful if the user is intended to read from the
> committed offset but something is wrong. It might feel weird if it is used
> as default, because an exception will be thrown when users start new jobs
> with default settings.
>
> Best regards,
> Jing
>
> On Tue, Jun 14, 2022 at 4:15 PM bastien dine 
> wrote:
>
>> Hello everyone,
>>
>> Does someone know why the starting offset behaviour has changed in the
>> new Kafka Source ?
>>
>> This is now from earliest (code in KafkaSourceBuilder), doc says :
>> "If offsets initializer is not specified, OffsetsInitializer.earliest() will
>> be used by default." from :
>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#starting-offset
>>
>> Before in old FlinkKafkaConsumer it was from committed offset (i.e : 
>> setStartFromGroupOffsets()
>> method)
>>
>> which match with this behaviour in new KafkaSource :   :
>> OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST
>>
>> This change can lead to big troubles if user pay no attention to this
>> point when migrating from old KafkaConsumer to new KafkaSource,
>>
>> Regards,
>> Bastien
>>
>> --
>>
>> Bastien DINE
>> Data Architect / Software Engineer / Sysadmin
>> bastiendine.io
>>
>


New KafkaSource API : Change in default behavior regarding starting offset

2022-06-14 Thread bastien dine
Hello everyone,

Does someone know why the starting offset behaviour has changed in the new
Kafka Source ?

This is now from earliest (code in KafkaSourceBuilder), doc says :
"If offsets initializer is not specified, OffsetsInitializer.earliest() will
be used by default." from :
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#starting-offset

Before in old FlinkKafkaConsumer it was from committed offset (i.e :
setStartFromGroupOffsets()
method)

which match with this behaviour in new KafkaSource :   : OffsetsInitializer.
committedOffsets(OffsetResetStrategy.EARLIEST

This change can lead to big troubles if user pay no attention to this point
when migrating from old KafkaConsumer to new KafkaSource,

Regards,
Bastien

------

Bastien DINE
Data Architect / Software Engineer / Sysadmin
bastiendine.io


Re: question on dataSource.collect() on reading states from a savepoint file

2022-02-10 Thread bastien dine
I haven't used s3 with Flink, but according to this doc :
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/filesystems/s3/
You can setup pretty easily s3 and use it with s3://path/to/your/file with
a write sink
The page talk about DataStream but it should work with DataSet (
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/dataset/overview/#data-sinks
)

Maybe someone else will have more information about s3 dataset sink

Regards,


Le jeu. 10 févr. 2022 à 20:52, Antonio Si  a écrit :

> Thanks Bastien. Can you point to an example of using a sink as we are
> planning to write to S3?
>
> Thanks again for your help.
>
> Antonio.
>
> On Thu, Feb 10, 2022 at 11:49 AM bastien dine 
> wrote:
>
>> Hello Antonio,
>>
>> .collect() method should be use with caution as it's collecting the
>> DataSet (multiple partitions on multiple TM) into a List single list on JM
>> (so in memory)
>> Unless you have a lot of RAM, you can not use it this way and you
>> probably should not
>> I recommend you to use a sink to print it into a formatted file instead
>> (like CSV one) or if it's too big, into something splittable
>>
>> Regards,
>> Bastien
>>
>> --
>>
>> Bastien DINE
>> Data Architect / Software Engineer / Sysadmin
>> bastiendine.io
>>
>>
>> Le jeu. 10 févr. 2022 à 20:32, Antonio Si  a
>> écrit :
>>
>>> Hi,
>>>
>>> I am using the stateful processing api to read the states from a
>>> savepoint file.
>>> It works fine when the state size is small, but when the state size is
>>> larger, around 11GB, I am getting an OOM. I think it happens when it is
>>> doing a dataSource.collect() to obtain the states. The stackTrace is copied
>>> at the end of the message.
>>>
>>> Any suggestions or hints would be very helpful.
>>>
>>> Thanks in advance.
>>>
>>> Antonio.
>>>
>>> java.lang.OutOfMemoryError: null
>>> at
>>> java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
>>> ~[?:1.8.0_282]
>>> at
>>> java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
>>> ~[?:1.8.0_282]
>>> at
>>> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
>>> ~[?:1.8.0_282]
>>> at
>>> java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
>>> ~[?:1.8.0_282]
>>> at
>>> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
>>> ~[?:1.8.0_282]
>>> at
>>> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
>>> ~[?:1.8.0_282]
>>> at
>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
>>> ~[?:1.8.0_282]
>>> at
>>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>>> ~[?:1.8.0_282]
>>> at
>>> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:624)
>>> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>>> at
>>> org.apache.flink.util.SerializedValue.(SerializedValue.java:62)
>>> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>>> at
>>> org.apache.flink.runtime.executiongraph.ExecutionGraph.serializeAccumulator(ExecutionGraph.java:806)
>>> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>>> at
>>> org.apache.flink.runtime.executiongraph.ExecutionGraph.lambda$getAccumulatorsSerialized$0(ExecutionGraph.java:795)
>>> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>>> at
>>> java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321)
>>> ~[?:1.8.0_282]
>>> at
>>> java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
>>> ~[?:1.8.0_282]
>>> at
>>> java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1699)
>>> ~[?:1.8.0_282]
>>> at
>>> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
>>> ~[?:1.8.0_282]
>>> at
>>> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
>>> ~[?:1.8.0_282]
>>> at
>>> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
>>> ~[?:1.8.0_282]
>>> at
>>> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>>> ~[?:1.8.0_282]
>>> 

Re: question on dataSource.collect() on reading states from a savepoint file

2022-02-10 Thread bastien dine
Hello Antonio,

.collect() method should be use with caution as it's collecting the DataSet
(multiple partitions on multiple TM) into a List single list on JM (so in
memory)
Unless you have a lot of RAM, you can not use it this way and you probably
should not
I recommend you to use a sink to print it into a formatted file instead
(like CSV one) or if it's too big, into something splittable

Regards,
Bastien

--

Bastien DINE
Data Architect / Software Engineer / Sysadmin
bastiendine.io


Le jeu. 10 févr. 2022 à 20:32, Antonio Si  a écrit :

> Hi,
>
> I am using the stateful processing api to read the states from a savepoint
> file.
> It works fine when the state size is small, but when the state size is
> larger, around 11GB, I am getting an OOM. I think it happens when it is
> doing a dataSource.collect() to obtain the states. The stackTrace is copied
> at the end of the message.
>
> Any suggestions or hints would be very helpful.
>
> Thanks in advance.
>
> Antonio.
>
> java.lang.OutOfMemoryError: null
> at
> java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
> ~[?:1.8.0_282]
> at
> java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
> ~[?:1.8.0_282]
> at
> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> ~[?:1.8.0_282]
> at
> java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
> ~[?:1.8.0_282]
> at
> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
> ~[?:1.8.0_282]
> at
> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
> ~[?:1.8.0_282]
> at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
> ~[?:1.8.0_282]
> at
> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> ~[?:1.8.0_282]
> at
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:624)
> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
> at
> org.apache.flink.util.SerializedValue.(SerializedValue.java:62)
> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
> at
> org.apache.flink.runtime.executiongraph.ExecutionGraph.serializeAccumulator(ExecutionGraph.java:806)
> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
> at
> org.apache.flink.runtime.executiongraph.ExecutionGraph.lambda$getAccumulatorsSerialized$0(ExecutionGraph.java:795)
> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
> at
> java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321)
> ~[?:1.8.0_282]
> at
> java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
> ~[?:1.8.0_282]
> at
> java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1699)
> ~[?:1.8.0_282]
> at
> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> ~[?:1.8.0_282]
> at
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> ~[?:1.8.0_282]
> at
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
> ~[?:1.8.0_282]
> at
> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> ~[?:1.8.0_282]
> at
> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
> ~[?:1.8.0_282]
> at
> org.apache.flink.runtime.executiongraph.ExecutionGraph.getAccumulatorsSerialized(ExecutionGraph.java:792)
> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
> at
> org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph.createFrom(ArchivedExecutionGraph.java:325)
> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
> at
> org.apache.flink.runtime.scheduler.SchedulerBase.requestJob(SchedulerBase.java:810)
> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
> at
> org.apache.flink.runtime.jobmaster.JobMaster.jobStatusChanged(JobMaster.java:1085)
> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
> at
> org.apache.flink.runtime.jobmaster.JobMaster.access$2200(JobMaster.java:131)
> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
> at
> org.apache.flink.runtime.jobmaster.JobMaster$JobManagerJobStatusListener.lambda$jobStatusChanges$0(JobMaster.java:1356)
> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
> at
> org.apache.flink.runtime.rpc.akka.

Re: Flink High-Availability and Job-Manager recovery

2022-02-04 Thread bastien dine
Hello,

On k8s the current recommendation is to set up 1 job manager with H-A
enabled, so that cluster do not lost state upon crash

1. The storage dir can for sure be on kube PV, the directory should be
shared within all JM, you will need to map the volume to the same local
directory (e.g /data) so that the configuration amongst JM is the same
2. You can have only 1 JM, but you still need to enabled HA, since HA will
write the cluster state into ZK & storage dir
3. I don't know anything about beam, so I can not help you with that,
But per-job mode will not be available on k8s (neither native nor
standalone kube)
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/resource-providers/standalone/kubernetes/#per-job-mode
&
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/#per-job-mode,
you will need YARN to do so (i think MESOS is deprecated)
Application mode can be a bit tricky to understand, it will "move" the
submit of the job inside the JM
The chosen solution will depends on your deployment needs, I can't tell you
without knowing more,
But going into session mode + streaming job deployment is pretty standard
and you can easily emulate "one cluster per job" with it (better for ops &
tuning matters than a cluster with multiple jobs)

Hope this can help,
Regards,
Bastien


Le jeu. 3 févr. 2022 à 15:12, Koffman, Noa (Nokia - IL/Kfar Sava) <
noa.koff...@nokia.com> a écrit :

> Hi all,
>
> We are currently deploying flink on k8s 3 nodes cluster - with 1
> job-manager and 3 task managers
>
> We are trying to understand the recommendation for deployment, more
> specifically for recovery from job-manager failure, and have some questions
> about that:
>
>
>
>1. If we use flink HA solution (either Kubernetes-HA or zookeeper),
>the documentation states we should define the ‘high-availability.storageDir
>
> In the examples we found, there is mostly hdfs or s3 storage.
>
> We were wondering if we could use Kubernetes PersistentVolumes and
> PersistentVolumeClaims, if we do use that, can each job-manager have its
> own volume? Or it must be shared?
>
>1. Is there a solution for jobmanager recovery without HA? With the
>way our flink is currenly configured, killing the job-manager pod, all the
>jobs are lost.
>
> Is there a way to configure the job-manager so that if it goes down and
> k8s restarts it, it will continue from the same state (restart all the
> tasks, etc…)?
>
> For this, can a Persistent Volume be used, without HDFS or external
> solutions?
>
>1. Regarding the deployment mode: we are working with beam + flink,
>and flink is running in session mode, we have a few long running streaming
>pipelines deployed (less then 10).
>
> Is ‘session’ mode the right deployment mode for our type of deployment? Or
> should we consider switching to something different? (Per-job/application)
>
>
>
> Thanks
>
>
>
>
>
>
>
>
>


Re: Pojo State Migration - NPE with field deletion

2022-02-03 Thread bastien dine
Thanks for the JIRA ticket,


This is for sure pretty critical.
The "workaround" is to not remove the field but I am not sure if this is
acceptable :)

I could work on that, but someone need to point out to me where to start,

Do I work on the PojoSerializer, to make this case not throwing an
exception ?

Or do I try to find the root cause, namely why the field serializer of the
deleted field is still present ?

Regards,
------

Bastien DINE
Data Architect / Software Engineer / Sysadmin
bastiendine.io


Le mer. 2 févr. 2022 à 16:37, Alexis Sarda-Espinosa <
alexis.sarda-espin...@microfocus.com> a écrit :

> Hello,
>
>
>
> Happened to me too, here’s the JIRA ticket:
> https://issues.apache.org/jira/browse/FLINK-21752
>
>
>
> Regards,
>
> Alexis.
>
>
>
> *From:* bastien dine 
> *Sent:* Mittwoch, 2. Februar 2022 16:01
> *To:* user 
> *Subject:* Pojo State Migration - NPE with field deletion
>
>
>
> Hello,
>
>
>
> I have some trouble restoring a state (pojo) after deleting a field
>
> According to documentation, it should not be a problem with POJO :
>
> *"**Fields can be removed. Once removed, the previous value for the
> removed field will be dropped in future checkpoints and savepoints."*
>
>
>
> Here is a short stack trace (full trace is below) :
>
>
>
> Caused by: java.lang.NullPointerException
>
> at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.(
> PojoSerializer.java:119)
>
> at org.apache.flink.api.java.typeutils.runtime.PojoSerializer
> .duplicate(PojoSerializer.java:184)
>
> at org.apache.flink.api.java.typeutils.runtime.PojoSerializer
> .duplicate(PojoSerializer.java:56)
>
> at org.apache.flink.streaming.runtime.streamrecord.
> StreamElementSerializer.duplicate(StreamElementSerializer.java:83)
>
> at org.apache.flink.streaming.runtime.streamrecord.
> StreamElementSerializer.duplicate(StreamElementSerializer.java:46)
>
>
>
>
>
> After some debug, it seems that the deleted POJO field still has a field
> serializer in the corresponding object PojoSerializer "fieldSerializers"
> array
>
> But it is not present in the "fields", where we have a gap of 1 index (for
> example 0-1-3-4)
>
> So when serializer reach index 2 we got this NPE,
>
>
>
> Why is the deleted field serializer still present ? this should have been
> dropped when resolving schema compatibility right ?
>
> I can not find anything on that matter, could someone help me with it ?
>
> Reproduced in flink 1.13 & 1.14, can not find any related JIRA too
>
>
>
> Best Regards,
>
> Bastien
>
>
> Full stack trace :
>
> 2022-02-02 15:44:20
>
> java.io.IOException: Could not perform checkpoint 2737490 for operator
> OperatorXXX
>
> at org.apache.flink.streaming.runtime.tasks.StreamTask
> .triggerCheckpointOnBarrier(StreamTask.java:1274)
>
> at org.apache.flink.streaming.runtime.io.checkpointing.
> CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:
> 147)
>
> at org.apache.flink.streaming.runtime.io.checkpointing.
> SingleCheckpointBarrierHandler.triggerCheckpoint(
> SingleCheckpointBarrierHandler.java:287)
>
> at org.apache.flink.streaming.runtime.io.checkpointing.
> SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler
> .java:64)
>
> at org.apache.flink.streaming.runtime.io.checkpointing.
> SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(
> SingleCheckpointBarrierHandler.java:493)
>
> at org.apache.flink.streaming.runtime.io.checkpointing.
> AbstractAlignedBarrierHandlerState.triggerGlobalCheckpoint(
> AbstractAlignedBarrierHandlerState.java:74)
>
> at org.apache.flink.streaming.runtime.io.checkpointing.
> AbstractAlignedBarrierHandlerState.barrierReceived(
> AbstractAlignedBarrierHandlerState.java:66)
>
> at org.apache.flink.streaming.runtime.io.checkpointing.
> SingleCheckpointBarrierHandler.lambda$processBarrier$2(
> SingleCheckpointBarrierHandler.java:234)
>
> at org.apache.flink.streaming.runtime.io.checkpointing.
> SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(
> SingleCheckpointBarrierHandler.java:262)
>
> at org.apache.flink.streaming.runtime.io.checkpointing.
> SingleCheckpointBarrierHandler.processBarrier(
> SingleCheckpointBarrierHandler.java:231)
>
> at org.apache.flink.streaming.runtime.io.checkpointing.
> CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)
>
> at org.apache.flink.streaming.runtime.io.checkpointing.
> CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159)
>
> at

Pojo State Migration - NPE with field deletion

2022-02-02 Thread bastien dine
Hello,

I have some trouble restoring a state (pojo) after deleting a field
According to documentation, it should not be a problem with POJO :
*"Fields can be removed. Once removed, the previous value for the removed
field will be dropped in future checkpoints and savepoints."*

Here is a short stack trace (full trace is below) :

Caused by: java.lang.NullPointerException
at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.(
PojoSerializer.java:119)
at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.duplicate(
PojoSerializer.java:184)
at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.duplicate(
PojoSerializer.java:56)
at org.apache.flink.streaming.runtime.streamrecord.
StreamElementSerializer.duplicate(StreamElementSerializer.java:83)
at org.apache.flink.streaming.runtime.streamrecord.
StreamElementSerializer.duplicate(StreamElementSerializer.java:46)


After some debug, it seems that the deleted POJO field still has a field
serializer in the corresponding object PojoSerializer "fieldSerializers"
array
But it is not present in the "fields", where we have a gap of 1 index (for
example 0-1-3-4)
So when serializer reach index 2 we got this NPE,

Why is the deleted field serializer still present ? this should have been
dropped when resolving schema compatibility right ?
I can not find anything on that matter, could someone help me with it ?
Reproduced in flink 1.13 & 1.14, can not find any related JIRA too

Best Regards,
Bastien

Full stack trace :

2022-02-02 15:44:20
java.io.IOException: Could not perform checkpoint 2737490 for operator
OperatorXXX
at org.apache.flink.streaming.runtime.tasks.StreamTask
.triggerCheckpointOnBarrier(StreamTask.java:1274)
at org.apache.flink.streaming.runtime.io.checkpointing.
CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147)
at org.apache.flink.streaming.runtime.io.checkpointing.
SingleCheckpointBarrierHandler.triggerCheckpoint(
SingleCheckpointBarrierHandler.java:287)
at org.apache.flink.streaming.runtime.io.checkpointing.
SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler
.java:64)
at org.apache.flink.streaming.runtime.io.checkpointing.
SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(
SingleCheckpointBarrierHandler.java:493)
at org.apache.flink.streaming.runtime.io.checkpointing.
AbstractAlignedBarrierHandlerState.triggerGlobalCheckpoint(
AbstractAlignedBarrierHandlerState.java:74)
at org.apache.flink.streaming.runtime.io.checkpointing.
AbstractAlignedBarrierHandlerState.barrierReceived(
AbstractAlignedBarrierHandlerState.java:66)
at org.apache.flink.streaming.runtime.io.checkpointing.
SingleCheckpointBarrierHandler.lambda$processBarrier$2(
SingleCheckpointBarrierHandler.java:234)
at org.apache.flink.streaming.runtime.io.checkpointing.
SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(
SingleCheckpointBarrierHandler.java:262)
at org.apache.flink.streaming.runtime.io.checkpointing.
SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler
.java:231)
at org.apache.flink.streaming.runtime.io.checkpointing.
CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)
at org.apache.flink.streaming.runtime.io.checkpointing.
CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput
.emitNext(AbstractStreamTaskNetworkInput.java:110)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
StreamTask.java:496)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
.runMailboxLoop(MailboxProcessor.java:203)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(
StreamTask.java:809)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
.java:761)
at org.apache.flink.runtime.taskmanager.Task
.runWithSystemExitMonitoring(Task.java:958)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:
937)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could
not complete snapshot 2737490 for operator OperatorXXX
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler
.snapshotState(StreamOperatorStateHandler.java:265)
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler
.snapshotState(StreamOperatorStateHandler.java:170)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator
.snapshotState(AbstractStreamOperator.java:348)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain

Re: Problem when restoring from savepoint with missing state & POJO modification

2020-12-09 Thread bastien dine
Hello Yun,
Thank you very much for your response, that's what I thought,
However, it does not seem possible to remove only one state using the state
processor API,
We use it a lot, and we can only remove all of the operator states, not one
specifically,
Am I missing something?

Best Regards,
Bastien

--

Bastien DINE
Data Architect / Software Engineer / Sysadmin
bastiendine.io


Le mar. 8 déc. 2020 à 08:54, Yun Tang  a écrit :

> Hi Bastien,
>
> Flink supports to register state via state descriptor when
> calling runtimeContext.getState(). However, once the state is registered,
> it cannot be removed anymore. And when you restore from savepoint, the
> previous state is registered again [1]. Flink does not to drop state
> directly and you could use state processor API [2] to remove related state.
>
>
> [1]
> https://github.com/apache/flink/blob/d94c7a451d22f861bd3f79435f777b427020eba0/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/AbstractRocksDBRestoreOperation.java#L171
> [2]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
>
> ------
> *From:* bastien dine 
> *Sent:* Tuesday, December 8, 2020 0:28
> *To:* user 
> *Subject:* Problem when restoring from savepoint with missing state &
> POJO modification
>
> Hello,
> We have experienced some weird issues with POJO mapState in a streaming
> job upon checkpointing when removing state, then modifying the state POJO
> and restoring job
>
> Caused by: java.lang.NullPointerException
> at
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.(PojoSerializer.java:123)
> at
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.duplicate(PojoSerializer.java:186)
>
> Reproduced in Flink 1.10 & 1.11
> (full stack below)
>
>
> *Context : *We have a streaming job with a state name "buffer" and POJO
> Buffer inside a CoFlatMap function
>
> MyCoFlat:
> *public class MyCoFlat extends RichCoFlatMapFunction {*
> *transient MapState buffer;*
> *@Override*
> *public void open(Configuration parameters) {*
> *buffer = getRuntimeContext().getMapState(new
> MapStateDescriptor<>("buffer", String.class, Buffer.class));*
>
> *} *
>
> Buffer :
>
>
>
>
> *public class Buffer { private String field1; private String field2;
> private String field3; ... + empty constructor  + getter / setter for POJO
> consideration*
>
> We had some troubles with our job, so we rework 2 things :
>  - we removed field2 in Buffer class,
>  - we stopped using "buffer" state anymore
>
> When restoring with savepoint (--allowNonRestoredState) we have the
> exception below
> The job is submitted to the cluster but fails on checkpointing, job is
> totally stuck.
>
>
> *Debug: *Debugging showed us some stuff, the exception is raised here (as
> expected):
>
>
>
>
>
>
>
>
>
>
>
>
>
> *public PojoSerializer( Class clazz, TypeSerializer[]
> fieldSerializers, Field[] fields, ExecutionConfig executionConfig) {
> this.clazz = checkNotNull(clazz); this.fieldSerializers =
> (TypeSerializer[]) checkNotNull(fieldSerializers); this.fields =
> checkNotNull(fields); this.numFields = fieldSerializers.length;
> this.executionConfig = checkNotNull(executionConfig); for (int i = 0; i <
> numFields; i++) { this.fields[i].setAccessible(true); < HERE }*
>
> In our fields, we have field[0] & field[2] but field[1] is totally missing
> from the array, that's why we have the NPE over here, when i=1
>
> So what we have done is to put this state back in our streaming job (with
> the missing field and POJO), redeploy with old savepoint and this went
> totally fine
> Then we have redeploy a job without this state
> This has been a 2 times deployment for our job (1 -> modify the POJO, 2 ->
> remove the state using this POJO)
> But the non-used-anymore state is still (at least the serializer) in the
> savepoints, we could be facing this problem again when we will
> modify Buffer POJO later.
> Finally we just modify a savepoint with API and remove this state once for
> all, and restart from it.
>
> I have a couple of questions here:
> Why does flink keep a non-used state in a savepoint (even if it can not
> map it into a new topology and allowNonRestoredState is checked ?)
> Why does flink not handle this case ? Behaviour seems to be different
> between an existing POJO state and this non used POJO state
> How can I clean my savepoint ? I don't want them to contain non-used state
>
> If anybody has experienced an issue like that before or knows how to
> handle this, I would be glad to discuss !
> Best regards,
>
> --
>
> Bastien DINE
> Data Architect / Software Engineer / Sysadmin
>


Problem when restoring from savepoint with missing state & POJO modification

2020-12-07 Thread bastien dine
Hello,
We have experienced some weird issues with POJO mapState in a streaming job
upon checkpointing when removing state, then modifying the state POJO and
restoring job

Caused by: java.lang.NullPointerException
at
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.(PojoSerializer.java:123)
at
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.duplicate(PojoSerializer.java:186)

Reproduced in Flink 1.10 & 1.11
(full stack below)


*Context :* We have a streaming job with a state name "buffer" and POJO
Buffer inside a CoFlatMap function

MyCoFlat:
* public class MyCoFlat extends RichCoFlatMapFunction {*
* transient MapState buffer;*
* @Override*
* public void open(Configuration parameters) {*
* buffer = getRuntimeContext().getMapState(new
MapStateDescriptor<>("buffer", String.class, Buffer.class));*

* } *

Buffer :




* public class Buffer { private String field1; private String field2;
private String field3; ... + empty constructor  + getter / setter for POJO
consideration*

We had some troubles with our job, so we rework 2 things :
 - we removed field2 in Buffer class,
 - we stopped using "buffer" state anymore

When restoring with savepoint (--allowNonRestoredState) we have the
exception below
The job is submitted to the cluster but fails on checkpointing, job is
totally stuck.


*Debug:*Debugging showed us some stuff, the exception is raised here (as
expected):













* public PojoSerializer( Class clazz, TypeSerializer[]
fieldSerializers, Field[] fields, ExecutionConfig executionConfig) {
this.clazz = checkNotNull(clazz); this.fieldSerializers =
(TypeSerializer[]) checkNotNull(fieldSerializers); this.fields =
checkNotNull(fields); this.numFields = fieldSerializers.length;
this.executionConfig = checkNotNull(executionConfig); for (int i = 0; i <
numFields; i++) { this.fields[i].setAccessible(true); < HERE }*

In our fields, we have field[0] & field[2] but field[1] is totally missing
from the array, that's why we have the NPE over here, when i=1

So what we have done is to put this state back in our streaming job (with
the missing field and POJO), redeploy with old savepoint and this went
totally fine
Then we have redeploy a job without this state
This has been a 2 times deployment for our job (1 -> modify the POJO, 2 ->
remove the state using this POJO)
But the non-used-anymore state is still (at least the serializer) in the
savepoints, we could be facing this problem again when we will
modify Buffer POJO later.
Finally we just modify a savepoint with API and remove this state once for
all, and restart from it.

I have a couple of questions here:
Why does flink keep a non-used state in a savepoint (even if it can not map
it into a new topology and allowNonRestoredState is checked ?)
Why does flink not handle this case ? Behaviour seems to be different
between an existing POJO state and this non used POJO state
How can I clean my savepoint ? I don't want them to contain non-used state

If anybody has experienced an issue like that before or knows how to handle
this, I would be glad to discuss !
Best regards,

--

Bastien DINE
Data Architect / Software Engineer / Sysadmin


Re: Keyed raw state - example

2019-11-18 Thread bastien dine
Hello Congxian,

Thanks for your response,
Don't you have an example with an Operator extending the
AbstractUdfStreamOperator?
Using the context.getRawKeyedStateInputs() (& output to snapshots)

TimeService is reimplementing the whole stuff :/

--

Bastien DINE
Data Architect / Software Engineer / Sysadmin
bastiendine.io


Le lun. 18 nov. 2019 à 03:19, Congxian Qiu  a
écrit :

> Hi
>Currently, I think you can ref the implementation of timerservice[1]
> which used the raw keyed state, the snapshot happens in
> AbstractStreamOperator#snapshotState(), for using Raw State you need to
> implement a new operator[2]. There is an issue wants to give some example
> for raw state[2]
>
> [1]
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java
> [2]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#raw-and-managed-state
> [3] https://issues.apache.org/jira/browse/FLINK-14379
>
> Best,
> Congxian
>
>
> bastien dine  于2019年11月16日周六 上午5:57写道:
>
>> Hello everyone,
>>
>> I would like to know if anybody has a working example on how to declare a
>> keyed raw state ( in my case a keyedprocessoperator) and how to use  it in
>> my UDF (keyedprocessfunction)?
>>
>> Basicaly we have a huge problem with a ValueState w Rocksdb, getting
>> serialized for every element ( need to access it and update) so it's taking
>> a crazy amount of time and we would like to have it serialized only on
>> snapshot, so using Raw state is a possible good solution,
>> But i cannot find anyexample of it :/
>>
>> Thanks and best regards,
>>
>> Bastien DINE
>> Freelance
>> Data Architect / Software Engineer / Sysadmin
>> http://bastiendine.io
>>
>>
>>
>


Keyed raw state - example

2019-11-15 Thread bastien dine
Hello everyone,

I would like to know if anybody has a working example on how to declare a
keyed raw state ( in my case a keyedprocessoperator) and how to use  it in
my UDF (keyedprocessfunction)?

Basicaly we have a huge problem with a ValueState w Rocksdb, getting
serialized for every element ( need to access it and update) so it's taking
a crazy amount of time and we would like to have it serialized only on
snapshot, so using Raw state is a possible good solution,
But i cannot find anyexample of it :/

Thanks and best regards,

Bastien DINE
Freelance
Data Architect / Software Engineer / Sysadmin
http://bastiendine.io


Async operator with a KeyedStream

2019-10-31 Thread bastien dine
Hello,

I would like to know if you can use a KeyedStream with the Async operator :
I want to use the async operator to insert some stuff in my database but I
want to limit 1 request per element (with key=id) at a time
With a regular keyBy / map, it's working, but it's too slow (i don't have
enough ressources to increase my parallelism),

As far as I have seen, this is not possible
When I write something like
Async.orderedWait(myStream.keyBy(myKeyselector)), the keyBy is totally
ignored

Have you a solution for this?

Best Regards,
Bastien

--

Bastien DINE
Data Architect / Software Engineer / Sysadmin
bastiendine.io


NullPointerException on StreamTask

2019-05-27 Thread bastien dine
Hello eveyrone,

I am experiencing random NullPointerException since I upgrade 1.8
Cannot find anything related on the JIRA, have someone an explanation ? or
maybe can point me in the right direction to check some stuff ?
My source is a KafkaProducer

Stacktrace :

java.lang.NullPointerException
at
org.apache.flink.streaming.runtime.tasks.StreamTask.createRecordWriters(StreamTask.java:1175)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.(StreamTask.java:212)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.(StreamTask.java:190)
at
org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.(TwoInputStreamTask.java:55)
at sun.reflect.GeneratedConstructorAccessor29.newInstance(Unknown
Source)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at
org.apache.flink.runtime.taskmanager.Task.loadAndInstantiateInvokable(Task.java:1405)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:689)
at java.lang.Thread.run(Thread.java:748)

--

Bastien DINE
Data Architect / Software Engineer / Sysadmin
bastiendine.io


Re: Job cluster and HA

2019-05-25 Thread bastien dine
Hello Boris,

I think you are confused by the name of the shell script "standalone-job.sh"
Which basically means that we start a "standalone job manager" as stated in
the first comment of
https://github.com/apache/flink/blob/release-1.8/flink-dist/src/main/flink-bin/bin/standalone-job.sh

This is another version of : flink-dist/src/main/flink-bin/bin/jobmanager.sh

It's not related to a job

When you configure H-A on a flink cluster, and you submit a job, Flink (i.e
the jobmanager) store the state of the job in Zookeeper / HDFS
So when it crashes and comes back (with this entrypoint) it will read in ZK
/ HDFS and restore previous execution

Regards,
Bastien

------

Bastien DINE
Data Architect / Software Engineer / Sysadmin
bastiendine.io


Le ven. 24 mai 2019 à 23:22, Boris Lublinsky 
a écrit :

> Hi,
> I was experimenting with HA lately and see that it recovers successfully
> job, in the case of jobmanager restarts.
> Now my question is whether it will work for the job cluster.
> Based on the instructions
> https://github.com/apache/flink/blob/release-1.8/flink-container/docker/README.md
> I can see
> https://github.com/apache/flink/blob/release-1.8/flink-container/docker/docker-entrypoint.sh
>  that
> In this case the following command is invoked:
> exec $FLINK_HOME/bin/standalone-job.sh start-foreground "$@“
>
> Which means that if a jobManager restarts, the following is going to
> happen:
>
> 1. It will use HA to restore job that was running
> 2. A new job will be submitted, overwriting restored job and bypassing
> checkpoint restore.
>
> Am I missing something here?
>
>
> Boris Lublinsky
> FDP Architect
> boris.lublin...@lightbend.com
> https://www.lightbend.com/
>
>


State migration into multiple operators

2019-05-14 Thread bastien dine
Hello,

I would like to have some advices about splitting an operator with a state
into multiple operators.
The new operators would have state containing pieces of information of the
initial state
We will "split" the state

For exemple, I have operator (process) with uid A, with a state containing
field1, field2
I would like to split it into two operator B & C with, respectively, state
field1 and state field2

How can I split my state upon multiple operators ?

Best Regards,
Bastien

------

Bastien DINE
Data Architect / Software Engineer / Sysadmin
bastiendine.io


H-A Deployment : Job / task manager configuration

2019-02-05 Thread bastien dine
Hello everyone,

I would like to know what exactly I need to configure on my job / task
managers for an H-A deployment
The document (
https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/jobmanager_high_availability.html)
is not really fluent about this..
The conf/masters need to be on job / tasks ? or only on taskmaangers to
find the job manager(s)
If so, the conf/flink-conf.yaml of task manager need to be set to ha
zookeeper only on job manager ? Or on taskmanager too ?
Just to know exactly where we need to configure things will help to know a
little more about interaction between job manager / task manager / zk

Best Regards,
Bastien

--

Bastien DINE
Data Architect / Software Engineer / Sysadmin
bastiendine.io


Submit a job to a remote cluster : RemoteEnvironnement or ClientLevel ?

2019-02-01 Thread bastien dine
Hello all,

I would like to know what is the best way to sumit a job to a remote
cluster (from a java app) ?
Between :
- Using the remoteEnvironnement & calling env.execute() (
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/cluster_execution.html#remote-environment
)
- Using the clientLevel to submit a program ? (with a regular execution
environment )
https://ci.apache.org/projects/flink/flink-docs-stable/dev/parallel.html#client-level

What are the caveats of each methods ?

Best Regards,
Bastien

--

Bastien DINE
Data Architect / Software Engineer / Sysadmin
bastiendine.io


Re: One TaskManager per node or multiple TaskManager per node

2019-01-15 Thread bastien dine
Hello Jamie,

Does #1 apply to batch jobs too ?

Regards,

--

Bastien DINE
Data Architect / Software Engineer / Sysadmin
bastiendine.io


Le lun. 14 janv. 2019 à 20:39, Jamie Grier  a écrit :

> There are a lot of different ways to deploy Flink.  It would be easier to
> answer your question with a little more context about your use case but in
> general I would advocate the following:
>
> 1) Don't run a "permanent" Flink cluster and then submit jobs to it.
> Instead what you should do is run an "ephemeral" cluster per job if
> possible.  This keeps jobs completely isolated from each other which helps
> a lot with understanding performance, debugging, looking at logs, etc.
> 2) Given that you can do #1 and you are running on bare metal (as opposed
> to in containers) then run one TM per physical machine.
>
> There are many ways to accomplish the above depending on your deployment
> infrastructure (YARN, K8S, bare metal, VMs, etc) so it's hard to give
> detailed input but general you'll have the best luck if you don't run
> multiple jobs in the same TM/JVM.
>
> In terms of the TM memory usage you can set that up by configuring it in
> the flink-conf.yaml file.  The config key you are looking or is
> taskmanager.heap.size:
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/config.html#taskmanager-heap-size
>
>
> On Mon, Jan 14, 2019 at 8:05 AM Ethan Li 
> wrote:
>
>> Hello,
>>
>> I am setting up a standalone flink cluster and I am wondering what’s the
>> best way to distribute TaskManagers.  Do we usually launch one TaskManager
>> (with many slots) per node or multiple TaskManagers per node (with smaller
>> number of slots per tm) ?  Also with one TaskManager per node, I am seeing
>> that TM launches with only 30GB JVM heap by default while the node has 180
>> GB. Why is it not launching with more memory since there is a lot
>> available?
>>
>> Thank you very much!
>>
>> - Ethan
>
>


Re: Flink on Kubernetes - Hostname resolution between job/tasks-managers

2019-01-15 Thread bastien dine
Nevermind..
Problem already discussed in thread :
Flink 1.7 jobmanager tries to lookup taskmanager by its hostname in k8s
environment"


------

Bastien DINE
Data Architect / Software Engineer / Sysadmin
bastiendine.io


Le mar. 15 janv. 2019 à 15:16, bastien dine  a
écrit :

> Hello,
> I am trying to install Flink on Kube, it's almost working..
> I am using the kube files on flink 1.7.1 doc
>
> My cluster is starting well, my 2 tasksmanagers are registering
> successfully to job manager
> On webUI, i see them :
> akka.tcp://flink@dev-flink-taskmanager-3717639837-gvwh4
> :37057/user/taskmanager_0
>
> I can submit a job too..
> But when I am going in job detail, or try to load the logs.. I have
> nothing.. and log on jobmanager give me plenty of error like :
>
> 2019-01-15 14:12:40.111 [flink-metrics-96] WARN
> akka.remote.ReliableDeliverySupervisor
> flink-metrics-akka.remote.default-remote-dispatcher-113 - Association with
> remote system
> [akka.tcp://flink-metrics@dev-flink-taskmanager-3717639837-gvwh4:40508]
> has failed, address is now gated for [50] ms. Reason: [Association failed
> with [akka.tcp://flink-metrics@dev-flink-taskmanager-3717639837-gvwh4:40508]]
> Caused by: [dev-flink-taskmanager-3717639837-gvwh4: Name does not resolve]
>
> -> Name does not resolve..
> So trying to ping on the pod hostname and it's not working
> Thus, ping on the pod's IP is working
>
> So, my question is :
> - Can we force usage of IPv4 over hostname resolution ? (will be better
> for perf also)
> - If no, do I need to had a service or something to make it work ?
>
> Best Regards,
> Bastien
>
> --
>
> Bastien DINE
> Data Architect / Software Engineer / Sysadmin
> bastiendine.io
>


Flink on Kubernetes - Hostname resolution between job/tasks-managers

2019-01-15 Thread bastien dine
Hello,
I am trying to install Flink on Kube, it's almost working..
I am using the kube files on flink 1.7.1 doc

My cluster is starting well, my 2 tasksmanagers are registering
successfully to job manager
On webUI, i see them :
akka.tcp://flink@dev-flink-taskmanager-3717639837-gvwh4
:37057/user/taskmanager_0

I can submit a job too..
But when I am going in job detail, or try to load the logs.. I have
nothing.. and log on jobmanager give me plenty of error like :

2019-01-15 14:12:40.111 [flink-metrics-96] WARN
akka.remote.ReliableDeliverySupervisor
flink-metrics-akka.remote.default-remote-dispatcher-113 - Association with
remote system
[akka.tcp://flink-metrics@dev-flink-taskmanager-3717639837-gvwh4:40508] has
failed, address is now gated for [50] ms. Reason: [Association failed with
[akka.tcp://flink-metrics@dev-flink-taskmanager-3717639837-gvwh4:40508]]
Caused by: [dev-flink-taskmanager-3717639837-gvwh4: Name does not resolve]

-> Name does not resolve..
So trying to ping on the pod hostname and it's not working
Thus, ping on the pod's IP is working

So, my question is :
- Can we force usage of IPv4 over hostname resolution ? (will be better for
perf also)
- If no, do I need to had a service or something to make it work ?

Best Regards,
Bastien

--

Bastien DINE
Data Architect / Software Engineer / Sysadmin
bastiendine.io


Re: Question about key group / key state & parallelism

2018-12-12 Thread bastien dine
Hi Hequn, thanks for your response !

Ok, that's what I was thinking about the key & operator instance
If the affectation of key group to an instance is deterministic (and the
hash of the key to belong to a key group) I have the following problem

Let's say I have 4 key (A,B,C,D) & 2 parallel instance for my operator (1,
2).
Flink determines that A/B belong 1 and C/D belong to 2.
If I have a message keyed by A it will be processed by 1.
But the following message is a B-key, it will wait for message A to be
processed by 1 and then go to 1, even if 2 is not busy and can technically
do the processing, right ?

How can I deal with that ?

Best Regard and many thanks !
Bastien
--

Bastien DINE
Data Architect / Software Engineer / Sysadmin
bastiendine.io


Le mer. 12 déc. 2018 à 13:39, Hequn Cheng  a écrit :

> Hi Bastien,
>
> Each key “belongs” to exactly one parallel instance of a keyed operator,
> and each parallel instance contains one or more Key Groups.
> Keys will be hashed into the corresponding key group deterministically. It
> is hashed by the value instead of the number of the total records.
> Different keys do not affect each other even a parallel instance contains
> one or more Key Groups.
>
> Best, Hequn
>
>
> On Wed, Dec 12, 2018 at 6:21 PM bastien dine 
> wrote:
>
>> Hello everyone,
>>
>> I have a question regarding the key state & parallelism of a process
>> operation
>>
>> Doc says : "You can think of Keyed State as Operator State that has been
>> partitioned, or sharded, with exactly one state-partition per key. Each
>> keyed-state is logically bound to a unique composite of
>> , and since each key “belongs” to exactly
>> one parallel instance of a keyed operator, we can think of this simply as
>> ."
>>
>> If I have less parallel operator instance (say 5) than my number of
>> possible key (10), it means than every instance will "manage" 2 key state ?
>> (is this spread evenly ?)
>> Is the logical bound fixed ? I mean, are the state always managed by the
>> same instance, or does this depends on the available instance at the moment
>> ?
>>
>> "During execution each parallel instance of a keyed operator works with
>> the keys for one or more Key Groups."
>> -> this is related, does "works with the keys" means always the same keys
>> ?
>>
>> Best Regards,
>> Bastien
>>
>> --
>>
>> Bastien DINE
>> Data Architect / Software Engineer / Sysadmin
>> bastiendine.io
>>
>


Question about key group / key state & parallelism

2018-12-12 Thread bastien dine
Hello everyone,

I have a question regarding the key state & parallelism of a process
operation

Doc says : "You can think of Keyed State as Operator State that has been
partitioned, or sharded, with exactly one state-partition per key. Each
keyed-state is logically bound to a unique composite of
, and since each key “belongs” to exactly
one parallel instance of a keyed operator, we can think of this simply as
."

If I have less parallel operator instance (say 5) than my number of
possible key (10), it means than every instance will "manage" 2 key state ?
(is this spread evenly ?)
Is the logical bound fixed ? I mean, are the state always managed by the
same instance, or does this depends on the available instance at the moment
?

"During execution each parallel instance of a keyed operator works with the
keys for one or more Key Groups."
-> this is related, does "works with the keys" means always the same keys ?

Best Regards,
Bastien

--

Bastien DINE
Data Architect / Software Engineer / Sysadmin
bastiendine.io


Re: Flink - Metric are not reported

2018-11-28 Thread bastien dine
Yea, that was I was thinking..
Batch can be quick
Can I report metric on "added" action ? (i should override the
notifyOnAddedMetric to report ?)

------

Bastien DINE
Data Architect / Software Engineer / Sysadmin
bastiendine.io


Le mer. 28 nov. 2018 à 15:54, Chesnay Schepler  a
écrit :

> How quick does job batch terminate? Metrics are unregistered once the job
> ends; if the job duration is shorter than the report interval they may
> never be exposed.
>
> On 28.11.2018 15:18, bastien dine wrote:
>
> Hello Chesnay,
>
> Thanks for your response !
> I have logs enable (info), slf4jReporter is working, I can see :
>
> 15:16:00.112 [Flink-MetricRegistry-thread-1] INFO
> org.apache.flink.metrics.slf4j.Slf4jReporter  -
> === Starting metrics report
> ===
>
> -- Counters
> ---
>
> -- Gauges
> -
> 
>
> On both jobmanager & taskmanager,
> *BUT* i see only system metrics, not my custom one..
> Am i missing something in declaration in my topology ?
>
> *Note* : I am using DataSet API (so my program is batch, and not
> continuous)
>
> Regards,
> Bastien
>
> --
>
> Bastien DINE
> Data Architect / Software Engineer / Sysadmin
> bastiendine.io
>
>
> Le mar. 27 nov. 2018 à 17:07, Chesnay Schepler  a
> écrit :
>
>> Please enable WARN logging and check for warnings by the SLF4JReporter
>> and/or MetricQueryService.
>>
>> On 27.11.2018 17:00, bastien dine wrote:
>>
>> Hello everyone,
>>
>> Once again I require your help !
>> I am trying to report custom metric (see my code below)
>> Yet, I do not see them anywhere.. nor in the metric tab from my tasks,
>> nor in the rest API, nor in the declared slf4j reporter..
>> Can someone help me to debug this ..
>>
>> Here is my RichMap function :
>>
>> public class MetricGaugeRichMap> extends 
>> RichMapFunction {
>>
>> private transient T valueToExpose;
>> private final String metricGroup;
>> private final String metricName;
>>
>>
>> public MetricGaugeRichMap(String metricGroup, String metricName) {
>> this.metricGroup = metricGroup;
>> this.metricName = metricName;
>> }
>>
>> @Overridepublic void open(Configuration config) {
>> getRuntimeContext()
>> .getMetricGroup()
>> .addGroup(metricGroup)
>> .gauge(metricName, (Gauge) () -> valueToExpose);
>> }
>>
>> @Overridepublic E map(E metricTuple) throws Exception {
>>     valueToExpose = metricTuple.getMetricValue();
>> return metricTuple;
>> }
>> }
>>
>> calling from topology :
>>
>> env.fromElements(new MetricTuple<>(metricGroup, metricName, metricValue))
>> .map(new MetricGaugeRichMap<>(metricGroup, metricName))
>> .output(new MetricGaugeOutputFormat<>()); // dummy output
>>
>> --
>>
>>
>> Bastien DINE
>> Data Architect / Software Engineer / Sysadmin
>> bastiendine.io
>>
>>
>>
>


Re: Flink - Metric are not reported

2018-11-28 Thread bastien dine
Hello Chesnay,

Thanks for your response !
I have logs enable (info), slf4jReporter is working, I can see :

15:16:00.112 [Flink-MetricRegistry-thread-1] INFO
org.apache.flink.metrics.slf4j.Slf4jReporter  -
=== Starting metrics report
===

-- Counters
---

-- Gauges
-


On both jobmanager & taskmanager,
*BUT* i see only system metrics, not my custom one..
Am i missing something in declaration in my topology ?

*Note* : I am using DataSet API (so my program is batch, and not continuous)

Regards,
Bastien

--

Bastien DINE
Data Architect / Software Engineer / Sysadmin
bastiendine.io


Le mar. 27 nov. 2018 à 17:07, Chesnay Schepler  a
écrit :

> Please enable WARN logging and check for warnings by the SLF4JReporter
> and/or MetricQueryService.
>
> On 27.11.2018 17:00, bastien dine wrote:
>
> Hello everyone,
>
> Once again I require your help !
> I am trying to report custom metric (see my code below)
> Yet, I do not see them anywhere.. nor in the metric tab from my tasks, nor
> in the rest API, nor in the declared slf4j reporter..
> Can someone help me to debug this ..
>
> Here is my RichMap function :
>
> public class MetricGaugeRichMap> extends 
> RichMapFunction {
>
> private transient T valueToExpose;
> private final String metricGroup;
> private final String metricName;
>
>
> public MetricGaugeRichMap(String metricGroup, String metricName) {
> this.metricGroup = metricGroup;
> this.metricName = metricName;
> }
>
> @Overridepublic void open(Configuration config) {
> getRuntimeContext()
> .getMetricGroup()
> .addGroup(metricGroup)
> .gauge(metricName, (Gauge) () -> valueToExpose);
> }
>
> @Overridepublic E map(E metricTuple) throws Exception {
> valueToExpose = metricTuple.getMetricValue();
> return metricTuple;
> }
> }
>
>  calling from topology :
>
> env.fromElements(new MetricTuple<>(metricGroup, metricName, metricValue))
>     .map(new MetricGaugeRichMap<>(metricGroup, metricName))
> .output(new MetricGaugeOutputFormat<>()); // dummy output
>
> --
>
>
> Bastien DINE
> Data Architect / Software Engineer / Sysadmin
> bastiendine.io
>
>
>


Flink - Metric are not reported

2018-11-27 Thread bastien dine
Hello everyone,

Once again I require your help !
I am trying to report custom metric (see my code below)
Yet, I do not see them anywhere.. nor in the metric tab from my tasks, nor
in the rest API, nor in the declared slf4j reporter..
Can someone help me to debug this ..

Here is my RichMap function :

public class MetricGaugeRichMap> extends
RichMapFunction {

private transient T valueToExpose;
private final String metricGroup;
private final String metricName;


public MetricGaugeRichMap(String metricGroup, String metricName) {
this.metricGroup = metricGroup;
this.metricName = metricName;
}

@Override
public void open(Configuration config) {
getRuntimeContext()
.getMetricGroup()
.addGroup(metricGroup)
.gauge(metricName, (Gauge) () -> valueToExpose);
}

@Override
public E map(E metricTuple) throws Exception {
valueToExpose = metricTuple.getMetricValue();
return metricTuple;
}
}


calling from topology :

env.fromElements(new MetricTuple<>(metricGroup, metricName, metricValue))
.map(new MetricGaugeRichMap<>(metricGroup, metricName))
.output(new MetricGaugeOutputFormat<>()); // dummy output


--


Bastien DINE
Data Architect / Software Engineer / Sysadmin
bastiendine.io


Re: Multiple env.execute() into one Flink batch job

2018-11-23 Thread bastien dine
Oh god, if we have some code with Accumulator after the env.execute(), this
will not be executed on the JobManager too ?
Thanks, I would be interested indeed !

--

Bastien DINE
Data Architect / Software Engineer / Sysadmin
bastiendine.io


Le ven. 23 nov. 2018 à 16:37, Flavio Pompermaier  a
écrit :

> The problem is that the REST API block on env.execute.
> If you want to run your Flink job you have to submit it using the CLI
> client.
> As a workaround we wrote a Spring REST API that to run a job open an SSH
> connection to the job manager and execute the bin/flink run command..
>
> If you're interested in I can share some code..
>
>
>
> On Fri, Nov 23, 2018 at 4:32 PM bastien dine 
> wrote:
>
>> Hello,
>>
>> I need to chain processing in DataSet API, so I am launching severals
>> jobs, with multiple env.execute() :
>>
>> topology1.define();
>> env.execute;
>>
>> topogy2.define();
>> env.execute;
>>
>> This is working fine when I am running it within IntellIiJ
>> But when I am deploying it into my cluster, it only launch the first
>> topology..
>>
>> Could you please shed some light on this issue?
>>
>> Regards,
>> Bastien
>>
>
>
>


Multiple env.execute() into one Flink batch job

2018-11-23 Thread bastien dine
Hello,

I need to chain processing in DataSet API, so I am launching severals jobs,
with multiple env.execute() :

topology1.define();
env.execute;

topogy2.define();
env.execute;

This is working fine when I am running it within IntellIiJ
But when I am deploying it into my cluster, it only launch the first
topology..

Could you please shed some light on this issue?

Regards,
Bastien


Re: Call batch job in streaming context?

2018-11-23 Thread bastien dine
Hi Eric,

You can run a job from another one, using the REST API
This is the only way we have found to launch a batch job from a streaming
job

--

Bastien DINE
Data Architect / Software Engineer / Sysadmin
bastiendine.io


Le ven. 23 nov. 2018 à 11:52, Piotr Nowojski  a
écrit :

> Hi,
>
> I’m not sure if I understand your problem and your context, but spawning a
> batch job every 45 seconds doesn’t sound as a that bad idea (as long as the
> job is short).
>
> Another idea would be to incorporate this batch job inside your streaming
> job, for example by reading from Cassandra using an AsyncIO operator:
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/asyncio.html
>
> Quick google search revealed for example this:
>
>
> https://stackoverflow.com/questions/43067681/read-data-from-cassandra-for-processing-in-flink
>
> Piotrek
>
> > On 23 Nov 2018, at 10:33, eric hoffmann 
> wrote:
> >
> > Hi
> > Is it possible to call batch job on a streaming context?
> > what i want to do is:
> > for a given input event, fetch cassandra elements based on event data,
> apply transformation on them and apply a ranking when all elements fetched
> by cassandra are processed.
> > If i do this in batch mode i would have to submit a job on each events
> and i can have an event every 45 seconds.
> > Is there any alternative? can i start a batch job that will receive some
> external request, process it and wait for another request?
> > thx
> > Eric
>
>


DataSet - Broadcast set in output format

2018-11-22 Thread bastien dine
Hello,

I would like to use a broadcast variable in my outputformat (to pass some
information, and control execution flow)
How would I do it ?
.output does not have a .withBroadcast function as it does not extends
SingleInputUdfOperator


--

Bastien DINE
Data Architect / Software Engineer / Sysadmin
bastiendine.io


Re: Metric on JobManager

2018-11-21 Thread bastien dine
Hi Jamie, thanks for your reponse..
Erm this will not be easy.. any idea on how to deal with the end time ?
I can have some Runtime exception in my topology, so i would like to do it
like :

try   {
  // Start time here
  env.execute()
} catch (e: Expcetion) {

} finally {
   // End time here
}


--

Bastien DINE
Data Architect / Software Engineer / Sysadmin
bastiendine.io


Le mer. 21 nov. 2018 à 22:44, Jamie Grier  a écrit :

> What you're describing is not possible.  There is no runtime context or
> metrics you can use at that point.
>
> The best you can probably do (at least for start time) is just keep a flag
> in your function and log a metric once and only once when it first starts
> executing.
>
> On Wed, Nov 21, 2018 at 5:18 AM bastien dine 
> wrote:
>
>> Hello all,
>>
>> I am using metric to count some sutff in my topology, this is pretty easy
>> with the metric API in getRuntimeContext in a Rich function
>> However I would like to use this metric API to log start date & end date
>> of my processing, but in the source code executed on the job manager (i.e
>> not in a operator) before & after the env.execute..
>> How can i retrieve the runtime context, from the execution env maybe ?
>>
>> Regards,
>> Bastien
>>
>> --
>>
>> Bastien DINE
>> Data Architect / Software Engineer / Sysadmin
>> bastiendine.io
>>
>


Metric on JobManager

2018-11-21 Thread bastien dine
Hello all,

I am using metric to count some sutff in my topology, this is pretty easy
with the metric API in getRuntimeContext in a Rich function
However I would like to use this metric API to log start date & end date of
my processing, but in the source code executed on the job manager (i.e not
in a operator) before & after the env.execute..
How can i retrieve the runtime context, from the execution env maybe ?

Regards,
Bastien

--

Bastien DINE
Data Architect / Software Engineer / Sysadmin
bastiendine.io


Re: Counting DataSet in DataFlow

2018-11-07 Thread bastien dine
Hi Fabian,
Thanks for the response, I am going to use the second solution !
Regards,
Bastien

--

Bastien DINE
Data Architect / Software Engineer / Sysadmin
bastiendine.io


Le mer. 7 nov. 2018 à 14:16, Fabian Hueske  a écrit :

> Another option for certain tasks is to work with broadcast variables [1].
> The value could be use to configure two filters.
>
> DataSet input = 
> DataSet count = input.map(-> 1L).sum()
> DataSet input.filter(if cnt == 0).withBroadcastSet("cnt",
> count).doSomething
> DataSet input.filter(if cnt != 0).withBroadcastSet("cnt",
> count).doSomethingElse
>
> Fabian
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/batch/#broadcast-variables
>
>
> Am Mi., 7. Nov. 2018 um 14:10 Uhr schrieb Fabian Hueske  >:
>
>> Hi,
>>
>> Counting always requires a job to be executed.
>> Not sure if this is what you want to do, but if you want to prevent to
>> get an empty result due to an empty cross input, you can use a
>> mapPartition() with parallelism 1 to emit a special record, in case the
>> MapPartitionFunction didn't see any data.
>>
>> Best, Fabian
>>
>> Am Mi., 7. Nov. 2018 um 14:02 Uhr schrieb bastien dine <
>> bastien.d...@gmail.com>:
>>
>>> Hello,
>>>
>>> I would like to a way to count a dataset to check if it is empty or
>>> not.. But .count() throw an execution and I do not want to do separe job
>>> execution plan, as hthis will trigger multiple reading..
>>> I would like to have something like..
>>>
>>> Source -> map -> count -> if 0 -> do someting
>>>if not -> do something
>>>
>>>
>>> More concrete i would like to check if one of my dataset is empty before
>>> doing a cross operation..
>>>
>>> Thanks,
>>> Bastien
>>>
>>>
>>>


Counting DataSet in DataFlow

2018-11-07 Thread bastien dine
Hello,

I would like to a way to count a dataset to check if it is empty or not..
But .count() throw an execution and I do not want to do separe job
execution plan, as hthis will trigger multiple reading..
I would like to have something like..

Source -> map -> count -> if 0 -> do someting
   if not -> do something


More concrete i would like to check if one of my dataset is empty before
doing a cross operation..

Thanks,
Bastien


Re: Flink - Process datastream in a bounded context (like Dataset) - Unifying stream & batch

2018-09-25 Thread bastien dine
Hi Hequn,

Thanks for your response
Yea I know about the table API, but I am searching a way to have a bounded
context with a stream, somehow create a dataset from a buffer store in a
window of datastream

Regards, Bastien

Le mar. 25 sept. 2018 à 14:50, Hequn Cheng  a écrit :

> Hi bastien,
>
> Flink features two relational APIs, the Table API and SQL. Both APIs are
> unified APIs for batch and stream processing, i.e., queries are executed
> with the same semantics on unbounded, real-time streams or bounded[1].
> There are also documents about Join[2].
>
> Best, Hequn
> [1] https://flink.apache.org/flink-applications.html#layered-apis
> [2]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql.html#joins
>
> On Tue, Sep 25, 2018 at 4:14 PM bastien dine 
> wrote:
>
>> Hello everyone,
>>
>> I need to join some files to perform some processing.. The dataset API is
>> a perfect way to achieve this, I am able to do it when I read file in batch
>> (csv)
>>
>> However in the prod environment, I will receive thoses files in kafka
>> messages (one message = one line of a file)
>> So I am considering using a global window + a custom trigger on a end of
>> file message and a process window function.
>> But I can not go too far with that as process is only one function and
>> chaining functions will be a pain. I don't think that emitting a datastream
>> & windows / trigger on EOF before every process function is a good idea
>>
>> However I would like to work in a bounded way once I received all of my
>> elements (after the trigger on global window), like the dataset API, as I
>> will join on my whole dataset..
>>
>> I thought maybe it would be a good idea to go for table API and group
>> window ? but you can not have custom trigger and a global group window on a
>> table ?(like the global window on datastream ?)
>> Best alternative would be to create a dataset as a result of my process
>> window function.. but I don't think this is possible, is it ?
>>
>> Best Regards,
>> Bastien
>>
>


Flink - Process datastream in a bounded context (like Dataset) - Unifying stream & batch

2018-09-25 Thread bastien dine
Hello everyone,

I need to join some files to perform some processing.. The dataset API is a
perfect way to achieve this, I am able to do it when I read file in batch
(csv)

However in the prod environment, I will receive thoses files in kafka
messages (one message = one line of a file)
So I am considering using a global window + a custom trigger on a end of
file message and a process window function.
But I can not go too far with that as process is only one function and
chaining functions will be a pain. I don't think that emitting a datastream
& windows / trigger on EOF before every process function is a good idea

However I would like to work in a bounded way once I received all of my
elements (after the trigger on global window), like the dataset API, as I
will join on my whole dataset..

I thought maybe it would be a good idea to go for table API and group
window ? but you can not have custom trigger and a global group window on a
table ?(like the global window on datastream ?)
Best alternative would be to create a dataset as a result of my process
window function.. but I don't think this is possible, is it ?

Best Regards,
Bastien