KeyedCoProcessFunction, processElement1, processElement2, onTimer timeout

2020-09-15 Thread Mazen Ezzeddine
Hey all, 

I am using the KeyedCoProcessFunction class in Flink DataStream APIs to
implement a timeout like use case. The scenario is as follows: I have an
input kafka topic and an output Kafka topic, a service reads from the input
topic processes it (for variable amount of time) and then publishes the
response in the output kafka topic.

Now to implement the timeout (must be using Flink datastream APIs), I have a
FlinkKafkaConsumer that reads from the kafka input topic, and another
FlinkKafkaConsumer that reads from the kafka output topic (once processed
and published by the external service). I am connecting the two streams, and
using the processElement1 I am registering a timer and waiting either that
the onTimer method be fired (a timeout is declared), or the processElement2
is fired before and hence I delete the timer and do not declare a timeout.

 In the situation described above can the scenario of reading an element
from the output topic (processElement2 is fired) happen before reading from
the input topic (processElement1 is fired) knowing that the time taken to
process the element by the external service might take seconds before
publishing it to the output topic, is it possible? is that how by design
Flink works, are there any way to force Flink connected streams to operate
based first comes first served. 

In such case what is the best case to implement the timeout functionality as
described above strictly using the Flink DataStream APIs, Any hint please?

Thank you so much.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Unknown datum type java.time.Instant: 2020-09-15T07:00:00Z

2020-09-15 Thread Lian Jiang
Hi,

i am using avro 1.9.1 + Flink 1.10.1 + Confluent Kafka 5.5. In Intellij, I
can see the FlinkKafkaConsumer already deserialized the upstream kafka
message. However, I got below error when this message is serialized during
pushToOperator. Per the stack trace, the reason is that AvroSerializer is
created by AvroFactory.fromSpecific() which creates its private copy of
specificData. This private specificData does not have logical type
information. This blocks the deserialized messages from being passed to
downstream operators. Any idea how to make this work? Appreciated very much!


org.apache.avro.AvroRuntimeException: Unknown datum type java.time.Instant:
2020-09-15T07:00:00Z
at org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:887)
at
org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:420)
at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:850)
at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1280)
at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199)
at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1261)
at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199)
at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1280)
at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199)
at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1261)
at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199)
at
org.apache.flink.formats.avro.typeutils.AvroSerializer.copy(AvroSerializer.java:242)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:715)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
at
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
at
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185)
at
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)


Disable WAL in RocksDB recovery

2020-09-15 Thread Juha Mynttinen
Hello there,

I'd like to bring to discussion a previously discussed topic - disabling WAL in 
RocksDB recovery.

It's clear that WAL is not needed during the process, the reason being that the 
WAL is never read, so there's no need to write it.

AFAIK the last thing that was done with WAL during recovery is an attempt to 
remove it and later reverting that removal 
(https://issues.apache.org/jira/browse/FLINK-8922). If I interpret the comments 
in the ticket correctly, what happened was that a) WAL was kept in the 
recovery, 2) it's unknown why removing WAL causes segfault.

What can be seen in the ticket is that having WAL causes a significant 
performance penalty. Thus, getting rid of WAL would be a very nice performance 
improvement. I think it'd be worth to creating a new JIRA ticket at least as a 
reminder that WAL should be removed?

I'm planning adding an experimental flag to remove WAL in the environment I'm 
using Flink and trying it out. If the flag is made configurable, WAL can always 
be re-enabled if removing it causes issues.

Thoughts?

Regards,
Juha



Flink Table SQL, Kafka, partitions and unnecessary shuffling

2020-09-15 Thread Dan Hill
How do I avoid unnecessary reshuffles when using Kafka as input?  My keys
in Kafka are ~userId.  The first few stages do joins that are usually
(userId, someOtherKeyId).  It makes sense for these joins to stay on the
same machine and avoid unnecessary shuffling.

What's the best way to avoid unnecessary shuffling when using Table SQL
interface?  I see PARTITION BY on TABLE.  I'm not sure how to specify the
keys for Kafka.


Re: Why setAllVerticesInSameSlotSharingGroupByDefault is set to false in batch mode

2020-09-15 Thread Zhu Zhu
Hi Zheng,

To divide managed memory for operators[1], we need to consider which tasks
will
run in the same slot. In batch jobs, vertices in different regions may not
run at
the same time. If we put them in the same slot sharing group, running tasks
may run slower with less managed memory, while managed memory reserved
for tasks that are not running yet will be wasted.

More slots can be requested as a result. However, it's not necessary to add
more
containers. One slot will serve fewer tasks, this means you can decrease
the slot
size (via increase "taskmanager.numberOfTaskSlots") so that the previous
number
of containers can be enough. This is because you are still running that
many tasks
at the same time, although they are spread into more slots.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-53%3A+Fine+Grained+Operator+Resource+Management

Thanks,
Zhu

zheng faaron  于2020年9月16日周三 上午10:58写道:

> Hi All,
>
> I find we set AllVerticesInSameSlotSharingGroupByDefault to false in
> flink 1.10. It will make batch job request lots of containers. I'm not sure
> why we set it to false directly. I try to set it to true and find the batch
> job can run correctly with a small amount containers. Why  don't we add a
> configuration to let user configure it?
>
> Best,
> Faaron Zheng
>


