Re: Preparing keyed state before snapshot

2024-02-21 Thread Lorenzo Nicora
you to store operator state as BLOB directly if that would be a >doable option for you. > > > > Sincere greetings > > > > Thias > > > > > > > > > > *From:* Zakelly Lan > *Sent:* Wednesday, February 21, 2024 8:04 AM > *To:* Lorenzo Nicora > *Cc:* F

Re: Preparing keyed state before snapshot

2024-02-20 Thread Lorenzo Nicora
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/FastTop1Function.java#L165 > > Best, > Zakelly > > On Sat, Feb 17, 2024 at 1:48 AM Lorenzo Nicora > wrote: > >> Hi Thias >> >> I considered CheckpointedFunction. &g

Re: Preparing keyed state before snapshot

2024-02-16 Thread Lorenzo Nicora
tate(…) you also get access to state of different operator > key. > > SnapshotState(…) is called as part of the (each) checkpoint in order to > store data. > > > > Sincere greetings > > > > Thias > > > > *From:* Lorenzo Nicora > *Sent:* Thursday, Fe

Preparing keyed state before snapshot

2024-02-15 Thread Lorenzo Nicora
Hello everyone, I have a convoluted problem. I am implementing a KeyedProcessFunction that keeps some non-serializable "state" in memory, in a transient Map (key = stream key, value = the non-serializable "state"). I can extract a serializable representation to put in Flink state, and I can

Kafka Sink and Kafka transaction timeout

2023-10-02 Thread Lorenzo Nicora
Hi team In Kafka Sink docs [1], with EXACTLY_ONCE it is recommended to set: transaction_timeout > maximum_checkpoint duration + maximum_restart_duration. I understand transaction_timeout > maximum_checkpoint_duration But why adding maximum_restart_duration? If the application recovers from a

FileSource for unbounded data

2023-04-25 Thread Lorenzo Nicora
Hi I understand the FileSystem DataStream FileSource remembers in state all the processed files, forever. This causes the state to grow unbounded, making FileSource impractical to use in a stateful application. Is there any known workaround? Thanks Lorenzo

Re: KeyedProcessFunction within an iteration

2023-02-18 Thread Lorenzo Nicora
fluence/pages/viewpage.action?pageId=184615300 > [2] > https://github.com/apache/flink-ml/blob/master/flink-ml-iteration/src/main/java/org/apache/flink/iteration/Iterations.java > > Lorenzo Nicora 于2023年2月18日周六 17:00写道: > > > > Hi all, > > > > I am trying to

KeyedProcessFunction within an iteration

2023-02-18 Thread Lorenzo Nicora
Hi all, I am trying to implement an iterative streaming job that processes the loop with a KeyedProcessFunction. I need a KeyedProcessFunction to use keyed state and to emit a side-output (that after further transformations becomes the feedback) Problem is IterativeStream.process() only accepts

Changing watermark in the middle of a flow

2020-07-22 Thread Lorenzo Nicora
Hi I have a linear streaming flow with a single source and multiple sinks to publish intermediate results. The time characteristic is Event Time and I am adding one AssignerWithPeriodicWatermarks immediately after the source. I need to add a different assigner, in the middle of the flow, to

DynamoDB sink

2020-07-17 Thread Lorenzo Nicora
Hi I was wondering whether there is any reasonably optimised DynamoDB Sink I am surprised I only found some old, partial discussions about implementing your own one. Am I the only one with the requirement of sending output to DynamoDB? Am I missing something obvious? I am obviously looking for

Re: Hadoop FS when running standalone

2020-07-16 Thread Lorenzo Nicora
inary libraries (specifically I > think for me the issue was related to snappy), because my HADOOP_HOME was > not (properly) set. > > I have never used S3 so I don't know if what I mentioned could be the > problem here too, but worth checking. > > Best regards, > Ales

Hadoop FS when running standalone

2020-07-16 Thread Lorenzo Nicora
Hi I need to run my streaming job as a *standalone* Java application, for testing The job uses the Hadoop S3 FS and I need to test it (not a unit test). The job works fine when deployed (I am using AWS Kinesis Data Analytics, so Flink 1.8.2) I have *org.apache.flink:flink-s3-fs-hadoop* as a

Re: Reading and updating rule-sets from a file

