Implications on incremental checkpoint duration based on RocksDBStateBackend, Number of Slots per TM

2020-06-17 Thread Sameer W
Hi,

The number of RocksDB databases the Flink creates is equal to the number of
operator states multiplied by the number of slots.

Assuming a parallelism of 100 for a job which is executed on 100 TM's with
1 slot per TM as opposed to 10 TM's with 10 slots per TM I have noticed
that the former configuration is more efficient for incremental
checkpointing. In both cases the number of RocksDB databases is the same,
except in the latter case 10 times as many are created in one TM vs the
former case.

Reading the link

below, it says - "link uses this to figure out the state changes. To do
this, Flink triggers a flush in RocksDB, forcing all memtables into
sstables on disk, and hard-linked in a local temporary directory. *This
process is synchronous to the processing pipeline*, and Flink performs all
further steps asynchronously and does not block processing."

What does "Synchronous to the processing pipeline" mean? Does it mean that
flushing to DB happens synchronously (serially) for all RocksDB databases
in one TM? Is the flushing single threaded per TM

Thanks,
Sameer



Re: Processing Message after emitting to Sink

2020-04-23 Thread Sameer W
One idea that comes to my mind is to convert ProcessFunction1 with a
CoProcessFunction[1]. The processElement1() function can send to
side-output and process and maintain the business function message as State
without emitting it.  Then as Arvid mentioned processElement2() can listen
on the side output (emitted by processElement1()) and when it receives it,
emit the result from the state and clear the state.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/streaming/api/functions/co/CoProcessFunction.html

On Thu, Apr 23, 2020 at 7:20 AM Arvid Heise  wrote:

> Hi Kristoff,
>
> I see a few ways, none of which are perfect.
>
> The easiest way would be to not use a sink. Instead of outputting into a
> side-output, you could tag that element and have a successive asyncIO place
> that in RabbitMQ. If that asyncIO is ordered, then you can be sure that all
> following events are only processed after the element has been added. Of
> course, the downside is that you have to manually implement the
> communication with RabbitMQ and lose what Flink already has. This is what
> you already sketched out.
>
> A more complicated approach would be to implement a custom operator with
> input selection to replace processFunction2 [1]. Let's call it op2. You
> would add the feedback from the sink implicitly, by also consuming from
> that MQ queue on op2. Then, processFunction1 would also emit some flag
> event on the main output together with the side output. Op2 would block the
> input on receiving that flag until it has read the appropriate entry from
> the MQ. However, this approach is really complex to implement and input
> selection is somewhat based on a best-effort. So before going that route,
> I'd do a small POC to see if it fits your needs.
>
> The best solution, of course, would be to revise your overall
> architecture. It's quite smelly in a stream processing job that you need to
> halt execution at some point. If you give some more details, I could try to
> help.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/index.html?org/apache/flink/streaming/api/operators/InputSelectable.html
>
> On Wed, Apr 15, 2020 at 5:36 PM KristoffSC 
> wrote:
>
>> My point was, that as far as I know, Sinks are "terminating" operators,
>> that
>> ends the stream like .collect in Java 8 stream API. The don't emit
>> elements
>> further and I cannot link then in a way:
>>
>> source - proces - sink - process - sink
>>
>> Sink function produces DataStreamSink which is used for emitting elements
>> from a streaming topology.
>> It is not SingleOutputStreamOperator or DataStream that I can use as input
>> for next operator.
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> 
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward  - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>


Re: Best way to link static data to event data?

2019-09-27 Thread Sameer W
Connected Streams is one option. But may be an overkill in your scenario if
your CSV does not refresh. If your CSV is small enough (number of records
wise), you could parse it and load it into an object (serializable) and
pass it to the constructor of the operator where you will be streaming the
data.

If the CSV can be made available via a shared network folder (or S3 in case
of AWS) you could also read it in the open function (if you use Rich
versions of the operator).

The real problem I guess is how frequently does the CSV update. If you want
the updates to propagate in near real time (or on schedule) the option 1  (
parse in driver and send it via constructor does not work). Also in the
second option you need to be responsible for refreshing the file read from
the shared folder.

In that case use Connected Streams where the stream reading in the file
(the other stream reads the events) periodically re-reads the file and
sends it down the stream. The refresh interval is your tolerance of stale
data in the CSV.

On Fri, Sep 27, 2019 at 3:49 PM John Smith  wrote:

> I don't think I need state for this...
>
> I need to load a CSV. I'm guessing as a table and then filter my events
> parse the number, transform the event into geolocation data and sink that
> downstream data source.
>
> So I'm guessing i need a CSV source and my Kafka source and somehow join
> those transform the event...
>
> On Fri, 27 Sep 2019 at 14:43, Oytun Tez  wrote:
>
>> Hi,
>>
>> You should look broadcast state pattern in Flink docs.
>>
>> ---
>> Oytun Tez
>>
>> *M O T A W O R D*
>> The World's Fastest Human Translation Platform.
>> oy...@motaword.com — www.motaword.com
>>
>>
>> On Fri, Sep 27, 2019 at 2:42 PM John Smith 
>> wrote:
>>
>>> Using 1.8
>>>
>>> I have a list of phone area codes, cities and their geo location in CSV
>>> file. And my events from Kafka contain phone numbers.
>>>
>>> I want to parse the phone number get it's area code and then associate
>>> the phone number to a city, geo location and as well count how many numbers
>>> are in that city/geo location.
>>>
>>


Re: Apache Flink - Question about dynamically changing window end time at run time

2019-04-24 Thread Sameer W
Global Windows is fine for this use case. I have used the same strategy.
You just define custom evictors and triggers and you are all good. Windows
are managed by keys, so as such as long as events are evicted from the
window, that counts towards reclaiming memory for the key+window
combination. Plus there is just window per key with Global Windows.

On Wed, Apr 24, 2019 at 7:47 AM M Singh  wrote:

> Hi Rong:
>
> Thanks for your answer.
>
> From what I understand the dynamic gap session windows are also created
> when the event is encountered.  I need to be able to change the window end
> time at a later time based on what other events are in that window.  One
> way to do this is to use GlobalWindows but then these are never deleted.
>
> Regarding CEP option - I believe that CEP patterns cannot be changed
> dynamically once they've been complied which limits it usage.
>
> Please feel free to correct me.
>
> Thanks for your help and pointers.
>
> On Tuesday, April 23, 2019, 8:12:56 PM EDT, Rong Rong 
> wrote:
>
>
> Hi Mans,
>
> I am not sure what you meant by "dynamically change the end-time of a
> window. If you are referring to dynamically determines the firing time of
> the window, then it fits into the description of session window [1]:
> If you want to handle window end time dynamically, one way of which I can
> think of is the dynamic gap, session window [1] approach. with which you
> can specify the end-time of a window based on input elements. Provided that
> you are maintaining a session window.
> Another way to look at it is through the Flink-CEP library [2].
>
> Thanks,
> Rong
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/operators/windows.html#session-windows
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/libs/cep.html#groups-of-patterns
>
> On Tue, Apr 23, 2019 at 8:19 AM M Singh  wrote:
>
> Hi:
>
> I am working on a project and need to change the end time of the window
> dynamically.  I want to find out if the end time of the window is used
> internally (for sorting windows/etc) except for handling watermarks that
> would cause problems if the end time was changed during run time after the
> window has been created even if no new event has arrived for that window.
>
> I don't want to use GlobalWindow since from my understanding it never gets
> destroyed.
>
> If there is any alternate way of dealing with this, please let me know.
>
> Thanks
>
> Mans
>
>


JAXB Classloading errors when using PMML Library (AWS EMR Flink 1.4.2)

2018-09-03 Thread Sameer W
Hi,

I am using PMML dependency as below to execute ML models at prediction time
within a Flink Map operator



org.jpmml

pmml-evaluator

1.4.3






javax.xml.bind

jaxb-api





org.glassfish.jaxb

jaxb-runtime





guava

com.google.guava






Environment is EMR, OpenJDK 1.8 and Flink 1.4.2. My programs run fine in my
Eclipse Development environment. However when we deploy on the cluster we
get Classloading exceptions which are primarily due to the PMML classes
loaded via the Flink Classloader while the JAXB classes are loaded by the
boot classloader. Also the problem seems like the version of the jaxb
classes referenced within the PMML library is different from the ones
loaded by the open JDK.

For example I keep getting this type of error. I have also listed another
error after this which is linked to not being able to use reflection and
unsafe library to set private instances within the PMML class instance
using JAXB Unmarshaller.  -
java.lang.LinkageError: loader constraint violation: when resolving
interface method
"javax.xml.bind.Unmarshaller.unmarshal(Ljavax/xml/transform/Source;)Ljava/lang/Object;"
the class loader (instance of
org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders$ChildFirstClassLoader)
of the current class,
com/comcast/mlarche/featurecreationflows/xreerrors/MyJAXBUtil, and the
class loader (instance of ) for the method's defining class,
javax/xml/bind/Unmarshaller, have different Class objects for the type
javax/xml/transform/Source used in the signature
at
com.comcast.mlarche.featurecreationflows.xreerrors.MyJAXBUtil.unmarshal(MyJAXBUtil.java:52)
at
com.comcast.mlarche.featurecreationflows.xreerrors.MyJAXBUtil.unmarshalPMML(MyJAXBUtil.java:38)
at
com.comcast.mlarche.featurecreationflows.xreerrors.PMMLModelExecution.getMiningModelEvaluator(PMMLModelExecution.java:67)
at
com.comcast.mlarche.featurecreationflows.xreerrors.PMMLModelExecution.predict(PMMLModelExecution.java:126)
at
com.comcast.mlarche.featurecreationflows.xreerrors.XreErrorModelsPredictionServiceService.predict(XreErrorModelsPredictionServiceService.java:61)
at
com.comcast.mlarche.featurecreationflows.xreerrors.XreErrorModelsPredictionServiceService.predictSystemRefresh(XreErrorModelsPredictionServiceService.java:44)
at
com.comcast.mlarche.featurecreationflows.xreerrors.XREErrorsModelExecutionMap.flatMap(XREErrorsModelExecutionMap.java:46)
at
com.comcast.mlarche.featurecreationflows.xreerrors.XREErrorsModelExecutionMap.flatMap(XREErrorsModelExecutionMap.java:17)
at
org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)
at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
at
com.comcast.mlarche.featurecreationflows.xreerrors.XreErrorsModelsApplyFunction.apply(XreErrorsModelsApplyFunction.java:65)
at
com.comcast.mlarche.featurecreationflows.xreerrors.XreErrorsModelsApplyFunction.apply(XreErrorsModelsApplyFunction.java:20)
at
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:44)
at
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:32)
at
org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator.emitWindowContents(EvictingWindowOperator.java:357)
at
org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator.processElement(EvictingWindowOperator.java:218)
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:207)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)

Finally when I started using InputStream based constructor (so as not to
use SAXSource classe) below is the last error I got when I finally got as
deep within the library as possible without using any util classes.

java.lang.RuntimeException: java.lang.IllegalArgumentException: Can not set
org.xml.sax.Locator field org.dmg.pmml.PMMLObject.locator to
org.xml.sax.helpers.LocatorImpl

Do Flink metrics survive a shutdown?

2018-05-09 Thread Sameer W
I want to use Flink metrics API to store user defined metrics (counters). I
instantiate the MetricsGroup in the open() function of the RichMapFunction
and increment the counters which are created within the metrics group.

If the job restarts on failure, will the counters get restored from state?

Thanks,
Sameer


Re: Feasability Question: Distributed FlinkCEP

2016-10-20 Thread Sameer W
Could you not do separate followedBy and then perform a join on the
resulting alert stream.

Pattern p1= followedBy(/*1st*/)
Pattern p2= followedBy(/*1st*/)
DataStream alertStream1  = CEP.pattern(keyedDs, p1)
DataStream alertStream2  = CEP.pattern(keyedDs, p2)

