Re: Preparing keyed state before snapshot

2024-02-21 Thread Lorenzo Nicora
Thanks Thias and Zakelly,

I probably muddied the waters saying that my use case was similar to
kvCache.
What I was calling "non serializable state" is actually a Random Cut Forest
ML model that cannot be serialized by itself, but you can extract a
serializable state. That is serializable, but definitely not a
primitive type.
To be specific, I am trying to implement a keyed version of this RCF
operator [1]. I need one RCF model (a separate "forest") per key. Key
cardinality is not very high, and the size of the state should not be
a problem.

I guess the only feasible way is what Zakelly is suggesting, using
reflection to extract and set the keyContext from within processElement().
I will explore this option.

Thanks again
Lorenzo

[1]
https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/blob/master/AnomalyDetection/RandomCutForest/src/main/java/software/amazon/flink/example/operator/RandomCutForestOperator.java


On Wed, 21 Feb 2024 at 08:13, Schwalbe Matthias 
wrote:

> Good morning all,
>
>
>
> Let me loop myself in …
>
>
>
>1. Another even more convenient way to enable cache is to actually
>configure/assign RocksDB to use more off-heap memory for cache, you also
>might consider enabling bloom filters  (all depends on how large you
>key-space is (thousands/millions/billions/…)
>
> Within the technological limits, RocksDB is hard to top, if keeping all
> data in memory is no option, this is the path I usually follow.
>
>1. The other question on how to control the current-key from within
>snapshot state: you can acquire a pointer to the underlying state backend
>e.g. from within open() and the get hold of a pointer of the specific state
>primitive, and set the current key directly.
>
> In order to find out how to do that, put a breakpoint in debugger and walk
> up a couple of call stack frames, and/or walk into the value setters and
> model after how it is done there.
>
> Mind though, to restore the current key, if you happen to change it to
> another key.
>
> Doing this e.g. in initializeState() is time-insensitive, because this
> happens outside the ‘hot’ code paths.
>
>1. If the number of elements to store is small, you can store it in
>operator state and initialize your local structure in initializeState()
>from it, you probably would want to keep the data in serialized form in
>operator state, since you mentioned, serialization would be expensive.
>2. There is another API (which I don’t remember the name of) that
>allows you to store operator state as BLOB directly if that would be a
>doable option for you.
>
>
>
> Sincere greetings
>
>
>
> Thias
>
>
>
>
>
>
>
>
>
> *From:* Zakelly Lan 
> *Sent:* Wednesday, February 21, 2024 8:04 AM
> *To:* Lorenzo Nicora 
> *Cc:* Flink User Group 
> *Subject:* Re: Preparing keyed state before snapshot
>
>
>
> ⚠*EXTERNAL MESSAGE – **CAUTION: Think Before You Click *⚠
>
>
>
> Hi Lorenzo,
>
>
>
> I think the most convenient way is to modify the code of the state
> backend, adding a k-v cache as you want.
>
>
>
> Otherwise IIUC, there's no public interface to get keyContext. But well,
> you may try something hacky. You may use the passed-in `Context` instance
> in processElement, and leverage java reflection to get
> the KeyedProcessOperator instance, where you can perform setCurrentKey().
>
>
>
>
>
> Best,
>
> Zakelly
>
>
>
> On Wed, Feb 21, 2024 at 1:05 AM Lorenzo Nicora 
> wrote:
>
> Thanks Zakelly,
>
>
>
> I'd need to do something similar, with a map containing my
> non-serializable "state", similar to the kvCache in FastTop1Fucntion.
>
>
>
> But I am not sure I understand how I can set the keyed state for a
> specific key, in snapshotState().
>
> FastTop1Function seems to rely on keyContext set via setKeyContext(). This
> method is not part of the API. I see it's set specifically for
> AbstractTopNFuction in StreamExecRank.
>
> How can I do something similar without modifying the Flink runtime?
>
>
>
> Lorenzo
>
>
>
>
>
> On Sun, 18 Feb 2024 at 03:42, Zakelly Lan  wrote:
>
> Hi Lorenzo,
>
>
>
> It is not recommended to do this with the keyed state. However there is an
> example in flink code (FastTop1Function#snapshotState) [1] of setting keys
> when snapshotState().
>
>
>
> Hope this helps.
>
>
>
> [1]
> https://github.com/apache/flink/blob/050503c65f5c5c18bb573748ccbf5aecce4ec1a5/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/FastTop1Function.java#L165
>
>
>
> Best,
>
> Zakelly
>
&g

Re: Preparing keyed state before snapshot

2024-02-20 Thread Lorenzo Nicora
Thanks Zakelly,

I'd need to do something similar, with a map containing my non-serializable
"state", similar to the kvCache in FastTop1Fucntion.

But I am not sure I understand how I can set the keyed state for a specific
key, in snapshotState().
FastTop1Function seems to rely on keyContext set via setKeyContext(). This
method is not part of the API. I see it's set specifically for
AbstractTopNFuction in StreamExecRank.
How can I do something similar without modifying the Flink runtime?

Lorenzo


On Sun, 18 Feb 2024 at 03:42, Zakelly Lan  wrote:

> Hi Lorenzo,
>
> It is not recommended to do this with the keyed state. However there is an
> example in flink code (FastTop1Function#snapshotState) [1] of setting keys
> when snapshotState().
>
> Hope this helps.
>
> [1]
> https://github.com/apache/flink/blob/050503c65f5c5c18bb573748ccbf5aecce4ec1a5/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/FastTop1Function.java#L165
>
> Best,
> Zakelly
>
> On Sat, Feb 17, 2024 at 1:48 AM Lorenzo Nicora 
> wrote:
>
>> Hi Thias
>>
>> I considered CheckpointedFunction.
>> In snapshotState() I would have to update the state of each key,
>> extracting the in-memory "state" of each key and putting it in the state
>> with state.update(...) .
>> This must happen per key,
>> But snapshotState() has no visibility of the keys. And I have no way of
>> selectively accessing the state of a specific key to update it.
>> Unless I am missing something
>>
>> Thanks
>> Lorenzo
>>
>>
>> On Fri, 16 Feb 2024 at 07:21, Schwalbe Matthias <
>> matthias.schwa...@viseca.ch> wrote:
>>
>>> Good morning Lorenzo,
>>>
>>>
>>>
>>> You may want to implement
>>> org.apache.flink.streaming.api.checkpoint.CheckpointedFunction interface in
>>> your KeyedProcessFunction.
>>>
>>> Btw. By the time initializeState(…) is called, the state backend is
>>> fully initialized and can be read and written to (which is not the case for
>>> when the open(…) function is called.
>>>
>>> In initializeState(…) you also get access to state of different operator
>>> key.
>>>
>>> SnapshotState(…) is called as part of the (each) checkpoint in order to
>>> store data.
>>>
>>>
>>>
>>> Sincere greetings
>>>
>>>
>>>
>>> Thias
>>>
>>>
>>>
>>> *From:* Lorenzo Nicora 
>>> *Sent:* Thursday, February 15, 2024 7:50 PM
>>> *To:* Flink User Group 
>>> *Subject:* Preparing keyed state before snapshot
>>>
>>>
>>>
>>> Hello everyone,
>>>
>>>
>>>
>>> I have a convoluted problem.
>>>
>>>
>>>
>>> I am implementing a KeyedProcessFunction that keeps some
>>> non-serializable "state" in memory, in a transient Map (key = stream key,
>>> value = the non-serializable "state").
>>>
>>>
>>>
>>> I can extract a serializable representation to put in Flink state, and I
>>> can load my in-memory "state" from the Flink state. But these operations
>>> are expensive.
>>>
>>>
>>>
>>> Initializing the in-memory "state" is relatively easy. I do it lazily,
>>> in processElement(), on the first record for the key.
>>>
>>>
>>>
>>> The problem is saving the in-memory "state" to Flink state.
>>>
>>> I need to do it only before the state snapshot. But KeyedProcessFunction
>>> has no entrypoint called before the state snapshot.
>>>
>>> I cannot use CheckpointedFunction.snapshotState(), because it does not
>>> work for keyed state.
>>>
>>>
>>>
>>> Any suggestions?
>>>
>>>
>>>
>>> Note that I cannot use operator state nor a broadcast state.
>>>
>>> Processing is keyed. Every processed record modifies the in-memory
>>> "state" of that key. If the job rescale, the state of the key must follow
>>> the partition.
>>>
>>>
>>>
>>>
>>>
>>> Regards
>>>
>>> Lorenzo
>>> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
>>> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
>>> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
>>> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
>>> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
>>> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
>>> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
>>> dieser Informationen ist streng verboten.
>>>
>>> This message is intended only for the named recipient and may contain
>>> confidential or privileged information. As the confidentiality of email
>>> communication cannot be guaranteed, we do not accept any responsibility for
>>> the confidentiality and the intactness of this message. If you have
>>> received it in error, please advise the sender by return e-mail and delete
>>> this message and any attachments. Any unauthorised use or dissemination of
>>> this information is strictly prohibited.
>>>
>>


Re: Preparing keyed state before snapshot

2024-02-16 Thread Lorenzo Nicora
Hi Thias

I considered CheckpointedFunction.
In snapshotState() I would have to update the state of each key, extracting
the in-memory "state" of each key and putting it in the state with
state.update(...) .
This must happen per key,
But snapshotState() has no visibility of the keys. And I have no way of
selectively accessing the state of a specific key to update it.
Unless I am missing something

Thanks
Lorenzo


On Fri, 16 Feb 2024 at 07:21, Schwalbe Matthias 
wrote:

> Good morning Lorenzo,
>
>
>
> You may want to implement
> org.apache.flink.streaming.api.checkpoint.CheckpointedFunction interface in
> your KeyedProcessFunction.
>
> Btw. By the time initializeState(…) is called, the state backend is fully
> initialized and can be read and written to (which is not the case for when
> the open(…) function is called.
>
> In initializeState(…) you also get access to state of different operator
> key.
>
> SnapshotState(…) is called as part of the (each) checkpoint in order to
> store data.
>
>
>
> Sincere greetings
>
>
>
> Thias
>
>
>
> *From:* Lorenzo Nicora 
> *Sent:* Thursday, February 15, 2024 7:50 PM
> *To:* Flink User Group 
> *Subject:* Preparing keyed state before snapshot
>
>
>
> Hello everyone,
>
>
>
> I have a convoluted problem.
>
>
>
> I am implementing a KeyedProcessFunction that keeps some non-serializable
> "state" in memory, in a transient Map (key = stream key, value = the
> non-serializable "state").
>
>
>
> I can extract a serializable representation to put in Flink state, and I
> can load my in-memory "state" from the Flink state. But these operations
> are expensive.
>
>
>
> Initializing the in-memory "state" is relatively easy. I do it lazily, in
> processElement(), on the first record for the key.
>
>
>
> The problem is saving the in-memory "state" to Flink state.
>
> I need to do it only before the state snapshot. But KeyedProcessFunction
> has no entrypoint called before the state snapshot.
>
> I cannot use CheckpointedFunction.snapshotState(), because it does not
> work for keyed state.
>
>
>
> Any suggestions?
>
>
>
> Note that I cannot use operator state nor a broadcast state.
>
> Processing is keyed. Every processed record modifies the in-memory "state"
> of that key. If the job rescale, the state of the key must follow the
> partition.
>
>
>
>
>
> Regards
>
> Lorenzo
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
> dieser Informationen ist streng verboten.
>
> This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of email
> communication cannot be guaranteed, we do not accept any responsibility for
> the confidentiality and the intactness of this message. If you have
> received it in error, please advise the sender by return e-mail and delete
> this message and any attachments. Any unauthorised use or dissemination of
> this information is strictly prohibited.
>


Preparing keyed state before snapshot

2024-02-15 Thread Lorenzo Nicora
Hello everyone,

I have a convoluted problem.

I am implementing a KeyedProcessFunction that keeps some non-serializable
"state" in memory, in a transient Map (key = stream key, value = the
non-serializable "state").

I can extract a serializable representation to put in Flink state, and I
can load my in-memory "state" from the Flink state. But these operations
are expensive.

Initializing the in-memory "state" is relatively easy. I do it lazily, in
processElement(), on the first record for the key.

The problem is saving the in-memory "state" to Flink state.
I need to do it only before the state snapshot. But KeyedProcessFunction
has no entrypoint called before the state snapshot.
I cannot use CheckpointedFunction.snapshotState(), because it does not work
for keyed state.

Any suggestions?

Note that I cannot use operator state nor a broadcast state.
Processing is keyed. Every processed record modifies the in-memory "state"
of that key. If the job rescale, the state of the key must follow the
partition.


Regards
Lorenzo


Kafka Sink and Kafka transaction timeout

2023-10-02 Thread Lorenzo Nicora
Hi team

In Kafka Sink docs [1], with EXACTLY_ONCE it is recommended to set:
transaction_timeout  > maximum_checkpoint duration + maximum_restart_duration.

I understand transaction_timeout > maximum_checkpoint_duration
But why adding maximum_restart_duration?

If the application recovers from a checkpoint, any uncommitted message
that was written after the last successful checkpoint will be
re-written regardless.
If a transaction times out during the recovery it doesn't matter.

I would rather say:
transaction_timeout > maximum_checkpoint duration + checkpoint_interval

Any thoughts?

Regards
Lorenzo

[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/kafka/#fault-tolerance


FileSource for unbounded data

2023-04-25 Thread Lorenzo Nicora
Hi
I understand the FileSystem DataStream FileSource remembers in state all
the processed files, forever.
This causes the state to grow unbounded, making FileSource impractical to
use in a stateful application.

Is there any known workaround?

Thanks
Lorenzo


Re: KeyedProcessFunction within an iteration

2023-02-18 Thread Lorenzo Nicora
Hi Zhipeng

IterativeStreams does have keyBy() methods, but they all throw
UnsupportedOperationException [1]

For some context: the whole thing is to do message enrichment with asyncIO,
caching the enrichment info in state (with TTL).
I am using an iteration as RichAsyncFunction does not support state.
I didn't find a simpler way to do both async IO and caching in state.

Here is a shortened version of the flow

-8<-8<-8<-8<-8<-8<
// Measure, SensorData and EnrichedMeasure are POJOs. The key (sensorId) is
an Integer.

/// flow
DataStream measures = env.addSource(new MeasuresSource());
IterativeStream.ConnectedIterativeStreams iteration =
measures

.iterate().withFeedbackType(TypeInformation.of(SensorData.class));
ConnectedStreams measureAndSensorDataBySensorId =
iteration
// THIS THROWS UnsupportedOperationException
// "Cannot change the input partitioning of an iteration hea
directly. Apply the partitioning on the input and feedback streams instead."
.keyBy(Measure::getSensorId, SensorData::getSensorId);

//CachedEnrichment extends KeyedCoProcessFunction
//It emits cache hit on main output and cache miss on a side-output
CachedEnrichment cachedEnrichment = new CachedEnrichment();
// Try enrichment from cache
SingleOutputStreamOperator cacheHitEnrichedMeasures =
measureAndSensorDataBySensorId
.process(cachedEnrichment);
DataStream cacheMissMeasures = cacheHitEnrichedMeasures
.getSideOutput(cachedEnrichment.cacheMissOutputTag);

// On cache miss fetch SensorData with async IO
SingleOutputStreamOperator>
enrichedMeasuresAndFetchedSensorData =
AsyncDataStream.unorderedWait(
cacheMissMeasures,
//AsyncEnrich extends RichAsyncFunction>
new AsyncEnrich(),
ASYNC_CALL_TIMEOUT, TimeUnit.MILLISECONDS,
ASYNC_OPERATOR_CAPACITY);

// Close the loop with newly fetched SensorData
iteration.closeWith(enrichedMeasuresAndFetchedSensorData.map(t -> t.f1));

// Merge outputs
DataStream allEnrichedMeasures = cacheHitEnrichedMeasures
.union(enrichedMeasuresAndFetchedSensorData.map(t -> t.f0));
-8<-8<-8<-8<-8<-8<

Also, the message of the UnsupportedOperationException thrown by
IterativeStreams.keyBy()
("...Apply the partitioning on the input and feedback streams instead")
does not look right.

I tried that (I made a loop with a single stream of
Either) but it seems there is no way
of processing an IterativeStream with a KeyedProcessFunction, nor to feed
back a KeyedStream into the loop.

-8<-8<-8<-8<-8<-8<

KeyedStream, Integer>
measuresOrSensorDataBySensorId = measures
.map(m -> Either.Left(m)).returns(new
TypeHint<>() {})
.keyBy(msd -> msd.isLeft() ? msd.left().getSensorId() :
msd.right().getSensorId());
IterativeStream> iteration =
measuresOrSensorDataBySensorId.iterate();

CachedEnrichment cachedEnrichment = new CachedEnrichment();
// The following line DOES NOT COMPILE: IterativeStream.process() expects
ProcessFunction, not KeyedProcessFunction
SingleOutputStreamOperator cacheHitEnrichedMeasures =
 iteration.process(cachedEnrichment,
TypeInformation.of(EnrichedMeasure.class));

KeyedStream fetchedSensorDataBySensorId =
enrichedMeasureAndFetchedSensorData
.map(t -> t.f1).keyBy(SensorData::getSensorId);
// The following line DOES NOT COMPILE: closeWith() does not expect
KeyedStream
iteration.closeWith(fetchedSensorDataBySensorId);
-8<-8<-8<-8<-8<-8<

I will have a look at the iteration module you mentioned.
I wasn't aware.

Thanks
Lorenzo

[1]
https://github.com/apache/flink/blob/release-1.15.2/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/IterativeStream.java#L196


On Sat, 18 Feb 2023 at 12:38, Zhipeng Zhang  wrote:

> Hi Lorenzo,
>
> Could you provide some code example to reproduce your question? As I
> understand, IterativeStream#keyBy is supported since it is a subclass
> of DataStream.
>
> Moreover, we have implemented an unified iteration module for Flink
> [1] in Flink ML [2], which relies on Flink 1.15.2. Probably you can
> also have a try.
>
> [1]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=184615300
> [2]
> https://github.com/apache/flink-ml/blob/master/flink-ml-iteration/src/main/java/org/apache/flink/iteration/Iterations.java
>
> Lorenzo Nicora  于2023年2月18日周六 17:00写道:
> >
> > Hi all,
> >
> > I am trying to implement an iterative streaming job that processes the
> loop with a KeyedProcessFunction.
> >
> > I need a KeyedProcessFunction to use keyed state and to emit a
> side-output (that aft

KeyedProcessFunction within an iteration

2023-02-18 Thread Lorenzo Nicora
Hi all,

I am trying to implement an iterative streaming job that processes the loop
with a KeyedProcessFunction.

I need a KeyedProcessFunction to use keyed state and to emit a side-output
(that after further transformations becomes the feedback)

Problem is IterativeStream.process() only accepts ProcessFunction, no
KeyedProcessFunction.

The main and feedback streams have the same payload type, and I am keying
both before starting and closing the iteration.
I understand I cannot re-key after starting the iteration, as
IterativeStream does not support keyBy() and throws an
UnsupportedOperationException "Cannot change the input partitioning of an
iteration head directly. Apply the partitioning on the input and feedback
streams instead."

Is there any way of using keyed state within an iteration?
BTW,
I am using Flink 1.15.2 and I am bound to that version

Regards
Lorenzo


Changing watermark in the middle of a flow

2020-07-22 Thread Lorenzo Nicora
Hi

I have a linear streaming flow with a single source and multiple sinks to
publish intermediate results.
The time characteristic is Event Time and I am adding
one AssignerWithPeriodicWatermarks immediately after the source.
I need to add a different assigner, in the middle of the flow, to change
the event time - i.e. extracting a different field from the record as event
time.

I am not sure I completely understand the implications of changing
event time and watermark in the middle of a flow.

Can anybody give me a hint or direct me to any relevant documentation?

Lorenzo


DynamoDB sink

2020-07-17 Thread Lorenzo Nicora
Hi

I was wondering whether there is any reasonably optimised DynamoDB Sink
I am surprised I only found some old, partial discussions about
implementing your own one.

Am I the only one with the requirement of sending output to DynamoDB?
Am I missing something obvious?
I am obviously looking for an idempotent, at-least-once sink.

Cheers
Lorenzo


Re: Hadoop FS when running standalone

2020-07-16 Thread Lorenzo Nicora
Thanks Alessandro,

I think I solved it.
I cannot set any HADOOP_HOME as I have no Hadoop installed on the machine
running my tests.
But adding *org.apache.flink:flink-shaded-hadoop-2:2.8.3-10.0* as a compile
dependency to the Maven profile building the standalone version fixed the
issue.

Lorenzo


On Thu, 16 Jul 2020 at 15:35, Alessandro Solimando <
alessandro.solima...@gmail.com> wrote:

> Hi Lorenzo,
> IIRC I had the same error message when trying to write snappified parquet
> on HDFS with a standalone fat jar.
>
> Flink could not "find" the hadoop native/binary libraries (specifically I
> think for me the issue was related to snappy), because my HADOOP_HOME was
> not (properly) set.
>
> I have never used S3 so I don't know if what I mentioned could be the
> problem here too, but worth checking.
>
> Best regards,
> Alessandro
>
> On Thu, 16 Jul 2020 at 12:59, Lorenzo Nicora 
> wrote:
>
>> Hi
>>
>> I need to run my streaming job as a *standalone* Java application, for
>> testing
>> The job uses the Hadoop S3 FS and I need to test it (not a unit test).
>>
>> The job works fine when deployed (I am using AWS Kinesis Data Analytics,
>> so Flink 1.8.2)
>>
>> I have *org.apache.flink:flink-s3-fs-hadoop* as a "compile" dependency.
>>
>> For running standalone, I have a Maven profile adding dependencies that
>> are normally provided (
>> *org.apache.flink:flink-java*,
>> *org.apache.flink:flink-streaming-java_2.11*,
>> *org.apache.flink:flink-statebackend-rocksdb_2.11*,
>> *org.apache.flink:flink-connector-filesystem_2.11*) but I keep getting
>> the error "Hadoop is not in the classpath/dependencies" and it does not
>> work.
>> I tried adding *org.apache.flink:flink-hadoop-fs* with no luck
>>
>> What dependencies am I missing?
>>
>> Cheers
>> Lorenzo
>>
>


Hadoop FS when running standalone

2020-07-16 Thread Lorenzo Nicora
Hi

I need to run my streaming job as a *standalone* Java application, for
testing
The job uses the Hadoop S3 FS and I need to test it (not a unit test).

The job works fine when deployed (I am using AWS Kinesis Data Analytics, so
Flink 1.8.2)

I have *org.apache.flink:flink-s3-fs-hadoop* as a "compile" dependency.

For running standalone, I have a Maven profile adding dependencies that are
normally provided (
*org.apache.flink:flink-java*, *org.apache.flink:flink-streaming-java_2.11*,
*org.apache.flink:flink-statebackend-rocksdb_2.11*,
*org.apache.flink:flink-connector-filesystem_2.11*) but I keep getting the
error "Hadoop is not in the classpath/dependencies" and it does not work.
I tried adding *org.apache.flink:flink-hadoop-fs* with no luck

What dependencies am I missing?

Cheers
Lorenzo


Re: Reading and updating rule-sets from a file

2020-07-03 Thread Lorenzo Nicora
Thanks Till,

I understand making my FileInputFormat "unsplittable" guarantees a file is
always read by a single task. But how can I produce a single record for the
entire file?

As my file is a CSV with some idiosyncrasies, I am extending CsvInputFormat
not to reinvent the wheel of the CSV parsing and type conversions. This
generates one record per line and I cannot see any handle for the end of
file.

I've been thinking of using a GlobalWindow to process all the rules at once
when I reach the end of file,  but what can I use as a trigger?

Regards
Lorenzo


On Wed, 1 Jul 2020 at 08:21, Till Rohrmann  wrote:

> Hi Lorenzo,
>
> what you could try to do is to derive your own InputFormat (extending
> FileInputFormat) where you set the field `unsplittable` to true. That way,
> an InputSplit is the whole file and you can handle the set of new rules as
> a single record.
>
> Cheers,
> Till
>
> On Mon, Jun 29, 2020 at 3:52 PM Lorenzo Nicora 
> wrote:
>
>> Hi
>>
>> My streaming job uses a set of rules to process records from a stream.
>> The rule set is defined in simple flat files, one rule per line.
>> The rule set can change from time to time. A user will upload a new file
>> that must replace the old rule set completely.
>>
>> My problem is with reading and updating the rule set when I have a new
>> one.
>> I cannot update single rules. I need the whole rule set to validate it
>> and build the internal representation to broadcast.
>>
>> I am reading the file with a *ContinuousFileReaderOperator* and
>> *InputFormat* (via env.readFile(...) and creating the internal
>> representation of the rule set I then broadcast. I get new files with
>> processingMode = PROCESS_CONTINUOUSLY
>>
>> How do I know when I have read ALL the records from a physical file, to
>> trigger validating and building the new Rule Set?
>>
>> I've been thinking about a processing-time trigger, waiting a reasonable
>> time after I read the first rule of a new file, but it does not look safe
>> if the user, for example, uploads two new files by mistake.
>>
>> Cheers
>> Lorenzo
>>
>


Dockerised Flink 1.8 with Hadoop S3 FS support

2020-07-02 Thread Lorenzo Nicora
Hi

I need to set up a dockerized *session cluster* using Flink *1.8.2* for
development and troubleshooting. We are bound to 1.8.2 as we are deploying
to AWS Kinesis Data Analytics for Flink.

I am using an image based on the semi-official flink:1.8-scala_2.11
I need to add to my dockerized cluster support for S3 Hadoop File System
(s3a://), we have on KDA out of the box.

Note I do not want to add dependencies to the job directly, as I want to
deploy locally exactly the same JAR I deploy to KDA.

Flink 1.8 docs [1] say  is supported out of the box but does not look to be
the case for dockerised version.
I am getting "Could not find a file system implementation for scheme 's3a'"
and "Hadoop is not in the classpath/dependencies".
I assume I need to create a customised docker image,
extending flink:1.8-scala_2.11, but I do not understand how to add support
for S3 Hadoop FS.

Can someone please point me in the right direction? Docs or examples?


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/filesystems.html


Lorenzo


Reading and updating rule-sets from a file

2020-06-29 Thread Lorenzo Nicora
Hi

My streaming job uses a set of rules to process records from a stream.
The rule set is defined in simple flat files, one rule per line.
The rule set can change from time to time. A user will upload a new file
that must replace the old rule set completely.

My problem is with reading and updating the rule set when I have a new one.
I cannot update single rules. I need the whole rule set to validate it and
build the internal representation to broadcast.

I am reading the file with a *ContinuousFileReaderOperator* and
*InputFormat* (via env.readFile(...) and creating the internal
representation of the rule set I then broadcast. I get new files with
processingMode = PROCESS_CONTINUOUSLY

How do I know when I have read ALL the records from a physical file, to
trigger validating and building the new Rule Set?

I've been thinking about a processing-time trigger, waiting a reasonable
time after I read the first rule of a new file, but it does not look safe
if the user, for example, uploads two new files by mistake.

Cheers
Lorenzo


Re: Reading from AVRO files

2020-06-16 Thread Lorenzo Nicora
Thanks Arvid,

now it makes sense.
Unfortunately, the problematic schema comes from a 3rd party we cannot
control, we have to ingest and do some work with it before being able to
map out of it.
But at least now the boundary of the problem is clear

Thanks to the whole community
Lorenzo


On Tue, 16 Jun 2020 at 22:54, Arvid Heise  wrote:

> Hi Lorenzo,
>
> I didn't mean to dismiss the issue, but it's not a matter of
> incompatibility, it's a matter of unsound generated code. It will break
> independently of Flink, since it apparently is a bug in the Avro compiler
> 1.8.2, so our options to fix it are limited.
> What we should do is to bump the Avro version to fix the underlying issue.
> You can vote for it on the respective ticket, which also explains why it's
> not that easy [1] (also you can vote on that respective hive ticket).
>
> I remember that I also encountered an issue with nullable logical types
> back in my user days, but didn't dare to fix it, since the Avro project was
> really inactive at that time (e.g., it looked dead). Possible workarounds:
> * Stick with non-logical types (what I ended up with). You need to convert
> manually in your first step, which sounds like a PITA, but that's what you
> would do on non-JVM projects anyways (logical types are not really
> established after 5+ years).
> * Use default values instead of union with null. So instead of using null
> to tag missing values, use 0 = 01.01.1970 to identify missing values.
>
> Deep copies are used whenever the same record has to be used multiple
> times (state, broadcast). That's why I thought your idea of switching to
> POJOs asap should help. Where do you see issues?
>
> [1] https://issues.apache.org/jira/browse/FLINK-12532
>
>
>
> On Tue, Jun 16, 2020 at 9:59 PM Lorenzo Nicora 
> wrote:
>
>> Hi Arvid,
>>
>> Sorry but saying the AVRO compiler setup is "broken" sounds like an easy
>> way for dismissing a problem ;)
>> I am using the official AVRO 1.8.2 Maven plugin with no customisation  to
>> generate the code.
>> There might be some legit AVRO configurations that are incompatible with
>> Flink or something in the schema not fully supported.
>>
>> In particular, I noticed the user.avsc schema in Flink testing has *no
>> optional logical type fields* while my schema has multiple optional
>> timestamps.
>> Can AVRO-1891 <https://issues.apache.org/jira/browse/AVRO-1891> (fixed
>> in AVRO 1.9.1) be related?
>>
>> I tried changing user.avsc making one of the timestamp fields a union
>> with null, and flink-avro tests start failing with a lot of "Unknown datum
>> type org.joda.time.DateTime"
>>
>> This would explain why using records generated with AVRO 1.9.2 and
>> dateTimeLogicalType=Joda and enableObjectReuse() behaves better.
>> The workaround only partially solves my problem.
>> It looks like deepCopies happen in many places not controlled by
>> enableObjectReuse, like when adding to some Collectors. Am I right?
>>
>> Cheers
>> Lorenzo
>>
>>
>> On Mon, 15 Jun 2020 at 19:30, Arvid Heise  wrote:
>>
>>> Hi Lorenzo,
>>>
>>> Thank you for confirming my suspicion. It really means something is
>>> broken in your Avro compiler setup and there is not much that we can do on
>>> our end.
>>>
>>> Just for reference, we are having a user.avsc [1] being compiled [2]
>>> with 1.8.2 into this snippet [3] for our
>>> tests. Look especially on how the conversions look like; they have a
>>> different template style than yours.
>>>
>>> The expectation is that you have 1 conversion for each logical type that
>>> is compiled to joda type. If you have conversions on other places, you can
>>> trace back to which field they belong by using the IndexedRecord methods.
>>>
>>> [1]
>>> https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/test/resources/avro/user.avsc
>>> [2]
>>> https://github.com/apache/flink/blob/master/flink-formats/flink-avro/pom.xml
>>> [3] https://gist.github.com/AHeise/041ff5ccf76087975ed157c3d3276875
>>>
>>> On Thu, Jun 11, 2020 at 3:22 PM Lorenzo Nicora 
>>> wrote:
>>>
>>>>
>>>> Hi Arvit,
>>>>
>>>> I followed your instructions for the breakpoint in
>>>> SpecificDatumReader.readField *with  AVRO 1.8.2*,
>>>>
>>>> For all timestamp-millis fields (I have many):
>>>>
>>>> Conversion conversion = ((SpecificRecordBase)
>>>> r).getConversion(f.pos());
>>>>
>>>

Re: Reading from AVRO files

2020-06-16 Thread Lorenzo Nicora
Hi Arvid,

Sorry but saying the AVRO compiler setup is "broken" sounds like an easy
way for dismissing a problem ;)
I am using the official AVRO 1.8.2 Maven plugin with no customisation  to
generate the code.
There might be some legit AVRO configurations that are incompatible with
Flink or something in the schema not fully supported.

In particular, I noticed the user.avsc schema in Flink testing has *no
optional logical type fields* while my schema has multiple optional
timestamps.
Can AVRO-1891 <https://issues.apache.org/jira/browse/AVRO-1891> (fixed in
AVRO 1.9.1) be related?

I tried changing user.avsc making one of the timestamp fields a union with
null, and flink-avro tests start failing with a lot of "Unknown datum type
org.joda.time.DateTime"

This would explain why using records generated with AVRO 1.9.2 and
dateTimeLogicalType=Joda and enableObjectReuse() behaves better.
The workaround only partially solves my problem.
It looks like deepCopies happen in many places not controlled by
enableObjectReuse, like when adding to some Collectors. Am I right?

Cheers
Lorenzo


On Mon, 15 Jun 2020 at 19:30, Arvid Heise  wrote:

> Hi Lorenzo,
>
> Thank you for confirming my suspicion. It really means something is broken
> in your Avro compiler setup and there is not much that we can do on our end.
>
> Just for reference, we are having a user.avsc [1] being compiled [2] with
> 1.8.2 into this snippet [3] for our tests.
> Look especially on how the conversions look like; they have a different
> template style than yours.
>
> The expectation is that you have 1 conversion for each logical type that
> is compiled to joda type. If you have conversions on other places, you can
> trace back to which field they belong by using the IndexedRecord methods.
>
> [1]
> https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/test/resources/avro/user.avsc
> [2]
> https://github.com/apache/flink/blob/master/flink-formats/flink-avro/pom.xml
> [3] https://gist.github.com/AHeise/041ff5ccf76087975ed157c3d3276875
>
> On Thu, Jun 11, 2020 at 3:22 PM Lorenzo Nicora 
> wrote:
>
>>
>> Hi Arvit,
>>
>> I followed your instructions for the breakpoint in
>> SpecificDatumReader.readField *with  AVRO 1.8.2*,
>>
>> For all timestamp-millis fields (I have many):
>>
>> Conversion conversion = ((SpecificRecordBase)
>> r).getConversion(f.pos());
>>
>>
>> returns null for all timestamp-millis fields (I have many), so...
>>
>> datum = readWithoutConversion(oldDatum, f.schema(), in);
>>
>>
>> is used instead and returns a *Long*
>>
>>
>>
>> Not sure it's relevant, but in this version I have the explicit
>> dependency org.apache.avro:avro:1.8.2 and I am using the
>> avro-maven-plugin (1.8.2) to generate the record from .avsc with this
>> configuration:
>>
>> 
>> String
>> true
>> private
>> true
>> 
>>
>>
>> Cheers
>> Lorenzo
>>
>>
>> On Thu, 11 Jun 2020 at 13:11, Arvid Heise  wrote:
>>
>>> Sorry forget my last mail, that was half-finished.
>>>
>>> Here is the real one:
>>>
>>> Hi Lorenzo,
>>>
>>> if you still have time to investigate.
>>>
>>> Your stack trace shows that all expected code paths have been taken.
>>> Conversions are there; although they look different than here, but that can
>>> be attributed to the avro upgrade.
>>>
>>> Could you put a breakpoint on SpecificDatumReader.readField, so that
>>> you can inspect the conversion for the timestamp field? You probably want
>>> to make it a conditional for f.name() == .
>>> The expected flow is that it should have a conversion that returns the
>>> joda time instead of the long. Then datum should be the converted joda time.
>>>
>>> @Override
>>> protected void readField(Object r, Schema.Field f, Object oldDatum,
>>>  ResolvingDecoder in, Object state)
>>> throws IOException {
>>>   if (r instanceof SpecificRecordBase) {
>>> Conversion conversion = ((SpecificRecordBase) 
>>> r).getConversion(f.pos());
>>>
>>> Object datum;
>>> if (conversion != null) {
>>>   datum = readWithConversion(
>>>   oldDatum, f.schema(), f.schema().getLogicalType(), conversion, 
>>> in);
>>> } else {
>>>   datum = readWithoutConversion(oldDatum, f.schema(), in);
>>> }
>>>
>>> getData().setField(r, f.name(), f.pos(), datum);
>>>
>>>   } else {
>>&

Reading files from multiple subdirectories

2020-06-11 Thread Lorenzo Nicora
Hi,

related to the same case I am discussing in another thread, but not related
to AVRO this time :)

I need to ingest files a S3 Sink Kafka Connector periodically adds to an S3
bucket.
Files are bucketed by date time as it often happens.

Is there any way, using Flink only, to monitor a base-path and detect new
files in any subdirectories?
Or I need to use something external to move new files in a single directory?

I am currently using
env.readFile(inputFormat, path, PROCESS_CONTINUOUSLY, 6)
with AvroInputFormat, but it seems it can only monitor a single directory


Cheers
Lorenzo


Re: Reading from AVRO files

2020-06-11 Thread Lorenzo Nicora
Hi Arvit,

I followed your instructions for the breakpoint in
SpecificDatumReader.readField *with  AVRO 1.8.2*,

For all timestamp-millis fields (I have many):

Conversion conversion = ((SpecificRecordBase) r).getConversion(f.pos());


returns null for all timestamp-millis fields (I have many), so...

datum = readWithoutConversion(oldDatum, f.schema(), in);


is used instead and returns a *Long*



Not sure it's relevant, but in this version I have the explicit dependency
org.apache.avro:avro:1.8.2 and I am using the avro-maven-plugin (1.8.2) to
generate the record from .avsc with this configuration:


String
true
private
true



Cheers
Lorenzo


On Thu, 11 Jun 2020 at 13:11, Arvid Heise  wrote:

> Sorry forget my last mail, that was half-finished.
>
> Here is the real one:
>
> Hi Lorenzo,
>
> if you still have time to investigate.
>
> Your stack trace shows that all expected code paths have been taken.
> Conversions are there; although they look different than here, but that can
> be attributed to the avro upgrade.
>
> Could you put a breakpoint on SpecificDatumReader.readField, so that you
> can inspect the conversion for the timestamp field? You probably want to
> make it a conditional for f.name() == .
> The expected flow is that it should have a conversion that returns the
> joda time instead of the long. Then datum should be the converted joda time.
>
> @Override
> protected void readField(Object r, Schema.Field f, Object oldDatum,
>  ResolvingDecoder in, Object state)
> throws IOException {
>   if (r instanceof SpecificRecordBase) {
> Conversion conversion = ((SpecificRecordBase) 
> r).getConversion(f.pos());
>
> Object datum;
> if (conversion != null) {
>   datum = readWithConversion(
>   oldDatum, f.schema(), f.schema().getLogicalType(), conversion, in);
> } else {
>   datum = readWithoutConversion(oldDatum, f.schema(), in);
> }
>
> getData().setField(r, f.name(), f.pos(), datum);
>
>   } else {
> super.readField(r, f, oldDatum, in, state);
>   }
> }
>
>
> On Thu, Jun 11, 2020 at 2:06 PM Arvid Heise  wrote:
>
>> Hi Lorenzo,
>>
>> if you still have time to investigate.
>>
>> Your stack trace shows that all expected code paths have been taken.
>> Conversions are there although they look different than here, but that can
>> be attributed to the avro upgrade.
>>
>> @Override
>> protected void readField(Object r, Schema.Field f, Object oldDatum,
>>  ResolvingDecoder in, Object state)
>> throws IOException {
>>   if (r instanceof SpecificRecordBase) {
>> Conversion conversion = ((SpecificRecordBase) 
>> r).getConversion(f.pos());
>>
>> Object datum;
>> if (conversion != null) {
>>   datum = readWithConversion(
>>   oldDatum, f.schema(), f.schema().getLogicalType(), conversion, in);
>> } else {
>>   datum = readWithoutConversion(oldDatum, f.schema(), in);
>> }
>>
>> getData().setField(r, f.name(), f.pos(), datum);
>>
>>   } else {
>> super.readField(r, f, oldDatum, in, state);
>>   }
>> }
>>
>>
>> On Thu, Jun 11, 2020 at 1:27 PM Lorenzo Nicora 
>> wrote:
>>
>>>
>>> Thanks Gouwei,
>>>
>>> setting format.setReuseAvroValue(false) with 1.8.2-generated records
>>> does not solve the problem.
>>>
>>> 12:02:59,314 INFO
>>>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding
>>> checkpoint 1 of job 46ea458aff2a496c4617a6b57e4de937.
>>> java.lang.ClassCastException: java.lang.Long cannot be cast to
>>> org.joda.time.DateTime
>>> at com.tenx.client.generalledger.event.GlHeader.put(GlHeader.java:125)
>>> at org.apache.avro.generic.GenericData.setField(GenericData.java:690)
>>> at
>>> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:119)
>>> at
>>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
>>> at
>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
>>> at
>>> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116)
>>> at
>>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
>>> at
>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
>>> at
>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>>> at
>&g

Re: Reading from AVRO files

2020-06-11 Thread Lorenzo Nicora
Thanks Gouwei,

setting format.setReuseAvroValue(false) with 1.8.2-generated records does
not solve the problem.

12:02:59,314 INFO
 org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding
checkpoint 1 of job 46ea458aff2a496c4617a6b57e4de937.
java.lang.ClassCastException: java.lang.Long cannot be cast to
org.joda.time.DateTime
at com.tenx.client.generalledger.event.GlHeader.put(GlHeader.java:125)
at org.apache.avro.generic.GenericData.setField(GenericData.java:690)
at
org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:119)
at
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
at
org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116)
at
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233)
at
org.apache.flink.formats.avro.AvroInputFormat.nextRecord(AvroInputFormat.java:170)
at
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:323)




Summarising, the only working combination seems to be:

   - Use AVRO 1.9.2 code generation, setting dateTimeLogicalTypeImplementation
   = joda
   - Enabling Object Reuse (being careful for the implications)

Using AVRO 1.8.2 code generation does not work, with any of the other
workarounds.
Using Generic objects does not work for a bug in AvroSerializer
<https://issues.apache.org/jira/browse/FLINK-18223> but GenericRecords also
brings a number of other problems.

I am not very comfortable with using AVRO objects generated with a
different AVRO version than the one supported by Flink.
I am going to map AVRO records into hand-written POJOs immediately after
the ingestion to reduce chances of further issues. I reckon this is very
empirical, but that's what the workaround looks to me :)

Lorenzo

P.S, I want to give a massive thank to this community. So far it has been
one of the most reactive and helpful I ever interacted with.

On Thu, 11 Jun 2020 at 10:25, Guowei Ma  wrote:

> Hi,
> for the 1.8.2(case 1) you could try the format.setReuseAvroValue(false);
>
> Best,
> Guowei
>
>
> Lorenzo Nicora  于2020年6月11日周四 下午5:02写道:
>
>> Hi Arvid,
>>
>> thanks for the point about catching records. Gotcha!
>>
>> Sorry I cannot share the full schema or generated code. It's a 3rd party
>> IP and we signed a meter-think NDA... I think I can post snippets.
>> The schema is heavily nested, including arrays of other record types
>> Types are primitives, or logical decimal and timestamp-millis. No union.
>>
>> #conversion is in AccountEntries only (one of the nested records) and
>> looks like this:
>>
>> private static final org.apache.avro.Conversion[] conversions =
>> new org.apache.avro.Conversion[] {
>> null,
>> null,
>> null,
>> new org.apache.avro.data.JodaTimeConversions.TimestampConversion(),
>> new org.apache.avro.data.JodaTimeConversions.TimestampConversion(),
>> new org.apache.avro.data.JodaTimeConversions.TimestampConversion(),
>> null,
>> null,
>> null,
>> null,
>> null,
>> null,
>> null
>> };
>>
>>
>> Note that I have to generate the specific object with AVRO 1.9.2 Maven
>> Plugin.
>> With 1.8.2 generated code it fails with the following exception,
>> regardless setting enableObjectReuse()
>>
>> java.lang.ClassCastException: java.lang.Long cannot be cast to
>> org.joda.time.DateTime
>> at com.tenx.client.generalledger.event.GlHeader.put(GlHeader.java:125)
>> at org.apache.avro.generic.GenericData.setField(GenericData.java:690)
>> at
>> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:119)
>> at
>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
>> at
>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
>> at
>> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116)
>> at
>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
>> at
>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:

