Re: RabbitMQ source does not stop unless message arrives in queue

2021-05-13 Thread John Morrow
Hi Jose, hey Austin!!

I know we were just recently looking at trying to consume a fixed number of 
messages from an RMQ source, process them and output them to an RMQ sink. As a 
naive first attempt at stopping the job when the target number of messaged had 
been processed, we put a counter state in the process function and tried 
throwing an exception when the counter >= the target message count.

The job had:

  *   parallelism: 1
  *   checkpointing: 1000 (1 sec)
  *   restartStrategy: noRestart
  *   prefetchCount: 100

Running it with 150 messages in the input queue and 150 also as the target 
number, at the end the queues had:

  *   output queue - 150
  *   input queue - 50

So it looks like it did transfer all the messages, but some unack'd ones also 
got requeued back at the source so end up as duplicates. I know throwing an 
exception in the Flink job is not the same as triggering a stateful shutdown, 
but it might be hitting similar unack issues.

John


From: Austin Cawley-Edwards 
Sent: Thursday 13 May 2021 16:49
To: Jose Vargas ; John Morrow 

Cc: user 
Subject: Re: RabbitMQ source does not stop unless message arrives in queue

Hey Jose,

Thanks for bringing this up – it indeed sounds like a bug. There is ongoing 
work to update the RMQ source to the new interface, which might address some of 
these issues (or should, if it is not already), tracked in FLINK-20628[1]. 
Would you be able to create a JIRA issue for this, or would you like me to?

At my previous company, we only consumed one Rabbit queue per application, so 
we didn't run into this exactly but did see other weird behavior in the RMQ 
source that could be related. I'm going to cc @John 
Morrow<mailto:johnniemor...@hotmail.com> who might be able to contribute to 
what he's seen working with the source, if he's around. I remember some 
messages not properly being ack'ed during a stateful shutdown via the Ververica 
Platform's stop-with-savepoint functionality that you mention, though that 
might be more related to FLINK-20244[2], perhaps.


Best,
Austin

[1]: https://issues.apache.org/jira/browse/FLINK-20628
[2]: https://issues.apache.org/jira/browse/FLINK-20244

On Thu, May 13, 2021 at 10:23 AM Jose Vargas 
mailto:jose.var...@fiscalnote.com>> wrote:
Hi,

I am using Flink 1.12 to read from and write to a RabbitMQ cluster. Flink's 
RabbitMQ source has some surprising behavior when a stop-with-savepoint request 
is made.

Expected Behavior:
The stop-with-savepoint request stops the job with a FINISHED state.

Actual Behavior:
The stop-with-savepoint request either times out or hangs indefinitely unless a 
message arrives in all the queues that the job consumes from after the 
stop-with-savepoint request is made.


I know that one possible workaround is to send a sentinel value to each of the 
queues consumed by the job that the deserialization schema checks in its 
isEndOfStream method. However, this is somewhat cumbersome and complicates the 
continuous delivery of a Flink job. For example, Ververica Platform will 
trigger a stop-with-savepoint for the user if one of many possible Flink 
configurations for a job are changed. The stop-with-savepoint can then hang 
indefinitely because only some of the RabbitMQ sources will have reached a 
FINISHED state.

I have attached the TaskManager thread dump after the save-with-savepoint 
request was made. Most every thread is either sleeping or waiting around for 
locks to be released, and then there are a handful of threads trying to read 
data from a socket via the com.rabbitmq.client.impl.Frame.readFrom method.

Ideally, once a stop-with-savepoint request is made, the threads trying to read 
data from RabbitMQ would be interrupted so that all RabbitMQ sources would 
reach a FINISHED state.

Regular checkpoints and savepoints complete successfully, it is only the 
stop-with-savepoint request where I see this behavior.


Respectfully,