Why setAllVerticesInSameSlotSharingGroupByDefault is set to false in batch mode

2020-09-15 Thread zheng faaron
Hi All,

I find we set AllVerticesInSameSlotSharingGroupByDefault to false in flink 
1.10. It will make batch job request lots of containers. I'm not sure why we 
set it to false directly. I try to set it to true and find the batch job can 
run correctly with a small amount containers. Why  don't we add a configuration 
to let user configure it?

Best,
Faaron Zheng


Fwd: I have a job with multiple Kafka sources. They all contain certain historical data.

2020-09-15 Thread hao kong
Hello guys,

I have a job with multiple Kafka sources. They all contain certain
historical data. If you use the events-time window, it will cause sources
with less data to cover more sources through water mark.


I can think of a solution, Implement a scheduler in the source phase, But
it is quite complicated to implement. Are ther otherbetter solutions?


Any suggestions?
Thanks!


Re: Flink Table API and not recognizing s3 plugins

2020-09-15 Thread Dan Hill
Sweet, this was the issue.  I got this to work by copying the s3 jar over
to plugins for the client container.

Thanks for all of the help!  The Table API is sweet!

On Mon, Sep 14, 2020 at 11:14 PM Dan Hill  wrote:

> Yes, the client runs in K8.  It uses a different K8 config than the Helm
> chart and does not load the plugins.  Does the client use the same plugin
> structure as the Flink job/task manager?  I can try using it tomorrow.
>
> Cool, that link would work too.
>
> Thanks, Arvid!
>
>
> On Mon, Sep 14, 2020 at 10:59 PM Arvid Heise  wrote:
>
>> Hi Dan,
>>
>> Are you running the client also in K8s? If so you need an initialization
>> step, where you add the library to the plugins directory. Putting it into
>> lib or into the user jar doesn't work anymore as we removed the shading in
>> s3 in Flink 1.10.
>>
>> The official Flink docker image has an easy way to add these plugins [1].
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/docker.html#using-plugins
>>
>> On Tue, Sep 15, 2020 at 6:40 AM Dan Hill  wrote:
>>
>>> Thanks for the update!
>>>
>>> I'm trying a bunch of combinations on the client side to get the S3
>>> Filesystem to be picked up correctly.  Most of my attempts involved
>>> building into the job jar (which I'm guessing won't work).  I then started
>>> getting issues with ClassCastExceptions.
>>>
>>> I might try a little more tomorrow (e.g. modifying the custom image).
>>> If I can't get it, I'll roll back to a previous Flink version that works.
>>>
>>> Caused by: java.lang.ClassCastException:
>>> org.codehaus.janino.CompilerFactory cannot be cast to
>>> org.codehaus.commons.compiler.ICompilerFactory
>>>
>>> at
>>> org.codehaus.commons.compiler.CompilerFactoryFactory.getCompilerFactory(CompilerFactoryFactory.java:129)
>>>
>>> at
>>> org.codehaus.commons.compiler.CompilerFactoryFactory.getDefaultCompilerFactory(CompilerFactoryFactory.java:79)
>>>
>>> at
>>> org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.compile(JaninoRelMetadataProvider.java:431)
>>>
>>> ... 51 more
>>>
>>>
>>> On Mon, Sep 14, 2020 at 7:03 PM Jingsong Li 
>>> wrote:
>>>
 Hi Dan,

 I think Arvid and Dawid are right, as a workaround, you can try making
 S3Filesystem works in the client. But for a long term solution, we can fix
 it.

 I created https://issues.apache.org/jira/browse/FLINK-19228 for
 tracking this.

 Best,
 Jingsong

 On Mon, Sep 14, 2020 at 3:57 PM Dawid Wysakowicz <
 dwysakow...@apache.org> wrote:

> Hi Dan,
>
> As far as I checked in the code, the FileSystemSink will try to create
> staging directories from the client. I think it might be problematic, as
> your case shows. We might need to revisit that part. I am cc'ing Jingsong
> who worked on the FileSystemSink.
>
> As a workaround you might try putting the s3 plugin on the CLI
> classpath (not sure if plugins work for the CLI through the /plugins
> directory).
>
> Best,
>
> Dawid
> On 10/09/2020 22:13, Dan Hill wrote:
>
> This is running on my local minikube and is trying to hit minio.
>
> On Thu, Sep 10, 2020 at 1:10 PM Dan Hill 
> wrote:
>
>> I'm using this Helm chart
>> .
>> I start the job by building an image with the job jar and using kubectl
>> apply to do a flink run with the jar.
>>
>> The log4j.properties on jobmanager and taskmanager have debug level
>> set and are pretty embedded into the Helm chart.  My log4j-cli.properties
>> is hacked on the CLI side.
>>
>> I thought I just needed the s3 plugins in the jobmanager and
>> taskmanager.  Do I need to have a similar plugin structure from the image
>> where I run 'flink run'?
>>
>>
>> On Thu, Sep 10, 2020 at 1:03 PM Dan Hill 
>> wrote:
>>
>>> Copying more of the log
>>>
>>> 2020-09-10 19:50:17,712 INFO
>>> org.apache.flink.client.cli.CliFrontend  [] -
>>> 
>>>
>>> 2020-09-10 19:50:17,718 INFO
>>> org.apache.flink.client.cli.CliFrontend  [] -  
>>> Starting
>>> Command Line Client (Version: 1.11.1, Scala: 2.12, Rev:7eb514a,
>>> Date:2020-07-15T07:02:09+02:00)
>>>
>>> 2020-09-10 19:50:17,719 INFO
>>> org.apache.flink.client.cli.CliFrontend  [] -  OS
>>> current user: root
>>>
>>> 2020-09-10 19:50:17,719 INFO
>>> org.apache.flink.client.cli.CliFrontend  [] -  
>>> Current
>>> Hadoop/Kerberos user: 
>>>
>>> 2020-09-10 19:50:17,719 INFO
>>> org.apache.flink.client.cli.CliFrontend  [] -  JVM:
>>> OpenJDK 64-Bit Server VM - Oracle Corporation - 1.8/25.265-b01
>>>
>>> 2020-09-10 19:50:17,719 INFO
>>> 

