Re: How do I ensure binary comparisons are being used?

2017-01-02 Thread ljwagerfield
Any insights on this?

Thanks,
Lawrence



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-do-I-ensure-binary-comparisons-are-being-used-tp10806p10819.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Set Parallelism and keyBy

2017-01-02 Thread Jamie Grier
Domink,

This should work just as you expect.  Maybe the output of the print is just
misleading you.  The print() operation will still have a parallelism of two
but the flatMap() with have a parallelism of 16 and all data elements with
the same key will get routed to the same host.

Any sequence of keyBy().flatMap() will always properly partition the data
across the instances of the flatMap() operator by key.

-Jamie


On Mon, Dec 26, 2016 at 10:52 AM, Dominik Bruhn  wrote:

> Hey,
> I have a flink job which has a default parallelism set to 2. I want to key
> the stream and then apply some flatMap on the keyed stream. The flatMap
> operation is quiet costly, so I want to have a much higher parallelism here
> (lets say 16). Additionally, it is important that the flatMap operation is
> executed for the same key always in the same process or in the same task.
>
> I have the following code:
>
> 
> env.setParallelism(2)
> val input: DataStream[Event] = /* from somewhere */
> input.keyBy(_.eventType).flatMap(new ExpensiveOperation()).print()
> 
>
> This works fine, and the "ExpensiveOperation" is executed always on the
> same tasks for the same keys.
>
> Now I tried two things:
>
> 1.
> 
> env.setParallelism(2)
> val input: DataStream[Event] = /* from somewhere */
> input.keyBy(_.eventType).setParallelism(16).flatMap(new
> ExpensiveOperation()).print()
> 
> This fails with an exception because I can't set the parallelism on the
> keyBy operator.
>
> 2.
> -
> env.setParallelism(2)
> val input: DataStream[Event] = /* from somewhere */
> input.keyBy(_.eventType).flatMap(new ExpensiveOperation()).setParal
> lelism(16).print()
> -
> While this executes, it breaks the assignment of the keys to the tasks:
> The "ExpensiveOperation" is now not executed on the same nodes anymore all
> the time (visible by the prefixes in the print()).
>
> What am I doing wrong? Is the only chance to set the whole parallelism of
> the whole flink job to 16?
>
> Thanks, have nice holidays,
> Dominik
>



-- 

Jamie Grier
data Artisans, Director of Applications Engineering
@jamiegrier 
ja...@data-artisans.com


Re: Flink streaming questions

2017-01-02 Thread Jamie Grier
Hi Henri,

#1 - This is by design.  Event time advances with the slowest input
source.  If there are input sources that generate no data this is
indistinguishable from a slow source.  Kafka topics where some partitions
receive no data are a problem in this regard -- but there isn't a simple
solution.  If possible I would address it at the source.

#2 - If it's possible to run these init functions just once when you submit
the job you can run them in the constructor of your FoldFunction.  This
init will then happen exactly once (on the client) and the constructed
FoldFunction is then serialized and distributed around the cluster.  If
this doesn't work because you need something truly dynamic you could also
accomplish this with a simple local variable in your function.

class MyFoldFunction extends FoldFunction {
>   private var initialized = false
>   def fold(accumulator: T, value: O): T = {
> if(!initialized){
>   doInitStuff()
>   initialized = true
> }
>
> doNormalStuff()
>   }
> }


#3 - One way to do this is as you've said which is to attach the profile
information to the event, using a mapper, before it enters the window
operations.


On Mon, Jan 2, 2017 at 1:25 AM, Henri Heiskanen 
wrote:

