Re: Feature Request: Upgrade Kafka Library

2022-01-07 Thread Martijn Visser
Hi Clayton,

There is an open ticket and PR to update to 2.8.1. You can track it under
https://issues.apache.org/jira/browse/FLINK-25504

Best regards,

Martijn

Op vr 7 jan. 2022 om 21:36 schreef Clayton Wohl 

> The latest version of flink-connector-kafka, still uses kafka-clients
> 2.4.1. There have been a lot of upgrades in the Kafka consumer/producer
> library since then.
>
> May I request that the Flink project upgrade to a recent version of the
> Kafka library?
>
> thanks!
>
-- 

Martijn Visser | Product Manager

mart...@ververica.com




Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time


Re: Windowing on the consumer side

2022-01-07 Thread Flink Lover
Hi,

I tried the code below but it throws an error.
val src: DataStream[String] =
env.addSource(consumer).windowAll(TumblingEventTimeWindows.of(Time.seconds(2)))
// reading data and used data distribution strategy
src.process(new JSONParsingProcessFunction()).uid("sink")

Error:
type mismatch;
 found   :
org.apache.flink.streaming.api.scala.AllWindowedStream[String,org.apache.flink.streaming.api.windowing.windows.TimeWindow]
 required: org.apache.flink.streaming.api.scala.DataStream[String]
val src: DataStream[String] =
env.addSource(consumer).windowAll(TumblingEventTimeWindows.of(Time.seconds(2)))
// reading data and used data distribution strategy

How do I resolve this?

Also, my question is why watermarks won't help me? Since, once the element
timestamp passes the window timestamp, the window will be triggered and the
elements passed.

Thanks,
Martin O

On Sat, Jan 8, 2022 at 4:07 AM Mariam Walid 
wrote:

> You can do this using the windowAll function, it works with non-keyed
> streams.
>
> On Friday, January 7, 2022, Flink Lover  wrote:
>
>> Can somebody help me with this? I tried several examples where they have
>> extracted the key from Json and used windowing but as far as I have learnt,
>> with windowing I will have to use some kind of aggregation but in my use
>> case there is no aggregation to be performed. I just have to get data for
>> every 2 secs and call the process function.
>>
>> -
>> Martin O
>>
>> On Fri, Jan 7, 2022 at 11:37 PM Flink Lover 
>> wrote:
>>
>>> I have an incoming json data like below:
>>> {"custId": 1,"custFirstName":"Martin",
>>> "custLastName":"owen","edl_created_at":"2022-03-01 00:00:00"}
>>>
>>> Now, this record has been pushed successfully via producer to the
>>> consumer. But I am willing to get records of say 2 seconds window but I
>>> don't have any key to use in KeyBy() operation.In this case can I use
>>> Watermarks? Something like below:
>>>
>>> val consumer = new FlinkKafkaConsumer[String]("topic", new
>>> SimpleStringSchema(), getProperties())
>>>
>>> consumer.assignTimestampsAndWatermarks(
>>>   WatermarkStrategy
>>> .forBoundedOutOfOrderness[String](Duration.ofSeconds(2)))
>>>
>>> Will this help me to get what I want?
>>>
>>> Thanks,
>>> Martin O.
>>>
>>


FLIP-188 Built-in Dynamic Table Storage 跟当下流行的数据湖技术是什么关系?

2022-01-07 Thread casel.chen
问一下FLIP-188 Built-in Dynamic Table Storage 跟当下流行的数据湖技术是什么关系?二者功能是否重复?还是侧重点不同?
https://cwiki.apache.org/confluence/display/FLINK/FLIP-188%3A+Introduce+Built-in+Dynamic+Table+Storage



Re: Flink rest api to start a job

2022-01-07 Thread Yun Gao

Hi Qihua

Sorry may I double confirm that whether the entry class exists in both testA 
and testB?

IF testA.jar is included on startup, it would be loaded in the parent 
classloader, which
is the parent classloader for the user classloader that loads testB. Thus at 
least if the
entry-class is exist only in testA, it should still be found.

Best,
Yun

--
Sender:Qihua Yang
Date:2022/01/07 02:55:09
Recipient:user
Theme:Flink rest api to start a job

Hi,

I found a weird behavior. We launched a k8s cluster without job. But includes 
the jar A. I use Flink rest api to upload a dummy jar(actually it can be any 
jar). Flink will create a jar id. Then I use rest api to start the job with the 
jar A entry-class. But the jar id is the dummy jar id. Flink will start the job 
from jar A. Anyone know why?
My understanding is flink rest api should start the job from the dummy jar, 
because jar id is dummy jar id that I uploaded.
Here are steps what I did:
1. deploy a k8s pod contains working jar(testA.jar)
1. flink rest api upload jar, testB.jar, flink generate jar id, 
2d6a9263-c9d3-4f23-9f59-fc3594aadf0c_job.jar
2. flink rest api to runJar with testB.jar id, but testA.jar entry-class.
3. flink start job from testA.jar

Thanks,
Qihua



Re: Windowing on the consumer side

2022-01-07 Thread Flink Lover
Can somebody help me with this? I tried several examples where they have
extracted the key from Json and used windowing but as far as I have learnt,
with windowing I will have to use some kind of aggregation but in my use
case there is no aggregation to be performed. I just have to get data for
every 2 secs and call the process function.

-
Martin O

On Fri, Jan 7, 2022 at 11:37 PM Flink Lover  wrote:

> I have an incoming json data like below:
> {"custId": 1,"custFirstName":"Martin",
> "custLastName":"owen","edl_created_at":"2022-03-01 00:00:00"}
>
> Now, this record has been pushed successfully via producer to the
> consumer. But I am willing to get records of say 2 seconds window but I
> don't have any key to use in KeyBy() operation.In this case can I use
> Watermarks? Something like below:
>
> val consumer = new FlinkKafkaConsumer[String]("topic", new
> SimpleStringSchema(), getProperties())
>
> consumer.assignTimestampsAndWatermarks(
>   WatermarkStrategy
> .forBoundedOutOfOrderness[String](Duration.ofSeconds(2)))
>
> Will this help me to get what I want?
>
> Thanks,
> Martin O.
>


Re: reinterpretAsKeyedStream and Elastic Scaling in Reactive Mode

2022-01-07 Thread Martin
I changed my flink job having an explicit keyBy instead of reinterpretAsKeyedStream.Situation is still the same, so its no problem with combination of reinterpretAsKeyedStream and Elastic Scaling in Reactive Mode.
This time I was able to check the logs of the task managers and it seems to be a serialization problem of my state on recovery by checkpoint after elastic scaling.Besides the mentioned sequence numbers my state has also some values which are nested POJOs having lists and hashmaps.I assume its a problem with that.What I understand from the docs [1] and David A. answer on SO [2] is, that I have to create for my POJOs with Lists and Maps TypeInfos.
Is this correct?
P.S.: Awkward that this only happening on scale-in/failing, but I did not retried it with scale-out yet.
BRMartin
Attached the exceptions happening:
2022-01-07 14:24:56,845 WARN org.apache.flink.streaming.api.operators.BackendRestorerProcedure [] - Exception while restoring keyed state backend for KeyedProcessOperator_8b4e92de8fd229ebde272da0b8bf387e_(1/8) from alternative (1/1), will retry while more alternatives are available.org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore heap backendat org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.restoreState(HeapKeyedStateBackendBuilder.java:177) ~[flink-dist_2.12-1.14.2.jar:1.14.2]at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:111) ~[flink-dist_2.12-1.14.2.jar:1.14.2]at org.apache.flink.runtime.state.hashmap.HashMapStateBackend.createKeyedStateBackend(HashMapStateBackend.java:131) ~[flink-dist_2.12-1.14.2.jar:1.14.2]at org.apache.flink.runtime.state.hashmap.HashMapStateBackend.createKeyedStateBackend(HashMapStateBackend.java:73) ~[flink-dist_2.12-1.14.2.jar:1.14.2]at org.apache.flink.runtime.state.StateBackend.createKeyedStateBackend(StateBackend.java:136) ~[flink-dist_2.12-1.14.2.jar:1.14.2]at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:329) ~[flink-dist_2.12-1.14.2.jar:1.14.2]at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) ~[flink-dist_2.12-1.14.2.jar:1.14.2]at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) ~[flink-dist_2.12-1.14.2.jar:1.14.2]at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:346) ~[flink-dist_2.12-1.14.2.jar:1.14.2]at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:164) ~[flink-dist_2.12-1.14.2.jar:1.14.2]at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:268) ~[flink-dist_2.12-1.14.2.jar:1.14.2]at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:109) ~[flink-dist_2.12-1.14.2.jar:1.14.2]at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:711) ~[flink-dist_2.12-1.14.2.jar:1.14.2]at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) ~[flink-dist_2.12-1.14.2.jar:1.14.2]at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:687) ~[flink-dist_2.12-1.14.2.jar:1.14.2]at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:654) ~[flink-dist_2.12-1.14.2.jar:1.14.2]at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) [flink-dist_2.12-1.14.2.jar:1.14.2]at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) [flink-dist_2.12-1.14.2.jar:1.14.2]at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) [flink-dist_2.12-1.14.2.jar:1.14.2]at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) [flink-dist_2.12-1.14.2.jar:1.14.2]at java.lang.Thread.run(Unknown Source) [?:?]Caused by: com.esotericsoftware.kryo.KryoException: java.io.IOException: Stream ClosedSerialization trace:IdString (de.martin.model.ContainerInfo)pDUContainerInformation (de.martin.model.UsedContainer)usedContainer (de.martin.model.UnitUsage)usagePerKey (de.telekom.mschalln.processing.logic.Accumulator)at com.esotericsoftware.kryo.io.Input.fill(Input.java:148) ~[flink-dist_2.12-1.14.2.jar:1.14.2]at org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:77) ~[flink-dist_2.12-1.14.2.jar:1.14.2]at com.esotericsoftware.kryo.io.Input.readVarInt(Input.java:355) ~[flink-dist_2.12-1.14.2.jar:1.14.2]at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:779) ~[flink-dist_2.12-1.14.2.jar:1.14.2]at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:728) ~[flink-dist_2.12-1.14.2.jar:1.14.2]at 

Feature Request: Upgrade Kafka Library

2022-01-07 Thread Clayton Wohl
The latest version of flink-connector-kafka, still uses kafka-clients
2.4.1. There have been a lot of upgrades in the Kafka consumer/producer
library since then.

May I request that the Flink project upgrade to a recent version of the
Kafka library?

thanks!


Re: Skewed Data when joining tables using Flink SQL

2022-01-07 Thread Anne Lai
Hi Yun,

Thanks for correcting the wrong assumption, I do plan to use BATCH
execution mode. It’s good to know that Flink is able to process the
broadcasted state first, although I’m still a bit concerned about the state
size.

Besides this, did I miss any approach or trick that can be applied in BATCH
mode? For example, I read about iterative broadcasting used in Apache
Spark, which basically broadcast a slice of dataset at a time, flush it and
repeat. I wonder if similar method can be done in Flink too.

Thanks,
Anne

On Fri, Jan 7, 2022 at 9:19 AM Yun Gao  wrote:

> Hi Anne,
>
> For one thing, for the datastream and broadcast state method, May I have a
> double
> confirmation that are you using BATCH execution mode? I think with [1] for
> BATCH
> mode it should be able to first process the broadcast side before the
> non-broadcast side.
>
> Best,
> Yun
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-20491
>
>
> --Original Mail --
> *Sender:*Anne Lai 
> *Send Date:*Fri Jan 7 17:20:58 2022
> *Recipients:*User 
> *Subject:*Skewed Data when joining tables using Flink SQL
>
> Hi,
>>
>> I have a Flink batch job that needs to join a large skewed table with a
>> smaller table, and because records are not evenly distributed to each
>> subtask, it always fails with a "too much data in partition" error.
>> I explored using DataStream API to broadcast the smaller tables as a
>> broadcast state, but it is not ideal since it is kept in-memory on all
>> downstream operators and even the smaller table is still too large to fit
>> into the memory. Using broadcast state would also introduce cold start
>> problems where the order of records processed in the connected stream can't
>> be guaranteed. As for coGroup transformation, it requires time windowing
>> but both tables are un-windowed.
>>
>> Is there any workaround or ideas that I can try on? Thanks!
>>
>> Best,
>> Anne
>>
>


Custom Kafka Keystore on Amazon Kinesis Analytics

2022-01-07 Thread Clayton Wohl
If I want to migrate from FlinkKafkaConsumer to KafkaSource, does the
latter support this:

https://docs.aws.amazon.com/kinesisanalytics/latest/java/example-keystore.html

Basically, I'm running my Flink app in Amazon's Kinesis Analytics hosted
Flink environment. I don't have reliable access to the local file system.
At the documentation link above, Amazon recommends adding a hook to copy
the keystore files from the classpath to a /tmp directory at runtime. Can
KafkaSource do something similar?


Re: Trigger the producer to send data to the consumer after mentioned seconds

2022-01-07 Thread David Morávek
you’re using compile target lower then 1.8, what needs to be done depends
on your build tool

On Fri 7. 1. 2022 at 20:05, Flink Lover  wrote:

> Hi David,
>
> Thanks for your explanation!
>
> I am familiar with how JVM works but why am I facing this issue? What
> exactly needs to be done?
>
> Thanks,
> Martin O.
>
> On Sat, Jan 8, 2022 at 12:19 AM David Morávek  wrote:
>
>> Hi Siddhesh,
>>
>> any JVM based language (Java, Scala, Kotlin) compiles into a byte-code
>> that can be executed by the JVM. As the JVM was evolving over the years,
>> new versions of byte code have been introduced. Target version simply
>> refers the the version of bytecode the compiler should generate. How to
>> specify it depends on the used build tool, but in general it all boils down
>> to running java compiler with specified "-target" [1].
>>
>> [1]
>> https://docs.oracle.com/javase/8/docs/technotes/tools/windows/javac.html
>>
>> D.
>>
>> On Fri, Jan 7, 2022 at 6:22 PM Flink Lover 
>> wrote:
>>
>>> Could you please help me with this?
>>>
>>> On Fri, Jan 7, 2022 at 11:48 AM Flink Lover 
>>> wrote:
>>>
 I tried Flink version 1.14.2 / 1.13.5

 On Fri, Jan 7, 2022 at 11:46 AM Flink Lover 
 wrote:

> Also, I am using flink-connector-kafka_2.11
>
> val consumer = new FlinkKafkaConsumer[String]("topic_name", new
> SimpleStringSchema(), properties)
>
>
> val myProducer = new FlinkKafkaProducer[String](
>   "topic_name", // target topic
>   new KeyedSerializationSchemaWrapper[String](new
> SimpleStringSchema()), // serialization schema
>   getProperties(), // producer config
>   FlinkKafkaProducer.Semantic.AT_LEAST_ONCE)
>
>
>
> On Fri, Jan 7, 2022 at 11:43 AM Flink Lover 
> wrote:
>
>> Hi All,
>>
>> I checked the Java version using the java -version on the terminal
>> and it gave me 1.8.0.281. Also, the project has been compiled using JDK 8
>> only which is by default.
>>
>> [image: image.png]
>>
>> What do you mean by target jvm? Also, what I am trying to achieve is
>> correct? about the windows?
>>
>> Thanks,
>> Martin
>>
>> On Fri, Jan 7, 2022 at 8:07 AM Qingsheng Ren 
>> wrote:
>>
>>> Hi Martin,
>>>
>>> Can you provide the configuration of your Kafka producer and
>>> consumer? Also it’ll be helpful to have the complete code of your
>>> DataStream.
>>>
>>> About the error you mentioned, I doubt that the JDK version you
>>> actually use is probably below 1.8. Can you have a double check on the
>>> environment that your job is running in?
>>>
>>> Cheers,
>>>
>>> Qingsheng Ren
>>>
>>>
>>> > On Jan 7, 2022, at 1:13 AM, Flink Lover 
>>> wrote:
>>> >
>>> > Hello Folks!
>>> >
>>> > I have a DataStream which sends data to the consumer but I got the
>>> data once the code completed its execution. I didn't receive the 
>>> records as
>>> the code was writing it to the topic. I was able to achieve this 
>>> behavior
>>> using AT_LEAST_ONCE property but I decided to implement Watermarks. I
>>> haven't enabled checkpointing as of now. I know checkpointing will also 
>>> do
>>> the trick.  My expectation is Producer should batch the records of 2
>>> seconds and send it to the consumer and consumer should receive a batch 
>>> of
>>> 2 seconds. My code goes as below:
>>> >
>>> > Producer Side:
>>> >  dataToKafka.assignTimestampsAndWatermarks(
>>> >   WatermarkStrategy
>>> > .forBoundedOutOfOrderness[String](Duration.ofSeconds(2)))
>>> > dataToKafka.addSink(myProducer).uid("source")
>>> >
>>> > Consumer Side:
>>> > consumer.assignTimestampsAndWatermarks(
>>> >   WatermarkStrategy
>>> > .forBoundedOutOfOrderness[String](Duration.ofSeconds(2)))
>>> >
>>> > Now this gives me an error as below:
>>> >
>>> > Static methods in interface require -target:jvm-1.8
>>> > .forBoundedOutOfOrderness[String](Duration.ofSeconds(2)))
>>> >
>>> > My scala version is 2.11.12 and Java JDK 1.8.0.281
>>> >
>>> > Thanks,
>>> > Martin.
>>> >
>>> >
>>> >
>>>
>>>


[no subject]

2022-01-07 Thread sudhansu jena
Unsubscribe


Re: Avro BulkFormat for the new FileSource API?

2022-01-07 Thread David Morávek
Hi Kevin,

I'm not as familiar with initiatives around the new sources, but it seems
that the BulkFormat for Avro [1] has been added recently and will be
released with the Flink 1.15.x.

[1] https://issues.apache.org/jira/browse/FLINK-24565

Best,
D.

On Fri, Jan 7, 2022 at 7:23 PM Kevin Lam  wrote:

> Hi all,
>
> We're looking into using the new FileSource
> 
> API, we see that there is a BulkFormat
> 
> for Parquet, via ParquetColumnarRowFormat
> .
> Is there similar BulkFormat available or in the works for Avro files?
>
> I imagined it may be a common use-case in the community so wanted to check
> here before we invest time implementing our own.
>
> Thanks in advance!
>


Re: Trigger the producer to send data to the consumer after mentioned seconds

2022-01-07 Thread Flink Lover
Hi David,

Thanks for your explanation!

I am familiar with how JVM works but why am I facing this issue? What
exactly needs to be done?

Thanks,
Martin O.

On Sat, Jan 8, 2022 at 12:19 AM David Morávek  wrote:

> Hi Siddhesh,
>
> any JVM based language (Java, Scala, Kotlin) compiles into a byte-code
> that can be executed by the JVM. As the JVM was evolving over the years,
> new versions of byte code have been introduced. Target version simply
> refers the the version of bytecode the compiler should generate. How to
> specify it depends on the used build tool, but in general it all boils down
> to running java compiler with specified "-target" [1].
>
> [1]
> https://docs.oracle.com/javase/8/docs/technotes/tools/windows/javac.html
>
> D.
>
> On Fri, Jan 7, 2022 at 6:22 PM Flink Lover  wrote:
>
>> Could you please help me with this?
>>
>> On Fri, Jan 7, 2022 at 11:48 AM Flink Lover 
>> wrote:
>>
>>> I tried Flink version 1.14.2 / 1.13.5
>>>
>>> On Fri, Jan 7, 2022 at 11:46 AM Flink Lover 
>>> wrote:
>>>
 Also, I am using flink-connector-kafka_2.11

 val consumer = new FlinkKafkaConsumer[String]("topic_name", new
 SimpleStringSchema(), properties)


 val myProducer = new FlinkKafkaProducer[String](
   "topic_name", // target topic
   new KeyedSerializationSchemaWrapper[String](new
 SimpleStringSchema()), // serialization schema
   getProperties(), // producer config
   FlinkKafkaProducer.Semantic.AT_LEAST_ONCE)



 On Fri, Jan 7, 2022 at 11:43 AM Flink Lover 
 wrote:

> Hi All,
>
> I checked the Java version using the java -version on the terminal and
> it gave me 1.8.0.281. Also, the project has been compiled using JDK 8 only
> which is by default.
>
> [image: image.png]
>
> What do you mean by target jvm? Also, what I am trying to achieve is
> correct? about the windows?
>
> Thanks,
> Martin
>
> On Fri, Jan 7, 2022 at 8:07 AM Qingsheng Ren 
> wrote:
>
>> Hi Martin,
>>
>> Can you provide the configuration of your Kafka producer and
>> consumer? Also it’ll be helpful to have the complete code of your
>> DataStream.
>>
>> About the error you mentioned, I doubt that the JDK version you
>> actually use is probably below 1.8. Can you have a double check on the
>> environment that your job is running in?
>>
>> Cheers,
>>
>> Qingsheng Ren
>>
>>
>> > On Jan 7, 2022, at 1:13 AM, Flink Lover 
>> wrote:
>> >
>> > Hello Folks!
>> >
>> > I have a DataStream which sends data to the consumer but I got the
>> data once the code completed its execution. I didn't receive the records 
>> as
>> the code was writing it to the topic. I was able to achieve this behavior
>> using AT_LEAST_ONCE property but I decided to implement Watermarks. I
>> haven't enabled checkpointing as of now. I know checkpointing will also 
>> do
>> the trick.  My expectation is Producer should batch the records of 2
>> seconds and send it to the consumer and consumer should receive a batch 
>> of
>> 2 seconds. My code goes as below:
>> >
>> > Producer Side:
>> >  dataToKafka.assignTimestampsAndWatermarks(
>> >   WatermarkStrategy
>> > .forBoundedOutOfOrderness[String](Duration.ofSeconds(2)))
>> > dataToKafka.addSink(myProducer).uid("source")
>> >
>> > Consumer Side:
>> > consumer.assignTimestampsAndWatermarks(
>> >   WatermarkStrategy
>> > .forBoundedOutOfOrderness[String](Duration.ofSeconds(2)))
>> >
>> > Now this gives me an error as below:
>> >
>> > Static methods in interface require -target:jvm-1.8
>> > .forBoundedOutOfOrderness[String](Duration.ofSeconds(2)))
>> >
>> > My scala version is 2.11.12 and Java JDK 1.8.0.281
>> >
>> > Thanks,
>> > Martin.
>> >
>> >
>> >
>>
>>


Re: Trigger the producer to send data to the consumer after mentioned seconds

2022-01-07 Thread David Morávek
Hi Siddhesh,

any JVM based language (Java, Scala, Kotlin) compiles into a byte-code that
can be executed by the JVM. As the JVM was evolving over the years, new
versions of byte code have been introduced. Target version simply refers
the the version of bytecode the compiler should generate. How to specify it
depends on the used build tool, but in general it all boils down to running
java compiler with specified "-target" [1].

[1] https://docs.oracle.com/javase/8/docs/technotes/tools/windows/javac.html

D.

On Fri, Jan 7, 2022 at 6:22 PM Flink Lover  wrote:

> Could you please help me with this?
>
> On Fri, Jan 7, 2022 at 11:48 AM Flink Lover 
> wrote:
>
>> I tried Flink version 1.14.2 / 1.13.5
>>
>> On Fri, Jan 7, 2022 at 11:46 AM Flink Lover 
>> wrote:
>>
>>> Also, I am using flink-connector-kafka_2.11
>>>
>>> val consumer = new FlinkKafkaConsumer[String]("topic_name", new
>>> SimpleStringSchema(), properties)
>>>
>>>
>>> val myProducer = new FlinkKafkaProducer[String](
>>>   "topic_name", // target topic
>>>   new KeyedSerializationSchemaWrapper[String](new
>>> SimpleStringSchema()), // serialization schema
>>>   getProperties(), // producer config
>>>   FlinkKafkaProducer.Semantic.AT_LEAST_ONCE)
>>>
>>>
>>>
>>> On Fri, Jan 7, 2022 at 11:43 AM Flink Lover 
>>> wrote:
>>>
 Hi All,

 I checked the Java version using the java -version on the terminal and
 it gave me 1.8.0.281. Also, the project has been compiled using JDK 8 only
 which is by default.

 [image: image.png]

 What do you mean by target jvm? Also, what I am trying to achieve is
 correct? about the windows?

 Thanks,
 Martin

 On Fri, Jan 7, 2022 at 8:07 AM Qingsheng Ren 
 wrote:

> Hi Martin,
>
> Can you provide the configuration of your Kafka producer and consumer?
> Also it’ll be helpful to have the complete code of your DataStream.
>
> About the error you mentioned, I doubt that the JDK version you
> actually use is probably below 1.8. Can you have a double check on the
> environment that your job is running in?
>
> Cheers,
>
> Qingsheng Ren
>
>
> > On Jan 7, 2022, at 1:13 AM, Flink Lover 
> wrote:
> >
> > Hello Folks!
> >
> > I have a DataStream which sends data to the consumer but I got the
> data once the code completed its execution. I didn't receive the records 
> as
> the code was writing it to the topic. I was able to achieve this behavior
> using AT_LEAST_ONCE property but I decided to implement Watermarks. I
> haven't enabled checkpointing as of now. I know checkpointing will also do
> the trick.  My expectation is Producer should batch the records of 2
> seconds and send it to the consumer and consumer should receive a batch of
> 2 seconds. My code goes as below:
> >
> > Producer Side:
> >  dataToKafka.assignTimestampsAndWatermarks(
> >   WatermarkStrategy
> > .forBoundedOutOfOrderness[String](Duration.ofSeconds(2)))
> > dataToKafka.addSink(myProducer).uid("source")
> >
> > Consumer Side:
> > consumer.assignTimestampsAndWatermarks(
> >   WatermarkStrategy
> > .forBoundedOutOfOrderness[String](Duration.ofSeconds(2)))
> >
> > Now this gives me an error as below:
> >
> > Static methods in interface require -target:jvm-1.8
> > .forBoundedOutOfOrderness[String](Duration.ofSeconds(2)))
> >
> > My scala version is 2.11.12 and Java JDK 1.8.0.281
> >
> > Thanks,
> > Martin.
> >
> >
> >
>
>


Avro BulkFormat for the new FileSource API?

2022-01-07 Thread Kevin Lam
Hi all,

We're looking into using the new FileSource

API, we see that there is a BulkFormat

for Parquet, via ParquetColumnarRowFormat
.
Is there similar BulkFormat available or in the works for Avro files?

I imagined it may be a common use-case in the community so wanted to check
here before we invest time implementing our own.

Thanks in advance!


Re: [E] Re: Metaspace OOM : class loaders not being GC

2022-01-07 Thread David Clutter
Thanks for the responses.  I did switch to per-job mode and it is working
well of course.  I suspected there wouldn't be an easy solution, but I had
to ask.  Thanks!

On Fri, Jan 7, 2022 at 3:37 AM David Morávek 
wrote:

> Hi David,
>
> If I understand the problem correctly, there is really nothing we can do
> here. Soft references are garbage collected when there is a high memory
> pressure and the garbage collector needs to free up more memory. The
> problem here is that the GC doesn't really take high memory pressure on
> Metaspace into the account here.
>
> I guess you might try to tweak _SoftRefLRUPolicyMSPerMB_ [1], but this
> might have some other consequences. Also this behavior might be highly
> dependent on the garbage collector you're using.
>
>
> From the docs [1]:
>
> -XX:SoftRefLRUPolicyMSPerMB=*time*
>
> Sets the amount of time (in milliseconds) a softly reachable object is
> kept active on the heap after the last time it was referenced. The default
> value is one second of lifetime per free megabyte in the heap. The
> -XX:SoftRefLRUPolicyMSPerMB option accepts integer values representing
> milliseconds per one megabyte of the current heap size (for Java HotSpot
> Client VM) or the maximum possible heap size (for Java HotSpot Server VM).
> This difference means that the Client VM tends to flush soft references
> rather than grow the heap, whereas the Server VM tends to grow the heap
> rather than flush soft references. In the latter case, the value of the
> -Xmx option has a significant effect on how quickly soft references are
> garbage collected.
>
> The following example shows how to set the value to 2.5 seconds:
>
> -XX:SoftRefLRUPolicyMSPerMB=2500
>
>
>
> [1] https://docs.oracle.com/javase/8/docs/technotes/tools/unix/java.html
> 
>
> Best,
> D.
>
> On Thu, Jan 6, 2022 at 3:13 AM Caizhi Weng  wrote:
>
>> Hi!
>>
>> As far as I remember this is a known issue a few years ago but Flink
>> currently has no solution to this (correct me if I'm wrong). I see that
>> you're running jobs on a yarn session. Could you switch to yarn-per-job
>> mode (where JM and TMs are created and destroyed for each job) for a
>> workaround?
>>
>> David Clutter  于2022年1月4日周二 23:39写道:
>>
>>> I am seeing an issue with class loaders not being GCed and the metaspace
>>> eventually OOM.  Here is my setup:
>>>
>>> - Flink 1.13.1 on EMR using JDK 8 in session mode
>>> - Job manager is a long-running yarn session
>>> - New jobs are submitted every 5m (and typically run for less than 5m)
>>>
>>> I find that after a few hours the job manager gets killed with Metaspace
>>> OOM.  I tried increasing the Metaspace for the job manager but that only
>>> delays the OOM.
>>>
>>> I did some debugging using jcmd and I noticed that the size of the
>>> classes loaded is always increasing.  Next I did a heap dump and found that
>>> instances of org.apache.flink.util.ChildFirstClassLoader are present
>>> long after the jobs complete.  Checking the GC roots I found that there is
>>> a reference in java.io.ObjectStreamClass$Caches.  Seems to be this JDK
>>> issue: https://bugs.openjdk.java.net/browse/JDK-8277072
>>> 
>>>
>>> Curious if there are any workarounds for this situation?
>>>
>>>


Windowing on the consumer side

2022-01-07 Thread Flink Lover
I have an incoming json data like below:
{"custId": 1,"custFirstName":"Martin",
"custLastName":"owen","edl_created_at":"2022-03-01 00:00:00"}

Now, this record has been pushed successfully via producer to the consumer.
But I am willing to get records of say 2 seconds window but I don't have
any key to use in KeyBy() operation.In this case can I use Watermarks?
Something like below:

val consumer = new FlinkKafkaConsumer[String]("topic", new
SimpleStringSchema(), getProperties())

consumer.assignTimestampsAndWatermarks(
  WatermarkStrategy
.forBoundedOutOfOrderness[String](Duration.ofSeconds(2)))

Will this help me to get what I want?

Thanks,
Martin O.


Re: Trigger the producer to send data to the consumer after mentioned seconds

2022-01-07 Thread Flink Lover
Could you please help me with this?

On Fri, Jan 7, 2022 at 11:48 AM Flink Lover  wrote:

> I tried Flink version 1.14.2 / 1.13.5
>
> On Fri, Jan 7, 2022 at 11:46 AM Flink Lover 
> wrote:
>
>> Also, I am using flink-connector-kafka_2.11
>>
>> val consumer = new FlinkKafkaConsumer[String]("topic_name", new
>> SimpleStringSchema(), properties)
>>
>>
>> val myProducer = new FlinkKafkaProducer[String](
>>   "topic_name", // target topic
>>   new KeyedSerializationSchemaWrapper[String](new
>> SimpleStringSchema()), // serialization schema
>>   getProperties(), // producer config
>>   FlinkKafkaProducer.Semantic.AT_LEAST_ONCE)
>>
>>
>>
>> On Fri, Jan 7, 2022 at 11:43 AM Flink Lover 
>> wrote:
>>
>>> Hi All,
>>>
>>> I checked the Java version using the java -version on the terminal and
>>> it gave me 1.8.0.281. Also, the project has been compiled using JDK 8 only
>>> which is by default.
>>>
>>> [image: image.png]
>>>
>>> What do you mean by target jvm? Also, what I am trying to achieve is
>>> correct? about the windows?
>>>
>>> Thanks,
>>> Martin
>>>
>>> On Fri, Jan 7, 2022 at 8:07 AM Qingsheng Ren  wrote:
>>>
 Hi Martin,

 Can you provide the configuration of your Kafka producer and consumer?
 Also it’ll be helpful to have the complete code of your DataStream.

 About the error you mentioned, I doubt that the JDK version you
 actually use is probably below 1.8. Can you have a double check on the
 environment that your job is running in?

 Cheers,

 Qingsheng Ren


 > On Jan 7, 2022, at 1:13 AM, Flink Lover 
 wrote:
 >
 > Hello Folks!
 >
 > I have a DataStream which sends data to the consumer but I got the
 data once the code completed its execution. I didn't receive the records as
 the code was writing it to the topic. I was able to achieve this behavior
 using AT_LEAST_ONCE property but I decided to implement Watermarks. I
 haven't enabled checkpointing as of now. I know checkpointing will also do
 the trick.  My expectation is Producer should batch the records of 2
 seconds and send it to the consumer and consumer should receive a batch of
 2 seconds. My code goes as below:
 >
 > Producer Side:
 >  dataToKafka.assignTimestampsAndWatermarks(
 >   WatermarkStrategy
 > .forBoundedOutOfOrderness[String](Duration.ofSeconds(2)))
 > dataToKafka.addSink(myProducer).uid("source")
 >
 > Consumer Side:
 > consumer.assignTimestampsAndWatermarks(
 >   WatermarkStrategy
 > .forBoundedOutOfOrderness[String](Duration.ofSeconds(2)))
 >
 > Now this gives me an error as below:
 >
 > Static methods in interface require -target:jvm-1.8
 > .forBoundedOutOfOrderness[String](Duration.ofSeconds(2)))
 >
 > My scala version is 2.11.12 and Java JDK 1.8.0.281
 >
 > Thanks,
 > Martin.
 >
 >
 >




Re: Skewed Data when joining tables using Flink SQL

2022-01-07 Thread Yun Gao
Hi Anne,

For one thing, for the datastream and broadcast state method, May I have a 
double 
confirmation that are you using BATCH execution mode? I think with [1] for BATCH
 mode it should be able to first process the broadcast side before the 
non-broadcast side.

Best,
Yun


[1] https://issues.apache.org/jira/browse/FLINK-20491



 --Original Mail --
Sender:Anne Lai 
Send Date:Fri Jan 7 17:20:58 2022
Recipients:User 
Subject:Skewed Data when joining tables using Flink SQL

Hi,

I have a Flink batch job that needs to join a large skewed table with a smaller 
table, and because records are not evenly distributed to each subtask, it 
always fails with a "too much data in partition" error. 
I explored using DataStream API to broadcast the smaller tables as a broadcast 
state, but it is not ideal since it is kept in-memory on all downstream 
operators and even the smaller table is still too large to fit into the memory. 
Using broadcast state would also introduce cold start problems where the order 
of records processed in the connected stream can't be guaranteed. As for 
coGroup transformation, it requires time windowing but both tables are 
un-windowed.

Is there any workaround or ideas that I can try on? Thanks!

Best,
Anne

Re: Serving Machine Learning models

2022-01-07 Thread Yun Gao
Hi Sonia,

Sorry I might not have the statistics on the provided two methods, perhaps as 
input
I could also provide another method: currently there is an eco-project 
dl-on-flink
that supports running DL frameworks on top of the Flink and it will handle the 
data
exchange between java and python processes, which would allows to user the 
native
model directly. 

Best,
Yun


[1] https://github.com/flink-extended/dl-on-flink




--
From:Sonia-Florina Horchidan 
Send Time:2022 Jan. 7 (Fri.) 17:23
To:user@flink.apache.org 
Subject:Serving Machine Learning models



Hello,

I recently started looking into serving Machine Learning models for streaming 
data in Flink. To give more context, that would involve training a model 
offline (using PyTorch or TensorFlow), and calling it from inside a Flink job 
to do online inference on newly arrived data. I have found multiple 
discussions, presentations, and tools that could achieve this, and it seems 
like the two alternatives would be: (1) wrap the pre-trained models in a HTTP 
service (such as PyTorch Serve [1]) and let Flink do async calls for model 
scoring, or (2) convert the models into a standardized format (e.g., ONNX [2]), 
pre-load the model in memory for every task manager (or use external storage if 
needed) and call it for each new data point. 
Both approaches come with a set of advantages and drawbacks and, as far as I 
understand, there is no "silver bullet", since one approach could be more 
suitable than the other based on the application requirements. However, I would 
be curious to know what would be the "recommended" methods for model serving 
(if any) and what approaches are currently adopted by the users in the wild.
[1] https://pytorch.org/serve/
[2] https://onnx.ai/
Best regards,
Sonia

 [Kth Logo]

Sonia-Florina Horchidan
PhD Student
KTH Royal Institute of Technology
Software and Computer Systems (SCS)
School of Electrical Engineering and Computer Science (EECS)
Mobil: +46769751562
sf...@kth.se, www.kth.se



Plans to update StreamExecutionEnvironment.readFiles to use the FLIP-27 compatible FileSource?

2022-01-07 Thread Kevin Lam
Hi all,

Are there any plans to update StreamExecutionEnvironment.readFiles

to use the new FLIP-27 compatible FileSource

?

readFiles supports some features via it's FileInputFormat
like setNestedFileEnumeration and setFilesFilter that we'd be interested in
continuing to use but it seems FileSource doesn't have that.


Re: Exactly Once Semantics

2022-01-07 Thread Siddhesh Kalgaonkar
Hey Yun,

Thanks for your quick response. Much appreciated. I have replied to your
answer on SO and I will continue with my doubts over there.

Thanks,
Sid

On Fri, Jan 7, 2022 at 9:05 PM Yun Gao  wrote:

> Hi Siddhesh,
>
> I answered on the stackoverflow and I also copied the answers here for
> reference:
>
> For the producer side, Flink Kafka Consumer would bookkeeper the current
> offset in the
>
> distributed checkpoint, and if the consumer task failed, it will restarted
> from the latest
>
> checkpoint and re-emit from the offset recorded in the checkpoint. For
> example, suppose
>
> the latest checkpoint records offset 3, and after that flink continue to
> emit 4, 5 and then
>
> failover, then Flink would continue to emit records from 4. Notes that
> this would not cause
>
> duplication since the state of all the operators are also fallback to the
> state after processed
>
> records 3.
>
>
> For the producer side, Flink use two-phase commit [1] to achieve
> exactly-once. Roughly
>
> Flink Producer would relies on Kafka's transaction to write data, and only
> commit data
>
> formally after the transaction is committed. Users could use
> Semantics.EXACTLY_ONCE
>
> to enable this functionality.
>
>
> We are warmly welcome for reaching to the community for help and very
> thanks
>
> everyone for participating in the community :) I think David and Martijn
> are also try to
>
> make we work together more efficiently. Very thanks for the understandings~
>
>
> Best,
>
> Yun
>
>
> [1]
> https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html
>
> [2]
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/#fault-tolerance
>
>
>
> --
> From:Siddhesh Kalgaonkar 
> Send Time:2022 Jan. 7 (Fri.) 23:25
> To:Martijn Visser 
> Cc:"David Morávek" ; user 
> Subject:Re: Exactly Once Semantics
>
> Hi Martijn,
>
> Understood. If possible please help me out with the problem.
>
> Thanks,
> Sid
>
> On Fri, Jan 7, 2022 at 8:45 PM Martijn Visser 
> wrote:
> Hi Siddesh,
>
> The purpose of both Stackoverflow and the mailing list is to solve a
> question or a problem, the mailing list is not for getting attention. It
> equivalents crossposting, which we rather don't. As David mentioned, time
> is limited and we all try to spent it the best we can.
>
> Best regards,
>
> Martijn
>
> Op vr 7 jan. 2022 om 16:04 schreef Siddhesh Kalgaonkar <
> kalgaonkarsiddh...@gmail.com>
> Hi David,
>
> It's actually better in my opinion. Because people who are not aware of
> the ML thread can Google and check the SO posts when they come across any
> similar problems. The reason behind posting on ML is to get attention.
> Because few questions are unanswered for multiple days and since we are
> beginners, the only things which we have are SO and ML.  I won't say
> "Duplication" but more kind of "Availability of similar problems".
>
> It's okay if you don't want to help.
>
> Cheers!
>
> Sid
>
> On Fri, Jan 7, 2022 at 8:18 PM David Morávek  wrote:
> Hi Siddhesh,
>
> can you please focus your questions on one channel only? (either SO or the
> ML)
>
> this could lead to unnecessary work duplication (which would be shame,
> because the community has limited resources) as people answering on SO
> might not be aware of the ML thread
>
> D.
>
> On Fri, Jan 7, 2022 at 3:02 PM Siddhesh Kalgaonkar <
> kalgaonkarsiddh...@gmail.com> wrote:
> I am trying to achieve exactly one semantics using Flink and Kafka. I have
> explained my scenario thoroughly in this post
>
> https://stackoverflow.com/questions/70622321/exactly-once-in-flink-kafka-producer-and-consumer
>
> Any help is much appreciated!
>
> Thanks,
> Sid
> --
>
> Martijn Visser | Product Manager
>
> mart...@ververica.com
>
> 
>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward  - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
>
>


Re: Exactly Once Semantics

2022-01-07 Thread Yun Gao
Hi Siddhesh,

I answered on the stackoverflow and I also copied the answers here for 
reference:

For the producer side, Flink Kafka Consumer would bookkeeper the current offset 
in the 
distributed checkpoint, and if the consumer task failed, it will restarted from 
the latest
 checkpoint and re-emit from the offset recorded in the checkpoint. For 
example, suppose 
the latest checkpoint records offset 3, and after that flink continue to emit 
4, 5 and then 
failover, then Flink would continue to emit records from 4. Notes that this 
would not cause 
duplication since the state of all the operators are also fallback to the state 
after processed 
records 3.

For the producer side, Flink use two-phase commit [1] to achieve exactly-once. 
Roughly 
Flink Producer would relies on Kafka's transaction to write data, and only 
commit data 
formally after the transaction is committed. Users could use 
Semantics.EXACTLY_ONCE
to enable this functionality.

We are warmly welcome for reaching to the community for help and very thanks 
everyone for participating in the community :) I think David and Martijn are 
also try to 
make we work together more efficiently. Very thanks for the understandings~

Best,
Yun

[1] 
https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html
[2] 
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/#fault-tolerance




--
From:Siddhesh Kalgaonkar 
Send Time:2022 Jan. 7 (Fri.) 23:25
To:Martijn Visser 
Cc:"David Morávek" ; user 
Subject:Re: Exactly Once Semantics

Hi Martijn,

Understood. If possible please help me out with the problem.

Thanks,
Sid
On Fri, Jan 7, 2022 at 8:45 PM Martijn Visser  wrote:

Hi Siddesh,

The purpose of both Stackoverflow and the mailing list is to solve a question 
or a problem, the mailing list is not for getting attention. It equivalents 
crossposting, which we rather don't. As David mentioned, time is limited and we 
all try to spent it the best we can.

Best regards,

Martijn

Op vr 7 jan. 2022 om 16:04 schreef Siddhesh Kalgaonkar 

Hi David,

It's actually better in my opinion. Because people who are not aware of the ML 
thread can Google and check the SO posts when they come across any similar 
problems. The reason behind posting on ML is to get attention. Because few 
questions are unanswered for multiple days and since we are beginners, the only 
things which we have are SO and ML.  I won't say "Duplication" but more kind of 
"Availability of similar problems".

It's okay if you don't want to help.  

Cheers!

Sid
On Fri, Jan 7, 2022 at 8:18 PM David Morávek  wrote:
Hi Siddhesh,

can you please focus your questions on one channel only? (either SO or the ML)

this could lead to unnecessary work duplication (which would be shame, because 
the community has limited resources) as people answering on SO might not be 
aware of the ML thread

D.

On Fri, Jan 7, 2022 at 3:02 PM Siddhesh Kalgaonkar 
 wrote:
I am trying to achieve exactly one semantics using Flink and Kafka. I have 
explained my scenario thoroughly in this post 
https://stackoverflow.com/questions/70622321/exactly-once-in-flink-kafka-producer-and-consumer

Any help is much appreciated!

Thanks,
Sid-- 

Martijn Visser | Product Manager
mart...@ververica.com


Follow us @VervericaData
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time




Re: Exactly Once Semantics

2022-01-07 Thread Siddhesh Kalgaonkar
Hi Martijn,

Understood. If possible please help me out with the problem.

Thanks,
Sid

On Fri, Jan 7, 2022 at 8:45 PM Martijn Visser  wrote:

> Hi Siddesh,
>
> The purpose of both Stackoverflow and the mailing list is to solve a
> question or a problem, the mailing list is not for getting attention. It
> equivalents crossposting, which we rather don't. As David mentioned, time
> is limited and we all try to spent it the best we can.
>
> Best regards,
>
> Martijn
>
> Op vr 7 jan. 2022 om 16:04 schreef Siddhesh Kalgaonkar <
> kalgaonkarsiddh...@gmail.com>
>
>> Hi David,
>>
>> It's actually better in my opinion. Because people who are not aware of
>> the ML thread can Google and check the SO posts when they come across any
>> similar problems. The reason behind posting on ML is to get attention.
>> Because few questions are unanswered for multiple days and since we are
>> beginners, the only things which we have are SO and ML.  I won't say
>> "Duplication" but more kind of "Availability of similar problems".
>>
>> It's okay if you don't want to help.
>>
>> Cheers!
>>
>> Sid
>>
>> On Fri, Jan 7, 2022 at 8:18 PM David Morávek  wrote:
>>
>>> Hi Siddhesh,
>>>
>>> can you please focus your questions on one channel only? (either SO or
>>> the ML)
>>>
>>> this could lead to unnecessary work duplication (which would be shame,
>>> because the community has limited resources) as people answering on SO
>>> might not be aware of the ML thread
>>>
>>> D.
>>>
>>> On Fri, Jan 7, 2022 at 3:02 PM Siddhesh Kalgaonkar <
>>> kalgaonkarsiddh...@gmail.com> wrote:
>>>
 I am trying to achieve exactly one semantics using Flink and Kafka. I
 have explained my scenario thoroughly in this post

 https://stackoverflow.com/questions/70622321/exactly-once-in-flink-kafka-producer-and-consumer

 Any help is much appreciated!

 Thanks,
 Sid

>>> --
>
> Martijn Visser | Product Manager
>
> mart...@ververica.com
>
> 
>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward  - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
>


Re: RowType for complex types in Parquet File

2022-01-07 Thread Jing Ge
Hi Meghajit,

like the exception described, parquet schema with nested columns is not
supported currently. It is on our todo list with high priority.

Best regards
Jing

On Fri, Jan 7, 2022 at 6:12 AM Meghajit Mazumdar <
meghajit.mazum...@gojek.com> wrote:

> Hello,
>
> Flink documentation mentions this
> 
> as to how to create a FileSource for reading Parquet files.
> For primitive parquet types like BINARY and BOOLEAN, I am able to create a
> RowType and read the fields.
>
> However, I have some nested fields in my parquet schema also like this
> which I want to read :
>
>   optional group location = 11 {
> optional double latitude = 1;
> optional double longitude = 2;
>   }
>
> How can I create a RowType for this ? I did something like this below, but
> I got an exception `Caused by: java.lang.UnsupportedOperationException:
> Complex types not supported`
>
> RowType nestedRowType = RowType.of(new LogicalType[] {new
> DoubleType(), new DoubleType()}, new String[]{"latitude", "longitude"});
> final LogicalType[] fieldTypes = new
> LogicalType[]{nestedRowType};
> final ParquetColumnarRowInputFormat format =
> new ParquetColumnarRowInputFormat<>(
> new Configuration(),
> RowType.of(fieldTypes, new
> String[]{"location"}),
> 500,
> false,
> true);
>


Re: Exactly Once Semantics

2022-01-07 Thread David Morávek
Also please note that the Apache mailing lists are also indexed by search
engines and publicly archived [1].

[1] https://lists.apache.org/list.html?user@flink.apache.org

On Fri, Jan 7, 2022 at 4:15 PM Martijn Visser  wrote:

> Hi Siddesh,
>
> The purpose of both Stackoverflow and the mailing list is to solve a
> question or a problem, the mailing list is not for getting attention. It
> equivalents crossposting, which we rather don't. As David mentioned, time
> is limited and we all try to spent it the best we can.
>
> Best regards,
>
> Martijn
>
> Op vr 7 jan. 2022 om 16:04 schreef Siddhesh Kalgaonkar <
> kalgaonkarsiddh...@gmail.com>
>
>> Hi David,
>>
>> It's actually better in my opinion. Because people who are not aware of
>> the ML thread can Google and check the SO posts when they come across any
>> similar problems. The reason behind posting on ML is to get attention.
>> Because few questions are unanswered for multiple days and since we are
>> beginners, the only things which we have are SO and ML.  I won't say
>> "Duplication" but more kind of "Availability of similar problems".
>>
>> It's okay if you don't want to help.
>>
>> Cheers!
>>
>> Sid
>>
>> On Fri, Jan 7, 2022 at 8:18 PM David Morávek  wrote:
>>
>>> Hi Siddhesh,
>>>
>>> can you please focus your questions on one channel only? (either SO or
>>> the ML)
>>>
>>> this could lead to unnecessary work duplication (which would be shame,
>>> because the community has limited resources) as people answering on SO
>>> might not be aware of the ML thread
>>>
>>> D.
>>>
>>> On Fri, Jan 7, 2022 at 3:02 PM Siddhesh Kalgaonkar <
>>> kalgaonkarsiddh...@gmail.com> wrote:
>>>
 I am trying to achieve exactly one semantics using Flink and Kafka. I
 have explained my scenario thoroughly in this post

 https://stackoverflow.com/questions/70622321/exactly-once-in-flink-kafka-producer-and-consumer

 Any help is much appreciated!

 Thanks,
 Sid

>>> --
>
> Martijn Visser | Product Manager
>
> mart...@ververica.com
>
> 
>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward  - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
>


Re: Exactly Once Semantics

2022-01-07 Thread Martijn Visser
Hi Siddesh,

The purpose of both Stackoverflow and the mailing list is to solve a
question or a problem, the mailing list is not for getting attention. It
equivalents crossposting, which we rather don't. As David mentioned, time
is limited and we all try to spent it the best we can.

Best regards,

Martijn

Op vr 7 jan. 2022 om 16:04 schreef Siddhesh Kalgaonkar <
kalgaonkarsiddh...@gmail.com>

> Hi David,
>
> It's actually better in my opinion. Because people who are not aware of
> the ML thread can Google and check the SO posts when they come across any
> similar problems. The reason behind posting on ML is to get attention.
> Because few questions are unanswered for multiple days and since we are
> beginners, the only things which we have are SO and ML.  I won't say
> "Duplication" but more kind of "Availability of similar problems".
>
> It's okay if you don't want to help.
>
> Cheers!
>
> Sid
>
> On Fri, Jan 7, 2022 at 8:18 PM David Morávek  wrote:
>
>> Hi Siddhesh,
>>
>> can you please focus your questions on one channel only? (either SO or
>> the ML)
>>
>> this could lead to unnecessary work duplication (which would be shame,
>> because the community has limited resources) as people answering on SO
>> might not be aware of the ML thread
>>
>> D.
>>
>> On Fri, Jan 7, 2022 at 3:02 PM Siddhesh Kalgaonkar <
>> kalgaonkarsiddh...@gmail.com> wrote:
>>
>>> I am trying to achieve exactly one semantics using Flink and Kafka. I
>>> have explained my scenario thoroughly in this post
>>>
>>> https://stackoverflow.com/questions/70622321/exactly-once-in-flink-kafka-producer-and-consumer
>>>
>>> Any help is much appreciated!
>>>
>>> Thanks,
>>> Sid
>>>
>> --

Martijn Visser | Product Manager

mart...@ververica.com




Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time


Re: Exactly Once Semantics

2022-01-07 Thread Siddhesh Kalgaonkar
Hi David,

It's actually better in my opinion. Because people who are not aware of the
ML thread can Google and check the SO posts when they come across any
similar problems. The reason behind posting on ML is to get attention.
Because few questions are unanswered for multiple days and since we are
beginners, the only things which we have are SO and ML.  I won't say
"Duplication" but more kind of "Availability of similar problems".

It's okay if you don't want to help.

Cheers!

Sid

On Fri, Jan 7, 2022 at 8:18 PM David Morávek  wrote:

> Hi Siddhesh,
>
> can you please focus your questions on one channel only? (either SO or the
> ML)
>
> this could lead to unnecessary work duplication (which would be shame,
> because the community has limited resources) as people answering on SO
> might not be aware of the ML thread
>
> D.
>
> On Fri, Jan 7, 2022 at 3:02 PM Siddhesh Kalgaonkar <
> kalgaonkarsiddh...@gmail.com> wrote:
>
>> I am trying to achieve exactly one semantics using Flink and Kafka. I
>> have explained my scenario thoroughly in this post
>>
>> https://stackoverflow.com/questions/70622321/exactly-once-in-flink-kafka-producer-and-consumer
>>
>> Any help is much appreciated!
>>
>> Thanks,
>> Sid
>>
>


Re: Exactly Once Semantics

2022-01-07 Thread David Morávek
Hi Siddhesh,

can you please focus your questions on one channel only? (either SO or the
ML)

this could lead to unnecessary work duplication (which would be shame,
because the community has limited resources) as people answering on SO
might not be aware of the ML thread

D.

On Fri, Jan 7, 2022 at 3:02 PM Siddhesh Kalgaonkar <
kalgaonkarsiddh...@gmail.com> wrote:

> I am trying to achieve exactly one semantics using Flink and Kafka. I have
> explained my scenario thoroughly in this post
>
> https://stackoverflow.com/questions/70622321/exactly-once-in-flink-kafka-producer-and-consumer
>
> Any help is much appreciated!
>
> Thanks,
> Sid
>


Exactly Once Semantics

2022-01-07 Thread Siddhesh Kalgaonkar
I am trying to achieve exactly one semantics using Flink and Kafka. I have
explained my scenario thoroughly in this post
https://stackoverflow.com/questions/70622321/exactly-once-in-flink-kafka-producer-and-consumer

Any help is much appreciated!

Thanks,
Sid


Re: [ANNOUNCE] Apache Flink ML 2.0.0 released

2022-01-07 Thread David Morávek
Great job! <3 Thanks Dong and Yun for managing the release and big thanks
to everyone who has contributed!

Best,
D.

On Fri, Jan 7, 2022 at 2:27 PM Yun Gao  wrote:

> The Apache Flink community is very happy to announce the release of Apache
> Flink ML 2.0.0.
>
>
>
> Apache Flink ML provides API and infrastructure that simplifies
> implementing distributed ML algorithms,
>
> and it also provides a library of off-the-shelf ML algorithms.
>
>
>
> Please check out the release blog post for an overview of the release:
>
> https://flink.apache.org/news/2022/01/07/release-ml-2.0.0.html
>
>
>
> The release is available for download at:
>
> https://flink.apache.org/downloads.html
>
>
>
> Maven artifacts for Flink ML can be found at:
>
> https://search.maven.org/search?q=g:org.apache.flink%20ml
>
>
>
> Python SDK for Flink ML published to the PyPI index can be found at:
>
> https://pypi.org/project/apache-flink-ml/
>
>
>
> The full release notes are available in Jira:
>
>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12351079
>
>
>
> We would like to thank all contributors of the Apache Flink community who
> made this release possible!
>
>
>
> Regards,
>
> Dong and Yun
>