Re: Reading from AVRO files

2020-06-11 Thread Lorenzo Nicora
Hi Arvid,

thanks for the point about catching records. Gotcha!

Sorry I cannot share the full schema or generated code. It's a 3rd party IP
and we signed a meter-think NDA... I think I can post snippets.
The schema is heavily nested, including arrays of other record types
Types are primitives, or logical decimal and timestamp-millis. No union.

#conversion is in AccountEntries only (one of the nested records) and looks
like this:

private static final org.apache.avro.Conversion[] conversions =
new org.apache.avro.Conversion[] {
null,
null,
null,
new org.apache.avro.data.JodaTimeConversions.TimestampConversion(),
new org.apache.avro.data.JodaTimeConversions.TimestampConversion(),
new org.apache.avro.data.JodaTimeConversions.TimestampConversion(),
null,
null,
null,
null,
null,
null,
null
};


Note that I have to generate the specific object with AVRO 1.9.2 Maven
Plugin.
With 1.8.2 generated code it fails with the following exception, regardless
setting enableObjectReuse()

java.lang.ClassCastException: java.lang.Long cannot be cast to
org.joda.time.DateTime
at com.tenx.client.generalledger.event.GlHeader.put(GlHeader.java:125)
at org.apache.avro.generic.GenericData.setField(GenericData.java:690)
at
org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:119)
at
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
at
org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116)
at
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233)
at
org.apache.flink.formats.avro.AvroInputFormat.nextRecord(AvroInputFormat.java:165)
at
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:323)