Re: Flink alert after database lookUp

2020-09-15 Thread Arvid Heise
Hi Sunitha,

dependency looks good. I'd probably bump the version to 1.1.0 though
(version is off-cycle to Flink as of now to accelerate releases of this
young feature).

Best,

Arvid

On Tue, Sep 15, 2020 at 5:10 PM s_penakalap...@yahoo.com <
s_penakalap...@yahoo.com> wrote:

> Hi Arvid,
>
> Thank you!!!
>
> Will check change data capture approach. Please confirm including
> dependency and adding sourceFunction should help us to achieve CDC.
>
> 
>   com.alibaba.ververica
>   flink-connector-postgre-cdc
>   1.0.0
> 
>
> Regards,
> Sunitha
>
>
> On Monday, September 14, 2020, 05:10:57 PM GMT+5:30, Arvid Heise <
> ar...@ververica.com> wrote:
>
>
> Hi Sunitha,
>
> to listen to changes in your database a change-data-capture approach
> should be taken [1], which is supported in Flink since 1.11.
>
> Basically, a tool like debezium [2] will monitor the changelog of the
> database and publish the result as a change stream, which can be ingested
> in Flink as another source. You can then use the change stream to build
> dynamic look-up tables in Flink and enrich your data as desired.
>
> Also have a look at this presentation for a better overview [3].
>
> [1]
> https://flink.apache.org/news/2020/07/06/release-1.11.0.html#table-apisql-support-for-change-data-capture-cdc
> [2] https://debezium.io/
> [3] https://noti.st/morsapaes/liQzgs/slides
>
> On Wed, Sep 9, 2020 at 11:13 AM Timo Walther  wrote:
>
> Flink's built-in JDBC connector will read the data only once. JDBC does
> not provide means to continously monitor a database table.
>
> It depends on the size of your database, if you parameter table is small
> it might be sufficient to write a simple Flink connector that
> periodically reads the table and ingests the data to the streaming
> pipeline. For larger database/streaming integrations, it might be worth
> to look into Kafka's Connect API. Also Debezium where you hook into
> database logs for retrieving continous data but this might be overkill
> for your usecase.
>
> The link that I've sent you to for streaming pattern slides should work
> after registration.
>
> Regards,
> Timo
>
>
> On 09.09.20 09:49, s_penakalap...@yahoo.com wrote:
> >
> > Hi Timo,
> >
> > Thank you for the suggestions.
> >
> > I see now both Process function and CEP approach will not fit in. Now if
> > I follow the third approach to stream the values from database() . Is it
> > possible to stream data continuously?
> >
> > If I follow the bellow approach, both I see one time load only not
> > continuously stream
> > Using JDBCInputFormat this will execute the query only once , so it will
> > not be a stream data. when we try to iterate source this may iterate
> > only on the data already fetched
> > Using RichFlatMapFunctions, in open() if I try to connect to DB again
> > this would be one time load. If I connect database in flatmap() then it
> > would lead to multiple hits to database.
> >
> > Request your help on how to continuously stream the data, If possible
> > sample source code for reference to stream database. Please help me
> > badly stuck.
> >
> > In the mail, I see you asked me to register. Are you referring to any
> > training here or any other registration.
> >
> >
> > Regards,
> > Sunitha.
> > On Tuesday, September 8, 2020, 08:19:49 PM GMT+5:30, Timo Walther
> >  wrote:
> >
> >
> > Hi Sunitha,
> >
> > what you are describing is a typical streaming enrichment. We need to
> > enrich the stream with some data from a database. There are different
> > strategies to handle this:
> >
> > 1) You are querying the database for every record. This is usually not
> > what you want because it would slow down your pipeline due to the
> > communication latenties to your database. It would also cause a lot of
> > pressure to the database in general.
> >
> > 2) You only query database from time to time and store the latest value
> > in a ProcessFunction ValueState or MapState.
> >
> > 3) You stream in the values as well and use connect() [1].
> >
> > In any case, I think CEP might not be useful for this case. If you
> > really want to do option 1, it might make sense to also checkout the SQL
> > API of Flink because it offers different kind of joins with very good
> > abstraction. `Join with a Temporal Table` offers a JDBC connector for
> > lookups in your database.
> >
> > If you like to use DataStream API, I would also recommend the Pattern
> > slides here [3] (unfortunately you have to register first).
> >
> > Regards,
> > Timo
> >
> > [1]
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/learn-flink/etl.html#connected-streams
> > [2]
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/joins.html
> > [3] https://training.ververica.com/decks/patterns/
> >
> >
> > On 07.09.20 17:25, s_penakalap...@yahoo.com
> >  wrote:
> >  > Hi All,
> >  >
> >  > I am new to Flink, request your help!!!
> >  >
> >  > My scenario :
> >  > 1> we receive Json messages at a v