Re: [ANNOUNCE] Apache Flink ML 2.0.0 released

2022-01-07 Thread David Morávek
Great job! <3 Thanks Dong and Yun for managing the release and big thanks
to everyone who has contributed!

Best,
D.

On Fri, Jan 7, 2022 at 2:27 PM Yun Gao  wrote:

> The Apache Flink community is very happy to announce the release of Apache
> Flink ML 2.0.0.
>
>
>
> Apache Flink ML provides API and infrastructure that simplifies
> implementing distributed ML algorithms,
>
> and it also provides a library of off-the-shelf ML algorithms.
>
>
>
> Please check out the release blog post for an overview of the release:
>
> https://flink.apache.org/news/2022/01/07/release-ml-2.0.0.html
>
>
>
> The release is available for download at:
>
> https://flink.apache.org/downloads.html
>
>
>
> Maven artifacts for Flink ML can be found at:
>
> https://search.maven.org/search?q=g:org.apache.flink%20ml
>
>
>
> Python SDK for Flink ML published to the PyPI index can be found at:
>
> https://pypi.org/project/apache-flink-ml/
>
>
>
> The full release notes are available in Jira:
>
>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12351079
>
>
>
> We would like to thank all contributors of the Apache Flink community who
> made this release possible!
>
>
>
> Regards,
>
> Dong and Yun
>


[ANNOUNCE] Apache Flink ML 2.0.0 released

2022-01-07 Thread Yun Gao
The Apache Flink community is very happy to announce the release of Apache 
Flink ML 2.0.0.

Apache Flink ML provides API and infrastructure that simplifies implementing 
distributed ML algorithms, 
and it also provides a library of off-the-shelf ML algorithms.

Please check out the release blog post for an overview of the release:
https://flink.apache.org/news/2022/01/07/release-ml-2.0.0.html

The release is available for download at:
https://flink.apache.org/downloads.html

Maven artifacts for Flink ML can be found at:
https://search.maven.org/search?q=g:org.apache.flink%20ml

Python SDK for Flink ML published to the PyPI index can be found at:
https://pypi.org/project/apache-flink-ml/

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12351079

We would like to thank all contributors of the Apache Flink community who made 
this release possible!

Regards,
Dong and Yun

[ANNOUNCE] Apache Flink ML 2.0.0 released

2022-01-07 Thread Yun Gao
The Apache Flink community is very happy to announce the release of Apache 
Flink ML 2.0.0.

Apache Flink ML provides API and infrastructure that simplifies implementing 
distributed ML algorithms, 
and it also provides a library of off-the-shelf ML algorithms.

Please check out the release blog post for an overview of the release:
https://flink.apache.org/news/2022/01/07/release-ml-2.0.0.html

The release is available for download at:
https://flink.apache.org/downloads.html

Maven artifacts for Flink ML can be found at:
https://search.maven.org/search?q=g:org.apache.flink%20ml

Python SDK for Flink ML published to the PyPI index can be found at:
https://pypi.org/project/apache-flink-ml/

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12351079

We would like to thank all contributors of the Apache Flink community who made 
this release possible!

Regards,
Dong and Yun

Re: adding elapsed times to events that form a transaction

2022-01-07 Thread HG
I am watching a ververica youtube playlist first
Already did the rides-and-fares stuff.

Will certainly look into these.

Thanks Ali


Op vr 7 jan. 2022 om 11:32 schreef Ali Bahadir Zeybek :

> Hello Hans,
>
> If you would like to see some hands-on examples which showcases the
> capabilities of Flink, I would suggest you follow the training
> exercises[1].
> To be more specific, checkpointing[2] example implements a similar logic to
> what you have described.
>
> Sincerely,
>
> Ali
>
> [1]: https://github.com/ververica/flink-training
> [2]:
> https://github.com/ververica/flink-training/tree/master/troubleshooting/checkpointing
>
> On Fri, Jan 7, 2022 at 1:13 PM Francesco Guardiani <
> france...@ververica.com> wrote:
>
>> So in Flink we essentially have 2 main APIs to define stream topologies:
>> one is DataStream and the other one is Table API. My guess is that right
>> now you're trying to use DataStream with the Kafka connector.
>>
>> DataStream allows you to statically define a stream topology, with an API
>> in a similar fashion to Java Streams or RxJava.
>> Table API on the other hand gives you the ability to define stream jobs
>> using SQL, where you can easily perform operations such as joins over
>> windows.
>>
>> Flink is definitely able to solve your use case, with both APIs. You can
>> also mix these two APIs in your application to solve your use case in the
>> way you want.
>> I suggest you start by looking at the documentation of Table API
>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/overview/
>> and then, for your specific use case, check
>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sql/queries/window-tvf/
>> .
>>
>> Hope it helps.
>> FG
>>
>> On Fri, Jan 7, 2022 at 10:58 AM HG  wrote:
>>
>>> Hi Francesco.
>>>
>>> I am not using anything right now apart from Kafka.
>>> Just need to know whether Flink is capable of doing this and trying to
>>> understand the documentation and terminology etc.
>>> I grapple a bit to understand the whole picture.
>>>
>>> Thanks
>>>
>>> Regards Hans
>>>
>>> Op vr 7 jan. 2022 om 09:24 schreef Francesco Guardiani <
>>> france...@ververica.com>:
>>>
 Hi,
 Are you using SQL or DataStream? For SQL you can use the Window TVF
 
 feature, where the window size is the "max" elapsed time, and then inside
 the window you pick the beginning and end event and join them.

 Hope it helps,
 FG

 On Thu, Jan 6, 2022 at 3:25 PM HG  wrote:

> Hello all,
>
> My question is basically whether it is possible to group events by a
> key (these will belong to a specific transaction) and then calculate the
> elapsed times between them based on a timestamp that is present in the
> event.
> So a transaction my have x events all timestamped and with the
> transaction_id as key.
> Is it possible to
> 1. group them by the key
> 2. order by the timestamp,
> 3. calculate the elapsed times between the steps/event
> 4. add that elapsed time to the step/event
> 5. output the modified events to the sink
>
>
>
> Regards Hans
>



Re: adding elapsed times to events that form a transaction

2022-01-07 Thread David Anderson
One way to solve this with Flink SQL would be to use MATCH_RECOGNIZE. [1]
is an example illustrating a very similar use case.

[1] https://stackoverflow.com/a/62122751/2000823

On Fri, Jan 7, 2022 at 11:32 AM Ali Bahadir Zeybek 
wrote:

> Hello Hans,
>
> If you would like to see some hands-on examples which showcases the
> capabilities of Flink, I would suggest you follow the training
> exercises[1].
> To be more specific, checkpointing[2] example implements a similar logic to
> what you have described.
>
> Sincerely,
>
> Ali
>
> [1]: https://github.com/ververica/flink-training
> [2]:
> https://github.com/ververica/flink-training/tree/master/troubleshooting/checkpointing
>
> On Fri, Jan 7, 2022 at 1:13 PM Francesco Guardiani <
> france...@ververica.com> wrote:
>
>> So in Flink we essentially have 2 main APIs to define stream topologies:
>> one is DataStream and the other one is Table API. My guess is that right
>> now you're trying to use DataStream with the Kafka connector.
>>
>> DataStream allows you to statically define a stream topology, with an API
>> in a similar fashion to Java Streams or RxJava.
>> Table API on the other hand gives you the ability to define stream jobs
>> using SQL, where you can easily perform operations such as joins over
>> windows.
>>
>> Flink is definitely able to solve your use case, with both APIs. You can
>> also mix these two APIs in your application to solve your use case in the
>> way you want.
>> I suggest you start by looking at the documentation of Table API
>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/overview/
>> and then, for your specific use case, check
>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sql/queries/window-tvf/
>> .
>>
>> Hope it helps.
>> FG
>>
>> On Fri, Jan 7, 2022 at 10:58 AM HG  wrote:
>>
>>> Hi Francesco.
>>>
>>> I am not using anything right now apart from Kafka.
>>> Just need to know whether Flink is capable of doing this and trying to
>>> understand the documentation and terminology etc.
>>> I grapple a bit to understand the whole picture.
>>>
>>> Thanks
>>>
>>> Regards Hans
>>>
>>> Op vr 7 jan. 2022 om 09:24 schreef Francesco Guardiani <
>>> france...@ververica.com>:
>>>
 Hi,
 Are you using SQL or DataStream? For SQL you can use the Window TVF
 
 feature, where the window size is the "max" elapsed time, and then inside
 the window you pick the beginning and end event and join them.

 Hope it helps,
 FG

 On Thu, Jan 6, 2022 at 3:25 PM HG  wrote:

> Hello all,
>
> My question is basically whether it is possible to group events by a
> key (these will belong to a specific transaction) and then calculate the
> elapsed times between them based on a timestamp that is present in the
> event.
> So a transaction my have x events all timestamped and with the
> transaction_id as key.
> Is it possible to
> 1. group them by the key
> 2. order by the timestamp,
> 3. calculate the elapsed times between the steps/event
> 4. add that elapsed time to the step/event
> 5. output the modified events to the sink
>
>
>
> Regards Hans
>



Re: adding elapsed times to events that form a transaction

2022-01-07 Thread HG
Super
Then it will not be a waste of time to learn flink.

Thanks!

Op vr 7 jan. 2022 om 11:13 schreef Francesco Guardiani <
france...@ververica.com>:

> So in Flink we essentially have 2 main APIs to define stream topologies:
> one is DataStream and the other one is Table API. My guess is that right
> now you're trying to use DataStream with the Kafka connector.
>
> DataStream allows you to statically define a stream topology, with an API
> in a similar fashion to Java Streams or RxJava.
> Table API on the other hand gives you the ability to define stream jobs
> using SQL, where you can easily perform operations such as joins over
> windows.
>
> Flink is definitely able to solve your use case, with both APIs. You can
> also mix these two APIs in your application to solve your use case in the
> way you want.
> I suggest you start by looking at the documentation of Table API
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/overview/
> and then, for your specific use case, check
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sql/queries/window-tvf/
> .
>
> Hope it helps.
> FG
>
> On Fri, Jan 7, 2022 at 10:58 AM HG  wrote:
>
>> Hi Francesco.
>>
>> I am not using anything right now apart from Kafka.
>> Just need to know whether Flink is capable of doing this and trying to
>> understand the documentation and terminology etc.
>> I grapple a bit to understand the whole picture.
>>
>> Thanks
>>
>> Regards Hans
>>
>> Op vr 7 jan. 2022 om 09:24 schreef Francesco Guardiani <
>> france...@ververica.com>:
>>
>>> Hi,
>>> Are you using SQL or DataStream? For SQL you can use the Window TVF
>>> 
>>> feature, where the window size is the "max" elapsed time, and then inside
>>> the window you pick the beginning and end event and join them.
>>>
>>> Hope it helps,
>>> FG
>>>
>>> On Thu, Jan 6, 2022 at 3:25 PM HG  wrote:
>>>
 Hello all,

 My question is basically whether it is possible to group events by a
 key (these will belong to a specific transaction) and then calculate the
 elapsed times between them based on a timestamp that is present in the
 event.
 So a transaction my have x events all timestamped and with the
 transaction_id as key.
 Is it possible to
 1. group them by the key
 2. order by the timestamp,
 3. calculate the elapsed times between the steps/event
 4. add that elapsed time to the step/event
 5. output the modified events to the sink



 Regards Hans

>>>


Re: adding elapsed times to events that form a transaction

2022-01-07 Thread Ali Bahadir Zeybek
Hello Hans,

If you would like to see some hands-on examples which showcases the
capabilities of Flink, I would suggest you follow the training exercises[1].
To be more specific, checkpointing[2] example implements a similar logic to
what you have described.

Sincerely,

Ali

[1]: https://github.com/ververica/flink-training
[2]:
https://github.com/ververica/flink-training/tree/master/troubleshooting/checkpointing

On Fri, Jan 7, 2022 at 1:13 PM Francesco Guardiani 
wrote:

> So in Flink we essentially have 2 main APIs to define stream topologies:
> one is DataStream and the other one is Table API. My guess is that right
> now you're trying to use DataStream with the Kafka connector.
>
> DataStream allows you to statically define a stream topology, with an API
> in a similar fashion to Java Streams or RxJava.
> Table API on the other hand gives you the ability to define stream jobs
> using SQL, where you can easily perform operations such as joins over
> windows.
>
> Flink is definitely able to solve your use case, with both APIs. You can
> also mix these two APIs in your application to solve your use case in the
> way you want.
> I suggest you start by looking at the documentation of Table API
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/overview/
> and then, for your specific use case, check
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sql/queries/window-tvf/
> .
>
> Hope it helps.
> FG
>
> On Fri, Jan 7, 2022 at 10:58 AM HG  wrote:
>
>> Hi Francesco.
>>
>> I am not using anything right now apart from Kafka.
>> Just need to know whether Flink is capable of doing this and trying to
>> understand the documentation and terminology etc.
>> I grapple a bit to understand the whole picture.
>>
>> Thanks
>>
>> Regards Hans
>>
>> Op vr 7 jan. 2022 om 09:24 schreef Francesco Guardiani <
>> france...@ververica.com>:
>>
>>> Hi,
>>> Are you using SQL or DataStream? For SQL you can use the Window TVF
>>> 
>>> feature, where the window size is the "max" elapsed time, and then inside
>>> the window you pick the beginning and end event and join them.
>>>
>>> Hope it helps,
>>> FG
>>>
>>> On Thu, Jan 6, 2022 at 3:25 PM HG  wrote:
>>>
 Hello all,

 My question is basically whether it is possible to group events by a
 key (these will belong to a specific transaction) and then calculate the
 elapsed times between them based on a timestamp that is present in the
 event.
 So a transaction my have x events all timestamped and with the
 transaction_id as key.
 Is it possible to
 1. group them by the key
 2. order by the timestamp,
 3. calculate the elapsed times between the steps/event
 4. add that elapsed time to the step/event
 5. output the modified events to the sink



 Regards Hans