Thanks for the help
Lorenzo


On Thu, 11 Jun 2020 at 08:58, Arvid Heise  wrote:

> Hi Lorenzo,
>
> I'm glad that it worked out somehow, but I'd still like to understand what
> went wrong, so it will work more smoothly for future users. I double
> checked and we even test AvroSerializer with logical types, so I'm a bit
> puzzled.
>
> Could you attach GlHeader or at least show us how GlHeader#conversions look
> like? I want to exclude the possibility that the source generator screwed
> up.
>
> Concerning object reuse is that you need to treat all POJO as immutable
> (I'm assuming that that's what your meant from your description), but you
> should also never cache values like
> class ShiftElements extends MapFunction {
>   Object lastElement;
>
>   Object map(Object newElement, Collector out) {
> out.collect(lastElement);
> lastElement = newElement; // <- never cache with enableObjectReuse
>   }
> }
>
> (excuse my ugly code)
>
> On Thu, Jun 11, 2020 at 9:25 AM Lorenzo Nicora 
> wrote:
>
>> Hi Arvid,
>>
>> answering to your other questions
>>
>> Here is the stacktrace of the case (1),  when I try to read using
>> specific records generated by the AVRO 1.8.2 plugin
>>
>> java.lang.ClassCastException: java.lang.Long cannot be cast to
>> org.joda.time.DateTime
>> at com.tenx.client.generalledger.event.GlHeader.put(GlHeader.java:125)
>> at org.apache.avro.generic.GenericData.setField(GenericData.java:690)
>> at
>> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:119)
>> at
>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
>> at
>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
>> at
>> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116)
>> at
>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
>> at
>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
>> at
>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>> at
>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
>> at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233)
>> at
>> org.apache.flink.formats.avro.AvroInputFormat.nextRecord(AvroInputFormat.java:165)
>> at
>> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(Con

Re: Reading from AVRO files

2020-06-11 Thread Lorenzo Nicora
Hi Arvid,

answering to your other questions

Here is the stacktrace of the case (1),  when I try to read using specific
records generated by the AVRO 1.8.2 plugin

java.lang.ClassCastException: java.lang.Long cannot be cast to
org.joda.time.DateTime
at com.tenx.client.generalledger.event.GlHeader.put(GlHeader.java:125)
at org.apache.avro.generic.GenericData.setField(GenericData.java:690)
at
org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:119)
at
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
at
org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116)
at
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233)
at
org.apache.flink.formats.avro.AvroInputFormat.nextRecord(AvroInputFormat.java:165)
at
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:323)