Then just join the two alertStream's using a keyBy (some common key in the
Alert events) on Event Time, only emit the ones with alerts from both sides
if and'ing and either side if or'ing. Or another CEP operation on the two
Alert Streams after keying by on something common in the alert events. Or
if you just union the two streams and apply CEP on the resulting stream.

The pattern you mentioned seems only possible if each pattern works on
separate keys but you still want to decide if two separate keys produced an
alert.

Sameer

On Thu, Oct 20, 2016 at 6:27 AM, Lucas Konstantin Bärenfänger <
lucas.baerenfaen...@stud.tu-darmstadt.de> wrote:

> Hi all,
>
> here's what I want to do: Consider a query such as *(A and B) followed_by
> (C or D)*. (Pseudo code, obviously.) I want to create a graph of
> independent processing nodes, each running an independent instance of
> FlinkCEP. I want each of them to represent an operator of the query above,
> like so:
>
>
> *  followed_by   <-- Processing node 1*
>
> *  / \*
>
> * andor  <-- Processing nodes 2 and 3*
>
> */   \  /   \A   B  C   D*
>
> The three nodes would have to process the following (sub-)queries,
> respectively. (Pseudo code, again.)
>
>
> *Node1: {{node2ResultStream}} followed_by {{node3ResultStream}}*
>
> *Node2: A and B*
>
>
> *Node3: C or D*
> Long story short: I want to execute the query in a distributed fashion. Is
> that currently possible using FlinkCEP?
>
> Thank you very much in advance!
>
> Best
> Lucas
>


Re: What is the best way to load/add patterns dynamically (at runtime) with Flink?

2016-10-11 Thread Sameer W
This is one of my challenges too-

1. The JavaScript rules are only applicable inside one operator (next,
followedBy, notFollowedBy). And the JavaScript rules can apply to only the
event in that operator. I make it a little more dynamic by creating a Rules
HashMap and add rules with the names "first", "next", "followedBy1" (same
as the ones I use for the pattern operator). This way the rules attached to
a particular operator can be easily changed via a connected stream.

I think the feature where other events in the pattern are accessible is
being added. Currently you can only look inside one event in the "where"
clause attached to a CEP pattern operator. For example, if I check two
consecutive credit card events for a user to calculate the straight line
distance between then to divide by the difference in time I cannot do that
unless I fire the pattern for every pair and check this condition in the
PatternStream's select operator where all the events are accessible.

2. The second problem I have is that I cannot change the rules applied to
the pattern stream. For example if I have next.followedBy and I want to add
another followedBy it is a compile time change. The JavaScript engine helps
me with the first issue but this one just needs a recompile unless you have
another Flink Pipeline deployed which can check against that pattern as
well. I am guess at this point you will need to take a SavePoint, stop your
pipeline, redeploy this new pipeline(with a new pattern configuration) and
start again.

I would like to know if there is a cleaner solution but the above is my
fallback.

Sameer



On Tue, Oct 11, 2016 at 5:51 PM, <lg...@yahoo.com> wrote:

> Hi Sameer,
>
> I just replied to the earlier post, but I will copy it here:
>
> We also have the same requirement - we want to allow the user to change
> the matching patterns and have them take effect immediately. I'm wondering
> whether the proposed trigger DSL takes us one step closer:(I don't think it
> solves the problem) or we have to dynamically generate Flink job JAR files
> when the matching rules/patterns are changed and submit them to Flink.
>
> I had thought about using a similar approach, but it is quite restrictive
> because you cannot use the power for Flink CEP engine with this approach.
> For example, I want to be able to use followedBy, next, notFollowedBy (in
> future) operators to detect the patterns and these matching patterns need
> to be user-cofigurable/dynamic/hot deployable. The simple attribute-based
> rules/patterns that you specified can be made dynamic as you mentioned but
> the rules/patterns that use not just the current event attributes, but also
> past events (e.g. followedBy) are much harder to make them dynamic without
> some help from Flink that implements the CEP operators.
>
> - LF
>
>
>
>
> --
> *From:* Sameer W <sam...@axiomine.com>
> *To:* "user@flink.apache.org" <user@flink.apache.org>
> *Sent:* Tuesday, October 11, 2016 2:23 PM
> *Subject:* Re: What is the best way to load/add patterns dynamically (at
> runtime) with Flink?
>
> I have used a JavaScript engine in my CEP to evaluate my patterns. Each
> event is a list of named attributes (HashMap like). And event is attached
> to a list of rules expressed as JavaScript code (See example below with one
> rule but I can match as many rules).  The rules are distributed over a
> connected stream which allow it to change over time. This is how I do it to
> keep my rules dynamic. If someone has a better way I would love to hear it
> as well.
>
>
> private transient ScriptEngineManager factory = new ScriptEngineManager();
> private transient ScriptEngine engine = factory.getEngineByName("
> JavaScript");
> /*Inside open*/
> factory = new ScriptEngineManager();
>
> /*Close open*/
>
> /*Inside my operator*/
> engine = factory.getEngineByName("JavaScript");
> engine.put("evt", value.f1); //value.f1 contains a JSON version of my
> HashMap of attributes
> engine.eval(value.f2.rule); //f2.last contains the rule which is evaluated
> by the JavaScript Engine
> /*
> Sample JavaScript contained in the call - engine.eval(value.f2.rule); is
> shown below (not the "evt" variable in the JavaScript and the the preceding
> line - engine.put("evt", value.f1);
>
> *var evt=JSON.parse(evt);var result = evt.temperature>50 &&
> evt.pressure<900*
> */
> boolean ret = (boolean)engine.get("result");
>
> if(ret) /*Rule is Matched*/
>
>
>
> > On Oct 11, 2016, at 5:01 PM, PedroMrChaves <pedro.mr.cha...@gmail.com>
> wrote:
> >
> > Hello,
> >
> > I am new to Apache Flink and am trying to bui

Re: What is the best way to load/add patterns dynamically (at runtime) with Flink?

2016-10-11 Thread Sameer W
I have used a JavaScript engine in my CEP to evaluate my patterns. Each
event is a list of named attributes (HashMap like). And event is attached
to a list of rules expressed as JavaScript code (See example below with one
rule but I can match as many rules).  The rules are distributed over a
connected stream which allow it to change over time. This is how I do it to
keep my rules dynamic. If someone has a better way I would love to hear it
as well.


private transient ScriptEngineManager factory = new ScriptEngineManager();
private transient ScriptEngine engine =
factory.getEngineByName("JavaScript");
/*Inside open*/
factory = new ScriptEngineManager();

/*Close open*/

/*Inside my operator*/
engine = factory.getEngineByName("JavaScript");
engine.put("evt", value.f1); //value.f1 contains a JSON version of my
HashMap of attributes
engine.eval(value.f2.rule); //f2.last contains the rule which is evaluated
by the JavaScript Engine
/*
Sample JavaScript contained in the call - engine.eval(value.f2.rule); is
shown below (not the "evt" variable in the JavaScript and the the preceding
line - engine.put("evt", value.f1);

*var evt=JSON.parse(evt);var result = evt.temperature>50 &&
evt.pressure<900*
*/
boolean ret = (boolean)engine.get("result");

if(ret) /*Rule is Matched*/



> On Oct 11, 2016, at 5:01 PM, PedroMrChaves 
wrote:
>
> Hello,
>
> I am new to Apache Flink and am trying to build a CEP using Flink's API.
One
> of the requirements is the ability to add/change patterns at runtime for
> anomaly detection (maintaining the systems availability). Any Ideas of how
> could I do that?
>
> For instance, If I have a stream of security events (accesses,
> authentications ,etc.) and a pattern for detecting anomalies I would like
to
> be able to change that pattern parameters, for instance instead of
detecting
> the occurrence of events A->B->C I would like to change the condition on B
> to B’ in order to have a new rule. Moreover, I would like to be able to
> create new patterns dynamically as new use cases arise.
>
> Best Regards,
> Pedro Chaves
>
>
>
>
> --
> View this message in context:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/What-is-the-best-way-to-load-add-patterns-dynamically-at-runtime-with-Flink-tp9461.html
> Sent from the Apache Flink User Mailing List archive. mailing list
archive at Nabble.com.


Re: Listening to timed-out patterns in Flink CEP

2016-10-11 Thread Sameer W
Assuming an element with timestamp which is later than the last emitted
watermark arrives, would it just be dropped because the PatternStream does
not have a max allowed lateness method? In that case it appears that CEP
cannot handle late events yet out of the box.

If we do want to support late events can we chain a
keyBy().timeWindow().allowedLateness(x).map().assignTimestampsAndWatermarks().keyBy()
again before handing it to the CEP operator. This way we may have the
patterns fired multiple times but it allows an event to be late and out of
order. It looks like it will work but is there a less convoluted way.

Thanks,
Sameer

On Tue, Oct 11, 2016 at 12:17 PM, Till Rohrmann <till.rohrm...@gmail.com>
wrote:

> But then no element later than the last emitted watermark must be issued
> by the sources. If that is the case, then this solution should work.
>
> Cheers,
> Till
>
> On Tue, Oct 11, 2016 at 4:50 PM, Sameer W <sam...@axiomine.com> wrote:
>
>> Hi,
>>
>> If you know that the events are arriving in order and a consistent lag,
>> why not just increment the watermark time every time the
>> getCurrentWatermark() method is invoked based on the autoWatermarkInterval
>> (or less to be conservative).
>>
>> You can check if the watermark has changed since the arrival of the last
>> event and if not increment it in the getCurrentWatermark() method.
>> Otherwise the watermark will never increase until an element arrive and if
>> the stream partition stalls for some reason the whole pipeline freezes.
>>
>> Sameer
>>
>>
>> On Tue, Oct 11, 2016 at 6:04 AM, Till Rohrmann <till.rohrm...@gmail.com>
>> wrote:
>>
>>> Hi David,
>>>
>>> the problem is still that there is no corresponding watermark saying
>>> that 4 seconds have now passed. With your code, watermarks will be
>>> periodically emitted but the same watermark will be emitted until a new
>>> element arrives which will reset the watermark. Thus, the system can never
>>> know until this watermark is seen whether there will be an earlier event or
>>> not. I fear that this is a fundamental problem with stream processing.
>>>
>>> You're right that the negation operator won't solve the problem. It will
>>> indeed suffer from the same problem.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Sun, Oct 9, 2016 at 7:37 PM, <lg...@yahoo.com> wrote:
>>>
>>>> >>FLINK-3320 <https://issues.apache.org/jira/browse/FLINK-3320> (CEP
>>>> "not" operator) does not address this because again, how would the "not
>>>> match" be triggered if no event at all occurs?
>>>>
>>>> Good question.
>>>>
>>>> I'm not sure whether the following will work:
>>>>
>>>> This could be done by creating a CEP matching pattern that uses both of
>>>> "notNext" (or "notFollowedBy") and "within" constructs. Something like 
>>>> this:
>>>>
>>>> Pattern<Event, ?> pattern = Pattern.begin("first")
>>>> .notNext("second")
>>>> .within(Time.seconds(3));
>>>>
>>>> I'm hoping Flink CEP experts (Till?) will comment on this.
>>>>
>>>> Note: I have requested these negation patterns to be implemented in
>>>> Flink CEP, but notNext/notFollowedBy are not yet implemented in Flink..
>>>>
>>>>
>>>> - LF
>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> *From:* David Koch <ogd...@googlemail.com>
>>>> *To:* user@flink.apache.org; lg...@yahoo.com
>>>> *Sent:* Sunday, October 9, 2016 5:51 AM
>>>>
>>>> *Subject:* Re: Listening to timed-out patterns in Flink CEP
>>>>
>>>> Hello,
>>>>
>>>> Thank you for the explanation as well as the link to the other post.
>>>> Interesting to learn about some of the open JIRAs.
>>>>
>>>> Indeed, I was not using event time, but processing time. However, even
>>>> when using event time I only get notified of timeouts upon subsequent
>>>> events.
>>>>
>>>> The link <http://pastebin.com/x4m3RHQz> contains an example where I
>>>> read   from a socket, wrap this in a custom "event" with
>>>> timestamp, key the resultant stream by  and attempt to detect 
>>>> instances no further than 3 seconds apart using CEP.
>>>&g