[https://lh4.googleusercontent.com/cgQ5UKZ_oWF2ip_0c1HOs45h5UE-FQ6Gp561o43FbhJK7zovHLoYRx_PkeotKziAds52CL47siHAhV3N2eIqqsSAwfiZ_5O7fikdoFV1fj4h0UZnh--abrRte86VARCmquCG1w9KMnI]

Jose Vargas

Software Engineer, Data Engineering

E: jose.var...@fiscalnote.com<mailto:jose.var...@fiscalnote.com>

fiscalnote.com<https://www.fiscalnote.com>  |  
info.cq.com<http://www.info.cq.com>  | rollcall.com<https://www.rollcall.com>



Re: Stateful Functions + ML model prediction

2020-10-05 Thread John Morrow
Thanks for the response Gordon, and that FlinkForward presentation - it's been 
very helpful.

I put in a JIRA ticket for it: https://issues.apache.org/jira/browse/FLINK-19507

I did find this page: 
https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.2/io-module/flink-connectors.html
 and there are source/sink connectors for Pulsar 
(https://github.com/streamnative/pulsar-flink) - I'm guessing that's how I 
should approach using Pulsar as an ingress/egress?

Cheers,
John.


From: Tzu-Li (Gordon) Tai 
Sent: Monday 5 October 2020 03:21
To: John Morrow ; user 
Subject: Re: Stateful Functions + ML model prediction

Hi John,

It is definitely possible to use Apache Pulsar with StateFun. Could you open a 
JIRA ticket for that?
It would be nice to see how much interest we can gather on adding that as a new 
IO module, and consider adding native support for Pulsar in future releases.

If you are already using StateFun and want to start using Pulsar as an 
ingress/egress already for current versions, there's also a way to do that 
right now.
If that's the case, please let me know and I'll try to provide some guidelines 
on how to achieve that.

Cheers,
Gordon


On Fri, Oct 2, 2020, 1:38 AM John Morrow 
mailto:johnniemor...@hotmail.com>> wrote:
Hi Flink Users,

I was watching Tzu-Li Tai's talk on stateful functions from Flink Forward 
(https://www.youtube.com/watch?v=tuSylBadNSo) which mentioned that Kafka & 
Kinesis are supported, and looking at 
https://repo.maven.apache.org/maven2/org/apache/flink/ I can see IO packages 
for those two: statefun-kafka-io & statefun-kinesis-io


Is it possible to use Apache Pulsar as a Statefun ingress & egress?

Thanks,
John.

____
From: John Morrow mailto:johnniemor...@hotmail.com>>
Sent: Wednesday 23 September 2020 11:37
To: Igal Shilman mailto:i...@ververica.com>>
Cc: user mailto:user@flink.apache.org>>
Subject: Re: Stateful Functions + ML model prediction

Thanks very much Igal - that sounds like a good solution!

I'm new to StateFun so I'll have to dig into it a bit more, but this sounds 
like a good direction.

Thanks again,
John.


From: Igal Shilman mailto:i...@ververica.com>>
Sent: Wednesday 23 September 2020 09:06
To: John Morrow mailto:johnniemor...@hotmail.com>>
Cc: user mailto:user@flink.apache.org>>
Subject: Re: Stateful Functions + ML model prediction

Hi John,

Thank you for sharing your interesting use case!

Let me start from your second question:
Are stateful functions available to all Flink jobs within a cluster?

Yes, the remote functions are some logic exposed behind an HTTP endpoint, and 
Flink would forward any message addressed to them via an HTTP request.
The way StateFun works is, for every invocation, StateFun would attach the 
necessary context (any previous state for a key, and the message) to the HTTP 
request.
So practically speaking the same remote function can be contacted by different 
Jobs, as the remote functions are effectively stateless.

 Does this sound like a good use case for stateful functions?

The way I would approach this is, I would consider moving the business rules 
and the enrichment to the remote function.
This would:

a) Eliminate the need for a broadcast stream, you can simply deploy a new 
version of the remote function container, as they can be independy restarted 
(without the need to restart the Flink job that contacts them)
b) You can perform the enrichment immediately without going through an 
RichAsyncFunction, as StateFun, by default, invokes many remote functions in 
parallel (but never for the same key)
c) You can contact the remote service that hosts the machine learning model, or 
even load the model in the remote function's process on startup.