I also tried generating the specific object with avro 1.9.2 (2)  but
forcing it to use Joda time but still didn't work

org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
Could not forward element to next operator
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:596)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394)
at
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:325)
Caused by: org.apache.avro.AvroRuntimeException: Unknown datum type
org.joda.time.DateTime: 2020-06-01T02:00:42.326Z
at org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:909)
at
org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:420)
at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:871)
at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1302)
at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1221)
at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1283)
at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1221)
at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1283)
at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1221)
at
org.apache.flink.formats.avro.typeutils.AvroSerializer.copy(AvroSerializer.java:242)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577)
... 7 more


But in the second case, it seems the failure happens when Flink tries to
make a copy of the record.
So I followed your suggestion of enableObjectReuse() and* IT WORKS!*

I am not sure I understand all implications of object reuse in Flink,
specifically.
I am familiar with the general risk of mutable messages, and I always
handle them as mutable even when they are POJO. Never mutating and
forwarding the same record.
Not sure whether there are other implications in Flink.

Many thanks
Lorenzo


On Wed, 10 Jun 2020 at 17:52, Arvid Heise  wrote:

> Hi Lorenzo,
>
> 1) I'm surprised that this doesn't work. I'd like to see that stacktrace.
>
> 2) cannot work like this, because we bundle Avro 1.8.2. You could retest
> with dateTimeLogicalType='Joda' set, but then you will probably see the
> same issue as 1)
>
> 3) I'm surprised that this doesn't work either. There is a codepath since
> 2016 for GenericRecord and it's covered in a test. From the error
> description and the ticket, it looks like the issue is not the
> AvroInputFormat, but the serializer. So it would probably work with a
> different serializer (but that would cause back and forth type
> transformation).
>
> On Wed, Jun 10, 2020 at 4:31 PM Lorenzo Nicora 
> wrote:
>
>> Thanks Timo,
>>
>