Re: Flink alert after database lookUp

2020-09-15 Thread s_penakalap...@yahoo.com
 Hi Arvid,
Thank you!!!
Will check change data capture approach. Please confirm including dependency 
and adding sourceFunction should help us to achieve CDC.
  com.alibaba.ververica  
flink-connector-postgre-cdc  
1.0.0 wrote:  
 
 Hi Sunitha,
to listen to changes in your database a change-data-capture approach should be 
taken [1], which is supported in Flink since 1.11.
Basically, a tool like debezium [2] will monitor the changelog of the database 
and publish the result as a change stream, which can be ingested in Flink as 
another source. You can then use the change stream to build dynamic look-up 
tables in Flink and enrich your data as desired.
Also have a look at this presentation for a better overview [3].

[1] 
https://flink.apache.org/news/2020/07/06/release-1.11.0.html#table-apisql-support-for-change-data-capture-cdc[2]
 https://debezium.io/[3] https://noti.st/morsapaes/liQzgs/slides
On Wed, Sep 9, 2020 at 11:13 AM Timo Walther  wrote:

Flink's built-in JDBC connector will read the data only once. JDBC does 
not provide means to continously monitor a database table.

It depends on the size of your database, if you parameter table is small 
it might be sufficient to write a simple Flink connector that 
periodically reads the table and ingests the data to the streaming 
pipeline. For larger database/streaming integrations, it might be worth 
to look into Kafka's Connect API. Also Debezium where you hook into 
database logs for retrieving continous data but this might be overkill 
for your usecase.

The link that I've sent you to for streaming pattern slides should work 
after registration.

Regards,
Timo


On 09.09.20 09:49, s_penakalap...@yahoo.com wrote:
> 
> Hi Timo,
> 
> Thank you for the suggestions.
> 
> I see now both Process function and CEP approach will not fit in. Now if 
> I follow the third approach to stream the values from database() . Is it 
> possible to stream data continuously?
> 
> If I follow the bellow approach, both I see one time load only not 
> continuously stream
> Using JDBCInputFormat this will execute the query only once , so it will 
> not be a stream data. when we try to iterate source this may iterate 
> only on the data already fetched
> Using RichFlatMapFunctions, in open() if I try to connect to DB again 
> this would be one time load. If I connect database in flatmap() then it 
> would lead to multiple hits to database.
> 
> Request your help on how to continuously stream the data, If possible 
> sample source code for reference to stream database. Please help me 
> badly stuck.
> 
> In the mail, I see you asked me to register. Are you referring to any 
> training here or any other registration.
> 
> 
> Regards,
> Sunitha.
> On Tuesday, September 8, 2020, 08:19:49 PM GMT+5:30, Timo Walther 
>  wrote:
> 
> 
> Hi Sunitha,
> 
> what you are describing is a typical streaming enrichment. We need to
> enrich the stream with some data from a database. There are different
> strategies to handle this:
> 
> 1) You are querying the database for every record. This is usually not
> what you want because it would slow down your pipeline due to the
> communication latenties to your database. It would also cause a lot of
> pressure to the database in general.
> 
> 2) You only query database from time to time and store the latest value
> in a ProcessFunction ValueState or MapState.
> 
> 3) You stream in the values as well and use connect() [1].
> 
> In any case, I think CEP might not be useful for this case. If you
> really want to do option 1, it might make sense to also checkout the SQL
> API of Flink because it offers different kind of joins with very good
> abstraction. `Join with a Temporal Table` offers a JDBC connector for
> lookups in your database.
> 
> If you like to use DataStream API, I would also recommend the Pattern
> slides here [3] (unfortunately you have to register first).
> 
> Regards,
> Timo
> 
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/learn-flink/etl.html#connected-streams
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/joins.html
> [3] https://training.ververica.com/decks/patterns/
> 
> 
> On 07.09.20 17:25, s_penakalap...@yahoo.com 
>  wrote:
>  > Hi All,
>  >
>  > I am new to Flink, request your help!!!
>  >
>  > My scenario :
>  > 1> we receive Json messages at a very high frequency like 10,000
>  > messages / second
>  > 2> we need to raise an Alert for a particular user if there is any
>  > breach in threshold value against each attribute in Json.
>  > 3> These threshold values are part of my database table and can be
>  > frequently updated by different user.
>  > 4> In realtime I would like to know how to get latest data from the
>  > database.
>  >
>  > I tried using Flink CEP Pattern approach to generate alert. I would like
>  > to get some inputs on how I can implement the realtime lookup tables in
>  > Flink Java while monitoring alert, any sam

Re: Emit event to kafka when finish sink