> Hi,
>
> I have few questions related to Flink streaming. I am on 1.2-SNAPSHOT and
> what I would like to accomplish is to have a stream that reads data from
> multiple kafka topics, identifies user sessions, uses an external user user
> profile to enrich the data, evaluates an script to produce session
> aggregates and then create updated profiles from session aggregates. I am
> working with high volume data and user sessions may be long, so using
> generic window apply might not work. Below is the simplification of the
> stream.
>
> stream = createKafkaStreams(...);
> env.setParallelism(4);
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> stream
> .keyBy(2)
> .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
> .fold(new SessionData(), new SessionFold(), new
> ProfilerApply())
> .print();
>
> The questions:
>
> 1. Initially when I used event time windowing I could not get any of my
> windows to close. The reason seemed to be that I had 6 partitions in my
> test kafka setup and only 4 of them generated traffic. If I used
> parallelism above 4, then no windows were closed. Is this by design or a
> defect? We use flink-connector-kafka-0.10 because earlier versions did not
> commit the offsets correctly.
>
> 2. Rich fold functions are not supported. However I would like execute a
> piece of custom script in the fold function that requires initialisation
> part. I would have used the open and close lifecycle methods of rich
> functions but they are not available now in fold. What would be the
> preferred way to run some initialisation routines (and closing the
> gracefully) when using fold?
>
> 3. Kind of related to above. I would also like to fetch a user profile
> from external source in the beginning of the session. What would be a best
> practice for that kind of operation? If I would be using the generic window
> apply I could fetch in in the beginning of the apply method. I was thinking
> of introducing a mapper that fetches this profiler periodically and caches
> it to flink state. However, with this setup I would not be able to tie this
> to user sessions identified for windows.
>
> 4. I also may have an additional requirement of writing out each event
> enriched with current session and profile data. I basically could do this
> again with generic window function and write out each event with collector
> when iterating, but would there be a better pattern to use? Maybe sharing
> state with functions or something.
>
> Br,
> Henri H
>



-- 

Jamie Grier
data Artisans, Director of Applications Engineering
@jamiegrier 
ja...@data-artisans.com


Re: Programmatically get live values of accumulators

2017-01-02 Thread Jamie Grier
Hi Gwenhael,

I think what you actually want is to use the Apache Flink metrics
interface.  See the following:
https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/metrics.html

Sending metrics to StatsD is supported out-of-the-box.

-Jamie


On Mon, Jan 2, 2017 at 1:34 AM, Gwenhael Pasquiers <
gwenhael.pasqui...@ericsson.com> wrote:

> Hi, and best wishes for the year to come J
>
>
>
> I’d like to be able to programmatically get the (live) values of
> accumulators in order to send them using a statsd (or another) client in
> the JobManager of a yarn-deployed application. I say live because I’d like
> to use that in streaming (24/7) applications, and send live stats, I cannot
> way for the application to end.
>
>
>
> I’ve seen that there is a json API (I’d prefer no to have my app poll
> itself).
>
> I’ve seen some code on github (tests files) where it’s done using the
> underlying akka framework, I don’t mind doing it the same way and creating
> an actor to get notifications messages, but I don’t know the best way, and
> there probably is a better one.
>
>
>
> Thanks in advance,
>
>
>
> Gwenhaël PASQUIERS
>



-- 

Jamie Grier
data Artisans, Director of Applications Engineering
@jamiegrier 
ja...@data-artisans.com


Re: Hi, There is possibly an issue with EventTimeSessionWindows where a gap is specified for considering items in the same session. Here the logic is, if two adjacent items have a difference in event

2017-01-02 Thread Jamie Grier
If there is never a gap between elements larger than the session gap -- the
window never ending would be the correct behavior.  So, if this is the case
with some data stream I would not suggest to use session windows at all --
or I would use a smaller session gap.

Another alternative would be to use Session Windows along with a
user-defined trigger that fires periodically whether the session has ended
or not.  For example, if in the normal case the session window logic works
well but sometimes you want to force an eval in case a "natural" session is
too long.

-Jamie

On Mon, Jan 2, 2017 at 3:11 AM, Sujit Sakre 
wrote:

> Hi,
>
> We are using Flink 1.1.4 version.
>
>
> There is possibly an issue with EventTimeSessionWindows where a gap is
> specified for considering items in the same session. Here the logic is, if
> two adjacent items have a difference in event timestamps of more than the
> gap then the items are considered to be in separate session. The issue is,
> what happens if the gap between streaming records is *never*  (or for a
> very long time) less than the session gap. This is likely to lead to a race
> condition.
>
> Is this a bug? How do we deal with this to process windows in finite time?
>
> Please could you suggest.
>
> Thanks.
>
>
> *Sujit Sakre*
>
> Senior Technical Architect
> Tel: +91 22 6660 6600
> Ext:
> 247
> Direct: 6740 5247
>
> Mobile: +91 98672 01204
>
> www.rave-tech.com
>
>
>
> Follow us on: Twitter  / LinkedIn
>  / YouTube
> 
>
>
>
> Rave Technologies – A Northgate Public Services Company
> 
>
>
>
> Please consider the environment before printing this email
>
> This email is sent on behalf of Northgate Public Services (UK) Limited and
> its associated companies including Rave Technologies (India) Pvt Limited
> (together "Northgate Public Services") and is strictly confidential and
> intended solely for the addressee(s).
> If you are not the intended recipient of this email you must: (i) not
> disclose, copy or distribute its contents to any other person nor use its
> contents in any way or you may be acting unlawfully;  (ii) contact
> Northgate Public Services immediately on +44(0)1908 264500
> <+44%201908%20264500> quoting the name of the sender and the addressee
> then delete it from your system.
> Northgate Public Services has taken reasonable precautions to ensure that
> no viruses are contained in this email, but does not accept any
> responsibility once this email has been transmitted.  You should scan
> attachments (if any) for viruses.
>
> Northgate Public Services (UK) Limited, registered in England and Wales
> under number 00968498 with a registered address of Peoplebuilding 2,
> Peoplebuilding Estate, Maylands Avenue, Hemel Hempstead, Hertfordshire, HP2
> 4NN.  Rave Technologies (India) Pvt Limited, registered in India under
> number 117068 with a registered address of 2nd Floor, Ballard House, Adi
> Marzban Marg, Ballard Estate, Mumbai, Maharashtra, India, 41.
>



-- 

Jamie Grier
data Artisans, Director of Applications Engineering
@jamiegrier 
ja...@data-artisans.com


1.1.4 IntelliJ Problem

2017-01-02 Thread Stephan Epping
Hi,

I am getting this error running my tests with 1.1.4 inside intellij ide.

java.lang.NoSuchMethodError: 
org.apache.flink.runtime.jobmanager.JobManager$.startJobManagerActors(Lorg/apache/flink/configuration/Configuration;Lakka/actor/ActorSystem;Lscala/Option;Lscala/Option;Ljava/lang/Class;Ljava/lang/Class;)Lscala/Tuple2;

at 
org.apache.flink.test.util.ForkableFlinkMiniCluster.startJobManager(ForkableFlinkMiniCluster.scala:103)
at 
org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$1.apply(FlinkMiniCluster.scala:292)
at 
org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$1.apply(FlinkMiniCluster.scala:286)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.Range.foreach(Range.scala:141)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at 
org.apache.flink.runtime.minicluster.FlinkMiniCluster.start(FlinkMiniCluster.scala:286)
at 
org.apache.flink.runtime.minicluster.FlinkMiniCluster.start(FlinkMiniCluster.scala:277)
at 
org.apache.flink.test.util.ForkableFlinkMiniCluster.start(ForkableFlinkMiniCluster.scala:255)
at 
org.apache.flink.test.util.TestBaseUtils.startCluster(TestBaseUtils.java:152)
at 
org.apache.flink.test.util.TestBaseUtils.startCluster(TestBaseUtils.java:126)
at 
org.flinkspector.datastream.DataStreamTestEnvironment.createTestEnvironment(DataStreamTestEnvironment.java:72)

Any ideas?

best,
Stephan



Re: Events are assigned to wrong window