Re: Reading from AVRO files

2020-06-10 Thread Lorenzo Nicora
Hi Arvid,

I confirm in the case 3) the problem is AvroSerializer.

How can I use a different serializer with AvroFileFormat?

I would be happy to make the file ingestion working and immediately after
mapping to an hand-written POJO, to avoid any inefficiency or headache with
moving around GenericRecords, if this is what you mean with back and forth
type transformation

Cheers
Lorenzo

On Wed, 10 Jun 2020, 17:52 Arvid Heise,  wrote:

> Hi Lorenzo,
>
> 1) I'm surprised that this doesn't work. I'd like to see that stacktrace.
>
> 2) cannot work like this, because we bundle Avro 1.8.2. You could retest
> with dateTimeLogicalType='Joda' set, but then you will probably see the
> same issue as 1)
>
> 3) I'm surprised that this doesn't work either. There is a codepath since
> 2016 for GenericRecord and it's covered in a test. From the error
> description and the ticket, it looks like the issue is not the
> AvroInputFormat, but the serializer. So it would probably work with a
> different serializer (but that would cause back and forth type
> transformation).
>
> On Wed, Jun 10, 2020 at 4:31 PM Lorenzo Nicora 
> wrote:
>
>> Thanks Timo,
>>
>> the stacktrace with 1.9.2-generated specific file is the following
>>
>> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
>> Could not forward element to next operator
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:596)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
>> at
>> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305)
>> at
>> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394)
>> at
>> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:325)
>> Caused by: org.apache.avro.AvroRuntimeException: Unknown datum type
>> java.time.Instant: 2020-06-01T02:00:42.105Z
>> at org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:909)
>> at
>> org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:420)
>> at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:871)
>> at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1302)
>> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1221)
>> at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1283)
>> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1221)
>> at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1283)
>> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1221)
>> at
>> org.apache.flink.formats.avro.typeutils.AvroSerializer.copy(AvroSerializer.java:242)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577)
>> ... 7 more
>>
>>
>> I reckon logical types might have been considered somehow experimental
>> since...ever. But, honestly, I've been using them in the Kafka/Java
>> ecosystem as well as in Spark without too many problems.
>>
>> For my specific use case, the schema is given. Messages are produced by a
>> 3rd party and we cannot change the schema (especially because it's a legit
>> schema).
>> I am desperately looking for a workaround.
>>
>> I  had a similar issue with a Kafka Source, and AVRO records containing
>> decimals and timestamps. Timestamps worked but not decimals.
>> I was able to work around the problem using GenericRecords.
>> But Kafka source relies on AvroDeserializationSchema rather than
>> AvroSerializer, and has no problem handling GenericRecords.
>>
>> I'm honestly finding very confusing having different ways of handling
>> AVRO deserialization inside Flink core components.
>>
>> Cheers
>> Lorenzo
>>
>>
>> On Wed, 10 Jun 2020 at 15:02, Timo Walther  wrote:
>>
>>> Hi Lorenzo,
>>>
>>> as far as I know we don't support Avro's logical times in Flink's
>>> AvroInputFormat y