2020-09-15 Thread Dawid Wysakowicz
Hi,

I am not sure if I understand your first solution, but it sounds rather
complicated.

I think implementing a custom operator could be a valid approach. You
would have to make sure it is run with parallelism of 1. You could
additionally implement a BoundedOneInput interface and notify the
external process in the endOfInput method. This is so also quite
involved solution.

Another idea is you could register a JobListener in the
StreamExecutionEnvironment. There you can execute some code based on the
status of the whole job.

Best,

Dawid

On 15/09/2020 11:00, Antonio Manzano wrote:
>
> Hello guys, 
>
> i would like to know if there is any possibility to emit an event when
> a sink has finished.
> To put it in context, I have a simple ETL (streaming bounded) that
> reads data from a database, maps, and inserts into another database.
> Once I finish inserting the data I want to issue an event to kafka to
> notify another process.
>
> I can think of some solutions, but I am not convinced which one is the
> best option:
>
>   * dividing the stream into two paths and with a session window, in
> onTimer,  check if the last record is already in the database
>   * Implement custom sink
>   * ...
>
>
> Any suggestions?
> Thanks!
> -- 
>
> Antonio Manzano Tejón
>
> Tel: 699 333 556
>
>


signature.asc
Description: OpenPGP digital signature


Re: How to schedule Flink Batch Job periodically or daily

2020-09-15 Thread s_penakalap...@yahoo.com
 Hi Arvid,
Thanks a lot.
Will check Airflow and Cron-job options.
Regards,Sunitha.
On Monday, September 14, 2020, 05:23:43 PM GMT+5:30, Arvid Heise 
 wrote:  
 
 Hi Sunitha,
oozie is a valid approach, but I'd recommend to evaluate Airflow first [1]. 
It's much better maintained and easier to use.

Both tools are more used to compose complex workflows though. If you just need 
a repeated execution, I'd go with cron jobs. For example, you can completely 
rely on K8s and use cron jobs [2] there if you run on K8s anyways (which I'd 
recommend).

[1] https://airflow.apache.org/[2] 
https://kubernetes.io/docs/concepts/workloads/controllers/cron-jobs/
On Fri, Sep 11, 2020 at 1:21 PM Robert Metzger  wrote:

Hi Sunitha,
(Note: You've emailed both the dev@ and user@ mailing list. Please only use the 
user@ mailing list for questions on how to use Flink. I'm moving the dev@ list 
to bcc)
Flink does not have facilities for scheduling batch jobs, and there are no 
plans to add such a feature (this is not in the scope of Flink, there are 
already a number of workflow management tools).

On Fri, Sep 11, 2020 at 1:10 PM s_penakalap...@yahoo.com 
 wrote:

Hi Team,
We have Flink Batch Jobs which needs to be scheduled as listed below:Case 1 :   
 2.00 UTC time  dailyCase 2 :    Periodically 2 hours onceCase 3:     Schedule 
based on an event
Request you to help me on this,  How to approach all the 3 use cases. Can we 
use Oozie workflows or any better approach.
Regards,Sunitha



-- 

Arvid Heise | Senior Java Developer




Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) 
Cheng      

Re: The rpc invocation size 13478509 exceeds the maximum akka framesize

2020-09-15 Thread Jake
Hi zheng,

It seem’s data is large. Resizing the framesize of akka will not working.

You can increase the parallelism.

Jake.


> On Sep 15, 2020, at 5:58 PM, zheng faaron  wrote:
> 
> Hi Zhu,
> 
> It's just a mistake in mail. It seems increase akka.framesize not works in 
> this scenario. 
> 
> Best,
> Faaron Zheng
> 
> From: Zhu Zhu 
> Sent: Tuesday, September 15, 2020 11:04:06 AM
> To: zheng faaron 
> Cc: user 
> Subject: Re: The rpc invocation size 13478509 exceeds the maximum akka 
> framesize
>  
> Hi Zheng,
> 
> Would you check whether the option "akka.framesize" is properly set?
> In your mail I saw a whitespace between "akka." and "framesize". Not sure if 
> it is a mistake in the mail or in the config?
> You can also check the printed config options in jobmaster log to see whether 
> your configuration takes effect.
> 
> Thanks,
> Zhu
> 
> zheng faaron mailto:faaronzh...@gmail.com>> 
> 于2020年9月14日周一 上午11:57写道:
> Hi all,
> 
> I find this exception when I run q71.sql in 10 T tpcds data with 400 
> parallelism. The flink version is 1.10, I try to increase the value of akka. 
> framesize to 20485760b and it not works. flink will set sink parallelism with 
> 1 when handle select query.  I'm not sure if that's the cause of the problem, 
> but it works fine when I increase the sink parallelism to 400.
> <20200914-111346.png><20200914-111353.png>



I have a job with multiple Kafka sources. They all contain certain historical data.

2020-09-15 Thread hao kong
Hello, I have a job with multiple Kafka sources. They all contain certain
historical data. If you use the events-time window, it will cause sources
with less data to cover more sources through water mark. Is there a
solution?


Emit event to kafka when finish sink

2020-09-15 Thread Antonio Manzano
Hello guys,