Re: CEP and slightly out of order elements

2016-10-11 Thread Sameer W
Thanks Till - This is helpful to know.

Sameer

On Tue, Oct 11, 2016 at 12:20 PM, Till Rohrmann <trohrm...@apache.org>
wrote:

> Hi Sameer,
>
> the CEP operator will take care of ordering the elements.
>
> Internally what happens is that the elements are buffered before being
> applied to the state machine. The operator only applies the elements after
> it has seen a watermark which is greater than the timestamps of the
> elements being applied to the NFA. Since the elements are kept in a
> priority queue wrt the timestamp they will be in order.
>
> Cheers,
> Till
>
> On Tue, Oct 11, 2016 at 1:51 PM, Sameer W <sam...@axiomine.com> wrote:
>
>> Hi,
>>
>> If using CEP with event-time I have events which can be slightly out of
>> order and I want to sort them by timestamp within their time-windows before
>> applying CEP-
>>
>> For example, if using 5 second windows and I use the following
>>
>> ds2 = ds.keyBy.window(TumblingWindow(10 seconds).apply(/*Sort by
>> Timestamp*/);
>>
>> Next assign watermarks again on ds2 (because elements in ds2 will all
>> have the same timestamp of WINDOW_END_TIME-1ms)
>> ds2.assignTimestampsAndWatermarks()
>>
>> Finally apply CEP on ds2 with a WITHIN window of 5 seconds (shorter
>> timestamp than the one I used earlier).
>>
>> The reasoning is, if I am using the next() operator in CEP, the events
>> should be in the order of their timestamps.
>>
>> Is this the right way to handle this problem? I have heard people say
>> that assigning watermarks twice can lead to wrong results. But don't I need
>> to assign timestamps once more in this scenario.
>>
>> Thanks,
>> Sameer
>>
>>
>>
>>
>>
>


Re: Listening to timed-out patterns in Flink CEP

2016-10-11 Thread Sameer W
Hi,

If you know that the events are arriving in order and a consistent lag, why
not just increment the watermark time every time the getCurrentWatermark()
method is invoked based on the autoWatermarkInterval (or less to be
conservative).

You can check if the watermark has changed since the arrival of the last
event and if not increment it in the getCurrentWatermark() method.
Otherwise the watermark will never increase until an element arrive and if
the stream partition stalls for some reason the whole pipeline freezes.

Sameer


On Tue, Oct 11, 2016 at 6:04 AM, Till Rohrmann 
wrote:

> Hi David,
>
> the problem is still that there is no corresponding watermark saying that
> 4 seconds have now passed. With your code, watermarks will be periodically
> emitted but the same watermark will be emitted until a new element arrives
> which will reset the watermark. Thus, the system can never know until this
> watermark is seen whether there will be an earlier event or not. I fear
> that this is a fundamental problem with stream processing.
>
> You're right that the negation operator won't solve the problem. It will
> indeed suffer from the same problem.
>
> Cheers,
> Till
>
> On Sun, Oct 9, 2016 at 7:37 PM,  wrote:
>
>> >>FLINK-3320  (CEP
>> "not" operator) does not address this because again, how would the "not
>> match" be triggered if no event at all occurs?
>>
>> Good question.
>>
>> I'm not sure whether the following will work:
>>
>> This could be done by creating a CEP matching pattern that uses both of
>> "notNext" (or "notFollowedBy") and "within" constructs. Something like this:
>>
>> Pattern pattern = Pattern.begin("first")
>> .notNext("second")
>> .within(Time.seconds(3));
>>
>> I'm hoping Flink CEP experts (Till?) will comment on this.
>>
>> Note: I have requested these negation patterns to be implemented in Flink
>> CEP, but notNext/notFollowedBy are not yet implemented in Flink..
>>
>>
>> - LF
>>
>>
>>
>>
>> --
>> *From:* David Koch 
>> *To:* user@flink.apache.org; lg...@yahoo.com
>> *Sent:* Sunday, October 9, 2016 5:51 AM
>>
>> *Subject:* Re: Listening to timed-out patterns in Flink CEP
>>
>> Hello,
>>
>> Thank you for the explanation as well as the link to the other post.
>> Interesting to learn about some of the open JIRAs.
>>
>> Indeed, I was not using event time, but processing time. However, even
>> when using event time I only get notified of timeouts upon subsequent
>> events.
>>
>> The link  contains an example where I read
>>   from a socket, wrap this in a custom "event" with timestamp,
>> key the resultant stream by  and attempt to detect  instances no
>> further than 3 seconds apart using CEP.
>>
>> Apart from the fact that results are only printed when I close the socket
>> (normal?) I don't observe any change in behaviour
>>
>> So event-time/watermarks or not: SOME event has to occur for the timeout
>> to be triggered.
>>
>> FLINK-3320  (CEP "not"
>> operator) does not address this because again, how would the "not match" be
>> triggered if no event at all occurs?
>>
>> On Sat, Oct 8, 2016 at 12:50 AM,  wrote:
>>
>> The following is a better link:
>>
>> http://mail-archives.apache. org/mod_mbox/flink-user/
>> 201609.mbox/%3CCAC27z% 3DOTtv7USYUm82bE43- DkoGfVC4UAWD6uQwwRgTsE5be8g%
>> 40mail.gmail.com%3E
>> 
>>
>>
>> - LF
>>
>>
>>
>>
>> --
>> *From:* "lg...@yahoo.com" 
>> *To:* "user@flink.apache.org" 
>> *Sent:* Friday, October 7, 2016 3:36 PM
>>
>> *Subject:* Re: Listening to timed-out patterns in Flink CEP
>>
>> Isn't the upcoming CEP negation (absence of an event) feature solve this
>> issue?
>>
>> See this discussion thread:
>> http://mail-archives.apache. org/mod_mbox/flink-user/
>> 201609.mbox/%3CCAC27z%3DOD% 2BTq8twBw_ 1YKni5sWAU3g1S9WDpJw0DUwgiG9YX
>> 9Fg%40mail.gmail.com%3E
>> 
>>
>>
>>
>> //  Atul
>>
>>
>> --
>> *From:* Till Rohrmann 
>> *To:* user@flink.apache.org
>> *Sent:* Friday, October 7, 2016 12:58 AM
>> *Subject:* Re: Listening to timed-out patterns in Flink CEP
>>
>> Hi David,
>>
>> in case of event time, the timeout will be detected when the first
>> watermark exceeding the timeout value is received. Thus, it depends a
>> little bit how you generate watermarks (e.g. periodically, watermark per
>> event).
>>
>> In case of processing time, the time is only updated whenever a new
>> element arrives. Thus, if you have 

CEP and slightly out of order elements

2016-10-11 Thread Sameer W
Hi,

If using CEP with event-time I have events which can be slightly out of
order and I want to sort them by timestamp within their time-windows before
applying CEP-

For example, if using 5 second windows and I use the following

ds2 = ds.keyBy.window(TumblingWindow(10 seconds).apply(/*Sort by
Timestamp*/);

Next assign watermarks again on ds2 (because elements in ds2 will all have
the same timestamp of WINDOW_END_TIME-1ms)
ds2.assignTimestampsAndWatermarks()

Finally apply CEP on ds2 with a WITHIN window of 5 seconds (shorter
timestamp than the one I used earlier).

The reasoning is, if I am using the next() operator in CEP, the events
should be in the order of their timestamps.

Is this the right way to handle this problem? I have heard people say that
assigning watermarks twice can lead to wrong results. But don't I need to
assign timestamps once more in this scenario.

Thanks,
Sameer


Side Inputs vs. Connected Streams

2016-10-03 Thread Sameer W
Hi,

I read the Side Inputs

design document. How does it compare to using ConnectedStreams with respect
to handling the ordering of streams transparently?

One of the challenges I have with ConnectedStreams is I need to buffer main
input if the rules stream has not arrived yet. Does this automatically go
away with Side Inputs? Will the call to  String sideValue =

   getRuntimeContext().getSideInput(filterString);
block if the side input is not available yet? And is the reverse also true?

Alternatively, if my rules are not large in number and I want to broadcast
them to all nodes is the below equivalent to using SideInputs where side
inputs are broadcast to all nodes and ensure that the side input is
evaluated before the main input:

DataStream ds4 = ds3.connect(dsSide.broadcast());

Will the above ensure that dsSide is always available before ds3 elements
arrive on the connected stream. Am I correct in assuming that ds2 changes
will continue to be broadcast to ds3 (with no ordering guarantees between
ds3 and dsSide, ofcourse).


Thanks,
Sameer


Re: Accessing state in connected streams

2016-08-27 Thread Sameer W
Ok sorry about that :-). I misunderstood as I am not familiar with Scala
code. Just curious though how are you passing two MapFunction's to the
flatMap function on the connected stream. The interface of ConnectedStream
requires just one CoMap function-
https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.html

Sameer

On Sat, Aug 27, 2016 at 6:13 PM, aris kol  wrote:

> Let's say I have two types sharing the same trait
>
> trait Event {
> def id: Id
> }
>
> case class EventA(id: Id, info: InfoA) extends Event
> case class EventB(id: Id, info: InfoB) extends Event
>
> Each of these events gets pushed to a Kafka topic and gets consumed by a
> stream in Flink.
>
> Let's say I have two streams
>
> Events of type A create state:
>
> val typeAStream = env.addSource(...)
> .flatMap(someUnmarshallerForA)
> .keyBy(_.id)
> .mapWithState(...)
>
> val typeBStream = env.addSource(...)
> .flatMap(someUnmarshallerForB)
> .keyBy(_.id)
>
> I want now to process the events in typeBStream using the information
> stored in the State of typeAStream.
>
> One approach would be to use the same stream for the two topics and then
> pattern match, but Event subclasses may grow in numbers and
> may have different loads, thus I may want to keep things separate.
>
> Would something along the lines of:
>
> typeAStream.connect(typeBStream).
> flatMap(
> new IdentityFlatMapFunction(),
> new SomeRichFlatMapFunctionForEventB[EventB, O] with
> StateFulFuntion[EventB, O, G[EventA]] { ... }
> )
>
> work?
>
> I tried this approach and I ended up in a NPE because the state object was
> not initialized (meaning it was not there).
>
>
> Thanks,
> Aris
>
>


Re: Accessing state in connected streams

2016-08-27 Thread Sameer W
There is no guarantee about the order in which each stream elements arrive
in a connected streams. You have to check if the elements have arrived from
Stream A before using the information to process elements from Stream B.
Otherwise you have to buffer elements from stream B and check if there are
unprocessed elements from stream B when elements arrive from stream A. You
might need to do that for elements from both streams depending on how you
use them.

You will get  NPE if you assume events have arrived from A and but they
might be lagging behind.

On Sat, Aug 27, 2016 at 6:13 PM, aris kol  wrote:

> Let's say I have two types sharing the same trait
>
> trait Event {
> def id: Id
> }
>
> case class EventA(id: Id, info: InfoA) extends Event
> case class EventB(id: Id, info: InfoB) extends Event
>
> Each of these events gets pushed to a Kafka topic and gets consumed by a
> stream in Flink.
>
> Let's say I have two streams
>
> Events of type A create state:
>
> val typeAStream = env.addSource(...)
> .flatMap(someUnmarshallerForA)
> .keyBy(_.id)
> .mapWithState(...)
>
> val typeBStream = env.addSource(...)
> .flatMap(someUnmarshallerForB)
> .keyBy(_.id)
>
> I want now to process the events in typeBStream using the information
> stored in the State of typeAStream.
>
> One approach would be to use the same stream for the two topics and then
> pattern match, but Event subclasses may grow in numbers and
> may have different loads, thus I may want to keep things separate.
>
> Would something along the lines of:
>
> typeAStream.connect(typeBStream).
> flatMap(
> new IdentityFlatMapFunction(),
> new SomeRichFlatMapFunctionForEventB[EventB, O] with
> StateFulFuntion[EventB, O, G[EventA]] { ... }
> )
>
> work?
>
> I tried this approach and I ended up in a NPE because the state object was
> not initialized (meaning it was not there).
>
>
> Thanks,
> Aris
>
>


Re: Threading Model for Kinesis

2016-08-23 Thread Sameer W
Perfect - This explains it very clearly. Thank you very much!

Sameer

On Tue, Aug 23, 2016 at 9:31 AM, Tzu-Li (Gordon) Tai <tzuli...@gmail.com>
wrote:

> Slight misunderstanding here. The one thread per Kafka broker happens
> *after* the assignment of Kafka partitions to the source instances. So,
> with a total of 10 partitions and 10 source instances, each source instance
> will first be assigned 1 partition. Then, each source instance will create
> 1 thread for every individual broker that holds partitions that the source
> instance is assigned. The per-broker threading model of the Kafka consumer
> has nothing to do with the initial assignment of partitions to source
> instances.
>
> Another example to explain this more clearly:
> Say you have 2 Kafka brokers, and each hold 5 partitions, and have source
> parallelism 5. Each source instance will still have 2 partitions. If the
> 2 partitions belong to the same broker, the source instance will have only
> 1 consuming threads; otherwise if the 2 partitions belong to different
> brokers, the source instance will have 2 consuming threads.
>
> Regards,
> Gordon
>
>
> On August 23, 2016 at 8:47:15 PM, Sameer W (sam...@axiomine.com) wrote:
>
> Gordon,
>
> I tried the following with Kafka - 1 Broker but a topic has 10 partitions.
> I have a parallelism of 10 defined for the job. I see all my 10
> source->Mapper->assignTimestamps receiving and sending data. If there is
> only one source instance per broker how does that happen?
>
> Thanks,
> Sameer
>
> On Tue, Aug 23, 2016 at 7:17 AM, Tzu-Li (Gordon) Tai <tzuli...@gmail.com>
> wrote:
>
>> Hi!
>>
>> Kinesis shards should be ideally evenly assigned to the source instances.
>> So, with your example of source parallelism of 10 and 20 shards, each
>> source instance will have 2 shards and will have 2 threads consuming them
>> (therefore, not in round robin).
>>
>> For the Kafka consumer, in the source instances there will be one
>> consuming thread per broker, instead of partition. So, if a source instance
>> is assigned partitions that happen to be on the same broker, the source
>> instance will only create 1 thread to consume all of them.
>>
>> You are correct that currently the Kafka consumer does not handle
>> repartitioning transparently like the Kinesis connector, but we’re working
>> on this :)
>>
>> Regards,
>> Gordon
>>
>> On August 23, 2016 at 6:50:31 PM, Sameer W (sam...@axiomine.com) wrote:
>>
>> Hi,
>>
>> The documentation says that there will be one thread per shard. If I my
>> streaming job runs with a parallelism of 10 and there are 20 shards, are
>> more threads going to be launched within  a task slot running a source
>> function to consume the additional shards or will one source function
>> instance consume 2 shards in round robin.
>>
>> Is it any different for Kafka? Based on the documentation my
>> understanding is that if there are 10 source function instances and 20
>> partitions, each one will read 2 partitions.
>>
>> Also if partitions are added to Kafka are they handled by the existing
>> streaming job or does it need to be restarted? It appears as though Kinesis
>> handles it via the consumer constantly checking for more shards.
>>
>> Thanks,
>> Sameer
>>
>>
>