>>>


Re: reinterpretAsKeyedStream and Elastic Scaling in Reactive Mode

2022-01-07 Thread Martin
Thanks David for the hints.
I checked the usage of the state API and for me it seems to be correct, but I am a new Flink users.
Checkpoints happen eachs minute, the scaleing I trigger after 30 minutes.The source and sink are Kafka topics in EXACTLY_ONCE mode.
 
I tried to simplify the code, but didnt test if it still cause the issue, but maybe it already shows some misuage of the state API.
 
BR
Martin

public class MyRecord2ProcessFunction extends KeyedProcessFunction {
    private final OutputTag sideOutputLateData;
    private transient UUID applicationId;
    private transient ValueState stateRecordSequenceNumber;
 
    public MyRecord2ProcessFunction(OutputTag sideOutputLateData) {
        this.sideOutputLateData = sideOutputLateData;
    }
    public static MyRecord2ProcessFunction create(OutputTag sideOutputLateData) {
        return new MyRecord2ProcessFunction(sideOutputLateData);
    }
    @Override
    public String toString() {
        return "MyRecord2ProcessFunction()";
    }
    @Override
    public void open(Configuration configuration) throws Exception {
        // Parameters
        parameters = (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
        long parameterTTL = parameters.getLong("processing.ttl.days", 1L);
        StateTtlConfig ttlConfig = StateTtlConfig
            .newBuilder(Time.days(parameterTTL))
            .setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite)
            .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
            .build();
        applicationId = UuidUtil.getTimeBasedUuid();
        ValueStateDescriptor recordSequenceNumberStateDescriptor = new ValueStateDescriptor<>("recordSequenceNumber", Long.class);
        recordSequenceNumberStateDescriptor.enableTimeToLive((ttlConfig));
        stateRecordSequenceNumber = getRuntimeContext().getState(recordSequenceNumberStateDescriptor);
    }
    @Override
    public void processElement(MyRecord2 MyRecord2, KeyedProcessFunction.Context ctx, Collector out) throws Exception {
        MyRecord2.setRecordingNetworkFunctionID(applicationId);
        if(MyRecord2.isClosed()) {
            ctx.output(sideOutputLateData, errorMyRecord2);
        } else {
            Long i = stateRecordSequenceNumber.value();
            if (i == 1 && MyRecord2.isClosed()) {
                // nothing
            } else if(meta.isClosed()) {
                MyRecord2.setRecordSequenceNumber(i);
                stateRecordSequenceNumber.update(i + 1L);
            } else if (!MyRecord2.hasRelease()) {
                MyRecord2.setRecordSequenceNumber(i);
                stateRecordSequenceNumber.update(i + 1L);
            } else {
                if(i>1) {
                    MyRecord2.setRecordSequenceNumber(i);
                    stateRecordSequenceNumber.update(i + 1L);
                }
            }
            out.collect(MyRecord2);
        }
    }
}
 