2017-01-02 Thread Nico
Hi Aljoscha,

thank you for having a look. Actually there is not too much code based on
timestamps:

stream
  .keyBy("id")
  .map(...)
  .filter(...)
  .map(...)
  .keyBy("areaID")
  .map(new KeyExtractor())
  .keyBy("f1.areaID","f0.sinterval")
  .window(TumblingEventTimeWindows.of(Time.seconds(20)))
  .apply(new TrafficInformation());

The map functions only enrich the data and don't change anything related to
the timestamp.

the apply function is:

@Override
public void apply(
Tuple key,
TimeWindow timeWindow,
Iterable> cars,
Collector> out) throws Exception {

System.out.println("Start: " +timeWindow.getStart());
System.out.println("End: " +timeWindow.getEnd());

for(Tuple2 t : cars){
System.out.println(t.f1);
}

System.out.println(t.f1) prints all information about a car, in which the
timestep is embedded. The System gets the timestamp with the class:

public class TimestampGenerator extends
BoundedOutOfOrdernessTimestampExtractor  {


public TimestampGenerator(Time maxOutOfOrderness){
super(maxOutOfOrderness);
}

@Override
public long extractTimestamp(Car car) {
return car.getTimestamp();
}


Example output is presented in the previous post... it looks like the
timestamp is rounded... I am confused :-/

Best,
Nico

2016-12-23 19:41 GMT+01:00 Aljoscha Krettek :

> Hi,
> could you please share code (and example data) for producing this output.
> I'd like to have a look.
>
> Cheers,
> Aljoscha
>
> On Wed, 21 Dec 2016 at 16:29 Nico  wrote:
>
>> Hi @all,
>>
>> I am using a TumblingEventTimeWindows.of(Time.seconds(20)) for testing.
>> During this I found a strange behavior (at least for me) in the assignment
>> of events.
>>
>> The first element of a new window is actually always part of the old
>> window. I thought the events are late, but then they they would be dropped
>> instead of assigned to the new window. Even with a allowedLateness of 10s
>> the behavior remains the same.
>>
>> The used timeWindow.getStart() and getEnd in order to get the boundaries
>> of the window.
>>
>> Can someone explain this?
>>
>> Best,
>> Nico
>>
>>
>> TimeWindows with Elements:
>>
>> Start: 148233294 - End: 148233296
>> timestamp=1482332952907
>>
>> Start: 148233296 - End: 148233298
>> timestamp=1482332958929
>> timestamp=1482332963995
>> timestamp=1482332969027
>> timestamp=1482332974039
>>
>> Start: 148233298 - End: 148233300
>> timestamp=1482332979059
>> timestamp=1482332984072
>> timestamp=1482332989081
>> timestamp=1482332994089
>>
>> Start: 148233300 - End: 148233302
>> timestamp=1482332999113
>> timestamp=1482333004123
>> timestamp=1482333009132
>> timestamp=1482333014144
>>
>


Hi, There is possibly an issue with EventTimeSessionWindows where a gap is specified for considering items in the same session. Here the logic is, if two adjacent items have a difference in event time

2017-01-02 Thread Sujit Sakre
Hi,

We are using Flink 1.1.4 version.


There is possibly an issue with EventTimeSessionWindows where a gap is
specified for considering items in the same session. Here the logic is, if
two adjacent items have a difference in event timestamps of more than the
gap then the items are considered to be in separate session. The issue is,
what happens if the gap between streaming records is *never*  (or for a
very long time) less than the session gap. This is likely to lead to a race
condition.

Is this a bug? How do we deal with this to process windows in finite time?

Please could you suggest.

Thanks.


*Sujit Sakre*

Senior Technical Architect
Tel: +91 22 6660 6600
Ext:
247
Direct: 6740 5247

Mobile: +91 98672 01204

www.rave-tech.com



Follow us on: Twitter  / LinkedIn
 / YouTube




Rave Technologies – A Northgate Public Services Company




Please consider the environment before printing this email

-- 
This email is sent on behalf of Northgate Public Services (UK) Limited and 
its associated companies including Rave Technologies (India) Pvt Limited 
(together "Northgate Public Services") and is strictly confidential and 
intended solely for the addressee(s). 
If you are not the intended recipient of this email you must: (i) not 
disclose, copy or distribute its contents to any other person nor use its 
contents in any way or you may be acting unlawfully;  (ii) contact 
Northgate Public Services immediately on +44(0)1908 264500 quoting the name 
of the sender and the addressee then delete it from your system.
Northgate Public Services has taken reasonable precautions to ensure that 
no viruses are contained in this email, but does not accept any 
responsibility once this email has been transmitted.  You should scan 
attachments (if any) for viruses.

Northgate Public Services (UK) Limited, registered in England and Wales 
under number 00968498 with a registered address of Peoplebuilding 2, 
Peoplebuilding Estate, Maylands Avenue, Hemel Hempstead, Hertfordshire, HP2 
4NN.  Rave Technologies (India) Pvt Limited, registered in India under 
number 117068 with a registered address of 2nd Floor, Ballard House, Adi 
Marzban Marg, Ballard Estate, Mumbai, Maharashtra, India, 41.


Programmatically get live values of accumulators

2017-01-02 Thread Gwenhael Pasquiers
Hi, and best wishes for the year to come :)

I'd like to be able to programmatically get the (live) values of accumulators 
in order to send them using a statsd (or another) client in the JobManager of a 
yarn-deployed application. I say live because I'd like to use that in streaming 
(24/7) applications, and send live stats, I cannot way for the application to 
end.

I've seen that there is a json API (I'd prefer no to have my app poll itself).
I've seen some code on github (tests files) where it's done using the 
underlying akka framework, I don't mind doing it the same way and creating an 
actor to get notifications messages, but I don't know the best way, and there 
probably is a better one.