Re: Threading Model for Kinesis

2016-08-23 Thread Sameer W
Gordon,

I tried the following with Kafka - 1 Broker but a topic has 10 partitions.
I have a parallelism of 10 defined for the job. I see all my 10
source->Mapper->assignTimestamps receiving and sending data. If there is
only one source instance per broker how does that happen?

Thanks,
Sameer

On Tue, Aug 23, 2016 at 7:17 AM, Tzu-Li (Gordon) Tai <tzuli...@gmail.com>
wrote:

> Hi!
>
> Kinesis shards should be ideally evenly assigned to the source instances.
> So, with your example of source parallelism of 10 and 20 shards, each
> source instance will have 2 shards and will have 2 threads consuming them
> (therefore, not in round robin).
>
> For the Kafka consumer, in the source instances there will be one
> consuming thread per broker, instead of partition. So, if a source instance
> is assigned partitions that happen to be on the same broker, the source
> instance will only create 1 thread to consume all of them.
>
> You are correct that currently the Kafka consumer does not handle
> repartitioning transparently like the Kinesis connector, but we’re working
> on this :)
>
> Regards,
> Gordon
>
> On August 23, 2016 at 6:50:31 PM, Sameer W (sam...@axiomine.com) wrote:
>
> Hi,
>
> The documentation says that there will be one thread per shard. If I my
> streaming job runs with a parallelism of 10 and there are 20 shards, are
> more threads going to be launched within  a task slot running a source
> function to consume the additional shards or will one source function
> instance consume 2 shards in round robin.
>
> Is it any different for Kafka? Based on the documentation my understanding
> is that if there are 10 source function instances and 20 partitions, each
> one will read 2 partitions.
>
> Also if partitions are added to Kafka are they handled by the existing
> streaming job or does it need to be restarted? It appears as though Kinesis
> handles it via the consumer constantly checking for more shards.
>
> Thanks,
> Sameer
>
>


Re: Default timestamps for Event Time when no Watermark Assigner used?

2016-08-23 Thread Sameer W
Thanks - Is there also a default behavior for non Kinesis streams? If I set
the time characteristics as Event Time but do not assign timestamps or
generate watermarks by invoking the assignTimestampsAndWatermarks
function, does
that default to using Ingestion time. Or in other words is it like I
invoking this method on the source stream-

assignTimestampsAndWatermarks(new IngestionTimeExtractor<>())

Sameer

On Tue, Aug 23, 2016 at 7:29 AM, Tzu-Li (Gordon) Tai <tzuli...@gmail.com>
wrote:

> Hi,
>
> For the Kinesis consumer, when you use Event Time but do not explicitly
> assign timestamps, the Kinesis server-side timestamp (the time which
> Kinesis received the record) is attached to the record as default, not
> Flink’s ingestion time.
>
> Does this answer your question?
>
> Regards,
> Gordon
>
>
> On August 23, 2016 at 6:42:26 PM, Sameer W (sam...@axiomine.com) wrote:
>
> Hi,
>
> If you do not explicitly assign timestamps and watermarks when using Event
> Time, does it automatically default to using Ingestion Time?
>
> I was reading the Kinesis integration section and came across the note
> below and which raised the above question. I saw another place where you
> explicitly use Event Time with ingestion time with the following - .
> assignTimestampsAndWatermarks(new IngestionTimeExtractor<>());.
>
> Does that line have to called explicitly or is it the default?
>
>
> "If streaming topologies choose to use the event time notion
> <https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/event_time.html>
>  for
> record timestamps, an *approximate arrival timestamp* will be used by
> default. This timestamp is attached to records by Kinesis once they were
> successfully received and stored by streams. Note that this timestamp is
> typically referred to as a Kinesis server-side timestamp, and there are no
> guarantees about the accuracy or order correctness (i.e., the timestamps
> may not always be ascending)."
>
> Thanks,
> Sameer
>
>


Re: Threading Model for Kinesis

2016-08-23 Thread Sameer W
Thanks Gordon - Appreciate the fast response.

Sameer

On Tue, Aug 23, 2016 at 7:17 AM, Tzu-Li (Gordon) Tai <tzuli...@gmail.com>
wrote:

> Hi!
>
> Kinesis shards should be ideally evenly assigned to the source instances.
> So, with your example of source parallelism of 10 and 20 shards, each
> source instance will have 2 shards and will have 2 threads consuming them
> (therefore, not in round robin).
>
> For the Kafka consumer, in the source instances there will be one
> consuming thread per broker, instead of partition. So, if a source instance
> is assigned partitions that happen to be on the same broker, the source
> instance will only create 1 thread to consume all of them.
>
> You are correct that currently the Kafka consumer does not handle
> repartitioning transparently like the Kinesis connector, but we’re working
> on this :)
>
> Regards,
> Gordon
>
> On August 23, 2016 at 6:50:31 PM, Sameer W (sam...@axiomine.com) wrote:
>
> Hi,
>
> The documentation says that there will be one thread per shard. If I my
> streaming job runs with a parallelism of 10 and there are 20 shards, are
> more threads going to be launched within  a task slot running a source
> function to consume the additional shards or will one source function
> instance consume 2 shards in round robin.
>
> Is it any different for Kafka? Based on the documentation my understanding
> is that if there are 10 source function instances and 20 partitions, each
> one will read 2 partitions.
>
> Also if partitions are added to Kafka are they handled by the existing
> streaming job or does it need to be restarted? It appears as though Kinesis
> handles it via the consumer constantly checking for more shards.
>
> Thanks,
> Sameer
>
>


Threading Model for Kinesis

2016-08-23 Thread Sameer W
Hi,

The documentation says that there will be one thread per shard. If I my
streaming job runs with a parallelism of 10 and there are 20 shards, are
more threads going to be launched within  a task slot running a source
function to consume the additional shards or will one source function
instance consume 2 shards in round robin.

Is it any different for Kafka? Based on the documentation my understanding
is that if there are 10 source function instances and 20 partitions, each
one will read 2 partitions.

Also if partitions are added to Kafka are they handled by the existing
streaming job or does it need to be restarted? It appears as though Kinesis
handles it via the consumer constantly checking for more shards.

Thanks,
Sameer


Default timestamps for Event Time when no Watermark Assigner used?

2016-08-23 Thread Sameer W
Hi,

If you do not explicitly assign timestamps and watermarks when using Event
Time, does it automatically default to using Ingestion Time?

I was reading the Kinesis integration section and came across the note
below and which raised the above question. I saw another place where you
explicitly use Event Time with ingestion time with the following
- .assignTimestampsAndWatermarks(new IngestionTimeExtractor<>());.

Does that line have to called explicitly or is it the default?


"If streaming topologies choose to use the event time notion

for
record timestamps, an *approximate arrival timestamp* will be used by
default. This timestamp is attached to records by Kinesis once they were
successfully received and stored by streams. Note that this timestamp is
typically referred to as a Kinesis server-side timestamp, and there are no
guarantees about the accuracy or order correctness (i.e., the timestamps
may not always be ascending)."

Thanks,
Sameer


Re: counting elements in datastream

2016-08-18 Thread Sameer W
Use Count windows and keep emitting results say every 1000 elements and do
a sum. Or do without windows something like this which has the disadvantage
that it emits a new updated result for each new element (not a good thing
if your volume is high)-

https://github.com/sameeraxiomine/flinkinaction/blob/master/flinkinactionjava/src/main/java/com/manning/fia/c02/SimpleStreamingWordCount.java

Or use tumbling time windows on processing time -
 
https://github.com/sameeraxiomine/flinkinaction/blob/master/flinkinactionjava/src/main/java/com/manning/fia/c04/TimeWindowExample.java
.
Advantage over count windows is that you get a count every few (configured
seconds) which you can then add up on your client side.

Since you do not need a keyBy operation you would do this directly on the
DataStream instance without doing a keyBy but that way you get multiple
counts per partition of the stream which you will need to add up.





On Thu, Aug 18, 2016 at 5:54 AM, subash basnet  wrote:

> Hello all,
>
> If anyone had idea, what could be the probable way to count the elements
> of a current instance of the datastream. Is it possible?
>
> DataStream> pointsWithGridCoordinates;
>
>
>
> Regards,
> Subash Basnet
>


Re: Multiple Partitions (Source Functions) -> Event Time -> Watermarks -> Trigger

2016-08-12 Thread Sameer W
Thanks Max -

I will advance watermarks when no event arrives for a while.  But when
using Kafka is it a good practice to assign events to partitions randomly
instead say device id or region id where the devices are located. What I
noticed is if devices sending to one of the partitions stop sending
information, the pipeline completely freezes unless I manually keep moving
the watermark.