So, in kubernetes terms:

1. You would need a set of pods (a deployment) that are able to serve HTTP 
traffic and expose a StateFun endpoint.
2. You would need a separate deployment for Flink that runs a StateFun job
3. The StateFun job would need to know how to contact these pods, so you would 
also need a kubernetes service (or a LoadBalancer) that
balances the requests from (2) to (1).

If you need to change your business rules, or the enrichment logic you can 
simply roll a new version of (1).


Good luck,
Igal.

On Tue, Sep 22, 2020 at 10:22 PM John Morrow 
mailto:johnniemor...@hotmail.com>> wrote:
Hi Flink Users,

I'm using Flink to process a stream of records containing a text field. The 
records are sourced from a message queue, enriched as they flow through the 
pipeline based on business rules and finally written to a database. We're using 
the Ververica platform so it's running on Kubernetes.

The initial business rules were straightforward, e.g. if field X contains a 
certain word then set field Y to a certain value. For the implementation I 
began by looking at 
https://flink.apache.org/news/20

Re: Stateful Functions + ML model prediction

2020-10-01 Thread John Morrow
Hi Flink Users,

I was watching Tzu-Li Tai's talk on stateful functions from Flink Forward 
(https://www.youtube.com/watch?v=tuSylBadNSo) which mentioned that Kafka & 
Kinesis are supported, and looking at 
https://repo.maven.apache.org/maven2/org/apache/flink/ I can see IO packages 
for those two: statefun-kafka-io & statefun-kinesis-io


Is it possible to use Apache Pulsar as a Statefun ingress & egress?

Thanks,
John.

____
From: John Morrow 
Sent: Wednesday 23 September 2020 11:37
To: Igal Shilman 
Cc: user 
Subject: Re: Stateful Functions + ML model prediction

Thanks very much Igal - that sounds like a good solution!

I'm new to StateFun so I'll have to dig into it a bit more, but this sounds 
like a good direction.

Thanks again,
John.


From: Igal Shilman 
Sent: Wednesday 23 September 2020 09:06
To: John Morrow 
Cc: user 
Subject: Re: Stateful Functions + ML model prediction

Hi John,

Thank you for sharing your interesting use case!

Let me start from your second question:
Are stateful functions available to all Flink jobs within a cluster?

Yes, the remote functions are some logic exposed behind an HTTP endpoint, and 
Flink would forward any message addressed to them via an HTTP request.
The way StateFun works is, for every invocation, StateFun would attach the 
necessary context (any previous state for a key, and the message) to the HTTP 
request.
So practically speaking the same remote function can be contacted by different 
Jobs, as the remote functions are effectively stateless.

 Does this sound like a good use case for stateful functions?

The way I would approach this is, I would consider moving the business rules 
and the enrichment to the remote function.
This would:

a) Eliminate the need for a broadcast stream, you can simply deploy a new 
version of the remote function container, as they can be independy restarted 
(without the need to restart the Flink job that contacts them)
b) You can perform the enrichment immediately without going through an 
RichAsyncFunction, as StateFun, by default, invokes many remote functions in 
parallel (but never for the same key)
c) You can contact the remote service that hosts the machine learning model, or 
even load the model in the remote function's process on startup.

So, in kubernetes terms:

1. You would need a set of pods (a deployment) that are able to serve HTTP 
traffic and expose a StateFun endpoint.
2. You would need a separate deployment for Flink that runs a StateFun job
3. The StateFun job would need to know how to contact these pods, so you would 
also need a kubernetes service (or a LoadBalancer) that
balances the requests from (2) to (1).

If you need to change your business rules, or the enrichment logic you can 
simply roll a new version of (1).


Good luck,
Igal.

