Streaming Exception

2017-03-09 Thread Govindarajan Srinivasaraghavan
Hi All,

I see the below error after running my streaming job for a while and when
the load increases. After a while the task manager becomes completely dead
and the job keeps on restarting.

Also when I checked if there is an back pressure in the UI, it kept on
saying sampling in progress and no results were displayed. Is there an API
which can provide the back pressure details?

2017-03-10 01:40:58,793 WARN
 org.apache.flink.streaming.api.operators.AbstractStreamOperator  - Error
while emitting latency marker.
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
Could not forward element to next operator
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitLatencyMarker(OperatorChain.java:426)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitLatencyMarker(AbstractStreamOperator.java:848)
at
org.apache.flink.streaming.api.operators.StreamSource$LatencyMarksEmitter$1.onProcessingTime(StreamSource.java:152)
at
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:256)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
 at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException
at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitLatencyMarker(RecordWriterOutput.java:117)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitLatencyMarker(AbstractStreamOperator.java:848)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.reportOrForwardLatencyMarker(AbstractStreamOperator.java:708)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processLatencyMarker(AbstractStreamOperator.java:690)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitLatencyMarker(OperatorChain.java:423)
... 10 more
Caused by: java.lang.InterruptedException
at java.lang.Object.wait(Native Method)
at
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:168)
at
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:138)
at
org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:132)
at
org.apache.flink.runtime.io.network.api.writer.RecordWriter.randomEmit(RecordWriter.java:107)
at
org.apache.flink.streaming.runtime.io.StreamRecordWriter.randomEmit(StreamRecordWriter.java:104)
at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitLatencyMarker(RecordWriterOutput.java:114)
... 14 more


questions on custom state with flink window

2017-03-09 Thread saiprasad mishra
Hi All
I have few questions on understanding state with flink

1) Is it advisable to create custom state within a RichWindowFunction
function.
I am able to create it but

2) If I create state in the window function then does the state remain for
ever as I want the state to stay for ever

3) Also one thing I am noticing is the custom valuestate is not queryable
from the query client and it results in the following exception

org.apache.flink.runtime.query.UnknownKvStateLocation: No KvStateLocation
4) is it possible to list all the state stores for a job Id, I did not see
it in the client api

Regards
Sai


Flink - Writing Test Case for the Datastream

2017-03-09 Thread MAHESH KUMAR
Hi Team,

I am trying to write test cases to check whether the job is getting
executed as desired. I am using the Flink test util. I am trying to do a
end to end testing where Flink reads from a Kafka Queue, does some
processing and then writes the output to another topic of the Kafka Queue.
My objective is to read the message from the output topic and check if it
has the same message as expected.

I have got Zookeeper and Kafka configured for the test. When I start the
Flink Job, it never terminates since it's source is a Kafka Source. Is
there a way to run a job for a specific interval of time or how do I go
about testing this scenario. Is there any documentation/example for running
test cases such as these?

My code currently looks something like this:

class StreamingMultipleTest extends StreamingMultipleProgramsTestBase
{

@Before def initialize() = {
// Start Kafka, Zookeeper
// Call the run method of the Flink Class - FlinkClass.run()  // This class
contains the env.execute()

// My code does not execute any further since the previous call is never
returned.
}

@Test def Test1() = {
// Check if the Output Topic of the Kafka Queue is as expected -
AssertStatement

}

@After def closeServices() = {
// Stop Zookeeper and Kafka
}

}


Thanks and Regards,
Mahesh

-- 

Mahesh Kumar Ravindranathan
Data Streaming Engineer
Oracle Marketing Cloud - Social Platform
Contact No:+1(720)492-4445


Re: TTL for State Entries / FLINK-3089

2017-03-09 Thread Johannes Schulte
Hey Aljoscha,

thank you for your reply. The amount and quality of response on this list
are really great to see and a good way to learn.

I will try this and see how this works out.

Cheers,

Johannes

On Thu, Mar 9, 2017 at 3:55 PM, Aljoscha Krettek 
wrote:

> Hi Johannes,
> I think what you can do is not register a timer for every event but for
> every key, with a certain granularity. When that timer fires you check
> what you want to clean up for that key and maybe register another timer
> for the future. This way, the size of your timer state is bounded by
> your key cardinality and I think people have used Flink with
> timers/windows with key cardinalities of several 100 millions.
>
> Best,
> Aljoscha
>
> On Wed, Mar 8, 2017, at 14:37, Ufuk Celebi wrote:
> > Looping in Aljoscha and Kostas who are the expert on this. :-)
> >
> > On Mon, Mar 6, 2017 at 6:06 PM, Johannes Schulte
> >  wrote:
> > > Hi,
> > >
> > > I am trying to achieve a stream-to-stream join with big windows and are
> > > searching for a way to clean up state of old keys. I am already using a
> > > RichCoProcessFunction
> > >
> > > I found there is already an existing ticket
> > >
> > > https://issues.apache.org/jira/browse/FLINK-3089
> > >
> > > but I have doubts that a registration of a timer for every incoming
> event is
> > > feasible as the timers seem to reside in an in-memory queue.
> > >
> > > The task is somewhat similar to the following blog post:
> > > http://devblog.mediamath.com/real-time-streaming-
> attribution-using-apache-flink
> > >
> > > Is the implementation of a custom window operator a necessity for
> achieving
> > > such functionality
> > >
> > > Thanks a lot,
> > >
> > > Johannes
> > >
> > >
>


Re: window function not working when control stream broadcast

2017-03-09 Thread Sam Huang
Hi Aljoscha,

Here's the code:

private static class DataFilterFunImpl extends
RichCoFlatMapFunction {
private JSONParser parser;
private Map>
whiteListMap = new HashMap<>();

@Override
// tuple5(domain, device_type, type, key, count_or_sum)
public void flatMap1(KVTuple6 dataTuple, Collector
collector) throws Exception {
String type = dataTuple.f2;
String[] keyValue =
dataTuple.f3.split(RawEventExtractor.Constants.DEFAULT_VALUE_SP);
String key = keyValue[0];
switch (type) {
case RawEventExtractor.Constants.VALUE_COUNT: {
if (whiteListMap.containsKey(key)) {
ControlJsonConfig ruleConfig =
whiteListMap.get(key).get(RawEventExtractor.Constants.VALUE_COUNT);
if (ruleConfig != null) {
String value = keyValue.length > 1 ?
keyValue[1] : "";
String bucket = ruleConfig.getBucketName(value);
if (bucket != null) {

dataTuple.setField(String.join(RawEventExtractor.Constants.DEFAULT_VALUE_SP,
key, bucket), 3);
collector.collect(dataTuple);
}
} else {
collector.collect(dataTuple);
}
}
break;
}
case RawEventExtractor.Constants.VALUE_SUM: {
if (whiteListMap.containsKey(key) &&
whiteListMap.get(key).containsKey(RawEventExtractor.Constants.VALUE_SUM))
{
collector.collect(dataTuple);
}
break;
}
default: collector.collect(dataTuple);
}
}


@Override
public void flatMap2(String jsonStr, Collector
collector) throws Exception {
//Map> whiteListMap
= whiteListMapState.value();
try {
if (parser == null) {
 parser = new JSONParser();
}
JSONObject jsonConfig = (JSONObject) parser.parse(jsonStr);
Tuple2> config
= RawEventExtractor.getKeyConfig(jsonConfig);
if (config.f1 == null) {
whiteListMap.remove(config.f0);
} else {
whiteListMap.put(config.f0, config.f1);
}
} catch (Exception e) {}
}
}


FYI, if I setParallelism of both the control stream and data stream, the
window function works. Is it necessary to do so for broadcast() function?


On Thu, Mar 9, 2017 at 2:26 AM, Aljoscha Krettek 
wrote:

> Hi Sam,
> could you please also send the code for the DataFilterFunImpl and your
> timestamps/watermark assigner. That could help in figuring out the problem.
>
> Best,
> Aljoscha
>
>
> On Wed, Mar 8, 2017, at 19:56, Sam Huang wrote:
>
> Hi Timo,
>
> The window function sinks the data into InfluxDB, and it's not triggered.
> If I comment the ".timeWindow", and print results after the reduce
> function, it works
> Code for window function is here:
>
> private static class WindowFunImpl implements 
> WindowFunction {
> @Override
> public void apply(Tuple tuple, TimeWindow window, Iterable 
> iterable,
>   Collector collector) throws Exception {
> KVTuple6 kvTypeTuple = iterable.iterator().next();
> System.*out*.println("window: " + kvTypeTuple); 
> // Doesn't work here if use broadcast
> Point.Builder builder = Point.*measurement*(*INFLUXDB_MEASUREMENT*)
> .time(window.getStart(), TimeUnit.*MILLISECONDS*)
> .tag(*TAG_DOMAIN*, kvTypeTuple.f0)
> .tag(*TAG_DEVICE*, kvTypeTuple.f1)
> .tag(*TAG_TYPE*, kvTypeTuple.f2)
> .tag(*TAG_KEY*, kvTypeTuple.f3)
> .addField(*FIELD*, kvTypeTuple.f4);
>
> collector.collect(builder.build());
> }
> }
>
>
> On Wed, Mar 8, 2017 at 1:10 AM, Timo Walther  wrote:
>
> Hi Sam,
>
> could you explain the behavior a bit more? How does the window function
> behave? Is it not triggered or what is the content? What is the result if
> you don't use a window function?
>
> Timo
>
>
> Am 08/03/17 um 02:59 schrieb Sam Huang:
>
> btw, the reduce function works well, I've printed out the data, and they
> are
> all correct. So are the timestamps and watermarks. And if I remove
> ".broadcast()", the data is successfully sinked.
>
> Any help?
>
>
>
> --
> View this message in context: http://apache-flink-user-maili
> ng-list-archive.2336050.n4.nabble.com/window-function-not-
> 

Flink Standalone Service

2017-03-09 Thread Daniel Skates
Hi all,

Is there a init.d or similar service script for Flink on Redhat (or Centos)
7?  Mostly I'm just wanting to make sure when my server restarts, Flink
starts up again, but being able to check the status would also be awesome.

Cheers,

Daniel


Re: Issues with Event Time and Kafka

2017-03-09 Thread ext.eformichella
Thanks for the suggestion, we can definitely try that out.

My one concern there is that events technically can lag for days or even
months in some cases, but we only care about including the events that lag
for 30 minutes or so, and would like the further lagging events to be
ignored - I just want to make sure that doesn't require special handling.

I also just want to make sure I'm understanding the maximum lateness
watermark correctly. Suppose a watermark gets generated, and then an
element with an older timestamp is found. My understanding was that that
element should be ignored, but from our results it looks like the late
element actually overwrites the aggregate of the on-time elements. Is this
expected behavior?

Thank you for your help!
-Ethan

On Tue, Mar 7, 2017 at 6:01 PM, Dawid Wysakowicz [via Apache Flink User
Mailing List archive.]  wrote:

> Hi Ethan,
>
> I believe then it is because the Watermark and Timestamps in your
> implementation are uncorrelated. What Watermark really is a marker that
> says there will be no elements with timestamp smaller than the value of
> this watermark. For more info on the concept see [1]
> 
> .
>
> In your case as you say that events can "lag" for 30 minutes, you should
> try the BoundedOutOfOrdernessTimestampExtractor. It is designed exactly
> for a case like yours.
>
> Regards,
> Dawid
>
> 2017-03-07 22:33 GMT+01:00 ext.eformichella <[hidden email]
> >:
>
>> Hi Dawid, I'm working with Max on the project
>> Our code for the TimestampAndWatermarkAssigner is:
>> ```
>> class TimestampAndWatermarkAssigner(val maxLateness: Long) extends
>> AssignerWithPeriodicWatermarks[Row] {
>>
>>   override def extractTimestamp(element: Row, previousElementTimestamp:
>> Long): Long = {
>> element.minTime
>>   }
>>
>>   override def getCurrentWatermark(): Watermark = {
>> new Watermark(System.currentTimeMillis() - maxLateness)
>>   }
>> }
>> ```
>>
>> Where Row is a class representing the incoming JSON object coming from
>> Kafka, which includes the timestamp
>>
>> Thanks,
>> -Ethan
>>
>>
>>
>> --
>> View this message in context: http://apache-flink-user-maili
>> ng-list-archive.2336050.n4.nabble.com/Issues-with-Event-
>> Time-and-Kafka-tp12061p12090.html
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive at Nabble.com.
>>
>
>
>
> --
> If you reply to this email, your message will be added to the discussion
> below:
> http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/Issues-with-Event-Time-and-Kafka-tp12061p12092.html
> To unsubscribe from Issues with Event Time and Kafka, click here
> 
> .
> NAML
> 
>




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Issues-with-Event-Time-and-Kafka-tp12061p12139.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Re: Flink streaming - call external API "after" sink