2020-07-03 Thread Lorenzo Nicora
e record. > > Cheers, > Till > > On Mon, Jun 29, 2020 at 3:52 PM Lorenzo Nicora > wrote: > >> Hi >> >> My streaming job uses a set of rules to process records from a stream. >> The rule set is defined in simple flat files, one rule per line. &g

Dockerised Flink 1.8 with Hadoop S3 FS support

2020-07-02 Thread Lorenzo Nicora
Hi I need to set up a dockerized *session cluster* using Flink *1.8.2* for development and troubleshooting. We are bound to 1.8.2 as we are deploying to AWS Kinesis Data Analytics for Flink. I am using an image based on the semi-official flink:1.8-scala_2.11 I need to add to my dockerized

Reading and updating rule-sets from a file

2020-06-29 Thread Lorenzo Nicora
Hi My streaming job uses a set of rules to process records from a stream. The rule set is defined in simple flat files, one rule per line. The rule set can change from time to time. A user will upload a new file that must replace the old rule set completely. My problem is with reading and

Re: Reading from AVRO files

2020-06-16 Thread Lorenzo Nicora
= 01.01.1970 to identify missing values. > > Deep copies are used whenever the same record has to be used multiple > times (state, broadcast). That's why I thought your idea of switching to > POJOs asap should help. Where do you see issues? > > [1] https://issues.apache.org/jira/browse/

Re: Reading from AVRO files

2020-06-16 Thread Lorenzo Nicora
d they belong by using the IndexedRecord methods. > > [1] > https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/test/resources/avro/user.avsc > [2] > https://github.com/apache/flink/blob/master/flink-formats/flink-avro/pom.xml > [3] https://gist.github.com/AHeise/

Reading files from multiple subdirectories

2020-06-11 Thread Lorenzo Nicora
Hi, related to the same case I am discussing in another thread, but not related to AVRO this time :) I need to ingest files a S3 Sink Kafka Connector periodically adds to an S3 bucket. Files are bucketed by date time as it often happens. Is there any way, using Flink only, to monitor a

Re: Reading from AVRO files

2020-06-11 Thread Lorenzo Nicora
>> datum = readWithConversion( >> oldDatum, f.schema(), f.schema().getLogicalType(), conversion, in); >> } else { >> datum = readWithoutConversion(oldDatum, f.schema(), in); >> } >> >> getData().setField(r, f.name(), f.p

Re: Reading from AVRO files

2020-06-11 Thread Lorenzo Nicora
en one of the most reactive and helpful I ever interacted with. On Thu, 11 Jun 2020 at 10:25, Guowei Ma wrote: > Hi, > for the 1.8.2(case 1) you could try the format.setReuseAvroValue(false); > > Best, > Guowei > > > Lorenzo Nicora 于2020年6月11日周四 下午5:02写道: > >> Hi

Re: Reading from AVRO files

2020-06-11 Thread Lorenzo Nicora
> class ShiftElements extends MapFunction { > Object lastElement; > > Object map(Object newElement, Collector out) { > out.collect(lastElement); > lastElement = newElement; // <- never cache with enableObjectReuse > } > } > > (excuse my ugly code)

Re: Reading from AVRO files

2020-06-11 Thread Lorenzo Nicora
orth type > transformation). > > On Wed, Jun 10, 2020 at 4:31 PM Lorenzo Nicora > wrote: > >> Thanks Timo, >> >> the stacktrace with 1.9.2-generated specific file is the following >> >> org.apache.flink.streaming.runtime.task

Re: Reading from AVRO files

2020-06-10 Thread Lorenzo Nicora
ion and the ticket, it looks like the issue is not the > AvroInputFormat, but the serializer. So it would probably work with a > different serializer (but that would cause back and forth type > transformation). > > On Wed, Jun 10, 2020 at 4:31 PM Lorenzo Nicora > wrote: > >> Tha

Re: Reading from AVRO files

2020-06-10 Thread Lorenzo Nicora
ally fixed the biggest > shortcomings such that Flink can properly support them as well. > > Regards, > Timo > > [1] > > https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java >

Reading from AVRO files

2020-06-10 Thread Lorenzo Nicora
Hi, I need to continuously ingest AVRO files as they arrive. Files are written by an S3 Sink Kafka Connect but S3 is not the point here. I started trying to ingest a static bunch of files from local fs first and I am having weird issues with AVRO deserialization. I have to say, the records