On Tue, Sep 22, 2020 at 10:22 PM John Morrow 
mailto:johnniemor...@hotmail.com>> wrote:
Hi Flink Users,

I'm using Flink to process a stream of records containing a text field. The 
records are sourced from a message queue, enriched as they flow through the 
pipeline based on business rules and finally written to a database. We're using 
the Ververica platform so it's running on Kubernetes.

The initial business rules were straightforward, e.g. if field X contains a 
certain word then set field Y to a certain value. For the implementation I 
began by looking at 
https://flink.apache.org/news/2020/01/15/demo-fraud-detection.html for 
inspiration. I ended up implementing a business rule as a Java class with a 
match-predicate & an action. The records enter the pipeline on a data stream 
which is joined with the rules in a broadcast stream and a ProcessFunction 
checks each record to see if it matches any rule predicates. If the record 
doesn't match any business rule predicates it continues on in the pipeline. If 
the record does match one or more business rule predicates it is sent to a side 
output with the list of business rules that it matched. The side output data 
stream goes through a RichAsyncFunction which loops through the matched rules 
and applies each one's action to the record. At the end, that enriched 
side-output record stream is unioned back with the non-enriched record stream. 
This all worked fine.

I have some new business rules which are more complicated and require sending 
the record's text field to different pre-trained NLP models for prediction, 
e.g. if a model predicts the text language is X then update field Y to that 
value, if another model predicts the sentiment is positive then set some other 
field to another value. I'm planning on using seldon-core to serve these 
pre-trained models, so they'll also be available in the k8s cluster.

I'm not sure about the best way to set up these model prediction calls in 
Flink. I could add in a new ProcessFunction in my pipeline before my existing 
enrichment-rule-predicate ProcessFunction and have it send the text to e

Re: Stateful Functions + ML model prediction

2020-09-23 Thread John Morrow
Thanks very much Igal - that sounds like a good solution!

I'm new to StateFun so I'll have to dig into it a bit more, but this sounds 
like a good direction.

Thanks again,
John.


From: Igal Shilman 
Sent: Wednesday 23 September 2020 09:06
To: John Morrow 
Cc: user 
Subject: Re: Stateful Functions + ML model prediction

Hi John,

Thank you for sharing your interesting use case!

Let me start from your second question:
Are stateful functions available to all Flink jobs within a cluster?

Yes, the remote functions are some logic exposed behind an HTTP endpoint, and 
Flink would forward any message addressed to them via an HTTP request.
The way StateFun works is, for every invocation, StateFun would attach the 
necessary context (any previous state for a key, and the message) to the HTTP 
request.
So practically speaking the same remote function can be contacted by different 
Jobs, as the remote functions are effectively stateless.

 Does this sound like a good use case for stateful functions?

The way I would approach this is, I would consider moving the business rules 
and the enrichment to the remote function.
This would:

a) Eliminate the need for a broadcast stream, you can simply deploy a new 
version of the remote function container, as they can be independy restarted 
(without the need to restart the Flink job that contacts them)
b) You can perform the enrichment immediately without going through an 
RichAsyncFunction, as StateFun, by default, invokes many remote functions in 
parallel (but never for the same key)
c) You can contact the remote service that hosts the machine learning model, or 
even load the model in the remote function's process on startup.

So, in kubernetes terms:

1. You would need a set of pods (a deployment) that are able to serve HTTP 
traffic and expose a StateFun endpoint.
2. You would need a separate deployment for Flink that runs a StateFun job
3. The StateFun job would need to know how to contact these pods, so you would 
also need a kubernetes service (or a LoadBalancer) that
balances the requests from (2) to (1).

If you need to change your business rules, or the enrichment logic you can 
simply roll a new version of (1).


Good luck,
Igal.

On Tue, Sep 22, 2020 at 10:22 PM John Morrow 
mailto:johnniemor...@hotmail.com>> wrote:
Hi Flink Users,

I'm using Flink to process a stream of records containing a text field. The 
records are sourced from a message queue, enriched as they flow through the 
pipeline based on business rules and finally written to a database. We're using 
the Ververica platform so it's running on Kubernetes.