Re: Reading from AVRO files

2020-06-10 Thread Lorenzo Nicora
Thanks Timo,

the stacktrace with 1.9.2-generated specific file is the following

org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
Could not forward element to next operator
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:596)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394)
at
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:325)
Caused by: org.apache.avro.AvroRuntimeException: Unknown datum type
java.time.Instant: 2020-06-01T02:00:42.105Z
at org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:909)
at
org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:420)
at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:871)
at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1302)
at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1221)
at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1283)
at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1221)
at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1283)
at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1221)
at
org.apache.flink.formats.avro.typeutils.AvroSerializer.copy(AvroSerializer.java:242)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577)
... 7 more


I reckon logical types might have been considered somehow experimental
since...ever. But, honestly, I've been using them in the Kafka/Java
ecosystem as well as in Spark without too many problems.

For my specific use case, the schema is given. Messages are produced by a
3rd party and we cannot change the schema (especially because it's a legit
schema).
I am desperately looking for a workaround.

I  had a similar issue with a Kafka Source, and AVRO records containing
decimals and timestamps. Timestamps worked but not decimals.
I was able to work around the problem using GenericRecords.
But Kafka source relies on AvroDeserializationSchema rather than
AvroSerializer, and has no problem handling GenericRecords.

I'm honestly finding very confusing having different ways of handling AVRO
deserialization inside Flink core components.