i would like to know if there is any possibility to emit an event when a
sink has finished.
To put it in context, I have a simple ETL (streaming bounded) that reads
data from a database, maps, and inserts into another database. Once I
finish inserting the data I want to issue an event to kafka to notify
another process.

I can think of some solutions, but I am not convinced which one is the best
option:

   - dividing the stream into two paths and with a session window, in
   onTimer,  check if the last record is already in the database
   - Implement custom sink
   - ...


Any suggestions?
Thanks!
-- 

Antonio Manzano Tejón

Tel: 699 333 556


Re: Flink DynamoDB stream connector losing records

2020-09-15 Thread Cranmer, Danny
Hi Jiawei,

I agree that the offset management mechanism uses the same code as Kinesis 
Stream Consumer and in theory should not lose exactly-once semantics. As Ying 
is alluding to, if your application is restarted and you have snapshotting 
disabled in AWS there is a chance that records can be lost between runs. 
However, if you have snapshotting enabled then the application should continue 
consuming records from the last processed sequence number.

I am happy to take a deeper look if you can provide more information/logs/code.

Thanks,

From: Ying Xu 
Date: Monday, 14 September 2020 at 19:48
To: Andrey Zagrebin 
Cc: Jiawei Wu , user 
Subject: RE: [EXTERNAL] Flink DynamoDB stream connector losing records


CAUTION: This email originated from outside of the organization. Do not click 
links or open attachments unless you can confirm the sender and know the 
content is safe.


Hi Jiawei:

Sorry for the delayed reply.  When you mention certain records getting skipped, 
is it from the same run or across different runs.  Any more specific details on 
how/when records are lost?

FlinkDynamoDBStreamsConsumer is built on top of FlinkKinesisConsumer , with 
similar offset management mechanism.  In theory it shouldn't lose exactly-once 
semantics in the case of getting throttled.  We haven't run it in any AWS 
kinesis analytics environment though.

Thanks.


On Thu, Sep 10, 2020 at 7:51 AM Andrey Zagrebin 
mailto:azagre...@apache.org>> wrote:
Generally speaking this should not be a problem for exactly-once but I am not 
familiar with the DynamoDB and its Flink connector.
Did you observe any failover in Flink logs?

On Thu, Sep 10, 2020 at 4:34 PM Jiawei Wu 
mailto:wujiawei5837...@gmail.com>> wrote:
And I suspect I have throttled by DynamoDB stream, I contacted AWS support but 
got no response except for increasing WCU and RCU.

Is it possible that Flink will lose exactly-once semantics when throttled?

On Thu, Sep 10, 2020 at 10:31 PM Jiawei Wu 
mailto:wujiawei5837...@gmail.com>> wrote:
Hi Andrey,

Thanks for your suggestion, but I'm using Kinesis analytics application which 
supports only Flink 1.8

Regards,
Jiawei

On Thu, Sep 10, 2020 at 10:13 PM Andrey Zagrebin 
mailto:azagre...@apache.org>> wrote:
Hi Jiawei,

Could you try Flink latest release 1.11?
1.8 will probably not get bugfix releases.
I will cc Ying Xu who might have a better idea about the DinamoDB source.

Best,
Andrey

On Thu, Sep 10, 2020 at 3:10 PM Jiawei Wu 
mailto:wujiawei5837...@gmail.com>> wrote:
Hi,

I'm using AWS kinesis analytics application with Flink 1.8. I am using the 
FlinkDynamoDBStreamsConsumer to consume DynamoDB stream records. But recently I 
found my internal state is wrong.

After I printed some logs I found some DynamoDB stream record are skipped and 
not consumed by Flink. May I know if someone encountered the same issue before? 
Or is it a known issue in Flink 1.8?

Thanks,
Jiawei


[ANNOUNCE] Weekly Community Update 2020/37

2020-09-15 Thread Konstantin Knauf
Dear community,

happy to share a belated update for the past week. This time with the
release of Flink 1.11.2, a couple of discussions and FLIPs on improving
Flink's APIs and dropping some baggage, most notably Scala 2.11, a new
unified sink API and a bit more.

Flink Development
==

* [releases] The vote for Flink 1.11.2 has passed today. Release
announcement will follow shortly. [1]

* [releases] Feature Freeze for Stateful Function 2.2.0 happened as planned
five days ago. Release candidate is expected soon. [2]

* [apis] Seth has started a discussion to drop Scala 2.11. Many in favor so
far. [3]

* [apis] Seth proposes to separate the concepts of "Statebackend" and
"Checkpointing" in Flink's APIs. So far, the statebackend (RocksDB vs
Heap-Based) and checkpointing storage (usually a distributed file system)
are handled by the same classes and configured together. He believes this
is the reason for some of the confusion of users around these concepts.
[4,5]

* [apis] Aljoscha started a discussion to deprecate and later
remove UnionList OperatorState. Some current users of UnionList
OperatorState have voiced concerns. No conclusion so far. [6]

* [connectors] Guowei has published FLIP-143 "Unified Sink API". This is
another follow up FLIP in order for the DataStream to supersede the DataSet
API (FLIP-131). The basic idea is to provide an interface that allows the
development of sinks that provide exactly-once guarantees for both bounded
and unbounded workloads, but don't require the developer of the sink to
make this distinction. [6,7]