Thanks in advance,

Gwenhaël PASQUIERS


Flink streaming questions

2017-01-02 Thread Henri Heiskanen
Hi,

I have few questions related to Flink streaming. I am on 1.2-SNAPSHOT and
what I would like to accomplish is to have a stream that reads data from
multiple kafka topics, identifies user sessions, uses an external user user
profile to enrich the data, evaluates an script to produce session
aggregates and then create updated profiles from session aggregates. I am
working with high volume data and user sessions may be long, so using
generic window apply might not work. Below is the simplification of the
stream.

stream = createKafkaStreams(...);
env.setParallelism(4);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
stream
.keyBy(2)
.window(EventTimeSessionWindows.withGap(Time.minutes(10)))
.fold(new SessionData(), new SessionFold(), new
ProfilerApply())
.print();

The questions:

1. Initially when I used event time windowing I could not get any of my
windows to close. The reason seemed to be that I had 6 partitions in my
test kafka setup and only 4 of them generated traffic. If I used
parallelism above 4, then no windows were closed. Is this by design or a
defect? We use flink-connector-kafka-0.10 because earlier versions did not
commit the offsets correctly.

2. Rich fold functions are not supported. However I would like execute a
piece of custom script in the fold function that requires initialisation
part. I would have used the open and close lifecycle methods of rich
functions but they are not available now in fold. What would be the
preferred way to run some initialisation routines (and closing the
gracefully) when using fold?

3. Kind of related to above. I would also like to fetch a user profile from
external source in the beginning of the session. What would be a best
practice for that kind of operation? If I would be using the generic window
apply I could fetch in in the beginning of the apply method. I was thinking
of introducing a mapper that fetches this profiler periodically and caches
it to flink state. However, with this setup I would not be able to tie this
to user sessions identified for windows.

4. I also may have an additional requirement of writing out each event
enriched with current session and profile data. I basically could do this
again with generic window function and write out each event with collector
when iterating, but would there be a better pattern to use? Maybe sharing
state with functions or something.

Br,
Henri H