Cheers
Lorenzo


On Wed, 10 Jun 2020 at 15:02, Timo Walther  wrote:

> Hi Lorenzo,
>
> as far as I know we don't support Avro's logical times in Flink's
> AvroInputFormat yet. E.g. AvroRowDeserializationSchema [1] supports the
> 1.8.2 version of logical types but might be incompatible with 1.9.2.
>
> Reg 2) Specific record generated with AVRO 1.9.2 plugin:
>
> Could you send us the full stack trace? I think this should actually
> work, because specific records are handled as POJOs and those should be
> able to also deal with logical type's classes through Kryo.
>
> Reg 3) Generic record
>
> It would be great if we can make this option possible. We could include
> it in the next minor release fix.
>
> Sorry, for the bad user experience. But IMHO logical type are still
> experiemental in Avro. Maybe 1.9.2 has finally fixed the biggest
> shortcomings such that Flink can properly support them as well.
>
> Regards,
> Timo
>
> [1]
>
> https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
>
>
> On 10.06.20 15:08, Lorenzo Nicora wrote:
> > Hi,
> >
> > I need to continuously ingest AVRO files as they arrive.
> > Files are written by an S3 Sink Kafka Connect but S3 is not the point
> > here. I started trying to ingest a static bunch of files from local fs
> > first and I am having weird issues with AVRO deserialization.
> >
> > I have to say, the records contain logical types, timestamps-ms and
> decimals
> >
> > To keep it simple, I extracted the AVRO schema from the data files and
> > used avro-maven-plugin to generate POJOs
> > I tried multiple combinations, all with no luck
> >
> > 1) Specific record generated with AVRO 1.8.2 plugin
> >
> > Path in = n

