.keyBy() on ConnectedStream

2017-01-26 Thread Matt
Hi all,

What's the purpose of .keyBy() on ConnectedStream? How does it affect
.map() and .flatMap()?

I'm not finding a way to group stream elements based on a key, something
like a Window on a normal Stream, but for a ConnectedStream.

Regards,
Matt


Re: Flink snapshotting to S3 - Timeout waiting for connection from pool

2017-01-26 Thread Shannon Carey
Haha, I see. Thanks.




On 1/26/17, 1:48 PM, "Chen Qin"  wrote:

>We worked around S3 and had a beer with our Hadoop engineers...
>
>
>
>--
>View this message in context: 
>http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-snapshotting-to-S3-Timeout-waiting-for-connection-from-pool-tp10994p11330.html
>Sent from the Apache Flink User Mailing List archive. mailing list archive at 
>Nabble.com.
>


Proper ways to write iterative DataSets with dependencies

2017-01-26 Thread Li Peng
Hi there, I just started investigating Flink and I'm curious if I'm
approaching my issue in the right way.

My current usecase is modeling a series of transformations, where I
start with some transformations, which when done can yield another
transformation, or a result to output to some sink, or a Join
operation that will extract data from some other data set and combine
it with existing data (and output a transformation that should be
processed like any other transform). The transformations and results
are easy to deal with, but joins are more troubling.

Here is my current solution that I got to work:

//initialSolution contains all external data to join on.
initialSolution.iterateDelta(initialWorkset, 1, Array("id")) {
  (solution: DataSet[Work[String]], workset: DataSet[Work[String]]) => {
//handle joins separately
val joined = handleJoins(solution, workset.filter(isJoin(_)))

   //transformations are handled separately as well
val transformed = handleTransformations(workset.filter(isTransform(_)))

val nextWorkSet = transformed.filter(isWork(_)).union(joined)
val solutionUpdate = transformed.filter(isResult(_))
(solutionUpdate, nextWorkSet)
  }
}

My questions are:

1. Is this the right way to use Flink? Based on the documentation
(correct me if I'm wrong) it seems that in the iterative case the
external data (to be used in the join) should be in the solution
DataSet, so if this usecase has multiple external data sources to join
on, they are all collected in the initial solution DataSet. Would
having all of this different data in the solution have bad
repercussions for partitioning/performance?
2. Doing the joins as part of the iteration seems a bit wrong to me (I
might just be thinking about the issue in the wrong way). I
alternatively tried to model this approach as a series of DataStreams,
where the code is pretty much the same as above, but where the
iteration occurs on stream T, which splits off to two streams J and R,
where R is just the result sink, and J has the logic that joins
incoming data, and after the join sends the result back to stream T.
But I didn't see a good way to say "send result of J back to T, and
run all the standard iterative logic on that" using the data stream
API. I could manually create some endpoints for these streams to hit
and thus achieve this behavior, but is there an easy way I'm missing
that can achieve this via the flink api?

Thanks,
Li


Re: Issues while restarting a job on HA cluster

2017-01-26 Thread ani.desh1512
Hi Robert,
Thanks for the answer. 
My code does actually contain both mapr streams and maprdb jars. here are
the steps I followed based on your suggestion:
1. I copied only the mapr-streams-*.jar and maprdb*.jar.
2. Then I tried to run my jar, but i got java.lang.noclassdeffounderror for
some maprfs class.
3. I added maprfs*.jar to lib and tried submitting my jar again. 
4. This time I got java.lang.noclassdeffounderror for some hadoopfs class.
5. At this point I just created a sym link in lib folder to point to the
mapr lib folder, basically entailing that ALL the mapr related jars will be
deployed into the system classloader.
6. This previous step did the trick and I was able to get my job running.
Also, I have not yet encountered the error that I had earlier mentioned,
once I cancelled and resubmitted the job.

My only question is: Is this the expected behavior and normal solution? Do
we really need to add ALL the jars? I can possibly nitpick which jar to copy
by using dependency tree, but to do that for all the jobs feels cumbersome.
  



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Issues-while-restarting-a-job-on-HA-cluster-tp11294p11332.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Dummy DataStream

2017-01-26 Thread Duck
I have a project where i am reading in on a single DataStream from Kafka, then 
sending to a variable number of handlers based on content of the recieved data, 
after that i want to join them all. Since i do not know how many different 
streams this will create, i cannot have a single "base" to performa a Join 
operation on. So my question is, can i create a "dummy / empty" 
DataStream to use as a join basis?

Example:
1) DataStream all = ..
2) Create a List myList;
3) Then i split the "all" datastream based on content, and add each stream to 
"myList"
4) I now parse each of the different streams
5) I now want to join my list of streams, "myList" to a DataStream 
all_joined_again;


/Duck

Re: Flink snapshotting to S3 - Timeout waiting for connection from pool