[1]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/RESULT-VOTE-Release-1-11-2-release-candidate-1-tp44731.html
[2]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ANNOUNCE-Feature-freeze-for-Stateful-Functions-2-2-0-tp44606.html
[3]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Drop-Scala-2-11-tp44607.html
[4]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-142-Disentangle-StateBackends-from-Checkpointing-tp44496.html
[5]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-142-Disentangle-StateBackends-from-Checkpointing-tp44679.html
[6]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Deprecate-and-remove-UnionList-OperatorState-tp44548p44650.html
[7]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-143-Unified-Sink-API-tp44602.html

flink-packages.org
==

* Jark has published an implementation of the Nexmark benchmark for Apache
Flink. [8]

[8] https://flink-packages.org/packages/nexmark-benchmark


Notable Bugs
==

Nothing came to my attention.

Events, Blog Posts, Misc
===

* Arvid Heise & Niels Basjes are Apache Flink Committers now!
Congratulations to both. [9,10]

* The third video of my colleague Alexander's "Introduction to Flink"
series has been published. This time about building event-driven
applications with Apache Flink. [11]

* On the Ververica Blog, Jark and Qingshen give an outlook to their
upcoming talk at Flink Forward Global [12] on Change Data Capture with
Flink SQL. [13]

[9]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ANNOUNCE-New-Apache-Flink-Committer-Arvid-Heise-tp44713.html
[10]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ANNOUNCE-New-Apache-Flink-Committer-Niels-Basjes-tp44653.html
[11] https://www.youtube.com/watch?v=t663p-qHijE&feature=youtu.be
[12] https://www.flink-forward.org/global-2020
[13]
https://www.ververica.com/blog/a-deep-dive-on-change-data-capture-with-flink-sql-during-flink-forward

Cheers,

Konstantin


-- 

Konstantin Knauf

https://twitter.com/snntrable

https://github.com/knaufk


Re: Performance issue associated with managed RocksDB memory

2020-09-15 Thread Yu Li
Thanks for the follow up Juha, I've just assigned FLINK-19238 to you. Let's
further track this on JIRA.

Best Regards,
Yu


On Tue, 15 Sep 2020 at 15:04, Juha Mynttinen 
wrote:

> Hey
>
> I created this one https://issues.apache.org/jira/browse/FLINK-19238.
>
> Regards,
> Juha
> --
> *From:* Yun Tang 
> *Sent:* Tuesday, September 15, 2020 8:06 AM
> *To:* Juha Mynttinen ; Stephan Ewen <
> se...@apache.org>
> *Cc:* user@flink.apache.org 
> *Subject:* Re: Performance issue associated with managed RocksDB memory
>
> Hi Juha
>
> Would you please consider to contribute this back to community? If agreed,
> please open a JIRA ticket and we could help review your PR then.
>
> Best
> Yun Tang
> --
> *From:* Juha Mynttinen 
> *Sent:* Thursday, September 10, 2020 19:05
> *To:* Stephan Ewen 
> *Cc:* Yun Tang ; user@flink.apache.org <
> user@flink.apache.org>
> *Subject:* Re: Performance issue associated with managed RocksDB memory
>
> Hey
>
> I've fixed the code 
> (https://github.com/juha-mynttinen-king/flink/commits/arena_block_sanity_check
> [github.com]
> )
> slightly. Now it WARNs if there is the memory configuration issue. Also, I
> think there was a bug in the way the check calculated the mutable memory,
> fixed that. Also, wrote some tests.
>
> I tried the code and in my setup I get a bunch of WARN if the memory
> configuration issue is happening:
>
> 20200910T140320.516+0300  WARN RocksDBStateBackend performance will be
> poor because of the current Flink memory configuration! RocksDB will flush
> memtable constantly, causing high IO and CPU. Typically the easiest fix is
> to increase task manager managed memory size. If running locally, see the
> parameter taskmanager.memory.managed.size. Details: arenaBlockSize 8388608
> < mutableLimit 7829367 (writeBufferSize 67108864 arenaBlockSizeConfigured 0
> defaultArenaBlockSize 8388608 writeBufferManagerCapacity 8947848)
>  
> [org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.sanityCheckArenaBlockSize()
> @ 189]
>
> Regards,
> Juha
>
> --
> *From:* Stephan Ewen 
> *Sent:* Wednesday, September 9, 2020 1:56 PM
> *To:* Juha Mynttinen 
> *Cc:* Yun Tang ; user@flink.apache.org <
> user@flink.apache.org>
> *Subject:* Re: Performance issue associated with managed RocksDB memory
>
> Hey Juha!
>
> I agree that we cannot reasonably expect from the majority of users to
> understand block sizes, area sizes, etc to get their application running.
> So the default should be "inform when there is a problem and suggest to
> use more memory." Block/arena size tuning is for the absolute expertes, the
> 5% super power users.
>
> The managed memory is 128 MB by default in the mini cluster. In a
> standalone session cluster setup with default config, it is 512 MB.
>
> Best,
> Stephan
>
>
>
> On Wed, Sep 9, 2020 at 11:10 AM Juha Mynttinen 
> wrote:
>
> Hey Yun,
>
> About the docs. I saw in the docs 
> (https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/large_state_tuning.html
> [ci.apache.org]
> )
> this:
>
> "An advanced option (expert mode) to reduce the number of MemTable flushes
> in setups with many states, is to tune RocksDB’s ColumnFamily options
> (arena block size, max background flush threads, etc.) via a
> RocksDBOptionsFactory".
>
> Only after debugging this issue we're talking about, I figured that this
> snippet in the docs is probably talking about the issue I'm witnessing. I
> think there are two issues here:
>
> 1) it's hard/impossible to know what kind of performance one can expect
> from a Flink application. Thus, it's hard to know if one is suffering from
> e.g. from this performance issue, or if the system is performing normally
> (and inherently being slow).
> 2) even if one suspects a performance issue, it's very hard to find the
> root cause of the performance issue (memtable flush happening frequently).
> To find out this one would need to know what's the normal flush frequency.
>
> Also the doc says "in setups with many states". The same problem is hit
> when using just one state, but "high" parallelism (5).
>
> If the arena block size _ever_ needs  to be configured only to "fix" this
> issue, it'd be best if there _never_ was a need to modify arena block size. 
> What
> if we forget even mentioning arena block size in the docs and focus o