Reading from AVRO files

2020-06-10 Thread Lorenzo Nicora
Hi,

I need to continuously ingest AVRO files as they arrive.
Files are written by an S3 Sink Kafka Connect but S3 is not the point here.
I started trying to ingest a static bunch of files from local fs first and
I am having weird issues with AVRO deserialization.

I have to say, the records contain logical types, timestamps-ms and decimals

To keep it simple, I extracted the AVRO schema from the data files and used
avro-maven-plugin to generate POJOs
I tried multiple combinations, all with no luck

1) Specific record generated with AVRO 1.8.2 plugin

Path in = new Path(sourceBasePath);
AvroInputFormat inputFormat = new
AvroInputFormat<>(in, AccountEntries.class);
DataStream accountEntries = env
.readFile(inputFormat, sourceBasePath,
FileProcessingMode.PROCESS_CONTINUOUSLY, FILE_SCAN_INTERVAL_MS);

*Result*
java.lang.ClassCastException: java.lang.Long cannot be cast to
org.joda.time.DateTime
(IIRC this is a known AVRO 1.8.2 issue)


2) Specific record generated with AVRO 1.9.2 plugin
Same code as above but AVRO POJOs are generated with AVRO 1.9.2

*Result*
org.apache.avro.AvroRuntimeException: Unknown datum type java.time.Instant


3) Generic record
I am getting the Schema from the generated specific record, for
convenience, but I am not using the generated POJO as record.
I also followed the suggestions in this Flink blog post
,
to explicitly specify the TypeInfo with returns(...)

Path in = new Path(config.sourceFileSystemPath);
Schema schema = AccountEntries.getClassSchema();
AvroInputFormat inputFormat = new AvroInputFormat<>(in,
GenericRecord.class);
DataStream accountEntries = env
 .readFile(inputFormat, config.sourceFileSystemPath,
FileProcessingMode.PROCESS_CONTINUOUSLY, FILE_SCAN_INTERVAL_MS)
.returns(new GenericRecordAvroTypeInfo(schema));


*Result*
The class 'org.apache.avro.generic.GenericRecord' is not instantiable: The
class is not a proper class. It is either abstract, an interface, or a
primitive type.

This looks like a bug.
I raised the ticket  and
I will try to submit a fix, but still do not solve my problem as I am using
a managed Flink I cannot update.
I cannot believe there is no workaround. I do not think I'm trying to do
anything bizarre. Am I?

Any ideas?
Am I missing something obvious?

Cheers
Lorenzo