2017-03-09 Thread Aljoscha Krettek
Hi,

this is the second time that something like this is being requested or
proposed. This was the first time: [1].


+Seth, who might have an opinion on this.



I'm starting to think that we might need to generalise this pattern.
Right now, the SinkFunction interface is this:
public interface SinkFunction extends Function, Serializable {

/**

 * Function for standard sink behaviour. This function is called

 * for every record.

 */

void invoke(IN value) throws Exception;

}



The interface for FlatMapFunction is this:

public interface FlatMapFunction extends Function, Serializable {


/**

 * The core method of the FlatMapFunction. Takes an element from the
 * input data set and transforms

 * it into zero, one, or more elements.

void flatMap(T value, Collector out) throws Exception;

}



The only difference is naming and the fact that FlatMapFunction can emit
elements. All SinkFunction implementations could be implemented as a
FlatMapFunction, so we might not even need a special
SinkFunction in the end and all sinks can become "sinks that can also
forward data".


For an example of a system that does it like this you can look at Apache
Beam, where there are no special Sink Functions. Everything is just
DoFns (basically a very powerful FlatMapFunction) stringed together. For
example there is a file "sink" that consists of three DoFns: one does
some initialisation and sends forward write handles, the second DoFn
does the writing and forwards handles to the written data, the third one
finalises by renaming the written files to the final location.


Best,

Aljoscha



[1] 
https://lists.apache.org/thread.html/bfa811892f8bd5d87d47a4597b60ab2f4aee0a8e7d6379b3d6d9d7b3@%3Cdev.flink.apache.org%3E




On Thu, Mar 9, 2017, at 16:56, Tarandeep Singh wrote:

> Hi,

> 

> I am using flink-1.2 streaming API to process clickstream and compute
> some results per cookie. The computed results are stored in Cassandra
> using flink-cassandra connector. After a result is stored in
> cassandra, I want to notify an external system (using their API or via
> Kafka) that result is available (for one cookie).
> 

> Can this be done (with/without modifying sink source code)?

> 

> What if I create a JointSink that internally uses cassandra sink and
> kafka sink and writes to both places? I am not worried about same
> record written multiple times as the computed result and the external
> system consumption is idempotent.
> 

> Thank you,

> Tarandeep




Flink streaming - call external API "after" sink

2017-03-09 Thread Tarandeep Singh
Hi,

I am using flink-1.2 streaming API to process clickstream and compute some
results per cookie. The computed results are stored in Cassandra using
flink-cassandra connector. After a result is stored in cassandra, I want to
notify an external system (using their API or via Kafka) that result is
available (for one cookie).

Can this be done (with/without modifying sink source code)?

What if I create a JointSink that internally uses cassandra sink and kafka
sink and writes to both places? I am not worried about same record written
multiple times as the computed result and the external system consumption
is idempotent.

Thank you,
Tarandeep


Re: Appropriate State to use to buffer events in ProcessFunction

2017-03-09 Thread Yassine MARZOUGUI
Hi Timo,

I thought about the ListState but quickly discarded It as it keeps the
insersion order and not events order. After a second thought I think I will
reconsider it since my events are occaionally out-of-order. Didn't know
that Flink CEP operators 'next' and 'within', can handle event time, so I
think I will give it a try! Thank you!

Best,
Yassine

2017-03-08 9:55 GMT+01:00 Timo Walther :

> Hi Yassine,
>
> have you thought about using a ListState? As far as I know, it keeps at
> least the insertion order. You could sort it once your trigger event has
> arrived.
> If you use a RocksDB as state backend, 100+ GB of state should not be a
> problem. Have you thought about using Flink's CEP library? It might fit to
> your needs without implementing a custom process function.
>
> I hope that helps.
>
> Timo
>
>
> Am 07/03/17 um 19:23 schrieb Yassine MARZOUGUI:
>
> Hi all,
>>
>> I want to label events in a stream based on a condition on some future
>> events.
>> For example my stream contains events of type A and B and and I would
>> like to assign a label 1 to an event E of type A if an event of type B
>> happens within a duration x of E. I am using event time and my events can
>> be out of order.
>> For this I'm using ProcessFunction which looks suitable for my use case.
>> In order to handle out of order events, I'm keeping events of type A in a
>> state and once an event of type B is received, I fire an event time timer
>> in which I loop through events of type A in the state having a timestamps <
>> timer.timestamp, label them and remove them from the state.
>> Currently the state is simply a value state containing a
>> TreeMap. I'm keeping events sorted in order to
>> effectively get events older than the timer timestamp.
>> I wonder If that's the appropriate data structure to use in the value
>> state to buffer events and be able to handle out of orderness, or if there
>> is a more effective implementation, especially that the state may grow to
>> reach ~100 GB sometimes?
>>
>> Any insight is appreciated.
>>
>> Thanks,
>> Yassine
>>
>>
>>
>>
>


Re: TTL for State Entries / FLINK-3089

2017-03-09 Thread Aljoscha Krettek
Hi Johannes,
I think what you can do is not register a timer for every event but for
every key, with a certain granularity. When that timer fires you check
what you want to clean up for that key and maybe register another timer
for the future. This way, the size of your timer state is bounded by
your key cardinality and I think people have used Flink with
timers/windows with key cardinalities of several 100 millions.

Best,
Aljoscha

On Wed, Mar 8, 2017, at 14:37, Ufuk Celebi wrote:
> Looping in Aljoscha and Kostas who are the expert on this. :-)
> 
> On Mon, Mar 6, 2017 at 6:06 PM, Johannes Schulte
>  wrote:
> > Hi,
> >
> > I am trying to achieve a stream-to-stream join with big windows and are
> > searching for a way to clean up state of old keys. I am already using a
> > RichCoProcessFunction
> >
> > I found there is already an existing ticket
> >
> > https://issues.apache.org/jira/browse/FLINK-3089
> >
> > but I have doubts that a registration of a timer for every incoming event is
> > feasible as the timers seem to reside in an in-memory queue.
> >
> > The task is somewhat similar to the following blog post:
> > http://devblog.mediamath.com/real-time-streaming-attribution-using-apache-flink
> >
> > Is the implementation of a custom window operator a necessity for achieving
> > such functionality
> >
> > Thanks a lot,
> >
> > Johannes
> >
> >


Re: window function not working when control stream broadcast

2017-03-09 Thread Aljoscha Krettek
Hi Sam,

could you please also send the code for the DataFilterFunImpl and
your timestamps/watermark assigner. That could help in figuring out
the problem.


Best,

Aljoscha





On Wed, Mar 8, 2017, at 19:56, Sam Huang wrote:

> Hi Timo,

> 

> The window function sinks the data into InfluxDB, and it's not
> triggered.
> If I comment the ".timeWindow", and print results after the reduce
> function, it works
> Code for window function is here:

> 

> private static class WindowFunImpl implements
> WindowFunction {
>
> @Override public void apply(Tuple tuple, TimeWindow window,
> Iterable iterable,  Collector
> collector) throws Exception {
>
> KVTuple6 kvTypeTuple = iterable.iterator().next();
> System.*out*.println("window: " + kvTypeTuple);
> // Doesn't work here if use broadcast Point.Builder builder =
> Point.*measurement*(*INFLUXDB_MEASUREMENT*)
>
> .time(window.getStart(), TimeUnit.*MILLISECONDS*)
> 

> .tag(*TAG_DOMAIN*, kvTypeTuple.f0)
> 

> .tag(*TAG_DEVICE*, kvTypeTuple.f1)
> 

> .tag(*TAG_TYPE*, kvTypeTuple.f2)
> 

> .tag(*TAG_KEY*, kvTypeTuple.f3)
> 

> .addField(*FIELD*, kvTypeTuple.f4);
>
> collector.collect(builder.build()); }
>
> }
> 

> 

> On Wed, Mar 8, 2017 at 1:10 AM, Timo Walther
>  wrote:
>> Hi Sam,

>> 

>>  could you explain the behavior a bit more? How does the window
>>  function behave? Is it not triggered or what is the content? What is
>>  the result if you don't use a window function?
>> 

>>  Timo

>> 

>> 

>>  Am 08/03/17 um 02:59 schrieb Sam Huang:

>> 

>>> btw, the reduce function works well, I've printed out the data, and
>>> they are
>>>  all correct. So are the timestamps and watermarks. And if I remove
>>>  ".broadcast()", the data is successfully sinked.

>>> 

>>>  Any help?

>>> 

>>> 

>>> 

>>>  --

>>>  View this message in context:
>>>  
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/window-function-not-working-when-control-stream-broadcast-tp12093p12094.html
>>>  Sent from the Apache Flink User Mailing List archive. mailing list
>>>  archive at Nabble.com.
>> 




Re: Event-time tumbling window doesn't fire- Flink 1.2.0, Kafka-0.8_2.10

2017-03-09 Thread Aljoscha Krettek
Great you could figure it out! And thanks for letting us know.

On Wed, Mar 8, 2017, at 03:03, Sam Huang wrote:
> So sorry I forgot to reply. I've solved the problem, turns out I didn't
> input
> data which generates a watermark greater than my first window end time,
> so
> no window was triggered.
> 
> 
> 
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Event-time-tumbling-window-doesn-t-fire-Flink-1-2-0-Kafka-0-8-2-10-tp11976p12095.html
> Sent from the Apache Flink User Mailing List archive. mailing list
> archive at Nabble.com.


Re: Remove Accumulators at runtime

2017-03-09 Thread Ufuk Celebi
I see, this is not possible with accumulators. You could wrap all
counts in a single metric and update that one. Check out Flink's
metrics:

https://ci.apache.org/projects/flink/flink-docs-release-1.2/monitoring/metrics.html


On Wed, Mar 8, 2017 at 5:04 PM, PedroMrChaves  wrote:
> Hi,
>
> I'm building a system that maintains a set of rules that can be dynamically
> added/removed. I wanted to count every element that matched each rule in an
> accumulator ( I have several parallel instances). If the rule is removed so
> should the accumulator.
>
>
>
>
>
> -
> Best Regards,
> Pedro Chaves
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Remove-Accumulators-at-runtime-tp12106p12119.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.


Re: ProcessFunction example

2017-03-09 Thread Kostas Kloudas
Hi Philippe,

You are right! 
Thanks for reporting it!
We will fix it asap.

Kostas

> On Mar 9, 2017, at 8:38 AM, Philippe Caparroy  
> wrote:
> 
> I think there is an error in the code snippet describing the ProcessFunction 
> time out example :  
> https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/process_function.html
>  
> 
> 
> 
> @Override
> public void onTimer(long timestamp, OnTimerContext ctx, 
> Collector> out)
> throws Exception {
> 
> // get the state for the key that scheduled the timer
> CountWithTimestamp result = state.value();
> 
> // check if this is an outdated timer or the latest timer
> if (timestamp == result.lastModified) {
> // emit the state
> out.collect(new Tuple2(result.key, result.count));
> }
> }
> If, as stated in the example, the CountWithTimeoutFunction should emit a 
> key/count if no further update occurred during the  minute elapsed since last 
> update, the test should be : 
> 
> if (timestamp == result.lastModified + 6) { 
>   // emit the state on timeout 
>   out.collect(new Tuple2(result.key, result.count)); 
> }
> 
> As stated in the javadoc of the ProcessFunction : the timestamp arg of on 
> timer method is the timestamp of the firing timer.
> 
> 
> 
> 
> 
>