The initial business rules were straightforward, e.g. if field X contains a 
certain word then set field Y to a certain value. For the implementation I 
began by looking at 
https://flink.apache.org/news/2020/01/15/demo-fraud-detection.html for 
inspiration. I ended up implementing a business rule as a Java class with a 
match-predicate & an action. The records enter the pipeline on a data stream 
which is joined with the rules in a broadcast stream and a ProcessFunction 
checks each record to see if it matches any rule predicates. If the record 
doesn't match any business rule predicates it continues on in the pipeline. If 
the record does match one or more business rule predicates it is sent to a side 
output with the list of business rules that it matched. The side output data 
stream goes through a RichAsyncFunction which loops through the matched rules 
and applies each one's action to the record. At the end, that enriched 
side-output record stream is unioned back with the non-enriched record stream. 
This all worked fine.

I have some new business rules which are more complicated and require sending 
the record's text field to different pre-trained NLP models for prediction, 
e.g. if a model predicts the text language is X then update field Y to that 
value, if another model predicts the sentiment is positive then set some other 
field to another value. I'm planning on using seldon-core to serve these 
pre-trained models, so they'll also be available in the k8s cluster.

I'm not sure about the best way to set up these model prediction calls in 
Flink. I could add in a new ProcessFunction in my pipeline before my existing 
enrichment-rule-predicate ProcessFunction and have it send the text to each of 
the prediction models and add the results for each one to the record so it's 
available for the enrichment step. The downside of this is that in the future 
I'm anticipating having more and more models, and not necessarily wanting to 
send each record to every model for prediction. e.g. I might have a business 
rule which says if the author of the text is X then get the sentiment (via the 
sentiment model) and update field Z, so it would be a waste of time doing that 
for all records.

I had a look at stateful functions. There's an example in the 
statefun.io<http://statefun.io> overview wh

Stateful Functions + ML model prediction

2020-09-22 Thread John Morrow
Hi Flink Users,

I'm using Flink to process a stream of records containing a text field. The 
records are sourced from a message queue, enriched as they flow through the 
pipeline based on business rules and finally written to a database. We're using 
the Ververica platform so it's running on Kubernetes.

The initial business rules were straightforward, e.g. if field X contains a 
certain word then set field Y to a certain value. For the implementation I 
began by looking at 
https://flink.apache.org/news/2020/01/15/demo-fraud-detection.html for 
inspiration. I ended up implementing a business rule as a Java class with a 
match-predicate & an action. The records enter the pipeline on a data stream 
which is joined with the rules in a broadcast stream and a ProcessFunction 
checks each record to see if it matches any rule predicates. If the record 
doesn't match any business rule predicates it continues on in the pipeline. If 
the record does match one or more business rule predicates it is sent to a side 
output with the list of business rules that it matched. The side output data 
stream goes through a RichAsyncFunction which loops through the matched rules 
and applies each one's action to the record. At the end, that enriched 
side-output record stream is unioned back with the non-enriched record stream. 
This all worked fine.

I have some new business rules which are more complicated and require sending 
the record's text field to different pre-trained NLP models for prediction, 
e.g. if a model predicts the text language is X then update field Y to that 
value, if another model predicts the sentiment is positive then set some other 
field to another value. I'm planning on using seldon-core to serve these 
pre-trained models, so they'll also be available in the k8s cluster.

I'm not sure about the best way to set up these model prediction calls in 
Flink. I could add in a new ProcessFunction in my pipeline before my existing 
enrichment-rule-predicate ProcessFunction and have it send the text to each of 
the prediction models and add the results for each one to the record so it's 
available for the enrichment step. The downside of this is that in the future 
I'm anticipating having more and more models, and not necessarily wanting to 
send each record to every model for prediction. e.g. I might have a business 
rule which says if the author of the text is X then get the sentiment (via the 
sentiment model) and update field Z, so it would be a waste of time doing that 
for all records.