2017-01-26 Thread Chen Qin
We worked around S3 and had a beer with our Hadoop engineers...



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-snapshotting-to-S3-Timeout-waiting-for-connection-from-pool-tp10994p11330.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Many operations cause StackOverflowError with AWS EMR YARN cluster

2017-01-26 Thread Geoffrey Mon
Hello Chesnay,

Thanks for the advice. I've begun adding multiple jobs per Python plan file
here: https://issues.apache.org/jira/browse/FLINK-5183 and
https://github.com/GEOFBOT/flink/tree/FLINK-5183

The functionality of the patch works. I am able to run multiple jobs per
file successfully, but the process doesn't exit once the jobs are done.
This is because the main issue that I am encountering is that although I
added a check for PythonPlanBinder to check for more jobs from the Python
process until the Python process exits, there is a race condition where
Python process usually exits after Java checks to see if it is still
running. Therefore, unless you use a debugger to pause the Java process
until the Python process exits or some other phenomenon happens where
Python exits fast enough, the Java process thinks that the Python process
is still alive and will end up waiting indefinitely for more Python jobs.

Another one minor comment I had with my own patch was that I used global
variables to keep track of the number of execution environments and to
differentiate between different environments. Is there a better way to do
this?

Thanks!

Cheers,
Geoffrey

On Wed, Nov 23, 2016 at 5:41 AM, Chesnay Schepler 
wrote:

Hello,

implementing collect() in python is not that trivial and the gain is
questionable. There is an inherent size limit (think 10mb), and it is
a bit at odds with the deployment model of the Python API.

Something easier would be to execute each iteration of the for-loop as a
separate job and save the result in a file.
Note that right now the Pyhton API can't execute multiple jobs from the
same file; we would need some modifications
in the PythonPlanBinder to allow this.

Regards,
Chesnay


On 20.11.2016 23:54, Geoffrey Mon wrote:

Hello,

I know that the reuse of the data set in my plan is causing the problem
(after one dictionary atom is learned using the data set "S", "S" is
updated for use with the next dictionary atom). When I comment out the line
updating the data set "S", I have no problem and the plan processing phase
takes substantially less time.

I assume that this is because updating and reusing "S" makes the graph of
transformations much more complicated and forces the optimizer to do much
more work, since for example the final value of "S" depends on all previous
operations combined. Is there a way to replace the for loop in my plan so
that I don't cause this complication and so that memory usage is
manageable? I considered making "S" an iterative data set, but I need to
save each dictionary atom to a file, and I wouldn't be able to do that if
"S" was iterative and not finalized.

Perhaps I would be able to collect "S" at the end of each dictionary atom
and then make the new "S" directly from these values. This however would
require that "collect" be implemented in the Python API.

In addition, I don't think the problem is YARN-specific anymore because I
have been able to reproduce it on a local machine.

Cheers,
Geoffrey

On Mon, Nov 14, 2016 at 11:38 AM Geoffrey Mon  wrote:

Hi Ufuk,

The master instance of the cluster was also a m3.xlarge instance with 15 GB
RAM, which I would've expected to be enough. I have gotten the program to
run successfully on a personal virtual cluster where each node has 8 GB RAM
and where the master node was also a worker node, so the problem appears to
have something to do with YARN's memory behavior (such as on EMR).

Nevertheless, it would probably be a good idea to modify my code to reduce
its memory usage. When running my code on my local cluster, performance was
probably bottlenecked.

The job does use a for loop to run the core operations for a specific
number of times, specified as a command line parameter. If it helps, here
is my code:
Python: https://github.com/quinngroup/pyflink-r1dl/blob/master/R1DL_Flink.py
(L260
is the core for loop)
Java:
https://github.com/quinngroup/flink-r1dl/blob/master/src/main/java/com/github/quinngroup/R1DL.java
(L120
is the core for loop)
I would expect the join operations to be a big cause of the excessive
memory usage.

Thanks!

Geoffrey


On Mon, Nov 14, 2016 at 5:13 AM Ufuk Celebi  wrote:

The Python API is in alpha state currently, so we would have to check if it
is related specifically to that. Looping in Chesnay who worked on that.

The JVM GC error happens on the client side as that's where the optimizer
runs. How much memory does the client submitting the job have?

How do you compose the job? Do you have nested loops, e.g. for() { ... bulk
iteration Flink program }?

– Ufuk

On 14 November 2016 at 08:02:26, Geoffrey Mon ( 
geof...@gmail.com) wrote:
> Hello all,
>
> I have a pretty complicated plan file using the Flink Python API running
on
> a AWS EMR cluster of m3.xlarge instances using YARN. The plan is for a
> dictionary learning algorithm and has to run a sequence of operations many
> times; each sequence involves bulk 

Re: Events are assigned to wrong window

2017-01-26 Thread Nico
Hi,