But the problem with sending events to random partitions is that when the
devices come back online, they send events which are now registered as late
events and the windows fire one element at a time.

Thanks,
Sameer

On Fri, Aug 12, 2016 at 4:41 AM, Maximilian Michels <m...@apache.org> wrote:

> Hi Sameer,
>
> If you use Event Time you should make sure to assign Watermarks and
> Timestamps at the source. As you already observed, Flink may get stuck
> otherwise because it waits for Watermarks to progress in time.
>
> There is no timeout for windows. However, you can implement that logic
> in your Watermark generation function.
>
> You're already using
> DataStream#assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks
> assigner)
>
> Your assigner has a `getCurrentWatermark()` method. This is called
> every ExecutionConfig#getAutoWatermarkInterval() milliseconds. You can
> set this via ExecutionConfig#setAutoWatermarkInterval(long
> milliseconds).
>
> In your assigner, simply create a field to keep track of the last time
> you emitted a Watermark. If you haven't emitted a Watermark for some
> time, you can kick off a timeout and emit a Watermark.
>
> Cheers,
> Max
>
> On Thu, Aug 11, 2016 at 1:05 AM, Sameer W <sam...@axiomine.com> wrote:
> > Sorry for replying to my own messages but this is super confusing and
> > logical at the same time to me :-).
> >
> > If I have Kafka Topic with 10 partitions. If I partition by device id
> when I
> > write to the Topic, and use Event Time, my pipeline freezes (if fewer
> than
> > 10 devices are active initially). Because if some partitions are inactive
> > (only a few devices active at a time) they do not send watermarks and my
> > pipeline waits forever for those partitions to send in their watermarks
> even
> > if the keyBy is on the device id whose records are going to come from
> only
> > one partition.
> >
> > When I send records to Kafka randomly (to any partition) the pipeline
> works
> > fine as all partitions (sources connected to them) are sending
> watermarks.
> >
> > This gets even more confusing if I apply watermarks and timestamps
> > downstream after a KeyBy operation which is again followed by another
> keyBy
> > which does not receive events for a key from all the upstream operators.
> > Again nothing fires as Flink expects other map operators (to which the
> > watermark assignment is piped) to send in the watermarks as well.
> >
> > My conclusion: Only produce watermarks at the source function. Is this
> valid
> > or am I missing something? Because only when I do that (and random
> > allocation of events to partitions in Kafka) the whole pipeline works
> > reliably.
> >
> > If there a way to set a timeout - If watermarks from source functions are
> > not received within a certain time interval, fire the time windows.
> >
> > Thanks,
> > Sameer
> >
> >
> >
> >
> > On Wed, Aug 10, 2016 at 3:27 PM, Sameer W <sam...@axiomine.com> wrote:
> >>
> >> And this is happening in my local environment. As soon as I set the
> >> parallelism to 1 it all works fine.
> >>
> >> Sameer
> >>
> >> On Wed, Aug 10, 2016 at 3:11 PM, Sameer W <sam...@axiomine.com> wrote:
> >>>
> >>> Hi,
> >>>
> >>> I am noticing this behavior with Event Time processing-
> >>>
> >>> I have a Kafka topic with 10 partitions. Each Event Source sends data
> to
> >>> any one of the partitions. Say I have only 1 event source active at
> this
> >>> moment, which means only one partition is receiving data.
> >>>
> >>> None of my windows will fire now because the 9 partitions (source
> >>> function instances) are not sending any watermarks and Flink waits
> forever.
> >>>
> >>> I go to topic with 1 partition but leave default parallelism intact.
> Only
> >>> one Mapper instance contributes to the subsequent keyBy operation but
> other
> >>> 7 (assuming 8 of default parallelism) are idle. I assign watermarks
> after
> >>> the map function. Again the same behavior because the 7 other mappers
> are
> >>> not sending watermarks.
> >>>
> >>> How do I handle this? Not all of my partitions are going to be
> receiving
> >>> data at all times using this partitioning strategy. Or I have to use
> random
> >>> partitioning which will also work.
> >>>
> >>> Thanks,
> >>> Sameer
> >>
> >>
> >
>


Re: Does Flink DataStreams using combiners?

2016-08-11 Thread Sameer W
Sorry I mean streaming cannot use combiners (repeated below)
---
Streaming cannot use combiners. The aggregations happen on the trigger.

The elements being aggregated are only known after the trigger delivers the
elements to the evaluation function.

Since windows can overlap and even assignment to a window is not done until
the elements arrive at the sum operator in your case, combiner cannot know
what to pre aggregate even if were available.

On Thu, Aug 11, 2016 at 9:22 PM, Sameer Wadkar  wrote:

> Streaming cannot use windows. The aggregations happen on the trigger.
>
> The elements being aggregated are only known after the trigger delivers
> the elements to the evaluation function.
>
> Since windows can overlap and even assignment to a window is not done
> until the elements arrive at the sum operator in your case, combiner cannot
> know what to pre aggregate even if were available.
>
>
>
> > On Aug 11, 2016, at 8:51 PM, Elias Levy 
> wrote:
> >
> > I am wondering if Flink makes use of combiners to pre-reduce a keyed and
> windowed stream before shuffling the data among workers.
> >
> > I.e. will it use a combiner in something like:
> >
> > stream.flatMap {...}
> >   .assignTimestampsAndWatermarks(...)
> >   .keyBy(...)
> >   .timeWindow(...)
> >   .trigger(...)
> >   .sum("cnt")
> >
> > or will it shuffle the keyed input before the sum reduction?
> >
> > If it does make use of combiners, it would be useful to point this out
> in the documentation, particularly if it only applies to certain types of
> reducers, folds, etc.
>


Re: Multiple Partitions (Source Functions) -> Event Time -> Watermarks -> Trigger

2016-08-10 Thread Sameer W
Sorry for replying to my own messages but this is super confusing and
logical at the same time to me :-).

If I have Kafka Topic with 10 partitions. If I partition by device id when
I write to the Topic, and use Event Time, my pipeline freezes (if fewer
than 10 devices are active initially). Because if some partitions are
inactive (only a few devices active at a time) they do not send watermarks
and my pipeline waits forever for those partitions to send in their
watermarks even if the keyBy is on the device id whose records are going to
come from only one partition.

When I send records to Kafka randomly (to any partition) the pipeline works
fine as all partitions (sources connected to them) are sending watermarks.

This gets even more confusing if I apply watermarks and timestamps
downstream after a KeyBy operation which is again followed by another keyBy
which does not receive events for a key from all the upstream operators.
Again nothing fires as Flink expects other map operators (to which the
watermark assignment is piped) to send in the watermarks as well.

My conclusion: Only produce watermarks at the source function. Is this
valid or am I missing something? Because only when I do that (and random
allocation of events to partitions in Kafka) the whole pipeline works
reliably.

*If there a way to set a timeout - If watermarks from source functions are
not received within a certain time interval, fire the time windows.*

Thanks,
Sameer




On Wed, Aug 10, 2016 at 3:27 PM, Sameer W <sam...@axiomine.com> wrote:

> And this is happening in my local environment. As soon as I set the
> parallelism to 1 it all works fine.
>
> Sameer
>
> On Wed, Aug 10, 2016 at 3:11 PM, Sameer W <sam...@axiomine.com> wrote:
>
>> Hi,
>>
>> I am noticing this behavior with Event Time processing-
>>
>> I have a Kafka topic with 10 partitions. Each Event Source sends data to
>> any one of the partitions. Say I have only 1 event source active at this
>> moment, which means only one partition is receiving data.
>>
>> None of my windows will fire now because the 9 partitions (source
>> function instances) are not sending any watermarks and Flink waits forever.
>>
>> I go to topic with 1 partition but leave default parallelism intact. Only
>> one Mapper instance contributes to the subsequent keyBy operation but other
>> 7 (assuming 8 of default parallelism) are idle. I assign watermarks after
>> the map function. Again the same behavior because the 7 other mappers are
>> not sending watermarks.
>>
>> How do I handle this? Not all of my partitions are going to be receiving
>> data at all times using this partitioning strategy. Or I have to use random
>> partitioning which will also work.
>>
>> Thanks,
>> Sameer
>>
>
>


Re: Flink : CEP processing

2016-08-10 Thread Sameer W
Mans,

I think at this time we need someone who knows the internal implementation
to answer definitively-

My understanding is-

1. Internally CEP is like a map operator with session-like semantics
operating in a pipeline. You could do what it does but you would have to
implement all that. If you need support for negation today that is probably
how you would do it.
2. Ultimately CEP produces a stream which you need to write to some sink.
If you sink supports exactly-once semantics then your pipeline will support
it. So I think snapshotting with CEP would be no different. If you send out
events (alerts) from within your select(PatternSelectFunction) then yes,
you could "send" your alerts multiple times. If instead you wrote to a sink
(with exactly once semantics) which then sent alerts out in the real world
you should not get those multiple alerts. I am sending alerts from within
my PatternSelectFunction. So I am taking the chance of sending alerts twice
which is ok for my use-case.

I am operating under the belief (which seems logical to me) that CEP is
like a stateful map operator at end of my processing pipeline. Snapshotting
would work exactly like it would in that case in CEP.

Thanks,
Sameer


On Wed, Aug 10, 2016 at 2:42 PM, M Singh <mans2si...@yahoo.com> wrote:

> Thanks for the pointers Sameer.
>
>
> The reason I wanted to find out about snapshotting with CEP is because I
> thought that CEP state might also be snapshotted for recovery. If that is
> the case, then there are events in the CEP might be in two snapshots.
>
> Mans
>
>
> On Tuesday, August 9, 2016 1:15 PM, Sameer W <sam...@axiomine.com> wrote:
>
>
> In one of the earlier thread Till explained this to me (
> http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/CEP-and-Within-Clause-td8159.html)
>
> 1. Within does not use time windows. It sort of uses session windows where
> the session begins when the first event of the pattern is identified. The
> timer starts when the "first" event in the pattern fires. If the pattern
> completes "within" the designated times (meaning the "next" and "followed
> by" fire as will "within" the time specified) you have a match or else the
> window is removed. I don't know how it is implemented but I doubt it stores
> all the events in memory for the "within" window (there is not need to). It
> will only store the relevant events (first, next, followed by, etc). So
> memory would not be an issue here. If two "first" type events are
> identified I think two "within" sessions are created.
>
> 2. Snapshotting (I don't know much in this area so I cannot answer). Why
> should it be different though? You are using operators and state. It should
> work the same way. But I am not too familiar with that.
>
> 3. The "Within" window is not an issue. Even the window preceding that
> should not be unless you are using WindowFunction (more memory friendly
> alternative is https://ci.apache.org/ projects/flink/flink-docs-
> master/apis/streaming/windows. html#window-functions
> <https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/windows.html#window-functions>
>  ) by
> themselves and using a really large window
>
> 4. The way I am using it, it is working fine. Some of the limitations I
> have seen are related to this paper not being fully implemented 
> (https://people.cs.umass.edu/
> ~yanlei/publications/sase- sigmod08.pdf
> <https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf>). I
> don't know how to support negation in an event stream but I don't need it
> for now.
>
> Thanks,
> Sameer
>
>
> On Tue, Aug 9, 2016 at 3:45 PM, M Singh <mans2si...@yahoo.com> wrote:
>
> Hi Sameer:
>
> If we use a within window for event series -
>
> 1. Does it interfere with the default time windows ?
> 2. How does it affect snapshotting ?
> 3. If the window is too large are the events stored in a "processor" for
> the window to expire ?
> 4. Are there any other know limitations and best practices of using CEP
> with Flink ?
>
> Thanks again for your help.
>
>
>
> On Tuesday, August 9, 2016 11:29 AM, Sameer Wadkar <sam...@axiomine.com>
> wrote:
>
>
> In that case you need to get them into one stream somehow (keyBy a dummy
> value for example). There is always some logical key to keyBy on when data
> is arriving from multiple sources (ex some portion of the time stamp).
>
> You are looking for patterns within something (events happening around the
> same time but arriving from multiple devices). That something should be the
> key. That's how I am using it.
>
> Sameer
>
> Sent from my iPhone
>
>

Within interval for CEP - Wall Clock based or Event Timestamp based?

2016-08-10 Thread Sameer W
Hi,

I am using EventTime but when the records get into the CEP PatternStream
does the WITHIN interval refer to the wall clock time or the timestamps
embedded in the event stream?

If I provide WITHIN(Time.Seconds(5)) and in processing time I am getting
events with timestamps in the range of 10 seconds (due to upstream emits),
are all those events considered part of the pattern of their timestamps
matter?

Thanks,
Sameer


Re: Multiple Partitions (Source Functions) -> Event Time -> Watermarks -> Trigger

2016-08-10 Thread Sameer W
And this is happening in my local environment. As soon as I set the
parallelism to 1 it all works fine.

Sameer

On Wed, Aug 10, 2016 at 3:11 PM, Sameer W <sam...@axiomine.com> wrote:

> Hi,
>
> I am noticing this behavior with Event Time processing-
>
> I have a Kafka topic with 10 partitions. Each Event Source sends data to
> any one of the partitions. Say I have only 1 event source active at this
> moment, which means only one partition is receiving data.
>
> None of my windows will fire now because the 9 partitions (source function
> instances) are not sending any watermarks and Flink waits forever.
>
> I go to topic with 1 partition but leave default parallelism intact. Only
> one Mapper instance contributes to the subsequent keyBy operation but other
> 7 (assuming 8 of default parallelism) are idle. I assign watermarks after
> the map function. Again the same behavior because the 7 other mappers are
> not sending watermarks.
>
> How do I handle this? Not all of my partitions are going to be receiving
> data at all times using this partitioning strategy. Or I have to use random
> partitioning which will also work.
>
> Thanks,
> Sameer
>


Multiple Partitions (Source Functions) -> Event Time -> Watermarks -> Trigger

2016-08-10 Thread Sameer W
Hi,

I am noticing this behavior with Event Time processing-

I have a Kafka topic with 10 partitions. Each Event Source sends data to
any one of the partitions. Say I have only 1 event source active at this
moment, which means only one partition is receiving data.

None of my windows will fire now because the 9 partitions (source function
instances) are not sending any watermarks and Flink waits forever.

I go to topic with 1 partition but leave default parallelism intact. Only
one Mapper instance contributes to the subsequent keyBy operation but other
7 (assuming 8 of default parallelism) are idle. I assign watermarks after
the map function. Again the same behavior because the 7 other mappers are
not sending watermarks.

How do I handle this? Not all of my partitions are going to be receiving
data at all times using this partitioning strategy. Or I have to use random
partitioning which will also work.

Thanks,
Sameer


Connected Streams - Controlling Order of arrival on the two streams

2016-08-09 Thread Sameer W
Hi,

I am using connected streams to send rules coded as JavaScript functions on
one stream and event data on another stream. They are both keyed by the
device id. The rules are cached in the co-map operation until another rule
arrives to override existing rule.

Is there a way to ensure that the rules stream arrives before the event
data stream. I am assuming there is no guarantee for this and I cache the
event data is the rules have not yet arrived and process and clear the
cache when the rules arrive. The rules are expected to arrive before the
event data. I am only using this method as a precautionary measure in case
the rules arrive late for reasons unrelated to when they were sent.

Is there a way to handle this situation without caching the streams?


Thanks,
Sameer


Re: Flink : CEP processing

2016-08-09 Thread Sameer W
In one of the earlier thread Till explained this to me (
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/CEP-and-Within-Clause-td8159.html
)

1. Within does not use time windows. It sort of uses session windows where
the session begins when the first event of the pattern is identified. The
timer starts when the "first" event in the pattern fires. If the pattern
completes "within" the designated times (meaning the "next" and "followed
by" fire as will "within" the time specified) you have a match or else the
window is removed. I don't know how it is implemented but I doubt it stores
all the events in memory for the "within" window (there is not need to). It
will only store the relevant events (first, next, followed by, etc). So
memory would not be an issue here. If two "first" type events are
identified I think two "within" sessions are created.

2. Snapshotting (I don't know much in this area so I cannot answer). Why
should it be different though? You are using operators and state. It should
work the same way. But I am not too familiar with that.

3. The "Within" window is not an issue. Even the window preceding that
should not be unless you are using WindowFunction (more memory friendly
alternative is https://ci.apache.org/projects/flink/flink-docs-
master/apis/streaming/windows.html#window-functions ) by themselves and
using a really large window

4. The way I am using it, it is working fine. Some of the limitations I
have seen are related to this paper not being fully implemented (
https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf). I
don't know how to support negation in an event stream but I don't need it
for now.

Thanks,
Sameer


On Tue, Aug 9, 2016 at 3:45 PM, M Singh <mans2si...@yahoo.com> wrote:

> Hi Sameer:
>
> If we use a within window for event series -
>
> 1. Does it interfere with the default time windows ?
> 2. How does it affect snapshotting ?
> 3. If the window is too large are the events stored in a "processor" for
> the window to expire ?
> 4. Are there any other know limitations and best practices of using CEP
> with Flink ?
>
> Thanks again for your help.
>
>
>
> On Tuesday, August 9, 2016 11:29 AM, Sameer Wadkar <sam...@axiomine.com>
> wrote:
>
>
> In that case you need to get them into one stream somehow (keyBy a dummy
> value for example). There is always some logical key to keyBy on when data
> is arriving from multiple sources (ex some portion of the time stamp).
>
> You are looking for patterns within something (events happening around the
> same time but arriving from multiple devices). That something should be the
> key. That's how I am using it.
>
> Sameer
>
> Sent from my iPhone
>
> On Aug 9, 2016, at 1:40 PM, M Singh <mans2si...@yahoo.com> wrote:
>
> Thanks Sameer.
>
> So does that mean that if the events keys are not same we cannot use the
> CEP pattern match ?  What if events are coming from different sources and
> need to be correlated ?
>
> Mans
>
>
> On Tuesday, August 9, 2016 9:40 AM, Sameer W <sam...@axiomine.com> wrote:
>
>
> Hi,
>
> You will need to use keyBy operation first to get all the events you need
> monitored in a pattern on the same node. Only then can you apply Pattern
> because it depends on the order of the events (first, next, followed by). I
> even had to make sure that the events were correctly sorted by timestamps
> to ensure that the first,next and followed by works correctly.
>
> Sameer
>
> On Tue, Aug 9, 2016 at 12:17 PM, M Singh <mans2si...@yahoo.com> wrote:
>
> Hey Folks:
>
> I have a question about CEP processing in Flink - How does flink
> processing work when we have multiple partitions in which the events used
> in the pattern sequence might be scattered across multiple partitions on
> multiple nodes ?
>
> Thanks for your insight.
>
> Mans
>
>
>
>
>
>
>


Re: Flink : CEP processing

2016-08-09 Thread Sameer W
Hi,

You will need to use keyBy operation first to get all the events you need
monitored in a pattern on the same node. Only then can you apply Pattern
because it depends on the order of the events (first, next, followed by). I
even had to make sure that the events were correctly sorted by timestamps
to ensure that the first,next and followed by works correctly.

Sameer

On Tue, Aug 9, 2016 at 12:17 PM, M Singh  wrote:

> Hey Folks:
>
> I have a question about CEP processing in Flink - How does flink
> processing work when we have multiple partitions in which the events used
> in the pattern sequence might be scattered across multiple partitions on
> multiple nodes ?
>
> Thanks for your insight.
>
> Mans
>


Re: Having a single copy of an object read in a RichMapFunction

2016-08-04 Thread Sameer W
Theodore,

Broadcast variables do that when using the DataSet API -
http://data-artisans.com/how-to-factorize-a-700-gb-matrix-with-apache-flink/

See the following lines in the article-
To support the above presented algorithm efficiently we had to improve
Flink’s broadcasting mechanism since it easily becomes the bottleneck of
the implementation. The enhanced Flink version can share broadcast
variables among multiple tasks running on the same machine. *Sharing avoids
having to keep for each task an individual copy of the broadcasted variable
on the heap. This increases the memory efficiency significantly, especially
if the broadcasted variables can grow up to several GBs of size.*

If you are using in the DataStream API then side-inputs (not yet
implemented) would achieve the same as broadcast variables.  (
https://docs.google.com/document/d/1hIgxi2Zchww_5fWUHLoYiXwSBXjv-M5eOv-MKQYN3m4/edit#)
. I use keyed Connected Streams in situation where I need them for one of
my use-cases (propagating rule changes to the data) where I could have used
side-inputs.

Sameer




On Thu, Aug 4, 2016 at 8:56 PM, Theodore Vasiloudis <
theodoros.vasilou...@gmail.com> wrote:

> Hello all,
>
> for a prototype we are looking into we would like to read a big matrix
> from HDFS, and for every element that comes in a stream of vectors do on
> multiplication with the matrix. The matrix should fit in the memory of one
> machine.
>
> We can read in the matrix using a RichMapFunction, but that would mean
> that a copy of the matrix is made for each Task Slot AFAIK, if the
> RichMapFunction is instantiated once per Task Slot.
>
> So I'm wondering how should we try address this problem, is it possible to
> have just one copy of the object in memory per TM?
>
> As a follow-up if we have more than one TM per node, is it possible to
> share memory between them? My guess is that we have to look at some
> external store for that.
>
> Cheers,
> Theo
>


Re: CEP and Within Clause

2016-08-02 Thread Sameer W
Thank you-  It is very clear now.

Sameer

On Tue, Aug 2, 2016 at 10:29 AM, Till Rohrmann <till.rohrm...@gmail.com>
wrote:

> The CEP operator maintains for each pattern a window length. This means
> that every starting event will set its own timeout value.
>
> So if T=51 arrives in the 11th minute, then it depends whether the second
> T=31 arrived sometime between the 1st and 11th minute. If that's the case,
> then you should also see a second matching.
>
> Cheers,
> Till
>
> On Tue, Aug 2, 2016 at 10:20 PM, Sameer W <sam...@axiomine.com> wrote:
>
>> Thanks Till,
>>
>> In that case if I have a pattern -
>> First = T > 30
>> Followed By = T > 50
>> Within 10 minutes
>>
>> If I get the following sequence of events within 10 minutes
>> T=31, T=51, T=31, T=51
>>
>> I assume the alert will fire twice now.
>>
>> But what happens if the last T=51 arrives in the 11th minute. If the
>> partially matched pattern is discarded after 10 minutes how will the system
>> detect T=51. Or do you mean that that timer (for the within clause) is
>> reset each time the patter T>30 matches. In that case it would fire!
>>
>> Thanks,
>> Sameer
>>
>> On Tue, Aug 2, 2016 at 10:02 AM, Till Rohrmann <trohrm...@apache.org>
>> wrote:
>>
>>> Hi Sameer,
>>>
>>> the within clause of CEP uses neither tumbling nor sliding windows. It
>>> is more like a session window which is started whenever an element which
>>> matches the starting condition arrives. As long as new events which fulfill
>>> the pattern definition arrive within the length of the window, they will be
>>> added. If the pattern should not be completed within the specified time
>>> interval, the partially matched pattern will be discarded. If you've
>>> specified a timeout handler, then the timeout handler is called with the
>>> partial pattern.
>>>
>>> At the moment, there is no way to re-insert elements in the upstream.
>>> Actually there is also no need for it because the CEP operator will detect
>>> the alert patterns if there are two temperature readings > 150 within 6
>>> seconds.
>>>
>>> Cheers,
>>> Till
>>>
>>>
>>>
>>> On Tue, Aug 2, 2016 at 5:12 AM, Aljoscha Krettek <aljos...@apache.org>
>>> wrote:
>>>
>>>> +Till, looping him in directly, he probably missed this because he was
>>>> away for a while.
>>>>
>>>>
>>>>
>>>> On Tue, 26 Jul 2016 at 18:21 Sameer W <sam...@axiomine.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> It looks like the WithIn clause of CEP uses Tumbling Windows. I could
>>>>> get it to use Sliding windows by using an upstream pipeline which uses
>>>>> Sliding Windows and produces repeating elements (in each sliding window)
>>>>> and applying a Watermark assigner on the resulting stream with elements
>>>>> duplicated. I wanted to use the "followedBy" pattern where there is a
>>>>> strong need for sliding windows.
>>>>>
>>>>> Is there a plan to add sliding windows to the within clause at some
>>>>> point?
>>>>>
>>>>> The PatternStream class's "select" and "flatSelect" have overloaded
>>>>> versions which take PatternTimeOut variable. Is there a way to insert some
>>>>> of those elements back to the front of the stream. Say I am trying to find
>>>>> a pattern where two temperature readings >150 within 6 second window 
>>>>> should
>>>>> raise an alert. If only one was found, can I insert that one back in the
>>>>> front of the stream on that task node (for that window pane) so that I can
>>>>> find a pattern match in the events occurring in the next 6 seconds. If I
>>>>> can do that, I don't need sliding windows. Else I cannot avoid using them
>>>>> for such scenarios.
>>>>>
>>>>> Thanks,
>>>>> Sameer
>>>>>
>>>>
>>>
>>
>


Re: CEP and Within Clause

2016-08-02 Thread Sameer W
Thanks Till,

In that case if I have a pattern -
First = T > 30
Followed By = T > 50
Within 10 minutes

If I get the following sequence of events within 10 minutes
T=31, T=51, T=31, T=51

I assume the alert will fire twice now.

But what happens if the last T=51 arrives in the 11th minute. If the
partially matched pattern is discarded after 10 minutes how will the system
detect T=51. Or do you mean that that timer (for the within clause) is
reset each time the patter T>30 matches. In that case it would fire!

Thanks,
Sameer

On Tue, Aug 2, 2016 at 10:02 AM, Till Rohrmann <trohrm...@apache.org> wrote:

> Hi Sameer,
>
> the within clause of CEP uses neither tumbling nor sliding windows. It is
> more like a session window which is started whenever an element which
> matches the starting condition arrives. As long as new events which fulfill
> the pattern definition arrive within the length of the window, they will be
> added. If the pattern should not be completed within the specified time
> interval, the partially matched pattern will be discarded. If you've
> specified a timeout handler, then the timeout handler is called with the
> partial pattern.
>
> At the moment, there is no way to re-insert elements in the upstream.
> Actually there is also no need for it because the CEP operator will detect
> the alert patterns if there are two temperature readings > 150 within 6
> seconds.
>
> Cheers,
> Till
>
>
>
> On Tue, Aug 2, 2016 at 5:12 AM, Aljoscha Krettek <aljos...@apache.org>
> wrote:
>
>> +Till, looping him in directly, he probably missed this because he was
>> away for a while.
>>
>>
>>
>> On Tue, 26 Jul 2016 at 18:21 Sameer W <sam...@axiomine.com> wrote:
>>
>>> Hi,
>>>
>>> It looks like the WithIn clause of CEP uses Tumbling Windows. I could
>>> get it to use Sliding windows by using an upstream pipeline which uses
>>> Sliding Windows and produces repeating elements (in each sliding window)
>>> and applying a Watermark assigner on the resulting stream with elements
>>> duplicated. I wanted to use the "followedBy" pattern where there is a
>>> strong need for sliding windows.
>>>
>>> Is there a plan to add sliding windows to the within clause at some
>>> point?
>>>
>>> The PatternStream class's "select" and "flatSelect" have overloaded
>>> versions which take PatternTimeOut variable. Is there a way to insert some
>>> of those elements back to the front of the stream. Say I am trying to find
>>> a pattern where two temperature readings >150 within 6 second window should
>>> raise an alert. If only one was found, can I insert that one back in the
>>> front of the stream on that task node (for that window pane) so that I can
>>> find a pattern match in the events occurring in the next 6 seconds. If I
>>> can do that, I don't need sliding windows. Else I cannot avoid using them
>>> for such scenarios.
>>>
>>> Thanks,
>>> Sameer
>>>
>>
>


CEP and Within Clause

2016-07-26 Thread Sameer W
Hi,

It looks like the WithIn clause of CEP uses Tumbling Windows. I could get
it to use Sliding windows by using an upstream pipeline which uses Sliding
Windows and produces repeating elements (in each sliding window) and
applying a Watermark assigner on the resulting stream with elements
duplicated. I wanted to use the "followedBy" pattern where there is a
strong need for sliding windows.

Is there a plan to add sliding windows to the within clause at some point?

The PatternStream class's "select" and "flatSelect" have overloaded
versions which take PatternTimeOut variable. Is there a way to insert some
of those elements back to the front of the stream. Say I am trying to find
a pattern where two temperature readings >150 within 6 second window should
raise an alert. If only one was found, can I insert that one back in the
front of the stream on that task node (for that window pane) so that I can
find a pattern match in the events occurring in the next 6 seconds. If I
can do that, I don't need sliding windows. Else I cannot avoid using them
for such scenarios.

Thanks,
Sameer


Re: Question about Checkpoint Storage (RocksDB)

2016-07-26 Thread Sameer W
Thanks Ufuk,

That was very helpful. But that raised a few more questions :-):

1. Calling clear() on the KV state is only possible for snapshots right? Do
you control that for checkpoints too.

2. Assuming that the user has no control over the checkpoint process
outside of controlling the checkpoint interval , when is the RocksDB
cleared of the operator state for checkpoints after they are long past. It
seems like there are only two checkpoints that are really necessary to
maintain, the current one and the previous one for restore. Does Flink
clean up checkpoints on a timer? When it does clean up checkpoints does it
also clean up the state backend (I am assuming they are different).

3. The pre-aggregating windows was very helpful as the WindowFunction is
now passed the pre-aggregated state. For windows, are the Reduce and Fold
functions called on each element event before the window is triggered. I
can see how that would work where the pre-compute is done per element but
the actual output is emitted only when the window is fired. But that is
only possible if there are no Evictors defined on the window? Also how are
the elements fed to the Reduce/Fold function. Is it like MapReduce where
even if you are using a Iterator, in reality all the values for a key are
not buffered into memory? Which ties back to how is RocksDB is used to
store a large window state before it is triggered. If my elements are
accumulating in a window (serving a ReduceFunction) does it spill to disk
(RocksDB?) when a threshold size is reached?

Thanks,
Sameer



On Tue, Jul 26, 2016 at 7:29 AM, Ufuk Celebi <u...@apache.org> wrote:

> On Mon, Jul 25, 2016 at 8:50 PM, Sameer W <sam...@axiomine.com> wrote:
> > The question is, if using really long windows (in hours) if the state of
> the
> > window gets very large over time, would size of the RocksDB get larger?
> > Would replication to HDFS start causing performance bottlenecks? Also
> would
> > this need a constant (at checkpoint interval?), read from RocksDB, add
> more
> > window elements and write to RocksDB.
>
> Yes. The size of the RocksDB instance is directly correlated with the
> number of K/V state pairs you store. You can remove state by calling
> `clear()` on the KvState instance.
>
> All state updates go directly to RocksDB and snapshots copy the DB
> files (semi-async mode, current default) or iterate-and-copy all K/V
> pairs (fully-async mode). No records are deleted automatically after
> snapshots.
>
> Snapshotting large RocksDB instances will cause some slow down, but
> you can trade this cost off by adjusting the checkpointing interval.
> There are plans to do the snapshots in an incremental fashion in order
> to lower the costs for this, but there is no design doc available for
> it at this point.
>
> > Outside of the read costs, is there a risk to having very long windows
> when
> > you know you could collect a lot of elements in them. Instead is it
> safer to
> > perform aggregations on top of aggregations or use your own custom remote
> > store like HBase to persist larger state per record and use windows only
> to
> > store the keys in HBase. I mention HBase because of its support for
> column
> > qualifiers allow elements to be added to the same key in multiple ordered
> > column qualifiers. Reading can also be throttled in batches of column
> > qualifiers allowing for the better memory consumption. Is this approach
> used
> > in practice?
>
> RocksDB works quite well for large stateful jobs. If possible for your
> use case, I would still recommend work with pre-aggregating window
> functions (
> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/windows.html#window-functions
> )
> or pre-aggregating the data. The I/O costs will correlate with the
> state size, but there is "no risk" in the sense of that it will still
> work as expected.
>
> What you describe with HBase could work, but I'm not aware of someone
> doing this. Furhtermore, depending on your use case, it can cause
> problems in failure scenarios, because you might need to keep HBase
> and Flink state in sync.
>


Question about Checkpoint Storage (RocksDB)

2016-07-25 Thread Sameer W
Hi,

My understanding about the RocksDB state backend is as follows:

When using a RocksDB state backend, it the checkpoints are backed up
locally (to the TaskManager) using the backup feature of RocksDB by taking
snapshots from RocksDB which are consistent read-only views on the RockDB
database. Each checkpoint is backed up on the task manager node and this
checkpoint is asynchronously backed up to the remote HDFS location.  When
each checkpoint is committed, the records are deleted from RocksDB,
allowing RocksDb data folders to remain small. This in turn allows each
snapshot to be relatively small. If the Task node goes away due to failure,
I assume the RocksDB database is restored from the checkpoints from the
remote HDFS. Since each checkpoint state is relatively small, the
restoration time from HDFS for the RocksDB database on the new task node is
relatively small.

The question is, if using really long windows (in hours) if the state of
the window gets very large over time, would size of the RocksDB get larger?
Would replication to HDFS start causing performance bottlenecks? Also would
this need a constant (at checkpoint interval?), read from RocksDB, add more
window elements and write to RocksDB.

Outside of the read costs, is there a risk to having very long windows when
you know you could collect a lot of elements in them. Instead is it safer
to perform aggregations on top of aggregations or use your own custom
remote store like HBase to persist larger state per record and use windows
only to store the keys in HBase. I mention HBase because of its support for
column qualifiers allow elements to be added to the same key in multiple
ordered column qualifiers. Reading can also be throttled in batches of
column qualifiers allowing for the better memory consumption. Is this
approach used in practice?

Thanks,
Sameer


Re: Processing windows in event time order

2016-07-21 Thread Sameer W
Alijoscha - Thanks it works exactly as you said. I found out why my windows
were firing twice. I was making the error of adding the
AutoWatermarkInterval to the existing watermark each time the watermark was
sampled from the source just to fire a window if one of the sources was
delayed substantially.

But doesn't this mean, that if one of the sources stop sending data (device
lost internet connectivity temporarily) , then such a pipeline would just
freeze and windows would keep accumulating on the reduce side as other
sources (except one) would keep sending data  and their watermarks. Isn't
this a risk for a possible Out of Memory Error. Should one always use a
RocksDB alternative to mitigate such risks.

Sameer



On Thu, Jul 21, 2016 at 7:52 AM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> Yes, that is to be expected. Stream 2 should only send the watermark once
> the elements with a timestamp lower than the watermark have been sent as
> well.
>
> On Thu, 21 Jul 2016 at 13:10 Sameer W <sam...@axiomine.com> wrote:
>
>> Thanks, Aljoscha,
>>
>> This what I am seeing when I use Ascending timestamps as watermarks-
>>
>> Consider a window if 1-5 seconds
>> Stream 1- Sends Elements A,B
>>
>> Stream 2 (20 seconds later) - Sends Elements C,D
>>
>> I see Window (1-5) fires first with just A,B. After 20 seconds Window
>> (1-5) fires again but this time with only C,D. If I add a delay where I lag
>> the watermarks by 20 seconds, then only one instance of the Window (1-5)
>> fires with elements A,B,C,D.
>>
>> Sameer
>>
>> On Thu, Jul 21, 2016 at 5:17 AM, Aljoscha Krettek <aljos...@apache.org>
>> wrote:
>>
>>> Hi David,
>>> windows are being processed in order of their end timestamp. So if you
>>> specify an allowed lateness of zero (which will only be possible on Flink
>>> 1.1 or by using a custom trigger) you should be able to sort the elements.
>>> The ordering is only valid within one key, though, since windows for
>>> different keys with the same end timestamp will be processed in an
>>> arbitrary order.
>>>
>>> @Sameer If both sources emit watermarks that are correct for the
>>> elements that they are emitting the Trigger should only fire when both
>>> sources progressed their watermarks sufficiently far. Could you maybe give
>>> a more detailed example of the problem that you described?
>>>
>>> Cheers,
>>> Aljoscha
>>>
>>>
>>> On Thu, 21 Jul 2016 at 04:03 Sameer Wadkar <sam...@axiomine.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> If watermarks arriving from multiple sources, how long does the Event
>>>> Time Trigger wait for the slower source to send its watermarks before
>>>> triggering only from the faster source? I have seen that if one of the
>>>> sources is really slow then the elements of the faster source fires and
>>>> when the elements arrive from the slower source, the same window fires
>>>> again with the new elements only. I can work around this by adding delays
>>>> but does merging watermarks require that both have arrived by the time the
>>>> watermarks progress to the point where a window can be triggered? Is
>>>> applying a delay in the watermark the only way to solve this.
>>>>
>>>> Sameer
>>>>
>>>> Sent from my iPhone
>>>>
>>>> On Jul 20, 2016, at 9:41 PM, Vishnu Viswanath <
>>>> vishnu.viswanat...@gmail.com> wrote:
>>>>
>>>> Hi David,
>>>>
>>>> You are right, the events in the window are not sorted according to the
>>>> EventTime hence the processing is not done in an increasing order of
>>>> timestamp.
>>>> As you said, you will have to do the sorting yourself in your window
>>>> function to make sure that you are processing the events in order.
>>>>
>>>> What Flink does is (when EventTime is set and timestamp is assigned),
>>>> it will assign the elements to the Windows based on the EventTime, which
>>>> otherwise (if using ProcessingTime) might have ended up in a different
>>>> Window. (as per the ProcessingTime).
>>>>
>>>> This is as per my limited knowledge, other Flink experts can correct me
>>>> if this is wrong.
>>>>
>>>> Thanks,
>>>> Vishnu
>>>>
>>>> On Wed, Jul 20, 2016 at 9:30 PM, David Desberg <david.desb...@uber.com>
>>>> wrote:
>>>>
>>>>> Hi all

Re: Processing windows in event time order

2016-07-21 Thread Sameer W
Stream2 does send watermarks only after it sees elements C,D. It send the
watermark (5) 20 seconds after Stream 1 sends it.

>From what I understand Flink merges watermarks from both streams on the
Reduce side. But does it wait a certain pre-configured amount of time (for
watermarks from both streams to arrive) before it finally fires the first
stream.



On Thu, Jul 21, 2016 at 7:52 AM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> Yes, that is to be expected. Stream 2 should only send the watermark once
> the elements with a timestamp lower than the watermark have been sent as
> well.
>
> On Thu, 21 Jul 2016 at 13:10 Sameer W <sam...@axiomine.com> wrote:
>
>> Thanks, Aljoscha,
>>
>> This what I am seeing when I use Ascending timestamps as watermarks-
>>
>> Consider a window if 1-5 seconds
>> Stream 1- Sends Elements A,B
>>
>> Stream 2 (20 seconds later) - Sends Elements C,D
>>
>> I see Window (1-5) fires first with just A,B. After 20 seconds Window
>> (1-5) fires again but this time with only C,D. If I add a delay where I lag
>> the watermarks by 20 seconds, then only one instance of the Window (1-5)
>> fires with elements A,B,C,D.
>>
>> Sameer
>>
>> On Thu, Jul 21, 2016 at 5:17 AM, Aljoscha Krettek <aljos...@apache.org>
>> wrote:
>>
>>> Hi David,
>>> windows are being processed in order of their end timestamp. So if you
>>> specify an allowed lateness of zero (which will only be possible on Flink
>>> 1.1 or by using a custom trigger) you should be able to sort the elements.
>>> The ordering is only valid within one key, though, since windows for
>>> different keys with the same end timestamp will be processed in an
>>> arbitrary order.
>>>
>>> @Sameer If both sources emit watermarks that are correct for the
>>> elements that they are emitting the Trigger should only fire when both
>>> sources progressed their watermarks sufficiently far. Could you maybe give
>>> a more detailed example of the problem that you described?
>>>
>>> Cheers,
>>> Aljoscha
>>>
>>>
>>> On Thu, 21 Jul 2016 at 04:03 Sameer Wadkar <sam...@axiomine.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> If watermarks arriving from multiple sources, how long does the Event
>>>> Time Trigger wait for the slower source to send its watermarks before
>>>> triggering only from the faster source? I have seen that if one of the
>>>> sources is really slow then the elements of the faster source fires and
>>>> when the elements arrive from the slower source, the same window fires
>>>> again with the new elements only. I can work around this by adding delays
>>>> but does merging watermarks require that both have arrived by the time the
>>>> watermarks progress to the point where a window can be triggered? Is
>>>> applying a delay in the watermark the only way to solve this.
>>>>
>>>> Sameer
>>>>
>>>> Sent from my iPhone
>>>>
>>>> On Jul 20, 2016, at 9:41 PM, Vishnu Viswanath <
>>>> vishnu.viswanat...@gmail.com> wrote:
>>>>
>>>> Hi David,
>>>>
>>>> You are right, the events in the window are not sorted according to the
>>>> EventTime hence the processing is not done in an increasing order of
>>>> timestamp.
>>>> As you said, you will have to do the sorting yourself in your window
>>>> function to make sure that you are processing the events in order.
>>>>
>>>> What Flink does is (when EventTime is set and timestamp is assigned),
>>>> it will assign the elements to the Windows based on the EventTime, which
>>>> otherwise (if using ProcessingTime) might have ended up in a different
>>>> Window. (as per the ProcessingTime).
>>>>
>>>> This is as per my limited knowledge, other Flink experts can correct me
>>>> if this is wrong.
>>>>
>>>> Thanks,
>>>> Vishnu
>>>>
>>>> On Wed, Jul 20, 2016 at 9:30 PM, David Desberg <david.desb...@uber.com>
>>>> wrote:
>>>>
>>>>> Hi all,
>>>>>
>>>>> In Flink, after setting the time characteristic to event time and
>>>>> properly assigning timestamps/watermarks, time-based windows will be
>>>>> created based upon event time. If we need to process events within a 
>>>>> window
>>>>> in event time order, we can sort the windowed values and process as
>>>>> necessary by applying a WindowFunction. However, as I understand it, there
>>>>> is no guarantee that time-based windows will be processed in time order. 
>>>>> Is
>>>>> this correct? Or, if we assume a watermarking system that (for example's
>>>>> sake) does not allow any late events, is there a way within Flink to
>>>>> guarantee that windows will be processed (via an applied WindowFunction) 
>>>>> in
>>>>> strictly increasing time order?
>>>>>
>>>>> If necessary, I can provide a more concrete explanation of what I
>>>>> mean/am looking for.
>>>>>
>>>>> Thanks!
>>>>> David
>>>>
>>>>
>>>>
>>


Re: Aggregate events in time window

2016-07-19 Thread Sameer W
How about using EventTime windows with watermark assignment and bounded
delays. That way you allow more than 5 minutes (bounded delay) for your
request and responses to arrive. Do you have a way to assign timestamp to
the responses based on the request timestamp (does the response contain the
request timestamp in some form). That way you add them to the same window.

Sameer

On Tue, Jul 19, 2016 at 12:31 PM, Dominique Rondé <
dominique.ro...@allsecur.de> wrote:

> Hi all,
>
> once again I need a "kick" to the right direction. I have a datastream
> with request and responses identified by an ReqResp-ID. I like to calculate
> the (avg, 95%, 99%) time between the request and response and also like to
> count them. I thought of
> ".keyBy("ReqRespID").timeWindowAll(Time.minutes(5)).apply(function)" would
> do the job, but there are some cases were a Request is in the first and the
> Response is in the second window. But if i use a overlapping time window
> (i.e. timeWindowAll(Time.minutes(5),Time.seconds(60))) I have a lot of
> requests more then one time in the apply-function.
>
> Do you have any hint for me?
>
> Thanks a lot!
>
> Dominique
>
>


Re: Can't access Flink Dashboard at 8081, running Flink program using Eclipse

2016-07-19 Thread Sameer W
Yes you have to provide the path of your jar. The reason is:
1. When you start in the pseudo-cluster mode the tasks are started in their
own JVM's with their own class loader.
2. You client program has access to your custom operator classes but the
remote JVM's don't. Hence you need to ship the JAR file to these remote
Task nodes. The getRemoteExcecutionEnvironment() method has overloaded
version which takes a JAR file. Just provide your local path to it and it
will ship it when it starts

Sameer

On Tue, Jul 19, 2016 at 6:51 AM, Biplob Biswas <revolutioni...@gmail.com>
wrote:

> Hi Sameer,
>
> Thanks for that quick reply, I was using flink streaming so the program
> keeps on running until i close it. But anyway I am ready to try this
> getRemoteExecutionEnvironment(), I checked but it ask me for the jar file,
> which is weird because I am running the program directly.
>
> Does it mean I create a jar package and then run it via eclipse?
>
> If not, could you point me to some resources?
>
> Thanks
> Biplob
>
>
> Sameer W wrote
> > From Eclipse it creates a local environment and runs in the IDE. When the
> > program finishes so does the Flink execution instance. I have never tried
> > accessing the console when the program is running but one the program is
> > finished there is nothing to connect to.
> >
> > If you need to access the dashboard, start Flink in the pseudo-cluster
> > mode
> > and connect to it using the getRemoteExecutionEnvironment(). That will
> > allow you to access the jobs statuses on the dashboard when you finish
> > running your job.
> >
> > Sameer
> >
> > On Tue, Jul 19, 2016 at 6:35 AM, Biplob Biswas 
>
> > revolutionisme@
>
> > 
> > wrote:
> >
> >> Hi,
> >>
> >> I am running my flink program using Eclipse and I can't access the
> >> dashboard
> >> at http://localhost:8081, can someone help me with this?
> >>
> >> I read that I need to check my flink-conf.yaml, but its a maven project
> >> and
> >> I don't have a flink-conf.
> >>
> >> Any help would be really appreciated.
> >>
> >> Thanks a lot
> >> Biplob
> >>
> >>
> >>
> >> --
> >> View this message in context:
> >>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Can-t-access-Flink-Dashboard-at-8081-running-Flink-program-using-Eclipse-tp8016.html
> >> Sent from the Apache Flink User Mailing List archive. mailing list
> >> archive
> >> at Nabble.com.
> >>
>
>
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Can-t-access-Flink-Dashboard-at-8081-running-Flink-program-using-Eclipse-tp8016p8018.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Re: Can't access Flink Dashboard at 8081, running Flink program using Eclipse

2016-07-19 Thread Sameer W
>From Eclipse it creates a local environment and runs in the IDE. When the
program finishes so does the Flink execution instance. I have never tried
accessing the console when the program is running but one the program is
finished there is nothing to connect to.

If you need to access the dashboard, start Flink in the pseudo-cluster mode
and connect to it using the getRemoteExecutionEnvironment(). That will
allow you to access the jobs statuses on the dashboard when you finish
running your job.

Sameer

On Tue, Jul 19, 2016 at 6:35 AM, Biplob Biswas 
wrote:

> Hi,
>
> I am running my flink program using Eclipse and I can't access the
> dashboard
> at http://localhost:8081, can someone help me with this?
>
> I read that I need to check my flink-conf.yaml, but its a maven project and
> I don't have a flink-conf.
>
> Any help would be really appreciated.
>
> Thanks a lot
> Biplob
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Can-t-access-Flink-Dashboard-at-8081-running-Flink-program-using-Eclipse-tp8016.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>