Re: Performance issue associated with managed RocksDB memory

2020-09-15 Thread Juha Mynttinen
Hey

I created this one https://issues.apache.org/jira/browse/FLINK-19238.

Regards,
Juha

From: Yun Tang 
Sent: Tuesday, September 15, 2020 8:06 AM
To: Juha Mynttinen ; Stephan Ewen 
Cc: user@flink.apache.org 
Subject: Re: Performance issue associated with managed RocksDB memory

Hi Juha

Would you please consider to contribute this back to community? If agreed, 
please open a JIRA ticket and we could help review your PR then.

Best
Yun Tang

From: Juha Mynttinen 
Sent: Thursday, September 10, 2020 19:05
To: Stephan Ewen 
Cc: Yun Tang ; user@flink.apache.org 
Subject: Re: Performance issue associated with managed RocksDB memory

Hey

I've fixed the code 
(https://github.com/juha-mynttinen-king/flink/commits/arena_block_sanity_check 
[github.com])
 slightly. Now it WARNs if there is the memory configuration issue. Also, I 
think there was a bug in the way the check calculated the mutable memory, fixed 
that. Also, wrote some tests.

I tried the code and in my setup I get a bunch of WARN if the memory 
configuration issue is happening:

20200910T140320.516+0300  WARN RocksDBStateBackend performance will be poor 
because of the current Flink memory configuration! RocksDB will flush memtable 
constantly, causing high IO and CPU. Typically the easiest fix is to increase 
task manager managed memory size. If running locally, see the parameter 
taskmanager.memory.managed.size. Details: arenaBlockSize 8388608 < mutableLimit 
7829367 (writeBufferSize 67108864 arenaBlockSizeConfigured 0 
defaultArenaBlockSize 8388608 writeBufferManagerCapacity 8947848)  
[org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.sanityCheckArenaBlockSize()
 @ 189]

Regards,
Juha


From: Stephan Ewen 
Sent: Wednesday, September 9, 2020 1:56 PM
To: Juha Mynttinen 
Cc: Yun Tang ; user@flink.apache.org 
Subject: Re: Performance issue associated with managed RocksDB memory

Hey Juha!

I agree that we cannot reasonably expect from the majority of users to 
understand block sizes, area sizes, etc to get their application running.
So the default should be "inform when there is a problem and suggest to use 
more memory." Block/arena size tuning is for the absolute expertes, the 5% 
super power users.

The managed memory is 128 MB by default in the mini cluster. In a standalone 
session cluster setup with default config, it is 512 MB.

Best,
Stephan



On Wed, Sep 9, 2020 at 11:10 AM Juha Mynttinen 
mailto:juha.myntti...@king.com>> wrote:
Hey Yun,

About the docs. I saw in the docs 
(https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/large_state_tuning.html
 
[ci.apache.org])
 this:

"An advanced option (expert mode) to reduce the number of MemTable flushes in 
setups with many states, is to tune RocksDB’s ColumnFamily options (arena block 
size, max background flush threads, etc.) via a RocksDBOptionsFactory".

Only after debugging this issue we're talking about, I figured that this 
snippet in the docs is probably talking about the issue I'm witnessing. I think 
there are two issues here:

1) it's hard/impossible to know what kind of performance one can expect from a 
Flink application. Thus, it's hard to know if one is suffering from e.g. from 
this performance issue, or if the system is performing normally (and inherently 
being slow).
2) even if one suspects a performance issue, it's very hard to find the root 
cause of the performance issue (memtable flush happening frequently). To find 
out this one would need to know what's the normal flush frequency.

Also the doc says "in setups with many states". The same problem is hit when 
using just one state, but "high" parallelism (5).

If the arena block size _ever_ needs  to be configured only to "fix" this 
issue, it'd be best if there _never_ was a need to modify arena block size. 
What if we forget even mentioning arena block size in the docs and focus on the 
managed memory size, since managed memory size is something the user does tune.

You're right that a very clear WARN message could also help to cope with the 
issue. What if there was a WARN message saying that performance will be poor 
and you should increase the managed memory size? And get rid of that arena 
block size decreasing example in the docs.

Also, the default managed memory size is AFAIK 12