can anyone help me with this problem? I don't get it. Forget the examples
below, I've created a copy / paste example to reproduce the problem of
incorrect results when using key-value state und windowOperator.


public class StreamingJob {

public static void main(String[] args) throws Exception {
// set up the streaming execution environment
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

DataStream> stream = env.fromElements(
new Tuple2<>("1",1485446260994L),
new Tuple2<>("1",1485446266012L),
new Tuple2<>("1",1485446271031L),
new Tuple2<>("1",1485446276040L),
new Tuple2<>("1",1485446281045L),
new Tuple2<>("1",1485446286049L),
new Tuple2<>("1",1485446291062L),
new Tuple2<>("1",1485446296066L),
new Tuple2<>("1",1485446302019L)
);

stream
.assignTimestampsAndWatermarks(new
BoundedOutOfOrdernessTimestampExtractor>(Time.seconds(0)) {
@Override
public long extractTimestamp(Tuple2 stringLongTuple2) {
return stringLongTuple2.f1;
}
})
.keyBy("f0")
.map(new MapTest())
.keyBy("f0")
.window(TumblingEventTimeWindows.of(Time.seconds(20)))
.apply(new WindowFunction, Object, Tuple, TimeWindow>()
{
@Override
public void apply(Tuple tuple, TimeWindow timeWindow,
Iterable> iterable, Collector collector)
throws Exception {

Set set = new HashSet<>();
for(Tuple2 t : iterable){
set.add(t.f1);
}

StringBuilder sb = new StringBuilder();

sb.append("Window [" +timeWindow.getStart() +" " +timeWindow.getEnd() +"]
");
sb.append("Set " +set.toString());
System.out.println(sb.toString());
}
})
.print();


// execute program
env.execute("Flink Streaming Java API Skeleton");
}

private static class MapTest extends
RichMapFunction,Tuple2> {

private transient ValueState> state;

@Override
public Tuple2 map(Tuple2 stringLongTuple2)
throws Exception {

Tuple2 t = state.value();

state.update(stringLongTuple2);

if(t == null) return stringLongTuple2;

return t;
}

@Override
public void open(Configuration parameters) throws Exception {

ValueStateDescriptor> vsd = new ValueStateDescriptor<>(
"lastEvent",
TypeInformation.of(new TypeHint>() {}),
null
);

state = getRuntimeContext().getState(vsd);
}
}
}


Output:

Window [148544626 148544628] Set [1485446271031, 1485446260994,
1485446266012]
Window [148544628 148544630] Set [1485446291062, 1485446281045,
1485446286049, 1485446276040]
Window [148544630 148544632] Set [1485446296066]

Best,
Nico

BTW ... I am using Flink 1.1.3.


2017-01-16 12:18 GMT+01:00 Nico :

> Hi Aljoscha,
>
> is was able to identify the root cause of the problem. It is my first map
> function using the ValueState. But first, the assignTimestampsAndWaterma
> rks() is called after the connector to Kafka is generated:
>
> FlinkKafkaConsumer09 carFlinkKafkaConsumer09  =
>   new FlinkKafkaConsumer09<>("Traffic", new Car(), properties);
>
> // Extrahieren der Timestamps mit max. Delay von 2s
> carFlinkKafkaConsumer09.assignTimestampsAndWatermarks(new 
> TimestampGenerator(Time.seconds(0)));
>
> In the map function I try to calculate the direction between two GPS data 
> points. For this, I store the last event in ValueState. The function looks 
> like this:
>
> private static class BearingMap extends RichMapFunction {
>
>private transient ValueState state;
>private final double maxdiff = 12; // in Sekunden
>
>@Override
>public Car map(Car destination) throws Exception {
>
>   Car origin = state.value();
>   double olat, olon, dlat, dlon;
>
>   /**
>*  Wenn State leer, berechne keine Richtung, sondern speichere Event 
> nur in State
>*/
>   if (origin == null){
>  state.update(destination);
>  // gebe Car ohne Aenderungen zurueck
>  return destination;
>   }
>
>   double diff = origin.getTimestamp()-destination.getTimestamp();
>
>System.out.println("Differenz: " +diff);
>
>if(Math.abs(diff) <= maxdiff*1000){
>
>  /*
>   * Bei spaeten Events, die jedoch noch in den erlaubten Delay fallen
>   */
>  if(diff > 0){
> Car tmp = destination;
> destination = origin;
> origin = tmp;
>  }
>
>  /*
>   * Car tmp ist immer der Ursprung
>   */
>
>  double bearing = Helper.calculateBearing(
>
> origin.getLat(),origin.getLon(),destination.getLat(),destination.getLon());
>
>  // Update des State's
>  state.update(destination);
>
>  origin.setDirection(bearing);
>  return origin;
>
>   }
>
>   // Bei zu spaeten Events behalte jetzigen Status und gebe diesen ohne 
> 

start-cluster.sh issue

2017-01-26 Thread Lior Amar
Hi,

I am new here.
My name is Lior and I am working at Parallel Machines.
Was assigned recently to work on Flink run/use/improve :-)

I am using the FLINK_CONF_DIR environment variable to pass the config
location
to the start-cluster.sh (e.g. env
FLINK_CONF_DIR=/tmp/parallelmachine/lior/flink/conf)
The directory contain the generated config for the setup and is avail on
multiple machines.

The issue is that when the start-cluster.sh is running the tasknmanager.sh
(via ssh)
the FLINK_CONF_DIR is not passed via the ssh.
Note: that the taskmanager.sh is making use of this environment variable
(line 96).

Is this a bug?
or expected?

Should I open a Jira?

Regards
--lior


Re: User configuration

2017-01-26 Thread Dmitry Golubets
Yes, thank you Robert!

Best regards,
Dmitry

On Thu, Jan 26, 2017 at 4:55 PM, Robert Metzger  wrote:

> Hi,
> Is this what you are looking for? https://ci.apache.org/
> projects/flink/flink-docs-release-1.2/monitoring/best_
> practices.html#parsing-command-line-arguments-and-
> passing-them-around-in-your-flink-application
>
> On Thu, Jan 26, 2017 at 5:38 PM, Dmitry Golubets 
> wrote:
>
>> Hi,
>>
>> Is there a place for user defined configuration settings?
>> How to read them?
>>
>> Best regards,
>> Dmitry
>>
>
>


Re: User configuration

2017-01-26 Thread Robert Metzger
Hi,
Is this what you are looking for?
https://ci.apache.org/projects/flink/flink-docs-release-1.2/monitoring/best_practices.html#parsing-command-line-arguments-and-passing-them-around-in-your-flink-application

On Thu, Jan 26, 2017 at 5:38 PM, Dmitry Golubets 
wrote:

> Hi,
>
> Is there a place for user defined configuration settings?
> How to read them?
>
> Best regards,
> Dmitry
>


User configuration

2017-01-26 Thread Dmitry Golubets
Hi,

Is there a place for user defined configuration settings?
How to read them?

Best regards,
Dmitry


Re: Flink dependencies shading

2017-01-26 Thread Dmitry Golubets
Hi Robert,

I ended up overriding Flink httpclient version number in main pom file and
recompiling it.

Thanks


Best regards,
Dmitry

On Thu, Jan 26, 2017 at 4:12 PM, Robert Metzger  wrote:

> Hi Dmitry,
>
> I think this issue is new.
> Where is the AWS SDK dependency coming from? Maybe you can resolve the
> issue on your side for now.
> I've filed a JIRA for this issue: https://issues.apache.
> org/jira/browse/FLINK-5661
>
>
>
> On Wed, Jan 25, 2017 at 8:24 PM, Dmitry Golubets 
> wrote:
>
>> I've build latest Flink from sources and it seems that httpclient
>> dependency from flink-mesos is not shaded. It causes troubles with latest
>> AWS SDK.
>> Do I build it wrong or is it a known problem?
>>
>> Best regards,
>> Dmitry
>>
>
>


Re: Flink dependencies shading

2017-01-26 Thread Robert Metzger
Hi Dmitry,

I think this issue is new.
Where is the AWS SDK dependency coming from? Maybe you can resolve the
issue on your side for now.
I've filed a JIRA for this issue:
https://issues.apache.org/jira/browse/FLINK-5661



On Wed, Jan 25, 2017 at 8:24 PM, Dmitry Golubets 
wrote:

> I've build latest Flink from sources and it seems that httpclient
> dependency from flink-mesos is not shaded. It causes troubles with latest
> AWS SDK.
> Do I build it wrong or is it a known problem?
>
> Best regards,
> Dmitry
>


Re: Debugging, logging and measuring operator subtask performance

2017-01-26 Thread Robert Metzger
Hi Dominik,

You could measure the throughput at each task in your job to see if one
operator is causing the slowdown (for example using Flink's metrics system)
Maybe the backpressure view already helps finding the task that causes the
issue.

Did you check if there are enough resources available for the processing
you intend to do. How saturated are the CPUs, disks and the network?

Regards,
Robert



On Wed, Jan 25, 2017 at 7:51 PM, Dominik Safaric 
wrote:

> Hi,
>
> As I am experiencing certain performance degradations in a streaming job,
> I want to determine the root cause of it by measuring subtask performance
> in terms of resource utilisation - e.g. CPU utilisation of the thread.
>
> Is this somehow possible? Does Flink log scheduled and executed threads?
> What approach would you recommend?
>
> Thanks in advance,
> Dominik


Re: Issues while restarting a job on HA cluster

2017-01-26 Thread Robert Metzger
Hi Ani,

This error is independent of cancel vs stop. Its an issue of loading the
MapR classes from the classloaders.

Do you user jars contain any MapR code (either mapr streams or maprdb)?

If so, I would recommend you to put these MapR libraries into the "lib/"
folder of Flink. They'll then be deployed into the system classloader of
the Flink JVMs.

Regards,
Robert


On Wed, Jan 25, 2017 at 5:10 PM, ani.desh1512 
wrote:

> 1. We have a HA cluster of 2 masters and 3 slaves. We run a jar through
> flink
> cli. Then we cancel that running job. Then we do some changes in the source
> code of jar, repackage it and deploy it again and run it again through cli.
> The following error occurs:
>
>
> / java.lang.LinkageError: loader constraint violation: when resolving
> method
> "com.mapr.fs.jni.MapRTableTools.CheckAndReplaceOrDeleteRPC(J
> JLjava/nio/ByteBuffer;[Ljava/nio/ByteBuffer;[I[BLjava/nio/By
> teBuffer;ZZLcom/mapr/fs/jni/MapRUpdateAndGet;)I"
> the class loader (instance of
> org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoader)
> of
> the current class, com/mapr/fs/Inode, and the class loader (instance of
> sun/misc/Launcher$ExtClassLoader) for the method's defining class,
> com/mapr/fs/jni/MapRTableTools, have different Class objects for the type
> com/mapr/fs/jni/MapRUpdateAndGet used in the signature
> at com.mapr.fs.Inode.checkAndReplaceOrDelete(Inode.java:1777)
> at
> com.mapr.fs.MapRHTable.checkAndReplaceOrDelete(MapRHTable.java:799)
> at
> com.mapr.db.impl.MapRDBTableImpl._checkAndReplace(MapRDBTabl
> eImpl.java:1736)
> at
> com.mapr.db.impl.MapRDBTableImpl._insert(MapRDBTableImpl.java:1366)
> at
> com.mapr.db.impl.MapRDBTableImpl.insert(MapRDBTableImpl.java:1332)
> at com.kabbage.utils.OJAIUtils.insert(OJAIUtils.java:93)
> at com.kabbage.flink.MaprdbSink.invoke(MaprdbSink.java:31)
> at com.kabbage.flink.MaprdbSink.invoke(MaprdbSink.java:10)
> at
> org.apache.flink.streaming.api.operators.StreamSink.processE
> lement(StreamSink.java:39)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi
> ngChainingOutput.collect(OperatorChain.java:373)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi
> ngChainingOutput.collect(OperatorChain.java:358)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOpera
> tor$CountingOutput.collect(AbstractStreamOperator.java:346)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOpera
> tor$CountingOutput.collect(AbstractStreamOperator.java:329)
> at
> org.apache.flink.streaming.api.operators.StreamSource$NonTim
> estampContext.collect(StreamSource.java:161)
> at
> org.apache.flink.streaming.connectors.kafka.internals.Abstra
> ctFetcher.emitRecord(AbstractFetcher.java:225)
> at
> org.apache.flink.streaming.connectors.kafka.internal.Kafka09
> Fetcher.run(Kafka09Fetcher.java:253)
> at java.lang.Thread.run(Thread.java:745)
>
> 01/24/2017 19:15:50 Job execution switched to status FAILING.
> java.lang.LinkageError: loader constraint violation: when resolving method
> "com.mapr.fs.jni.MapRTableTools.CheckAndReplaceOrDeleteRPC(J
> JLjava/nio/ByteBuffer;[Ljava/nio/ByteBuffer;[I[BLjava/nio/By
> teBuffer;ZZLcom/mapr/fs/jni/MapRUpdateAndGet;)I"
> the class loader (instance of
> org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoader)
> of
> the current class, com/mapr/fs/Inode, and the class loader (instance of
> sun/misc/Launcher$ExtClassLoader) for the method's defining class,
> com/mapr/fs/jni/MapRTableTools, have different Class objects for the type
> com/mapr/fs/jni/MapRUpdateAndGet used in the signature
> at com.mapr.fs.Inode.checkAndReplaceOrDelete(Inode.java:1777)
> at
> com.mapr.fs.MapRHTable.checkAndReplaceOrDelete(MapRHTable.java:799)
> at
> com.mapr.db.impl.MapRDBTableImpl._checkAndReplace(MapRDBTabl
> eImpl.java:1736)
> at
> com.mapr.db.impl.MapRDBTableImpl._insert(MapRDBTableImpl.java:1366)
> at
> com.mapr.db.impl.MapRDBTableImpl.insert(MapRDBTableImpl.java:1332)
> at com.kabbage.utils.OJAIUtils.insert(OJAIUtils.java:93)
> at com.kabbage.flink.MaprdbSink.invoke(MaprdbSink.java:31)
> at com.kabbage.flink.MaprdbSink.invoke(MaprdbSink.java:10)
> at
> org.apache.flink.streaming.api.operators.StreamSink.processE
> lement(StreamSink.java:39)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi
> ngChainingOutput.collect(OperatorChain.java:373)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi
> ngChainingOutput.collect(OperatorChain.java:358)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOpera
> tor$CountingOutput.collect(AbstractStreamOperator.java:346)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOpera
> 

Re: Kafka data not read in FlinkKafkaConsumer09 second time from command line

2017-01-26 Thread Robert Metzger
Hi,
I would guess that the watermark generation does not work as expected.
I would recommend to log the extracted timestamps + the watermarks to
understand how time is progressing, and when watermarks are generated to
trigger a window computation.

On Tue, Jan 24, 2017 at 6:53 PM, Sujit Sakre 
wrote:

> Hi Aljoscha,
>
> Thanks.
>
> Yes, we are using Event Time.
> Yes, Flink program is kept running in the IDE, i.e. eclipse and not
> closed, after the first batch of events and when adding the second batch.
> Yes, We do have acustom timestamp/watermark assigner, implemented as
> BoundedOutOfOrdernessGenerator2
>
> Are we using the properties for Kafka correctly?
> We are using Flink 1.1.1 and Flink Kafka connector:
> flink-connector-kafka-0.9_2.11
>
> More about the behavior:
> I have noticed that sometimes even after the first writing to the Kafka
> queue,  and when the Flink program runs, sometimes it does process the
> queue immediately. We need to restart. This is quite random.
>
> Following is the rough outline of our code.
>
> public class SlidingWindow2{
>
> public static void main(String[] args) throws Exception {
>
> // set up the execution environment
> StreamExecutionEnvironment env = StreamExecutionEnvironment.get
> ExecutionEnvironment();
>
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
> // configure the Kafka consumer
> Properties kafkaProps = new Properties();
> kafkaProps.setProperty("zookeeper.connect", "localhost:2181");
> kafkaProps.setProperty("bootstrap.servers", "localhost:9092");
> kafkaProps.setProperty("group.id", "demo");
> // always read the Kafka topic from the start
> kafkaProps.setProperty("auto.offset.reset" ,"earliest");
>
> FlinkKafkaConsumer09 String>> consumer = new FlinkKafkaConsumer09<>(
> "test",// kafka topic name
> new dataSchema(),
> kafkaProps);
> DataStream>
> stream1 = env.addSource(consumer);
> DataStream>
> keyedStream = stream1.assignTimestampsAndWatermarks(new
> BoundedOutOfOrdernessGenerator2());
>
> keyedStream.keyBy(4)
> .window(SlidingEventTimeWindows.of(Time.minutes(6), Time.minutes(2)))
> .apply(new CustomSlidingWindowFunction());
>
> env.execute("Sliding Event Time Window Processing");
>
>}
> }
>
>
> public static class CustomSlidingWindowFunction implements
> WindowFunction, Tuple5 String, Float, Float, String>, Tuple, TimeWindow>{
>
> @Override
> public void apply(Tuple key, TimeWindow window, Iterable String, Float, Float, String>> input,
> Collector> out) throws
> Exception {
>
> 
> }
>
>
> // Implemented custom Periodic Watermark as below from public static class
> BoundedOutOfOrdernessGenerator2 implements 
> AssignerWithPeriodicWatermarks String, Float, Float, String>> { /** * */ private static final long
> serialVersionUID = 1L; private final long maxOutOfOrderness =
> MAX_EVENT_DELAY; // constant set in seconds private long
> currentMaxTimestamp; @Override public long extractTimestamp(Tuple5 String, Float, Float, String> element, long previousElementTimestamp) {
> //System.out.println("inside extractTimestamp"); Date parseDate = null;
> SimpleDateFormat dateFormat = new SimpleDateFormat("dd-MM- HH:mm:ss");
> try { parseDate = dateFormat.parse(element.f0); } catch (ParseException e)
> { e.printStackTrace(); } long timestamp = parseDate.getTime();
> currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp); return
> timestamp; } @Override public Watermark getCurrentWatermark() { // return
> the watermark as twice the current highest timestamp minus the
> out-of-orderness bound // this is because it is not covering the lateness
> sufficiently; now it does // in future this may be multiple of 3 or more if
> necessary to cover the gap in records received return new
> Watermark(currentMaxTimestamp * 2 - maxOutOfOrderness); } }
>
>
>
>
>
> *Sujit Sakre*
>
>
>
> On 24 January 2017 at 22:34, Aljoscha Krettek  wrote:
>
>> Hi,
>> a bit more information would be useful. Are you using event-time? Is the
>> Flink program kept running after adding the first batch of events and when
>> adding the second batch or is it to invocations of your Flink program? Do
>> you have a custom timestamp/watermark assigner?
>>
>> Cheers,
>> Aljoscha
>>
>> On Tue, 24 Jan 2017 at 14:28 Sujit Sakre 
>> wrote:
>>
>>> Hi,
>>>
>>> We are using a sliding window function to process data read from Kafka
>>> Stream. We are using FlinkKafkaConsumer09 to read the data. The window
>>> function and sink are running correctly.
>>>
>>> To test the program, we are generating a stream of data from command

Re: State Descriptors / Queryable State Question

2017-01-26 Thread Fabian Hueske
Hi Joe,

working on a KeyedStream means that the records are partitioned by that
key, i.e., all records with the same key are processed by the same thread.
Therefore, only on thread accesses the state for a particular key.
Other tasks do not have read or write access to the state of other tasks.

Best, Fabian

2017-01-26 14:10 GMT+01:00 Joe Olson :

> If I have a keyed stream going in to a N node Flink stream processor, and
> I had a job that was keeping a count using a ValueStateDescriptor (per
> key), would that descriptor be synchronized among all the nodes?
>
> i.e. Are the state descriptors interfaces (ValueStateDescriptor,
> ListStateDescriptor) threadsafe?If I expose that descriptor via the
> RunTimeContext, will I get a consistent value back from each of the nodes?
>
> Thanks!
>
>


Re: CEP and KeyedStreams doubt

2017-01-26 Thread Kostas Kloudas
Hi Oriol,

The number of keys is related to the number of data-structures (NFAs) Flink is 
going to create and keep.
Given this, it may make sense to try to reduce your key-space (or your 
keyedStreams). Other than that, Flink
has not issue handling large numbers of keys.

Now, for the issue you mentioned, we hope to get it fixed soon but there is no 
concrete horizon yet.

Hope this helps!

Let us know if you have any issues,
Kostas

> On Jan 26, 2017, at 1:04 PM, Oriol  wrote:
> 
> Hello everyone, 
>  
> I'm using the CEP library for event stream processing. 
>  
> I'm splitting the dataStream into different KeyedStreams using keyBy(). In 
> the KeyBy, I'm using a tuple of two elements, which means I may have several 
> millions of KeyedStreams, as I need to monitor all our customer's users. 
>  
> Is this the preferred way to use Flink, or should I find a way to reduce the 
> number of KeyedStreams, for example having one per customer instead of one 
> per customer's user? (And find a way later to process each user by itself).
>  
> Also, is the bug reported in https://issues.apache.org/jira/browse/FLINK-5174 
>  related to keys of the 
> KeyedStreams? I'm not sure what kind of keys it is related to. If so, is it 
> going to be addressed soon?
> 
> Thanks,
> Oriol.



Re: Rate-limit processing

2017-01-26 Thread Robert Metzger
Hi Florian,

you can rate-limit the Kafka consumer by implementing a custom
DeserializationSchema that sleeps a bit from time to time (or at each
deserialization step)

On Tue, Jan 24, 2017 at 1:16 PM, Florian König 
wrote:

> Hi Till,
>
> thank you for the very helpful hints. You are right, I already see
> backpressure. In my case, that’s ok because it throttles the Kafka source.
> Speaking of which: You mentioned putting the rate limiting mechanism into
> the source. How can I do this with a Kafka source? Just extend the
> Producer, or is there a better mechanism to hook into the connector?
>
> Cheers,
> Florian
>
>
> > Am 20.01.2017 um 16:58 schrieb Till Rohrmann :
> >
> > Hi Florian,
> >
> > any blocking of the user code thread is in general a not so good idea
> because the checkpointing happens under the very same lock which also
> guards the user code invocation. Thus any checkpoint barrier arriving at
> the operator has only the chance to trigger the checkpointing once the
> blocking is over. Even worse, if the blocking happens in a downstream
> operator (not a source), then this blocking could cause backpressure. Since
> the checkpoint barriers flow with the events and are processed in order,
> the backpressure will then also influence the checkpointing time.
> >
> > So if you want to limit the rate, you should do it a the sources without
> blocking the source thread. You could for example count how many elements
> you've emitted in the past second and if it exceeds your maximum, then you
> don't emit the next element to downstream operators until some time has
> passed (this might end up in a busy loop but it allows the checkpointing to
> claim the lock).
> >
> > Cheers,
> > Till
> >
> > On Fri, Jan 20, 2017 at 12:18 PM, Yassine MARZOUGUI <
> y.marzou...@mindlytix.com> wrote:
> > Hi,
> >
> > You might find this similar thread from the mailing list archive helpful
> : http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/throttled-stream-td6138.html.
> >
> > Best,
> > Yassine
> >
> > 2017-01-20 10:53 GMT+01:00 Florian König :
> > Hi,
> >
> > i need to limit the rate of processing in a Flink stream application.
> Specifically, the number of items processed in a .map() operation has to
> stay under a certain maximum per second.
> >
> > At the moment, I have another .map() operation before the actual
> processing, which just sleeps for a certain time (e.g., 250ms for a limit
> of 4 requests / sec) and returns the item unchanged:
> >
> > …
> >
> > public T map(final T value) throws Exception {
> > Thread.sleep(delay);
> > return value;
> > }
> >
> > …
> >
> > This works as expected, but is a rather crude approach. Checkpointing
> the job takes a very long time: minutes for a state of a few kB, which for
> other jobs is done in a few milliseconds. I assume that letting the whole
> thread sleep for most of the time interferes with the checkpointing - not
> good!
> >
> > Would using a different synchronization mechanism (e.g.,
> https://google.github.io/guava/releases/19.0/api/docs/
> index.html?com/google/common/util/concurrent/RateLimiter.html) help to
> make checkpointing work better?
> >
> > Or, preferably, is there a mechanism inside Flink that I can use to
> accomplish the desired rate limiting? I haven’t found anything in the docs.
> >
> > Cheers,
> > Florian
> >
> >
>
>
>


Re: Improving Flink Performance

2017-01-26 Thread Stephan Ewen
@jonas Flink's Fork-Join Pool drives only the actors, which are doing
coordination. Unless your job is permanently failing/recovering, they don't
do much.


On Thu, Jan 26, 2017 at 2:56 PM, Robert Metzger  wrote:

> Hi Jonas,
>
> The good news is that your job is completely parallelizable. So if you are
> running it on a cluster, you can scale it at least to the number of Kafka
> partitions you have (actually even further, because the Kafka consumers are
> not the issue).
>
> I don't think that the scala (=akka) worker threads are really the thing
> that slows everything done. These threads should usually idle.
> I just tried it with Visualvm (I don't own a Jprofiler license :) ) and
> you can nicely see what's eating up CPU resources in my job:
> http://i.imgur.com/nqXeHdi.png
>
>
>
>
> On Thu, Jan 26, 2017 at 1:23 PM, Jonas  wrote:
>
>> JProfiler
>>
>>
>>
>> --
>> View this message in context: http://apache-flink-user-maili
>> ng-list-archive.2336050.n4.nabble.com/Improving-Flink-Per
>> formance-tp11248p11311.html
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive at Nabble.com.
>>
>
>


Re: Improving Flink Performance

2017-01-26 Thread Robert Metzger
Hi Jonas,

The good news is that your job is completely parallelizable. So if you are
running it on a cluster, you can scale it at least to the number of Kafka
partitions you have (actually even further, because the Kafka consumers are
not the issue).

I don't think that the scala (=akka) worker threads are really the thing
that slows everything done. These threads should usually idle.
I just tried it with Visualvm (I don't own a Jprofiler license :) ) and you
can nicely see what's eating up CPU resources in my job:
http://i.imgur.com/nqXeHdi.png




On Thu, Jan 26, 2017 at 1:23 PM, Jonas  wrote:

> JProfiler
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Improving-Flink-
> Performance-tp11248p11311.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


State Descriptors / Queryable State Question

2017-01-26 Thread Joe Olson
If I have a keyed stream going in to a N node Flink stream processor, and I had 
a job that was keeping a count using a ValueStateDescriptor (per key), would 
that descriptor be synchronized among all the nodes? 

i.e. Are the state descriptors interfaces (ValueStateDescriptor, 
ListStateDescriptor) threadsafe?If I expose that descriptor via the 
RunTimeContext, will I get a consistent value back from each of the nodes? 

Thanks! 



Re: Improving Flink Performance

2017-01-26 Thread Jonas
JProfiler



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Improving-Flink-Performance-tp11248p11311.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


CEP and KeyedStreams doubt

2017-01-26 Thread Oriol
Hello everyone,

I'm using the CEP library for event stream processing.

I'm splitting the dataStream into different KeyedStreams using keyBy(). In
the KeyBy, I'm using a tuple of two elements, which means I may have
several millions of KeyedStreams, as I need to monitor all our customer's
users.

Is this the preferred way to use Flink, or should I find a way to reduce
the number of KeyedStreams, for example having one per customer instead of
one per customer's user? (And find a way later to process each user by
itself).

Also, is the bug reported in
https://issues.apache.org/jira/browse/FLINK-5174 related to keys of the
KeyedStreams? I'm not sure what kind of keys it is related to. If so, is it
going to be addressed soon?

Thanks,
Oriol.


Re: Improving Flink Performance

2017-01-26 Thread dromitlabs
Offtopic: What profiler is it that you're using?

> On Jan 25, 2017, at 18:11, Jonas  wrote:
> 
> Images:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n11305/Tv6KnR6.png
> and
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n11305/Tv6KnR6.png
> 
> 
> 
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Improving-Flink-Performance-tp11248p11307.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.