I had a look at stateful functions. There's an example in the statefun.io 
overview which shows having a stateful function for doing a fraud model 
prediction based on if an account has had X number of frauds detected in the 
last 30 days, so the key for the state is an account number. In my case, these 
model predictions don't really have any state - they just take input and return 
a prediction, they're more like a stateless lambda function. Also, I was 
wondering if I implemented these as stateful functions would I be able to make 
them available to other Flink jobs within the cluster, as opposed to having 
them as individual RichAsyncFunctions defined within a single Flink job and 
only available to that. The last thing which made stateful functions sound good 
was that at the moment all my business rules happen to be orthogonal, but I can 
imagine in the future where I might want one rule to be based on another one, 
and whereas regular dataflows have to be an acyclic graph stateful functions 
could support that.

So, in summary:

  * Does this sound like a good use case for stateful functions?
  * Are stateful functions available to all Flink jobs within a cluster?


Thanks,
John.




Re: RichAsyncFunction + BroadcastProcessFunction

2020-03-17 Thread John Morrow
Hi Gordon,

That sounds good. My first thought was that if I have to break up the logic I'd 
end up with:

BroadcastFunction1 --> AsyncFunction --> BroadcastFunction2

...with Broadcast1 & BroadcastFunction2 needing the same broadcast state, and 
that state could change while an item is being processed through the chain. But 
I could leave a marker to do the call like you suggested and have a placeholder 
for the result and that might do the trick.

Thanks again for the suggestion! Below is a sudo-code example of how I think 
I'll be able to get it to work in case it's helpful for anyone else.

Cheers,
John.



Function:

processElement(item) { //BroadcastFunction

  if (broadcastState.checkInventoryLevel) {
  long inventoryLevel = http://get_the_inventory_level(item.id) // Zz
  if (item.inventory < X) {
ctx.output("reorder-outputTag", item)
  }
  }
...
  item.status = "checked";
  collect(item);
}


 ---> broken down in to functions A, B & C


FunctionA:

processElement(item) { //BroadcastFunction

  if (broadcastState.checkInventoryLevel) {
  collect(Tuple2(item, True))
  item.inventoryLevel = http://get_the_inventory(item.id)
  } else {
  collect(Tuple2(item, False))
  }


FunctionB:

asyncInvoke(Tuple2) { //AsyncFunction

  if (needsInventory)
  item.inventoryLevel = http://get_the_inventory(item.id)
  }
  collect(item);


FunctionC:

processElement(item) { //FlatMapFunction

  if (item.inventory != null && item.inventory < X) {
ctx.output("reorder-outputTag", item)
}
  item.status = "checked";
  collect(item);
}



From: Tzu-Li (Gordon) Tai 
Sent: Tuesday 17 March 2020 10:05
To: user@flink.apache.org 
Subject: Re: RichAsyncFunction + BroadcastProcessFunction

Hi John,

Have you considered letting the BroadcastProcessFunction output events that
indicate extra external HTTP requests needs to be performed, and have them
consumed by a downstream async IO operator to complete the HTTP request?
That could work depending on what exactly you need to do in your specific
case.

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


RichAsyncFunction + BroadcastProcessFunction

2020-03-17 Thread John Morrow
Hi Flink Users,

I have a BroadcastProcessFunction and in the processElement method I sometimes 
need to do some http requests, depending on the broadcast state.

Because I'm doing http requests, I'd prefer the function to be async, like 
RichAsyncFunction.asyncInvoke(), but RichAsyncFunction doesn't support 
broadcast data.

Is there any way to combine the functionality of a RichAsyncFunction + a 
BroadcastProcessFunction?

Thanks!
John.


Re: MiniCluster with ProcessingTimeTrigger

2019-12-18 Thread John Morrow
Thanks Biao!

I tried slowing down the input stream by replacing the env.fromCollection() 
with a custom SourceFunction (below) which drip feeds the data a bit slower. By 
the way, in my real scenario the datasource for the pipeline will be a RabbitMQ 
source.


I do get better results, but it seems like a timing issue still exists:

  +-- StreamTest [OK]
  | +-- testPipelineWithProcessingTimeTrigger() 10480 ms [X] expected: <10> but 
was: <6>
  | '-- testPipelineWithCountTrigger() [OK]


I can probably play around with the window-size & sleep times below and get my 
tests to pass but I'm more concerned if there's a potential race 
condition/coordination step, outside of the MiniCluster test environment, that 
I should be dealing with.

My pipeline is broken into two parts: pipeA & pipeB. I've done this because the 
2nd part has an async operator so it needs to be at the start of a chain. For a 
non-MiniCluster environment, would it be possible for records to flow through 
pipeA and not reach pipeB, as I'm seeing with the MiniCluster? i.e. is there's 
something I need to do to explicitly connect/sync pipeA & pipeB before calling 
env.execute(), besides the fact that:

pipeB = AsyncDataStream.unorderedWait(pipeA, ...


Thanks!
John.




public class StreamTest {

  private static class DripFeed extends RichSourceFunction {

private volatile boolean isRunning = false;
private final int inputSize;

public DripFeed(int inputSize) {
  this.inputSize = inputSize;
}

@Override
public void open(Configuration parameters) {
  isRunning = true;
}

@Override
public void run(SourceContext ctx) throws Exception {
  List listOfNumbers = IntStream.rangeClosed(1, 
inputSize).boxed().collect(Collectors.toList());
  Iterator iterator = listOfNumbers.iterator();
  while (isRunning && iterator.hasNext()) {
try {
  Thread.sleep(100L);
} catch (InterruptedException e) {
  System.out.println();
}
ctx.collect(iterator.next());
  }
  try {
Thread.sleep(1000L);
  } catch (InterruptedException e) {
System.out.println();
  }
}

@Override
public void cancel() {
  isRunning = false;
}

  }

  @Test // :)
  @Tag("unit")
  public void testPipelineWithCountTrigger() throws Exception {
runPipeline(10, CountTrigger.of(10));
  }

  @Test // :(
  @Tag("unit")
  public void testPipelineWithProcessingTimeTrigger() throws Exception {
runPipeline(10, ProcessingTimeTrigger.create());
  }


  private void runPipeline(int inputSize, Trigger trigger) 
throws Exception {

MiniClusterWithClientResource miniCluster = new 
MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setNumberSlotsPerTaskManager(1)
.setNumberTaskManagers(1)
.setShutdownTimeout(org.apache.flink.api.common.time.Time.of(1, 
TimeUnit.DAYS))
.build()
);
miniCluster.before();

StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
CollectSink.values.clear();

List listOfNumbers = IntStream.rangeClosed(1, 
inputSize).boxed().collect(Collectors.toList());

// 1st half of pipeline
//DataStream> pipeA = env.fromCollection(listOfNumbers)
DataStream> pipeA = env.addSource(new 
StreamTest.DripFeed(inputSize))
.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(2)))

...(same as before...)




From: Biao Liu 
Sent: Tuesday 17 December 2019 21:50
To: John Morrow 
Cc: user 
Subject: Re: MiniCluster with ProcessingTimeTrigger

Hi John,

The root cause is the collection source exits too fast. The window would also 
exit without being triggered.

You could verify that by waiting a second before releasing the window. For 
example, insert a map operator between source and window operator. Blocking a 
second or more in the "close" method of this map operator. You will see the 
window would work well.

Thanks,
Biao /'bɪ.aʊ/



On Wed, 18 Dec 2019 at 06:24, John Morrow 
mailto:johnniemor...@hotmail.com>> wrote:
Hi All,

I'm trying to test a pipeline that consists of two Flink tasks with a 
MiniCluster. The 1st task has a WindowAll operator which groups items into 
batches every second, and the 2nd task does an async operation with each batch 
and flatMaps the result.

I've whittled it down to the bare bones below. There are two tests:

  *   testPipelineWithCountTrigger - this one works fine 
  *   testPipelineWithProcessingTimeTrigger - this one doesn't give any output 

It seems like a timing issue. If I step through the failing one slowly I can 
see that the ProcessingTimeTrigger's onElement/onProcessingTime/clear methods 
do get called, and the asyncInvoke method also gets called, but when I run it 
the 2nd test fails as it produces no outp

MiniCluster with ProcessingTimeTrigger

2019-12-17 Thread John Morrow
Hi All,

I'm trying to test a pipeline that consists of two Flink tasks with a 
MiniCluster. The 1st task has a WindowAll operator which groups items into 
batches every second, and the 2nd task does an async operation with each batch 
and flatMaps the result.

I've whittled it down to the bare bones below. There are two tests:

  *   testPipelineWithCountTrigger - this one works fine 
  *   testPipelineWithProcessingTimeTrigger - this one doesn't give any output 

It seems like a timing issue. If I step through the failing one slowly I can 
see that the ProcessingTimeTrigger's onElement/onProcessingTime/clear methods 
do get called, and the asyncInvoke method also gets called, but when I run it 
the 2nd test fails as it produces no output. I've tried setting the MiniCluster 
timeout to 1 day, the same with my AsyncUDF timeout, and sleeping for 3 * 
window after env.execute but no difference. I'm running this with Flink 1.9.0 
and OpenJDK8 on Ubuntu (build 1.8.0_222-8u222-b10-1ubuntu1~18.04.1-b10).


Any idea how I can get the 2nd test to wait to process the output?


Thanks 

John.






import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import 
org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import 
org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.Collector;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.StreamSupport;

import static org.junit.jupiter.api.Assertions.assertEquals;


public class StreamTest {

  @Test // :)
  @Tag("unit")
  public void testPipelineWithCountTrigger() throws Exception {
runPipeline(10, CountTrigger.of(10));
  }

  @Test // :(
  @Tag("unit")
  public void testPipelineWithProcessingTimeTrigger() throws Exception {
runPipeline(10, ProcessingTimeTrigger.create());
  }


  private void runPipeline(int inputSize, Trigger trigger) 
throws Exception {

MiniClusterWithClientResource miniCluster = new 
MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setNumberSlotsPerTaskManager(1)
.setNumberTaskManagers(1)
.setShutdownTimeout(org.apache.flink.api.common.time.Time.of(1, 
TimeUnit.DAYS))
.build()
);
miniCluster.before();

StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
CollectSink.values.clear();

List listOfNumbers = IntStream.rangeClosed(1, 
inputSize).boxed().collect(Collectors.toList());

// 1st half of pipeline
DataStream> pipeA = env.fromCollection(listOfNumbers)
.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(1)))
.trigger(trigger)
.process(new Batcher());

// 2nd half of pipeline
DataStream pipeB = AsyncDataStream.unorderedWait(pipeA, new 
AsyncUDF(), 1L, TimeUnit.DAYS, 1 )
.flatMap((List records, Collector out) -> 
records.forEach(out::collect)).returns(Types.INT);
pipeB.addSink(new CollectSink());

env.execute();

try {
  Thread.sleep(1000L * 3);
} catch (InterruptedException e) {
  System.out.println();
}
miniCluster.after();

assertEquals(inputSize, CollectSink.values.size());
  }


  public static class Batcher extends ProcessAllWindowFunction, TimeWindow> {
@Override
public void process(Context context, Iterable elements, 
Collector> out) throws Exception {
  out.collect(StreamSupport.stream(elements.spliterator(), 
false).collect(Collectors.toList()));
}
  }

  private static class AsyncUDF extends RichAsyncFunction, 
List> {

private CompletableFuture> doAsyncStuff(List value) {
  return CompletableFuture.supplyAsync(() -> {
try {
  Thread.sleep(100);
} catch (InterruptedException e) {