Re: Checkpointing & File stream with

2019-06-17 Thread Yun Tang
Hi Sung

How about using FileProcessingMode.PROCESS_CONTINUOUSLY [1] as watch type when 
reading data from HDFS. FileProcessingMode.PROCESS_CONTINUOUSLY would 
periodically monitor the source while default FileProcessingMode.PROCESS_ONCE 
would only process once the data and exit.

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/datastream_api.html#data-sources

Best
Yun Tang

From: Sung Gon Yi 
Sent: Tuesday, June 18, 2019 14:13
To: user@flink.apache.org
Subject: Checkpointing & File stream with

Hello,

I work on joining two streams, one is from Kafka and another is from a file 
(small size).
Stream processing works well, but checkpointing is failed with following 
message.
The file only has less than 100 lines and the pipeline related file reading is 
finished with “FINISHED’ o as soon as deployed.

After that, checkpointing is failed with following message:
——
2019-06-17 20:25:13,575 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 
triggering task Source: Custom File Source (1/1) of job 
d26afe055f249c172c1dcb3311508e83 is not in state RUNNING but FINISHED instead. 
Aborting checkpoint.
——

Custom File Source is related following codes
——

DataStream specificationFileStream = env.readTextFile(specFile)

——

To perform checkpointing successfully, I write a code of custom source function 
to keep working (almost sleep after reading a file). I wonder it is correct way.

Sincerely,
Sung Gon



Checkpointing & File stream with

2019-06-17 Thread Sung Gon Yi
Hello,

I work on joining two streams, one is from Kafka and another is from a file 
(small size).
Stream processing works well, but checkpointing is failed with following 
message.
The file only has less than 100 lines and the pipeline related file reading is 
finished with “FINISHED’ o as soon as deployed.

After that, checkpointing is failed with following message:
——
2019-06-17 20:25:13,575 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 
triggering task Source: Custom File Source (1/1) of job 
d26afe055f249c172c1dcb3311508e83 is not in state RUNNING but FINISHED instead. 
Aborting checkpoint.
——

Custom File Source is related following codes
——
DataStream specificationFileStream = env.readTextFile(specFile)
——

To perform checkpointing successfully, I write a code of custom source function 
to keep working (almost sleep after reading a file). I wonder it is correct way.

Sincerely,
Sung Gon



Re: Need for user class path accessibility on all nodes

2019-06-17 Thread Biao Liu
Ah, sorry for misunderstanding.
So what you are asking is that why we need "--classpath"? I'm not sure what
the original author think of it. I guess the listed below might be
considered.
1. Avoid duplicated deploying. If some common jars are deployed in advance
to each node of cluster, the jobs depend on these jars could avoid
deploying one by one.
2. Support NFS which is mentioned in option description of "--classpath".


Abdul Qadeer  于2019年6月18日周二 上午11:45写道:

> Hi Biao,
>
> I am aware of it - that's not my question.
>
> On Mon, Jun 17, 2019 at 7:42 PM Biao Liu  wrote:
>
>> Hi Abdul, "--classpath " can be used for those are not included in
>> user jar. If all your classes are included in your jar passed to Flink, you
>> don't need this "--classpath".
>>
>> Abdul Qadeer  于2019年6月18日周二 上午3:08写道:
>>
>>> Hi!
>>>
>>> I was going through submission of a Flink program through CLI. I see
>>> that "--classpath " needs to be accessible from all nodes in the
>>> cluster as per documentation. As I understand the jar files are already
>>> part of the blob uploaded to JobManager from the CLI. The TaskManagers can
>>> download this blob when the receive the task and access the classes from
>>> there. Why is there a need to be able to access these files from every node
>>> then? It makes sense to use Distributed File System to access these jars if
>>> the network is not reachable to download blob files. Or if the blob doesn't
>>> contain metadata to differentiate between child class loader classes and
>>> the rest. However it seems like the TaskManager always tries to access the
>>> specified class paths irrespective of Network Partitions.
>>>
>>>


Re: Need for user class path accessibility on all nodes

2019-06-17 Thread Abdul Qadeer
Hi Biao,

I am aware of it - that's not my question.

On Mon, Jun 17, 2019 at 7:42 PM Biao Liu  wrote:

> Hi Abdul, "--classpath " can be used for those are not included in
> user jar. If all your classes are included in your jar passed to Flink, you
> don't need this "--classpath".
>
> Abdul Qadeer  于2019年6月18日周二 上午3:08写道:
>
>> Hi!
>>
>> I was going through submission of a Flink program through CLI. I see that
>> "--classpath " needs to be accessible from all nodes in the cluster as
>> per documentation. As I understand the jar files are already part of the
>> blob uploaded to JobManager from the CLI. The TaskManagers can download
>> this blob when the receive the task and access the classes from there. Why
>> is there a need to be able to access these files from every node then? It
>> makes sense to use Distributed File System to access these jars if the
>> network is not reachable to download blob files. Or if the blob doesn't
>> contain metadata to differentiate between child class loader classes and
>> the rest. However it seems like the TaskManager always tries to access the
>> specified class paths irrespective of Network Partitions.
>>
>>


Re: Need for user class path accessibility on all nodes

2019-06-17 Thread Biao Liu
Hi Abdul, "--classpath " can be used for those are not included in
user jar. If all your classes are included in your jar passed to Flink, you
don't need this "--classpath".

Abdul Qadeer  于2019年6月18日周二 上午3:08写道:

> Hi!
>
> I was going through submission of a Flink program through CLI. I see that
> "--classpath " needs to be accessible from all nodes in the cluster as
> per documentation. As I understand the jar files are already part of the
> blob uploaded to JobManager from the CLI. The TaskManagers can download
> this blob when the receive the task and access the classes from there. Why
> is there a need to be able to access these files from every node then? It
> makes sense to use Distributed File System to access these jars if the
> network is not reachable to download blob files. Or if the blob doesn't
> contain metadata to differentiate between child class loader classes and
> the rest. However it seems like the TaskManager always tries to access the
> specified class paths irrespective of Network Partitions.
>
>


Need for user class path accessibility on all nodes

2019-06-17 Thread Abdul Qadeer
Hi!

I was going through submission of a Flink program through CLI. I see that
"--classpath " needs to be accessible from all nodes in the cluster as
per documentation. As I understand the jar files are already part of the
blob uploaded to JobManager from the CLI. The TaskManagers can download
this blob when the receive the task and access the classes from there. Why
is there a need to be able to access these files from every node then? It
makes sense to use Distributed File System to access these jars if the
network is not reachable to download blob files. Or if the blob doesn't
contain metadata to differentiate between child class loader classes and
the rest. However it seems like the TaskManager always tries to access the
specified class paths irrespective of Network Partitions.


Re: Calculate a 4 hour time window for sum,count with sum and count output every 5 mins

2019-06-17 Thread Rafi Aroch
Hi Vijay,

When using windows, you may use the 'trigger' to set a Custom Trigger which
would trigger your *ProcessWindowFunction* accordingly.

In your case, you would probably use:

> *.trigger(ContinuousProcessingTimeTrigger.of(Time.minutes(5)))*
>

Thanks,
Rafi


On Mon, Jun 17, 2019 at 9:01 PM Vijay Balakrishnan 
wrote:

> I am also implementing the ProcessWindowFunction and accessing the
> windowState to get data but how do i push data out every 5 mins during a 4
> hr time window ?? I am adding a globalState to handle the 4 hr window ???
> Or should I still use the context.windowState even for the 4 hr window ?
>
> public  class MGroupingAggregateClass extends ProcessWindowFunction<> {
>>
>> private MapState timedGroupKeyState;
>> private MapState globalGroupKeyState;
>> private final MapStateDescriptor
>> timedMapKeyStateDescriptor =
>>new MapStateDescriptor<>("timedGroupKeyState",
>>String.class, Object.class);
>> private final MapStateDescriptor
>> globalMapKeyStateDescriptor =
>>new MapStateDescriptor<>("globalGroupKeyState",
>>String.class, Object.class);
>>
>>
>> public void open(Configuration ..) {
>> timedGroupKeyState =
>> getRuntimeContext().getMapState(timedMapKeyStateDescriptor);
>> globalGroupKeyState =
>> getRuntimeContext().getMapState(globalMapKeyStateDescriptor);
>> }
>>
>> public void process(MonitoringTuple currKey, Context context,
>> Iterable> elements,
>>Collector> out) throws
>> Exception {
>>logger.info("Entered MGroupingAggregateWindowProcessing - process
>> interval:{}, currKey:{}", interval, currKey);
>>timedGroupKeyState =
>> context.windowState().getMapState(timedMapKeyStateDescriptor);
>>globalGroupKeyState =
>> context.globalState().getMapState(globalMapKeyStateDescriptor);
>> ...
>> //get data fromm state
>> Object timedGroupStateObj = timedGroupKeyState.get(groupKey);
>>
>> //how do i push the data out every 5 mins to the sink during the 4 hr
>> window ??
>>
>> }
>>
>
>
>
>
>
>
>
> On Mon, Jun 17, 2019 at 10:06 AM Vijay Balakrishnan 
> wrote:
>
>> Hi,
>> Need to calculate a 4 hour time window for count, sum with current
>> calculated results being output every 5 mins.
>> How do i do that ?
>> Currently, I calculate results for 5 sec and 5 min time windows fine on
>> the KeyedStream.
>>
>> Time timeWindow = getTimeWindowFromInterval(interval);//eg: timeWindow =
>>> Time.seconds(timeIntervalL);
>>> KeyedStream, ...> monitoringTupleKeyedStream =
>>> kinesisStream.keyBy(...);
>>> final WindowedStream, , TimeWindow> windowStream
>>> =
>>> monitoringTupleKeyedStream
>>> .timeWindow(timeWindow);
>>> DataStream<> enrichedMGStream = windowStream.aggregate(
>>> new MGroupingWindowAggregateClass(...),
>>> new MGroupingAggregateClass())
>>> .map(new Monitoring...(...));
>>> enrichedMGStream.addSink(..);
>>>
>>
>>
>> TIA,
>> Vijay
>>
>


Re: Calculate a 4 hour time window for sum,count with sum and count output every 5 mins

2019-06-17 Thread Vijay Balakrishnan
I am also implementing the ProcessWindowFunction and accessing the
windowState to get data but how do i push data out every 5 mins during a 4
hr time window ?? I am adding a globalState to handle the 4 hr window ???
Or should I still use the context.windowState even for the 4 hr window ?

public  class MGroupingAggregateClass extends ProcessWindowFunction<> {
>
> private MapState timedGroupKeyState;
> private MapState globalGroupKeyState;
> private final MapStateDescriptor
> timedMapKeyStateDescriptor =
>new MapStateDescriptor<>("timedGroupKeyState",
>String.class, Object.class);
> private final MapStateDescriptor
> globalMapKeyStateDescriptor =
>new MapStateDescriptor<>("globalGroupKeyState",
>String.class, Object.class);
>
>
> public void open(Configuration ..) {
> timedGroupKeyState =
> getRuntimeContext().getMapState(timedMapKeyStateDescriptor);
> globalGroupKeyState =
> getRuntimeContext().getMapState(globalMapKeyStateDescriptor);
> }
>
> public void process(MonitoringTuple currKey, Context context,
> Iterable> elements,
>Collector> out) throws
> Exception {
>logger.info("Entered MGroupingAggregateWindowProcessing - process
> interval:{}, currKey:{}", interval, currKey);
>timedGroupKeyState =
> context.windowState().getMapState(timedMapKeyStateDescriptor);
>globalGroupKeyState =
> context.globalState().getMapState(globalMapKeyStateDescriptor);
> ...
> //get data fromm state
> Object timedGroupStateObj = timedGroupKeyState.get(groupKey);
>
> //how do i push the data out every 5 mins to the sink during the 4 hr
> window ??
>
> }
>







On Mon, Jun 17, 2019 at 10:06 AM Vijay Balakrishnan 
wrote:

> Hi,
> Need to calculate a 4 hour time window for count, sum with current
> calculated results being output every 5 mins.
> How do i do that ?
> Currently, I calculate results for 5 sec and 5 min time windows fine on
> the KeyedStream.
>
> Time timeWindow = getTimeWindowFromInterval(interval);//eg: timeWindow =
>> Time.seconds(timeIntervalL);
>> KeyedStream, ...> monitoringTupleKeyedStream =
>> kinesisStream.keyBy(...);
>> final WindowedStream, , TimeWindow> windowStream =
>> monitoringTupleKeyedStream
>> .timeWindow(timeWindow);
>> DataStream<> enrichedMGStream = windowStream.aggregate(
>> new MGroupingWindowAggregateClass(...),
>> new MGroupingAggregateClass())
>> .map(new Monitoring...(...));
>> enrichedMGStream.addSink(..);
>>
>
>
> TIA,
> Vijay
>


InvalidProgramException in streaming execution

2019-06-17 Thread Akash Jain
Hi All,

I am trying a simple example which looks like this:
StreamExecutionEnvironment see =
StreamExecutionEnvironment.createLocalEnvironment();
PojoCsvInputFormat pcif = new PojoCsvInputFormat<>(inPath,
PojoTypeInfo) TypeExtractor.createTypeInfo(TestPojo.class));
DataStreamSource stream = see.readFile(pcif, inPath,
FileProcessingMode.PROCESS_CONTINUOUSLY, 5000);
stream.writeAsText(outPath);
see.execute();

TestPojo looks like this:
public class TestPojo {
public int a;
public TestPojo() {}
}

But I am getting the following exception:
Exception in thread "main"
org.apache.flink.api.common.InvalidProgramException: The type returned
by the input format could not be automatically determined. Please
specify the TypeInformation of the produced type explicitly by using
the 'createInput(InputFormat, TypeInformation)' method instead.
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.readFile

Is reading csv data into pojos not supported with streaming readFile
or am I doing something wrong here?

Thank you


Re: [ANNOUNCEMENT] March 2019 Bay Area Apache Flink Meetup

2019-06-17 Thread Xuefu Zhang
Hi all,

The scheduled meetup is only about a week away. Please note that RSVP at
meetup.com is required.  In order for us to get the actual headcount to
prepare for the event, please sign up as soon as possible if you plan to
join. Thank you very much for your cooperation.

Regards,
Xuefu

On Thu, Feb 14, 2019 at 4:32 PM Xuefu Zhang  wrote:

> Hi all,
>
> I'm very excited to announce that the community is planning the next
> meetup in Bay Area on March 25, 2019. The event is just announced on
> Meetup.com [1].
>
> To make the event successful, your participation and help will be needed.
> Currently, we are looking for an organization that can host the event.
> Please let me know if you have any leads.
>
> Secondly, we encourage Flink users and developers to take this as an
> opportunity to share experience or development. Thus, please let me know if
> you like to give a short talk.
>
> I look forward to meeting you all in the Meetup.
>
> Regards,
> Xuefu
>
> [1] https://www.meetup.com/Bay-Area-Apache-Flink-Meetup/events/258975465
>


Re: NotSerializableException: DropwizardHistogramWrapper inside AggregateFunction

2019-06-17 Thread Vijay Balakrishnan
Thanks,Fabian.
I got around the issue by moving the logic for the
DropwizardHistogramWrapper -a non serializable class into the
ProcessWindowFunction's open() function.



On Fri, Jun 7, 2019 at 12:33 AM Fabian Hueske  wrote:

> Hi,
>
> There are two ways:
>
> 1. make the non-serializable member variable transient (meaning that it
> won't be serialized) and check in the aggregate call if it has been
> initialized or not.
> 2. implement your own serialization logic by overriding readObject() and
> writeObject() [1].
>
> Best, Fabian
>
> [1]
> https://howtodoinjava.com/java/serialization/custom-serialization-readobject-writeobject/
>
> Am Do., 6. Juni 2019 um 23:04 Uhr schrieb Vijay Balakrishnan <
> bvija...@gmail.com>:
>
>> HI,
>> I have a class defined :
>>
>> public class MGroupingWindowAggregate implements AggregateFunction.. {
>>> private final Map keyHistMap = new TreeMap<>();
>>> }
>>>
>> In the constructor, I initialize it.
>>
>>> public MGroupingWindowAggregate() {
>>> Histogram minHist = new Histogram(new
>>> SlidingTimeWindowReservoir(timeIntervalL, TimeUnit.MINUTES));
>>> org.apache.flink.metrics.Histogram minHistogram = new
>>> DropwizardHistogramWrapper(minHist);
>>> Map intervalHistMap = new
>>> TreeMap<>();
>>> intervalHistMap.putIfAbsent(interval, minHistogram);
>>> keyHistMap.putIfAbsent(operationKey, intervalHistMap);
>>> }
>>>
>> When trying to use it in the add() method of AggregateFunction, it fails
>> saying:
>> NotSerializableException:
>> org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper
>>
>> Tried to wrap DropwizardHistogramWrapper inside a serializable Object
>> with Composition but that also didn't work.
>>
>> Looked at using RichFunction open() based on Stephan's advise here.
>> https://stackoverflow.com/questions/34118469/flink-using-dagger-injections-not-serializable
>> But cannot use RichFunction with AggrgeateFunction or use
>> RichAggregateFunction
>>
>> How can I use the DropwizardHistogramWrapper -a non serializable class
>> inside my AggregateFunction ? Trying to use DropwizardHistogramWrapper to
>> get some Histogram percentile stats without re-inventing the wheel.
>>
>> TIA,
>> Vijay
>>
>


Calculate a 4 hour time window for sum,count with sum and count output every 5 mins

2019-06-17 Thread Vijay Balakrishnan
Hi,
Need to calculate a 4 hour time window for count, sum with current
calculated results being output every 5 mins.
How do i do that ?
Currently, I calculate results for 5 sec and 5 min time windows fine on the
KeyedStream.

Time timeWindow = getTimeWindowFromInterval(interval);//eg: timeWindow =
> Time.seconds(timeIntervalL);
> KeyedStream, ...> monitoringTupleKeyedStream =
> kinesisStream.keyBy(...);
> final WindowedStream, , TimeWindow> windowStream =
> monitoringTupleKeyedStream
> .timeWindow(timeWindow);
> DataStream<> enrichedMGStream = windowStream.aggregate(
> new MGroupingWindowAggregateClass(...),
> new MGroupingAggregateClass())
> .map(new Monitoring...(...));
> enrichedMGStream.addSink(..);
>


TIA,
Vijay


Re: Apache Flink Sql - How to access EXPR$0, EXPR$1 values from a Row in a table

2019-06-17 Thread Rong Rong
Hi Mans,

I am not sure if you intended to name them like this. but if you were to
access them you need to escape them like `EXPR$0` [1].
Also I think Flink defaults unnamed fields in a row to `f0`, `f1`, ... [2].
so you might be able to access them like that.

--
Rong

[1] https://calcite.apache.org/docs/reference.html#identifiers
[2]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/common.html#register-a-datastream-or-dataset-as-table

On Fri, Jun 14, 2019 at 1:55 PM M Singh  wrote:

> Hi:
>
> I am working with Flink Sql and have a table with the following schema:
>
> root
>  |-- name: String
>  |-- idx: Integer
>  |-- pos: String
>  |-- tx: Row(EXPR$0: Integer, EXPR$1: String)
>
> How can I access the attributes tx.EXPR$0 and tx.EXPR$1 ?
>
> I tried the following (the table is registered as 'tupleTable')
>
> Table tuples = myTableFuntionResultTuple.select("select name, idx,
> pos, tx.EXPR$0, tx.EXPR$1 from tupleTable");
>
> but I get the following exception
>
> Exception in thread "main"
> org.apache.flink.table.api.ExpressionParserException: Could not parse
> expression at column 8: `,' expected but `n' found
> select name, idx, pos, tx.EXPR$0, tx.EXPR$1 from tupleTable
>
> Please let me know how if there is any documentation or samples for
> accessing the tuples values in a table.
>
> Thanks
>
> Mans
>


Re: How to restart/recover on reboot?

2019-06-17 Thread John Smith
Well some reasons, machine reboots/maintenance etc... Host/VM crashes and
restarts. And same goes for the job manager. I don't want/need to have to
document/remember some start process for sys admins/devops.

So far I have looked at ./start-cluster.sh and all it seems to do is SSH
into all the specified nodes and starts the processes using the jobmanager
and taskmanager scripts. I don't see anything special in any of the sh
scripts.
I configured passwordless ssh through terraform and all that works great
only when trying to do the manual start through systemd. I may have
something missing...



On Mon, 17 Jun 2019 at 09:41, Till Rohrmann  wrote:

> Hi John,
>
> I have not much experience wrt setting Flink up via systemd services. Why
> do you want to do it like that?
>
> 1. In standalone mode, Flink won't automatically restart TaskManagers.
> This only works on Yarn and Mesos atm.
> 2. In case of a lost TaskManager, you should run `taskmanager.sh start`.
> This script simply starts a new TaskManager process.
> 3. I guess you could use systemd to bring up a Flink TaskManager process
> on start up.
>
> Cheers,
> Till
>
> On Fri, Jun 14, 2019 at 5:56 PM John Smith  wrote:
>
>> I looked into the start-cluster.sh and I don't see anything special. So
>> technically it should be as easy as installing Systemd services to run
>> jobamanger.sh and taskmanager.sh respectively?
>>
>> On Wed, 12 Jun 2019 at 13:02, John Smith  wrote:
>>
>>> The installation instructions do not indicate how to create systemd
>>> services.
>>>
>>> 1- When task nodes fail, will the job leader detect this and ssh and
>>> restart the task node? From my testing it doesn't seem like it.
>>> 2- How do we recover a lost node? Do we simply go back to the master
>>> node and run start-cluster.sh and the script is smart enough to figure out
>>> what is missing?
>>> 3- Or do we need to create systemd services and if so on which command
>>> do we start the service on?
>>>
>>


Why the current time of my window never reaches the end time when I use KeyedProcessFunction?

2019-06-17 Thread Felipe Gutierrez
Hi,

I used this example of KeyedProcessFunction from the FLink website [1] and
I have implemented my own KeyedProcessFunction to process some
approximation counting [2]. This worked very well. Then I switched the data
source to consume strings from Twitter [3]. The data source is consuming
the strings because I can see it when I debug. However, the time comparison
is always different on the onTimer() method, and I never get the results of
the window processing. I don't know the exact reason that this is
happening. I guess it is because my state is too heavy. But, still
shouldn't the time be correct at some point to finish the evaluation of my
window?

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#example
[2]
https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/WordHLLKeyedProcessWindowSocket.java
[3]
https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/WordHLLKeyedProcessWindowTwitter.java

Kind Regards,
Felipe
*--*
*-- Felipe Gutierrez*

*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
*


Re: How to restart/recover on reboot?

2019-06-17 Thread Till Rohrmann
Hi John,

I have not much experience wrt setting Flink up via systemd services. Why
do you want to do it like that?

1. In standalone mode, Flink won't automatically restart TaskManagers. This
only works on Yarn and Mesos atm.
2. In case of a lost TaskManager, you should run `taskmanager.sh start`.
This script simply starts a new TaskManager process.
3. I guess you could use systemd to bring up a Flink TaskManager process on
start up.

Cheers,
Till

On Fri, Jun 14, 2019 at 5:56 PM John Smith  wrote:

> I looked into the start-cluster.sh and I don't see anything special. So
> technically it should be as easy as installing Systemd services to run
> jobamanger.sh and taskmanager.sh respectively?
>
> On Wed, 12 Jun 2019 at 13:02, John Smith  wrote:
>
>> The installation instructions do not indicate how to create systemd
>> services.
>>
>> 1- When task nodes fail, will the job leader detect this and ssh and
>> restart the task node? From my testing it doesn't seem like it.
>> 2- How do we recover a lost node? Do we simply go back to the master node
>> and run start-cluster.sh and the script is smart enough to figure out what
>> is missing?
>> 3- Or do we need to create systemd services and if so on which command do
>> we start the service on?
>>
>


Re: Best practice to process DB stored log (is Flink the right choice?)

2019-06-17 Thread Piotr Nowojski
Hi,

Those are good questions.

> A datastream to connect to a table is available? I

What table, what database system do you mean? You can check the list of 
existing connectors provided by Flink in the documentation. About reading from 
relational DB (example by using JDBC) you can read a little bit here: 
https://stackoverflow.com/questions/48162464/how-to-read-data-from-relational-database-in-apache-flink-streaming
 


> I Flick an optimal option for that rather simple processing?

It depends on many things. What you have described could be easily done by some 
trivial python script, however you would have to answer yourself couple of 
questions:

- what is the scale at which you would like to operate? Would your computation 
need to be distributed across multiple machines in a foreseeable future?
- do you care about reliability? What should happen in case of failures? Do you 
need High Availability?
- could you have more use cases/requirements in the future?
- do you care about at-least-once or exactly-once processing guarantees?
- do you care if you lost your computation state in case of failure?
- how do you want to deploy your job (flink provides out of the box integration 
with many systems like Mesos, Yarn etc…)
- will you need to integrate with some other external systems, for which Flink 
has built in support (like S3 file system, Kafka, Kinesis, …)
- do you care about monitoring your job? (Flink has built-in metrics)
- …

If you do not care about those things and you only need to process small number 
of records per second, then Flink might be an overkill. If not, or if you are 
not sure, then I would encourage you to read/do the research about the above 
mention things to make up your mind.

Piotrek

> On 15 Jun 2019, at 18:27, Stefano Lissa  wrote:
> 
> Hi,
> surely really new-bye question but Flink sounds like to be the best choice. I 
> have a log continuously added to a database table where a machine status is 
> stored with a timestamp (the status is 0, 1, 2).
> 
> What I need is to process this status and produce another stream where a 
> sequence of status X is aggregated producing a new record with the first 
> status X timestamp found in the input stream and the time delta until a new 
> status different from X is seen.
> 
> A datastream to connect to a table is available? I've tried to find something 
> in the documentation, but not sure if I searched in the right place.
> 
> I Flick an optimal option for that rather simple processing?
> 
> Thank you, Stefano.



Has Flink a kafka processing location strategy?

2019-06-17 Thread Theo Diefenthal
Hi,

 

We have a Hadoop/YARN Cluster with Kafka and Flink/YARN running on the
same machines. 

 

In Spark (Streaming), there is a PreferBrokers location strategy, so that
the executors consume those kafka partitions which are served from the
same machines kafka broker. (
https://spark.apache.org/docs/2.4.0/streaming-kafka-0-10-integration.html#
locationstrategies  )

 

I wonder if there is such thing in Flink as well? I didn't find anything
yet.

 

Best regards

Theo Diefenthal 



Re: Timeout about local test case

2019-06-17 Thread Piotr Nowojski
Hi,

I don’t know what’s the reason (also there are no open issues with this test in 
our jira). This test seems to be working on travis/CI and it works for me when 
I’ve just tried running it locally. There might be some bug in the 
test/production that is triggered only in some specific conditions/setup, which 
just happen to be visible in your case. Couple of questions that might help you 
find your issue:

You can try checking out the logs (they might be disabled by default in 
`flink-runtime/src/test/resources/log4j-test.properties`) if something caused 
this timeout to happen (some previous failure)

It can also be a case that you have not cleaned up/rebuilt something correctly. 
If you run `man clean install -pl flink-runtime -am` does this test work? 

Piotrek

> On 15 Jun 2019, at 13:21, aitozi  wrote:
> 
> Hi community,
> 
> When I run mvn clean install locally (branch master) the test case always
> failed with bundle timeout exception, I am doubt why this failed? And also
> an ip  PS: I run the case individually also met the same error.
> 
> 
>  
> 
> Can someone can me some hints, and also met the ip unknown when running some
> test case which have not seen in the project too. (Running
> JobMasterTest.testDeclineCheckpointInvocationWithUserException)
> 
> java.util.concurrent.ExecutionException: akka.pattern.AskTimeoutException:
> Ask timed out on
> [ActorSelection[Anchor(akka.tcp://flink@151.101.16.133:50619/),
> Path(/user/jobmanager_0)]] after [1 ms]. Message of type
> [akka.actor.Identify]. A typical reason for `AskTimeoutException` is that
> the recipient actor didn't send a reply.
> 
>   at
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>   at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
>   at
> org.apache.flink.runtime.jobmaster.JobMasterTest.testDeclineCheckpointInvocationWithUserException(JobMasterTest.java:330)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>   at
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
>   at
> com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
>   at
> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
>   at 
> com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
> Caused by: akka.pattern.AskTimeoutException: Ask timed out on
> [ActorSelection[Anchor(akka.tcp://flink@151.101.16.133:50619/),
> Path(/user/jobmanager_0)]]
> 
> Thanks,
> Aitozi
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: How to trigger the window function even there's no message input in this window?

2019-06-17 Thread Piotr Nowojski
Hi,

As far as I know, this is currently impossible.

You can workaround this issue by maybe implementing your own custom post 
processing operator/flatMap function, that would:
- track the output of window operator
- register processing time timer with some desired timeout
- every time the processing time timer fires, your code would check if window 
operator has emitted something in the last X seconds period. If not, it could 
emit some default element

Piotrek

> On 14 Jun 2019, at 12:08, wangl...@geekplus.com.cn wrote:
> 
> 
> windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10))).process(new 
> MyProcessWindowFunction());
> How can i trigger the MyProcessWindowFunction even there's no input during 
> this window time? 
> 
> wangl...@geekplus.com.cn 


Re: Flink state: complex value state pojos vs explicitly managed fields

2019-06-17 Thread Congxian Qiu
Hi,
If you use RocksDBStateBackend, one member one state will get better
performance. Because RocksDBStateBackend needs to de/serialize the
key/value when put/get, with one POJO value, you need to de/serializer the
whole POJO value when put/get.

Best,
Congxian


Timothy Victor  于2019年6月17日周一 下午8:04写道:

> I would choose encapsulation if it the fields are indeed related and makes
> sense for your model.  In general, I feel it is not a good thing to let
> Flink (or any other frameworks) internal mechanics dictate your data model.
>
> Tim
>
> On Mon, Jun 17, 2019, 4:59 AM Frank Wilson  wrote:
>
>> Hi,
>>
>> Is it better to have one POJO value state with a collection inside or an
>> explicit state declaration for each member? e.g.
>>
>> MyPojo {
>> long id;
>> List[Foo] foos;
>>
>> // getter / setters omitted
>> }
>>
>> Or
>>
>> Two managed state declarations in my process function (a value for the
>> long and a list for the “foos”).
>>
>> It feels like former is better encapsulated but the latter gives flink
>> more information about the state.
>>
>> Frank
>>
>>


Re: Flink state: complex value state pojos vs explicitly managed fields

2019-06-17 Thread Timothy Victor
I would choose encapsulation if it the fields are indeed related and makes
sense for your model.  In general, I feel it is not a good thing to let
Flink (or any other frameworks) internal mechanics dictate your data model.

Tim

On Mon, Jun 17, 2019, 4:59 AM Frank Wilson  wrote:

> Hi,
>
> Is it better to have one POJO value state with a collection inside or an
> explicit state declaration for each member? e.g.
>
> MyPojo {
> long id;
> List[Foo] foos;
>
> // getter / setters omitted
> }
>
> Or
>
> Two managed state declarations in my process function (a value for the
> long and a list for the “foos”).
>
> It feels like former is better encapsulated but the latter gives flink
> more information about the state.
>
> Frank
>
>


Re: Error while using session window

2019-06-17 Thread Piotr Nowojski
Hi,

Thanks for reporting the issue. I think this might be caused by 
System.currentTimeMillis() not being monotonic [1] and the fact Flink is 
accessing this function per element multiple times (at least twice: first for 
creating a window, second to perform the check that has failed in your case), 
however I’m pretty sure that this is more general problem in more places.I have 
created a ticket for this. [2]

I’m not sure if there is an easy hot fix for that. You would have to increase 
inactivity gap, switch to ingestion/even time (anyway preferable), make sure 
that machine’s time doesn’t change or just ignore the problem and accept some 
failure from time to time.

Piotrek

[1] 
https://stackoverflow.com/questions/2978598/will-system-currenttimemillis-always-return-a-value-previous-calls
 

[2] https://issues.apache.org/jira/browse/FLINK-12872 


> On 14 Jun 2019, at 10:14, Abhishek Jain  wrote:
> 
> Hi,
> I have a job that uses processing time session window with inactivity gap of 
> 60ms where I intermittently run into the following exception. I'm trying to 
> figure out what happened here. Haven't been able to reproduce this scenario. 
> Any thoughts?
> 
> java.lang.UnsupportedOperationException: The end timestamp of a 
> processing-time window cannot become earlier than the current processing time 
> by merging. Current processing time: 1560493731808 window: 
> TimeWindow{start=1560493731654, end=1560493731778}
>   at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:325)
>   at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:311)
>   at 
> org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet.addWindow(MergingWindowSet.java:212)
>   at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:311)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
>   at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>   at java.lang.Thread.run(Thread.java:745)
> 
> -- 
> Warm Regards,
> Abhishek Jain
> 



Unexpected behavior from interval join in Flink

2019-06-17 Thread Wouter Zorgdrager
Hi all,

I'm experiencing some unexpected behavior using an interval join in Flink.
I'm dealing with two data sets, lets call them X and Y. They are finite
(10k elements) but I interpret them as a DataStream. The data needs to be
joined for enrichment purposes. I use event time and I know (because I
generated the data myself) that the timestamp of an element Y is always
between -60 minutes and +30 minutes of the element with the same key in set
X. Both datasets are in-order (in terms of timestamps), equal in size,
share a common key and parallelism is set to 1 throughout the whole program.

The code to join looks something like this:

xStream
  .assignAscendingTimestamps(_.date.getTime)
  .keyBy(_.commonKey)
  .intervalJoin(
yStream
  .assignAscendingTimestamps(_.date.getTime)
  .keyBy(_.commonKey))
  .between(Time.minutes(-60), Time.minutes(30))
  .process(new ProcessJoinFunction[X, Y, String] {
override def processElement(
left: X,
right: Y,
ctx: ProcessJoinFunction[X, Y, String]#Context,
out: Collector[String]): Unit = {

  out.collect(left + ":" + right)
}


However, about 30% percent of the data is not joined. Is there a proper way
to debug this? For instance, in windows you can side-output late data. Is
there a possibility to side-output unjoinable data?

Thx a lot,
Wouter


What order are events processed in iterative loop?

2019-06-17 Thread John Tipper
For the case of a single iteration of an iterative loop where the feedback type 
is different to the input stream type, what order are events processed in the 
forward flow? So for example, if we have:

  *   the input stream contains input1 followed by input2
  *   a ConnectedIterativeStream at the head of an iteration
  *   followed by a CoProcessFunction, which emits a feedback element in 
response to an inputthat closes the ConnectedIterativeStream

For an input stream of input1 followed by input2, what order of events does the 
CoProcessFunction see?

Does it see "input1, feedback1, input2, feedback2", or "input1, input2, 
feedback1, feedback2", or is it a non-deterministic processing time order based 
on the execution time of the CoProcessFunction, but where input1 is always 
processed before input2 and feedback1 is always processed before feedback2, 
e.g. either of the two orders are possible?

Many thanks,

John


Flink state: complex value state pojos vs explicitly managed fields

2019-06-17 Thread Frank Wilson
Hi,

Is it better to have one POJO value state with a collection inside or an
explicit state declaration for each member? e.g.

MyPojo {
long id;
List[Foo] foos;

// getter / setters omitted
}

Or

Two managed state declarations in my process function (a value for the long
and a list for the “foos”).

It feels like former is better encapsulated but the latter gives flink more
information about the state.

Frank


Re: [ANNOUNCE] Weekly Community Update 2019/24

2019-06-17 Thread Konstantin Knauf
Hi Zili,

thank you for adding these threads :) I would have otherwise picked them up
next week, just couldn't put everything into one email.

Cheers,

Konstantin

On Sun, Jun 16, 2019 at 11:07 PM Zili Chen  wrote:

> Hi Konstantin and all,
>
> Thank Konstantin very much for reviving this tradition! It reminds
> me of the joyful time I can easily catch up interesting ongoing threads.
> Thanks for Till's work, too.
>
> Besides exciting updates and news above, I'd like to pick up
> some other threads you guys may be interested in.
>
> * xiaogang has recently started a discussion[1] on allowing
> at-most-once delivery in case of failures, which adapts Flink
> to more scenarios.
>
> * vino has raised a discussion[2] on supporting local aggregation
> in Flink, which was received a lot of positive feedbacks and now
> there is a ongoing FLIP-44 thread[3].
>
> * Jeff Zhang has raised a discussion[4] and drafted a design doc[5]
> on Flink client API enhancement, which aims at overcoming limitation
> when integrating Flink with projects such as Zepplin or Beam.
>
> Best,
> tison.
>
> [1]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Allow-at-most-once-delivery-in-case-of-failures-td29464.html
> [2]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Support-Local-Aggregation-in-Flink-td29307.html
> [3]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-44-Support-Local-Aggregation-in-Flink-td29513.html
> [4]
> https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E
> [5]
> https://docs.google.com/document/d/1VavBrYn8vJeZs-Mhu5VzKO6xrWCF40aY0nlQ_UVVTRg/
>
>
> Konstantin Knauf  于2019年6月17日周一 上午12:10写道:
>
>> Dear community,
>>
>> last year Till did a great job on summarizing recent developments in the
>> Flink community in a "Weekly community update" thread. I found this very
>> helpful and would like to revive this tradition with a focus on topics &
>> threads which are particularly relevant to the wider community of Flink
>> users.
>>
>> As we haven't had such an update for some time (since December 2018), I
>> find it impossible to cover everything that's currently going on in this
>> email. I'll try to include most ongoing discussions and FLIPs over the
>> course of the next weeks to catch up. Afterwards I am going to go back to
>> only focus on news since the last update.
>>
>> You are welcome to share any additional news and updates with the
>> community in this thread.
>>
>> Flink Development
>> ===
>>
>> * [releases] The community is currently working on a Flink 1.8.1 release
>> [1]. The first release candidate should be ready soon (one critical bug to
>> fix as of writing, FLINK-12863).
>> * [releases] Kurt and Gordon stepped up as release managers for Flink 1.9
>> and started a thread [2] to sync on the status of various development
>> threads targeted for Flink 1.9. Check it out to see if the feature you are
>> waiting for is likely to make it or not.
>> * [savepoints] Gordon, Kostas and Congxian have recently started a
>> discussion [3] on unifying the savepoint format across StateBackends, which
>> will enable users to switch between StateBackends when recovering from a
>> Savepoint. The related discussion on introducing Stop-With-Checkpoint [4]
>> initiated by Yu Li is closely related and worth a read to understand the
>> long term vision.
>> * [savepoints] Seth and Gordon have started a discussion to add a State
>> Processing API ("Savepoint Connector"), which will allow reading &
>> modifying existing Savepoints as well as creating new Savepoints from
>> scratch with the DataSet API. The feature is targeted for Flink 1.9.0 as a
>> new *library*.
>> * [python-support] Back in April we had a discussion on the mailing list
>> about adding Python Support to the Table API [6]. This support will likely
>> be available in Flink 1.9 (without UDFs and later with UDF support as
>> well). Therefore, Stephan has started a discussion [7] to deprecate the
>> current Python API in Flink 1.9. This has gotten a lot of positive feedback
>> and the only open question as of writing is whether to only deprecate it or
>> to remove it directly.
>>
>> [1]
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Releasing-Flink-1-8-1-td29154.html
>> [2]
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Features-for-Apache-Flink-1-9-0-td28701.html
>> [3]
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-41-Unify-Keyed-State-Snapshot-Binary-Format-for-Savepoints-td29197.html
>> [4] https://issues.apache.org/jira/browse/FLINK-12619
>> [5]
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Discuss-FLIP-43-Savepoint-Connector-td29232.html
>> [6]
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-38-Support-python-language-in-flink-TableAPI-td28061.html#a28096
>> [7]
>> htt

StreamingFileSink with hdfs less than 2.7

2019-06-17 Thread Rinat
Hi mates, I decided to enable persist the state of our flink jobs, that write 
data into hdfs, but got some troubles with that.

I’m trying to use StreamingFileSink with cloudera hadoop, which version is 
2.6.5,  and it doesn’t contain truncate method.

So, job fails immediately when it’s trying to start, when trying to initialize 
HadoopRecoverableWriter. Because it only works with hadoop fs, greater or 
equals than 2.7

Do you have any plans to adopt recovery for hadoop file systems, that doesn’t 
contain truncate method, or how I can workaround such limitation ?

If workaround does not exist, than the following behaviour will be good enough:

get a path to the file, that should be restored
get a valid-length from the state
create a temporary directory and write stream from the restoring file into tmp 
until the valid-length is not reached
replace the restoring file with the file from tmp catalog
move file to the final state

what do you think about it ?

Sincerely yours,
Rinat Sharipov
Software Engineer at 1DMP CORE Team

email: r.shari...@cleverdata.ru 
mobile: +7 (925) 416-37-26

CleverDATA
make your data clever



Re: Loading state from sink/understanding recovery options

2019-06-17 Thread Eduardo Winpenny Tejedor
Hi Congixian,

I don't use Flink at the moment, I am trying to evaluate its suitability
for my company's purposes by re-writing one of our apps with Flink. We have
apps with similar business logic but different code, despite they do
essentially the same thing. I am new to the streaming paradigms and
concepts so any guidance is appreciated.

These apps consume a Kafka stream of "delta" messages, update the total sum
of the property relevant to them and then send "update" messages unto
another Kafka topic. Furthermore, they also produce an hourly "snapshot"
(i.e. the value of the property at exactly 09:00, 10:00, 11:00...). On a
restart they fully read the output topic, at which point they'll be in the
same state they were before the shutdown, and they then continue reading
from the "delta" topic - this is how we guarantee exactly-once processing.
Please point out if this a "code smell" in the streaming with Flink
paradigm.

Going back to my question, great to hear there's a jira for this! I hope
you can see why it's an attractive idea to avoid the latency incurred by
the checkpointing mechanics, given we're already publishing the meaningful
state of the app.

In the meantime I guess I'll have to use a backend with checkpointing, any
guidelines as to what state backend to use? Any other option I should
consider?

Thanks,
Eduardo

On Fri, 14 Jun 2019, 03:11 Congxian Qiu,  wrote:

> Hi, Eduardo
> Currently, we can't load state from the outside(there is an ongoing
> jira[1] to do this),  in the other word, if you disable checkpoint, and use
> the Kafka/database as your state storage, you should do the deduplication
> things by yourself.
>
> Just curious, which state backend do you use, and how about the latency?
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-43%3A+Savepoint+Connector
> Best,
> Congxian
>
>
> Eduardo Winpenny Tejedor  于2019年6月13日周四
> 下午11:31写道:
>
>> Is it possible someone could comment on this question in either direction
>> please?
>>
>> Thanks,
>> Eduardo
>>
>> On Sat, 8 Jun 2019, 14:10 Eduardo Winpenny Tejedor, <
>> eduardo.winpe...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> In generic terms, if a keyed operator outputs its state into a sink, and
>>> the state of that operator after a restart can be derived from the sink's
>>> contents, can we do just that, as opposed to using checkpointing mechanics?
>>> Would that reduce latency (even if only theoretically)?
>>>
>>> An example: A word counting app with a Kafka source and a Kafka sink
>>> (compacted topic). The input is an unbounded stream of single words and the
>>> output is  a  tuple stream that goes into a Kafka
>>> compacted topic.
>>>
>>> AFAIK Flink can guarantee exactly-once semantics by updating the keyed
>>> state of its counting operator on _every_ event - after a restart it can
>>> then resume from the last checkpoint. But in this case, given the sink
>>> contains exactly the same relevant information as a checkpoint (namely the
>>>  tuples), could we load the state of an operator from our
>>> sink and avoid the latency added by our state backend? If so, how can this
>>> be achieved?
>>>
>>> If we replaced the Kafka sink with a database sink, could we on startup
>>> know which keys a Flink task has been allocated, perform a _single_ query
>>> to the database to load the key_counts and load those into the operator?
>>> How can this be achieved? Instead of a single query you may want to do a
>>> batched query, as long as you're not querying the database once per key.
>>>
>>> Thanks,
>>> Eduardo
>>>
>>