public class MyRecord1Processing {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        ParameterTool parameters = ParameterToolEnvironmentUtils.createParameterTool(args);
        env.getConfig().setGlobalJobParameters(parameters);
        String parameterKafkaTransactionTimeout = parameters.get("kafka.transaction.timeout.ms", "360"); // sync with Kafka brokers transaction.max.timeout.ms setting
        int parameterKafkaSourceIdlenessSec = parameters.getInt("processing.source.idleness.sec",60);
        int parameterWatermarksMaxOutOfOrderSec = parameters.getInt("processing.watermarks.out-of-order.max.sec",60);
        int parameterAllowedLatenessSec = parameters.getInt("processing.allowed-lateness.sec",600);
        int parameterAllowedLatenessMs = parameterAllowedLatenessSec * 1000;
        int parameterParallelism = parameters.getInt("processing.parallel.all", env.getParallelism());
        String parameterKafkaTransactionIdPrefix = parameters.get("kafka.transaction.id.prefix", "charging-processing");
        env.setParallelism(parameterParallelism);
        env.getConfig().enableObjectReuse();
        OutputTag outErrors = new OutputTag<>("errors") {};
        OutputTag outMyRecord2Errors = new OutputTag<>("MyRecord2-errors") {};
        KafkaSource MyRecord1Source = KafkaSource.builder()
            .setBootstrapServers(parameterKafkaBrokers)
            .setTopics(parameterKafkaTopicMyRecord1)
            .setGroupId(parameterKafkaGroup)
            .setValueOnlyDeserializer(new JsonSerializationSchema<>(MyRecord1.class))
            .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
            .setProperty("flink.partition-discovery.interval-millis", "1")
            .setProperty("transaction.timeout.ms", parameterKafkaTransactionTimeout)
            .build();
        WatermarkStrategy watermarkStrategy = WatermarkStrategy
            

Re: adding elapsed times to events that form a transaction

2022-01-07 Thread Francesco Guardiani
So in Flink we essentially have 2 main APIs to define stream topologies:
one is DataStream and the other one is Table API. My guess is that right
now you're trying to use DataStream with the Kafka connector.

DataStream allows you to statically define a stream topology, with an API
in a similar fashion to Java Streams or RxJava.
Table API on the other hand gives you the ability to define stream jobs
using SQL, where you can easily perform operations such as joins over
windows.

Flink is definitely able to solve your use case, with both APIs. You can
also mix these two APIs in your application to solve your use case in the
way you want.
I suggest you start by looking at the documentation of Table API
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/overview/
and then, for your specific use case, check
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sql/queries/window-tvf/
.

Hope it helps.
FG

On Fri, Jan 7, 2022 at 10:58 AM HG  wrote:

> Hi Francesco.
>
> I am not using anything right now apart from Kafka.
> Just need to know whether Flink is capable of doing this and trying to
> understand the documentation and terminology etc.
> I grapple a bit to understand the whole picture.
>
> Thanks
>
> Regards Hans
>
> Op vr 7 jan. 2022 om 09:24 schreef Francesco Guardiani <
> france...@ververica.com>:
>
>> Hi,
>> Are you using SQL or DataStream? For SQL you can use the Window TVF
>> 
>> feature, where the window size is the "max" elapsed time, and then inside
>> the window you pick the beginning and end event and join them.
>>
>> Hope it helps,
>> FG
>>
>> On Thu, Jan 6, 2022 at 3:25 PM HG  wrote:
>>
>>> Hello all,
>>>
>>> My question is basically whether it is possible to group events by a key
>>> (these will belong to a specific transaction) and then calculate the
>>> elapsed times between them based on a timestamp that is present in the
>>> event.
>>> So a transaction my have x events all timestamped and with the
>>> transaction_id as key.
>>> Is it possible to
>>> 1. group them by the key
>>> 2. order by the timestamp,
>>> 3. calculate the elapsed times between the steps/event
>>> 4. add that elapsed time to the step/event
>>> 5. output the modified events to the sink
>>>
>>>
>>>
>>> Regards Hans
>>>
>>


Re: adding elapsed times to events that form a transaction

2022-01-07 Thread HG
Hi Francesco.

I am not using anything right now apart from Kafka.
Just need to know whether Flink is capable of doing this and trying to
understand the documentation and terminology etc.
I grapple a bit to understand the whole picture.

Thanks

Regards Hans

Op vr 7 jan. 2022 om 09:24 schreef Francesco Guardiani <
france...@ververica.com>:

> Hi,
> Are you using SQL or DataStream? For SQL you can use the Window TVF
> 
> feature, where the window size is the "max" elapsed time, and then inside
> the window you pick the beginning and end event and join them.
>
> Hope it helps,
> FG
>
> On Thu, Jan 6, 2022 at 3:25 PM HG  wrote:
>
>> Hello all,
>>
>> My question is basically whether it is possible to group events by a key
>> (these will belong to a specific transaction) and then calculate the
>> elapsed times between them based on a timestamp that is present in the
>> event.
>> So a transaction my have x events all timestamped and with the
>> transaction_id as key.
>> Is it possible to
>> 1. group them by the key
>> 2. order by the timestamp,
>> 3. calculate the elapsed times between the steps/event
>> 4. add that elapsed time to the step/event
>> 5. output the modified events to the sink
>>
>>
>>
>> Regards Hans
>>
>


Re: reinterpretAsKeyedStream and Elastic Scaling in Reactive Mode

2022-01-07 Thread David Morávek
OK, my first intuition would be some kind of misuse of the state API. Other
guess would be, has any checkpoint completed prior triggering of the
re-scaling event?

I'll also try to verify the scenario you've described, but these would be
the things that I'd check first.

D.

On Fri, Jan 7, 2022 at 10:46 AM Martin  wrote:

> Hello David,
>
> right now I cant share the complete code. But I will try in some days to
> simplify it and reduce the code to still trigger the issue.
>
> First I will check, if the explict keyBy instead of the
> reinterpretAsKeyedStream  fix the issue.
> If yes, that would assume - for me - that its a bug with
> reinterpretAsKeyedStream and the elastic scaling.
> If no, its probably another issue caused by my code, instead of Flink.
>
> BR
> Martin
>
> David Morávek schrieb am 07.01.2022 10:22 (GMT +01:00):
>
> Would you be able share the code of your test by any chance?
>
> Best,
> D.
>
> On Fri, Jan 7, 2022 at 10:06 AM Martin  wrote:
>
>> Hello David,
>>
>> I have a test setup, where the input is all the time the same.
>> After processing, I check all the output if each sequence number ist just
>> used once.
>>
>> Another output field is a random UUID generated on startup of a Task (in
>> the open-method of the (c)-keyed process function).
>> In the output I saw, that the sequence number started at 1 again on the
>> same time when the scaling happend and the change of the UUID fitted also
>> to that.
>>
>> Some output:
>>
>> "key1":151978,"taskid":"a406fb52-5f0c-11ec-bea3-0255ac100029","recordSequenceNumber":2
>>
>> "key1":151978,"taskid":"a406fb52-5f0c-11ec-bea3-0255ac100029","recordSequenceNumber":3
>>
>> "key1":151978,"taskid":"a406fb52-5f0c-11ec-bea3-0255ac100029","recordSequenceNumber":4
>>
>> "key1":151978,"taskid":"a406fb52-5f0c-11ec-bea3-0255ac100029","recordSequenceNumber":5
>>
>> "key1":151978,"taskid":"68731ce4-5f11-11ec-a5cd-0255ac100047",,"recordSequenceNumber":1
>>
>> "key1":151978,"taskid":"68731ce4-5f11-11ec-a5cd-0255ac100047","recordSequenceNumber":3
>>
>> "key1":151978,"taskid":"68731ce4-5f11-11ec-a5cd-0255ac100047","recordSequenceNumber":2
>>
>> "key1":151978,"taskid":"68731ce4-5f11-11ec-a5cd-0255ac100047","recordSequenceNumber":4
>>
>> "key1":151978,"taskid":"68731ce4-5f11-11ec-a5cd-0255ac100047","recordSequenceNumber":5
>>
>> The percentage is then the number of output records which uses a already
>> given sequence number (for each key1) compared to all output records.
>>
>>
>> Right now I change the flink job so, that instead of the
>> reinterpretAsKeyedStream it has an explict keyBy before function (c) again.
>> I will check if this will fix the issue in my job and test setup.
>>
>> BR
>> Martin
>>
>>
>>
>> David Morávek schrieb am 07.01.2022 09:37 (GMT +01:00):
>>
>> Hi Martin,
>>
>> _reinterpretAsKeyedStream_ should definitely work with the reactive mode,
>> if it doesn't it's a bug that needs to be fixed
>>
>>> For test use cases (3) and (4) the state of the keyed process function
>>> (c) seems only available for around 50% of the events processed after
>>> scale-in/fail.
>>>
>> Can you please provide details on how did you verify this?
>>
>> Best,
>> D.
>>
>> On Fri, Jan 7, 2022 at 8:10 AM Martin  wrote:
>>
>>> Hi,
>>>
>>> typo: "I run that job via Native Kubernetes deployment and use elastic
>>> scaling in reactive mode."
>>> -> I run it of course via standalone kubernetes deployment, to make
>>> elastic scaling possible.
>>>
>>> BR
>>> Martin
>>>
>>>
>>>
>>> mar...@sonicdev.de schrieb am 06.01.2022 21:38 (GMT +01:00):
>>>
>>> Hi,
>>>
>>> I have a job where I do a keyBy'd process function (a), then on the
>>> downstream a normal process function (b) and and then I use
>>> reinterpretAsKeyedStream to have yet another keyBy'd process function (c).
>>>
>>> The last keyed process function use keyed state for a increasing
>>> sequence number.
>>>
>>> I run that job via Native Kubernetes deployment and use elastic scaling
>>> in reactive mode.
>>>
>>> I use Flink 1.14.2.
>>>
>>> I test that job on four use cases: (1) static parallelism, (2) scale
>>> out, (3) scale-in, (4) task manager file*.
>>>
>>> * via kill -SIGTERM inside the container for the flink JVM
>>>
>>> For test use cases (1) and (2) everything is fine.
>>>
>>> For test use cases (3) and (4) the state of the keyed process function
>>> (c) seems only available for around 50% of the events processed after
>>> scale-in/fail.
>>>
>>> Is the reinterpretAsKeyedStream feature in general usable with Elastic
>>> Scaling in Reactive Mode in Flink 1.14.2?
>>>
>>> If yes, already any ideas what the root cause could be?
>>>
>>>
>>>
>>> BR
>>> Martin
>>>
>>>
>>>
>>>


Re: reinterpretAsKeyedStream and Elastic Scaling in Reactive Mode

2022-01-07 Thread Martin
Hello David,
right now I cant share the complete code. But I will try in some days to simplify it and reduce the code to still trigger the issue.
First I will check, if the explict keyBy instead of the reinterpretAsKeyedStream  fix the issue.If yes, that would assume - for me - that its a bug with reinterpretAsKeyedStream and the elastic scaling.If no, its probably another issue caused by my code, instead of Flink.
BRMartin

David Morávek schrieb am 07.01.2022 10:22 (GMT +01:00):


Would you be able share the code of your test by any chance?
 
Best,
D.



On Fri, Jan 7, 2022 at 10:06 AM Martin  wrote:


Hello David,
I have a test setup, where the input is all the time the same.After processing, I check all the output if each sequence number ist just used once.
Another output field is a random UUID generated on startup of a Task (in the open-method of the (c)-keyed process function).In the output I saw, that the sequence number started at 1 again on the same time when the scaling happend and the change of the UUID fitted also to that.
Some output:
"key1":151978,"taskid":"a406fb52-5f0c-11ec-bea3-0255ac100029","recordSequenceNumber":2
"key1":151978,"taskid":"a406fb52-5f0c-11ec-bea3-0255ac100029","recordSequenceNumber":3
"key1":151978,"taskid":"a406fb52-5f0c-11ec-bea3-0255ac100029","recordSequenceNumber":4
"key1":151978,"taskid":"a406fb52-5f0c-11ec-bea3-0255ac100029","recordSequenceNumber":5
"key1":151978,"taskid":"68731ce4-5f11-11ec-a5cd-0255ac100047",,"recordSequenceNumber":1
"key1":151978,"taskid":"68731ce4-5f11-11ec-a5cd-0255ac100047","recordSequenceNumber":3
"key1":151978,"taskid":"68731ce4-5f11-11ec-a5cd-0255ac100047","recordSequenceNumber":2
"key1":151978,"taskid":"68731ce4-5f11-11ec-a5cd-0255ac100047","recordSequenceNumber":4
"key1":151978,"taskid":"68731ce4-5f11-11ec-a5cd-0255ac100047","recordSequenceNumber":5
 
The percentage is then the number of output records which uses a already given sequence number (for each key1) compared to all output records.
 
Right now I change the flink job so, that instead of the reinterpretAsKeyedStream it has an explict keyBy before function (c) again.I will check if this will fix the issue in my job and test setup.
BRMartin
 
David Morávek schrieb am 07.01.2022 09:37 (GMT +01:00):


Hi Martin,
 
_reinterpretAsKeyedStream_ should definitely work with the reactive mode, if it doesn't it's a bug that needs to be fixed


For test use cases (3) and (4) the state of the keyed process function (c) seems only available for around 50% of the events processed after scale-in/fail.


Can you please provide details on how did you verify this?
 
Best,
D.



On Fri, Jan 7, 2022 at 8:10 AM Martin  wrote:


Hi,
typo: "I run that job via Native Kubernetes deployment and use elastic scaling in reactive mode."-> I run it of course via standalone kubernetes deployment, to make elastic scaling possible.
BRMartin
 
mar...@sonicdev.de schrieb am 06.01.2022 21:38 (GMT +01:00):

Hi,
I have a job where I do a keyBy'd process function (a), then on the downstream a normal process function (b) and and then I use reinterpretAsKeyedStream to have yet another keyBy'd process function (c).
The last keyed process function use keyed state for a increasing sequence number.
I run that job via Native Kubernetes deployment and use elastic scaling in reactive mode.
I use Flink 1.14.2.
I test that job on four use cases: (1) static parallelism, (2) scale out, (3) scale-in, (4) task manager file*.
* via kill -SIGTERM inside the container for the flink JVM
For test use cases (1) and (2) everything is fine.
For test use cases (3) and (4) the state of the keyed process function (c) seems only available for around 50% of the events processed after scale-in/fail.
Is the reinterpretAsKeyedStream feature in general usable with Elastic Scaling in Reactive Mode in Flink 1.14.2?
If yes, already any ideas what the root cause could be?
 
BRMartin
 











Re: Metaspace OOM : class loaders not being GC

2022-01-07 Thread David Morávek
Hi David,

If I understand the problem correctly, there is really nothing we can do
here. Soft references are garbage collected when there is a high memory
pressure and the garbage collector needs to free up more memory. The
problem here is that the GC doesn't really take high memory pressure on
Metaspace into the account here.

I guess you might try to tweak _SoftRefLRUPolicyMSPerMB_ [1], but this
might have some other consequences. Also this behavior might be highly
dependent on the garbage collector you're using.


>From the docs [1]:

-XX:SoftRefLRUPolicyMSPerMB=*time*

Sets the amount of time (in milliseconds) a softly reachable object is kept
active on the heap after the last time it was referenced. The default value
is one second of lifetime per free megabyte in the heap. The
-XX:SoftRefLRUPolicyMSPerMB option accepts integer values representing
milliseconds per one megabyte of the current heap size (for Java HotSpot
Client VM) or the maximum possible heap size (for Java HotSpot Server VM).
This difference means that the Client VM tends to flush soft references
rather than grow the heap, whereas the Server VM tends to grow the heap
rather than flush soft references. In the latter case, the value of the -Xmx
option has a significant effect on how quickly soft references are garbage
collected.

The following example shows how to set the value to 2.5 seconds:

-XX:SoftRefLRUPolicyMSPerMB=2500



[1] https://docs.oracle.com/javase/8/docs/technotes/tools/unix/java.html

Best,
D.

On Thu, Jan 6, 2022 at 3:13 AM Caizhi Weng  wrote:

> Hi!
>
> As far as I remember this is a known issue a few years ago but Flink
> currently has no solution to this (correct me if I'm wrong). I see that
> you're running jobs on a yarn session. Could you switch to yarn-per-job
> mode (where JM and TMs are created and destroyed for each job) for a
> workaround?
>
> David Clutter  于2022年1月4日周二 23:39写道:
>
>> I am seeing an issue with class loaders not being GCed and the metaspace
>> eventually OOM.  Here is my setup:
>>
>> - Flink 1.13.1 on EMR using JDK 8 in session mode
>> - Job manager is a long-running yarn session
>> - New jobs are submitted every 5m (and typically run for less than 5m)
>>
>> I find that after a few hours the job manager gets killed with Metaspace
>> OOM.  I tried increasing the Metaspace for the job manager but that only
>> delays the OOM.
>>
>> I did some debugging using jcmd and I noticed that the size of the
>> classes loaded is always increasing.  Next I did a heap dump and found that
>> instances of org.apache.flink.util.ChildFirstClassLoader are present
>> long after the jobs complete.  Checking the GC roots I found that there is
>> a reference in java.io.ObjectStreamClass$Caches.  Seems to be this JDK
>> issue: https://bugs.openjdk.java.net/browse/JDK-8277072
>>
>> Curious if there are any workarounds for this situation?
>>
>>


Serving Machine Learning models

2022-01-07 Thread Sonia-Florina Horchidan
Hello,


I recently started looking into serving Machine Learning models for streaming 
data in Flink. To give more context, that would involve training a model 
offline (using PyTorch or TensorFlow), and calling it from inside a Flink job 
to do online inference on newly arrived data. I have found multiple 
discussions, presentations, and tools that could achieve this, and it seems 
like the two alternatives would be: (1) wrap the pre-trained models in a HTTP 
service (such as PyTorch Serve [1]) and let Flink do async calls for model 
scoring, or (2) convert the models into a standardized format (e.g., ONNX [2]), 
pre-load the model in memory for every task manager (or use external storage if 
needed) and call it for each new data point.

Both approaches come with a set of advantages and drawbacks and, as far as I 
understand, there is no "silver bullet", since one approach could be more 
suitable than the other based on the application requirements. However, I would 
be curious to know what would be the "recommended" methods for model serving 
(if any) and what approaches are currently adopted by the users in the wild.


[1] https://pytorch.org/serve/

[2] https://onnx.ai/


Best regards,

Sonia


 [Kth Logo]

Sonia-Florina Horchidan
PhD Student
KTH Royal Institute of Technology
Software and Computer Systems (SCS)
School of Electrical Engineering and Computer Science (EECS)
Mobil: +46769751562
sf...@kth.se,  
www.kth.se



Re: reinterpretAsKeyedStream and Elastic Scaling in Reactive Mode

2022-01-07 Thread David Morávek
Would you be able share the code of your test by any chance?

Best,
D.

On Fri, Jan 7, 2022 at 10:06 AM Martin  wrote:

> Hello David,
>
> I have a test setup, where the input is all the time the same.
> After processing, I check all the output if each sequence number ist just
> used once.
>
> Another output field is a random UUID generated on startup of a Task (in
> the open-method of the (c)-keyed process function).
> In the output I saw, that the sequence number started at 1 again on the
> same time when the scaling happend and the change of the UUID fitted also
> to that.
>
> Some output:
>
> "key1":151978,"taskid":"a406fb52-5f0c-11ec-bea3-0255ac100029","recordSequenceNumber":2
>
> "key1":151978,"taskid":"a406fb52-5f0c-11ec-bea3-0255ac100029","recordSequenceNumber":3
>
> "key1":151978,"taskid":"a406fb52-5f0c-11ec-bea3-0255ac100029","recordSequenceNumber":4
>
> "key1":151978,"taskid":"a406fb52-5f0c-11ec-bea3-0255ac100029","recordSequenceNumber":5
>
> "key1":151978,"taskid":"68731ce4-5f11-11ec-a5cd-0255ac100047",,"recordSequenceNumber":1
>
> "key1":151978,"taskid":"68731ce4-5f11-11ec-a5cd-0255ac100047","recordSequenceNumber":3
>
> "key1":151978,"taskid":"68731ce4-5f11-11ec-a5cd-0255ac100047","recordSequenceNumber":2
>
> "key1":151978,"taskid":"68731ce4-5f11-11ec-a5cd-0255ac100047","recordSequenceNumber":4
>
> "key1":151978,"taskid":"68731ce4-5f11-11ec-a5cd-0255ac100047","recordSequenceNumber":5
>
> The percentage is then the number of output records which uses a already
> given sequence number (for each key1) compared to all output records.
>
>
> Right now I change the flink job so, that instead of the
> reinterpretAsKeyedStream it has an explict keyBy before function (c) again.
> I will check if this will fix the issue in my job and test setup.
>
> BR
> Martin
>
>
>
> David Morávek schrieb am 07.01.2022 09:37 (GMT +01:00):
>
> Hi Martin,
>
> _reinterpretAsKeyedStream_ should definitely work with the reactive mode,
> if it doesn't it's a bug that needs to be fixed
>
>> For test use cases (3) and (4) the state of the keyed process function
>> (c) seems only available for around 50% of the events processed after
>> scale-in/fail.
>>
> Can you please provide details on how did you verify this?
>
> Best,
> D.
>
> On Fri, Jan 7, 2022 at 8:10 AM Martin  wrote:
>
>> Hi,
>>
>> typo: "I run that job via Native Kubernetes deployment and use elastic
>> scaling in reactive mode."
>> -> I run it of course via standalone kubernetes deployment, to make
>> elastic scaling possible.
>>
>> BR
>> Martin
>>
>>
>>
>> mar...@sonicdev.de schrieb am 06.01.2022 21:38 (GMT +01:00):
>>
>> Hi,
>>
>> I have a job where I do a keyBy'd process function (a), then on the
>> downstream a normal process function (b) and and then I use
>> reinterpretAsKeyedStream to have yet another keyBy'd process function (c).
>>
>> The last keyed process function use keyed state for a increasing sequence
>> number.
>>
>> I run that job via Native Kubernetes deployment and use elastic scaling
>> in reactive mode.
>>
>> I use Flink 1.14.2.
>>
>> I test that job on four use cases: (1) static parallelism, (2) scale out,
>> (3) scale-in, (4) task manager file*.
>>
>> * via kill -SIGTERM inside the container for the flink JVM
>>
>> For test use cases (1) and (2) everything is fine.
>>
>> For test use cases (3) and (4) the state of the keyed process function
>> (c) seems only available for around 50% of the events processed after
>> scale-in/fail.
>>
>> Is the reinterpretAsKeyedStream feature in general usable with Elastic
>> Scaling in Reactive Mode in Flink 1.14.2?
>>
>> If yes, already any ideas what the root cause could be?
>>
>>
>>
>> BR
>> Martin
>>
>>
>>
>>


Re: Flink Kinesis Producer con't connect with AWS credentials

2022-01-07 Thread Matthias Pohl
I'm adding Danny to this thread. He might be able to help on this topic.

Best,
Matthias

On Mon, Jan 3, 2022 at 4:57 PM Daniel Vol  wrote:

> I definitely do, and you can see in my initial post that this is the first
> thing I tried but I got warnings and it doesn't use credentials I supplied.
> Though you are right that I do find a solution - using credentialProvider
> object and injecting keys as a java env variables through:
> -yd "env.java.opts.taskmanager=-Daws.secretKey=xxx -Daws.accessKeyId=xxx"
> -yd "env.java.opts.jobmanager=-Daws.secretKey=xxx -Daws.accessKeyId=xxx"
>
> Though I do expect from producer to be able to get parameters as per
> documentation (exactly as consumer do) so probably it is a good idea to
> open a ticket for this behavior:
>
> val props = new Properties
>
> props.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, kinesisConfig.accessKeyId.get)
>
> props.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, 
> kinesisConfig.secretKey.get)
>
> [Window(EventTimeSessionWindows(180), EventTimeTrigger,
> ScalaProcessWindowFunctionWrapper) -> Sink: Unnamed (1/1)#0] WARN
> o.a.f.k.s.c.a.s.k.producer.KinesisProducerConfiguration - Property
> aws.credentials.provider.basic.secretkey ignored as there is no
> corresponding set method in KinesisProducerConfiguration
> [Window(EventTimeSessionWindows(180), EventTimeTrigger,
> ScalaProcessWindowFunctionWrapper) -> Sink: Unnamed (1/1)#0] WARN
> o.a.f.k.s.c.a.s.k.producer.KinesisProducerConfiguration - Property
> aws.region ignored as there is no corresponding set method in
> KinesisProducerConfiguration
> [Window(EventTimeSessionWindows(180), EventTimeTrigger,
> ScalaProcessWindowFunctionWrapper) -> Sink: Unnamed (1/1)#0] WARN
> o.a.f.k.s.c.a.s.k.producer.KinesisProducerConfiguration - Property
> aws.credentials.provider.basic.accesskeyid ignored as there is no
> corresponding set method in KinesisProducerConfiguration
>
> On Mon, Jan 3, 2022 at 5:34 PM Matthias Pohl 
> wrote:
>
>> Hi Daniel,
>> I'm assuming you already looked into the Flink documentation for this
>> topic [1]? I'm gonna add Fabian to this thread. Maybe, he's able to help
>> out here.
>>
>> Matthias
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/connectors/kinesis.html#kinesis-producer
>>
>> On Fri, Dec 31, 2021 at 1:06 PM Daniel Vol  wrote:
>>
>>> Hi,
>>>
>>> I am trying to run a Flink on GCP with the current source and
>>> destination on Kinesis on AWS.
>>> I have configured the access key on AWS to be able to connect.
>>> I am running Flink 1.12.1
>>> In flink I use the following code (Scala 2.12.2)
>>>
>>> val props = new Properties
>>>
>>> props.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, 
>>> kinesisConfig.accessKeyId.get)
>>>
>>> props.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, 
>>> kinesisConfig.secretKey.get)
>>>
>>>
>>> It works just fine to get connection to consumer, but not to producer.
>>>
>>> In TaskManager stdout log I see the following:
>>>
>>> [Window(EventTimeSessionWindows(180), EventTimeTrigger, 
>>> ScalaProcessWindowFunctionWrapper) -> Sink: Unnamed (1/1)#0] WARN  
>>> o.a.f.k.s.c.a.s.k.producer.KinesisProducerConfiguration  - Property 
>>> aws.credentials.provider.basic.secretkey ignored as there is no 
>>> corresponding set method in KinesisProducerConfiguration
>>> [Window(EventTimeSessionWindows(180), EventTimeTrigger, 
>>> ScalaProcessWindowFunctionWrapper) -> Sink: Unnamed (1/1)#0] WARN  
>>> o.a.f.k.s.c.a.s.k.producer.KinesisProducerConfiguration  - Property 
>>> aws.region ignored as there is no corresponding set method in 
>>> KinesisProducerConfiguration
>>> [Window(EventTimeSessionWindows(180), EventTimeTrigger, 
>>> ScalaProcessWindowFunctionWrapper) -> Sink: Unnamed (1/1)#0] WARN  
>>> o.a.f.k.s.c.a.s.k.producer.KinesisProducerConfiguration  - Property 
>>> aws.credentials.provider.basic.accesskeyid ignored as there is no 
>>> corresponding set method in KinesisProducerConfiguration
>>>
>>> Then I have tried a different approach: to create AWSCredentialsProvider 
>>> object with key + secret and add it by:
>>>
>>> (as it have setCredentialsProvider method)
>>>
>>> class CredentialsProvider(config: KinesisConfig) extends 
>>> AWSCredentialsProvider with Serializable {
>>>   override def getCredentials: AWSCredentials =
>>> new BasicAWSCredentials(config.accessKeyId.get, config.secretKey.get)
>>>
>>>   override def refresh(): Unit = {}
>>> }
>>>
>>> val credentialsProvider = new CredentialsProvider(kinesisConfig)
>>>
>>> producerConfig.put("CredentialsProvider", credentialsProvider)
>>>
>>> But then I get different exceptions that the process can't find access_key 
>>> and secret key.
>>>
>>> [kpl-daemon-] ERROR 
>>> o.a.f.k.s.c.a.services.kinesis.producer.KinesisProducer  - Error in child 
>>> process
>>> java.lang.RuntimeException: Error running child process
>>> at 
>>> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.Daemon.fatalError(Daemon.java:533)
>>> at 

Skewed Data when joining tables using Flink SQL

2022-01-07 Thread Anne Lai
Hi,

I have a Flink batch job that needs to join a large skewed table with a
smaller table, and because records are not evenly distributed to each
subtask, it always fails with a "too much data in partition" error.
I explored using DataStream API to broadcast the smaller tables as a
broadcast state, but it is not ideal since it is kept in-memory on all
downstream operators and even the smaller table is still too large to fit
into the memory. Using broadcast state would also introduce cold start
problems where the order of records processed in the connected stream can't
be guaranteed. As for coGroup transformation, it requires time windowing
but both tables are un-windowed.

Is there any workaround or ideas that I can try on? Thanks!

Best,
Anne


Re: reinterpretAsKeyedStream and Elastic Scaling in Reactive Mode

2022-01-07 Thread Martin
Hello David,
I have a test setup, where the input is all the time the same.After processing, I check all the output if each sequence number ist just used once.
Another output field is a random UUID generated on startup of a Task (in the open-method of the (c)-keyed process function).In the output I saw, that the sequence number started at 1 again on the same time when the scaling happend and the change of the UUID fitted also to that.
Some output:
"key1":151978,"taskid":"a406fb52-5f0c-11ec-bea3-0255ac100029","recordSequenceNumber":2
"key1":151978,"taskid":"a406fb52-5f0c-11ec-bea3-0255ac100029","recordSequenceNumber":3
"key1":151978,"taskid":"a406fb52-5f0c-11ec-bea3-0255ac100029","recordSequenceNumber":4
"key1":151978,"taskid":"a406fb52-5f0c-11ec-bea3-0255ac100029","recordSequenceNumber":5
"key1":151978,"taskid":"68731ce4-5f11-11ec-a5cd-0255ac100047",,"recordSequenceNumber":1
"key1":151978,"taskid":"68731ce4-5f11-11ec-a5cd-0255ac100047","recordSequenceNumber":3
"key1":151978,"taskid":"68731ce4-5f11-11ec-a5cd-0255ac100047","recordSequenceNumber":2
"key1":151978,"taskid":"68731ce4-5f11-11ec-a5cd-0255ac100047","recordSequenceNumber":4
"key1":151978,"taskid":"68731ce4-5f11-11ec-a5cd-0255ac100047","recordSequenceNumber":5
 
The percentage is then the number of output records which uses a already given sequence number (for each key1) compared to all output records.
 
Right now I change the flink job so, that instead of the reinterpretAsKeyedStream it has an explict keyBy before function (c) again.I will check if this will fix the issue in my job and test setup.
BRMartin
 

David Morávek schrieb am 07.01.2022 09:37 (GMT +01:00):


Hi Martin,
 
_reinterpretAsKeyedStream_ should definitely work with the reactive mode, if it doesn't it's a bug that needs to be fixed


For test use cases (3) and (4) the state of the keyed process function (c) seems only available for around 50% of the events processed after scale-in/fail.


Can you please provide details on how did you verify this?
 
Best,
D.



On Fri, Jan 7, 2022 at 8:10 AM Martin  wrote:


Hi,
typo: "I run that job via Native Kubernetes deployment and use elastic scaling in reactive mode."-> I run it of course via standalone kubernetes deployment, to make elastic scaling possible.
BRMartin
 
mar...@sonicdev.de schrieb am 06.01.2022 21:38 (GMT +01:00):

Hi,
I have a job where I do a keyBy'd process function (a), then on the downstream a normal process function (b) and and then I use reinterpretAsKeyedStream to have yet another keyBy'd process function (c).
The last keyed process function use keyed state for a increasing sequence number.
I run that job via Native Kubernetes deployment and use elastic scaling in reactive mode.
I use Flink 1.14.2.
I test that job on four use cases: (1) static parallelism, (2) scale out, (3) scale-in, (4) task manager file*.
* via kill -SIGTERM inside the container for the flink JVM
For test use cases (1) and (2) everything is fine.
For test use cases (3) and (4) the state of the keyed process function (c) seems only available for around 50% of the events processed after scale-in/fail.
Is the reinterpretAsKeyedStream feature in general usable with Elastic Scaling in Reactive Mode in Flink 1.14.2?
If yes, already any ideas what the root cause could be?
 
BRMartin
 







Re: reinterpretAsKeyedStream and Elastic Scaling in Reactive Mode

2022-01-07 Thread David Morávek
Hi Martin,

_reinterpretAsKeyedStream_ should definitely work with the reactive mode,
if it doesn't it's a bug that needs to be fixed

> For test use cases (3) and (4) the state of the keyed process function (c)
> seems only available for around 50% of the events processed after
> scale-in/fail.
>
Can you please provide details on how did you verify this?

Best,
D.

On Fri, Jan 7, 2022 at 8:10 AM Martin  wrote:

> Hi,
>
> typo: "I run that job via Native Kubernetes deployment and use elastic
> scaling in reactive mode."
> -> I run it of course via standalone kubernetes deployment, to make
> elastic scaling possible.
>
> BR
> Martin
>
>
>
> mar...@sonicdev.de schrieb am 06.01.2022 21:38 (GMT +01:00):
>
> Hi,
>
> I have a job where I do a keyBy'd process function (a), then on the
> downstream a normal process function (b) and and then I use
> reinterpretAsKeyedStream to have yet another keyBy'd process function (c).
>
> The last keyed process function use keyed state for a increasing sequence
> number.
>
> I run that job via Native Kubernetes deployment and use elastic scaling in
> reactive mode.
>
> I use Flink 1.14.2.
>
> I test that job on four use cases: (1) static parallelism, (2) scale out,
> (3) scale-in, (4) task manager file*.
>
> * via kill -SIGTERM inside the container for the flink JVM
>
> For test use cases (1) and (2) everything is fine.
>
> For test use cases (3) and (4) the state of the keyed process function (c)
> seems only available for around 50% of the events processed after
> scale-in/fail.
>
> Is the reinterpretAsKeyedStream feature in general usable with Elastic
> Scaling in Reactive Mode in Flink 1.14.2?
>
> If yes, already any ideas what the root cause could be?
>
>
>
> BR
> Martin
>
>
>
>


Re: Moving off of TypeInformation in Flink 1.11

2022-01-07 Thread Francesco Guardiani
Hi Sofya,
DataStream API doesn't use DataTypes, but it still uses TypeInformation.
DataTypes and LogicalTypes are relevant only for Table API.

If I understood what you're trying to do, you don't need to manually
transform to Row, but you only need to define the Schema when crossing the
boundary from DataStream to Table API through
StreamTableEnvironment#fromDataStream
.


Look at the javadoc of this method:
https://nightlies.apache.org/flink/flink-docs-release-1.14/api/java/org/apache/flink/table/api/bridge/java/StreamTableEnvironment.html#fromDataStream-org.apache.flink.streaming.api.datastream.DataStream-org.apache.flink.table.api.Schema-

Hope it helps,
FG

On Thu, Jan 6, 2022 at 4:06 PM Sofya T. Irwin  wrote:

> Hi,
>
> I’m moving my Flink 1.11 application onto the Blink Table Planner; and off
>  of TypeInformation and onto DataTypes in preparation for upgrading Flink
>  to Flink 1.13 or higher.
>
> I’m having trouble moving off of TypeInformation.
>
> Specifically I have a section of code that maps a DataStream[Message] to a
> DataStream[Row]:
>
>   implicit val typeInformation: TypeInformation[Row] =
>  myObject.getProducedType
>   val resultStream: DataStream[Row] = dataStream.map(msg =>
> myTransform(msg))
>
> Note that myTransform() takes in a Message object and returns a Row object.
> Message is an internal class that we are using.
> The resultStream:DataStream[Row] is passed as a StreamTableSource[Row]
> later.
>
> If I comment out the implicit val above, I get a failure:
>
>   TableSource of type com.MyTableSourceFromDataStream returned a
> DataStream of data type
>   GenericType that does not match with the
> data type
>   ROW<`my_field_1` INT NOT NULL, ... `my_other_field` BIGINT> declared by
> the
>   TableSource.getProducedDataType() method. Please validate the
> implementation of the TableSource.
>
> I checked the Flink 1.11.4, Flink 1.13, and most recent sources and it
> seems that the implementation of DataStream.map() is not changed and still
> uses TypeInformation.
>
> https://github.com/apache/flink/blob/master/flink
> -streaming-scala/src/main/scala/org/apache/flink
> /streaming/api/scala/DataStream.scala#L657
>
> Based on the code above it seems that the issue is that Flink's
> DataStream.map function uses TypeInformation.
>
> I’m not sure if there’s an equivalent DataType implicit that I should be
> declaring instead. Or if I should be using some function other than map
>
> Do you have any suggestions for how to proceed? I'd like to completely
> move off of TypeInformation in my app.
>
> Thanks,
> Sofya
>


Re: adding elapsed times to events that form a transaction

2022-01-07 Thread Francesco Guardiani
Hi,
Are you using SQL or DataStream? For SQL you can use the Window TVF

feature, where the window size is the "max" elapsed time, and then inside
the window you pick the beginning and end event and join them.

Hope it helps,
FG

On Thu, Jan 6, 2022 at 3:25 PM HG  wrote:

> Hello all,
>
> My question is basically whether it is possible to group events by a key
> (these will belong to a specific transaction) and then calculate the
> elapsed times between them based on a timestamp that is present in the
> event.
> So a transaction my have x events all timestamped and with the
> transaction_id as key.
> Is it possible to
> 1. group them by the key
> 2. order by the timestamp,
> 3. calculate the elapsed times between the steps/event
> 4. add that elapsed time to the step/event
> 5. output the modified events to the sink
>
>
>
> Regards Hans
>


退订

2022-01-07 Thread janke