Re: multi tenant workflow execution

2017-01-24 Thread Chen Qin
Hi Fabian,

AsyncFunction and ProcessFunction do help!

I assume per event timers I created in implement RichProcessFunction will
be part of key grouped states & cached in memory during runtime right? I am
interested in this because we are targeting large deployment of million TPS
event source. I would like to understand checkpoint size and speed
implications.

How about checkpointing iteration stream? Can we achieve at least once
semantic in 1.2 on integration jobs?

Thanks,
Chen

On Tue, Jan 24, 2017 at 2:26 AM, Fabian Hueske  wrote:

> Hi Chen,
>
> if you plan to implement your application on top of the upcoming Flink
> 1.2.0 release, you might find the new AsyncFunction [1] and the
> ProcessFunction [2] helpful.
> AsyncFunction can be used for non-blocking calls to external services and
> maintains the checkpointing semantics.
> ProcessFunction allows to register and react to timers. This might easier
> to use than a window for the 24h timeout.
>
> Best,
> Fabian
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> release-1.2/dev/stream/asyncio.html
> [2] https://ci.apache.org/projects/flink/flink-docs-
> release-1.2/dev/stream/process_function.html
>
> 2017-01-24 0:41 GMT+01:00 Chen Qin :
>
>> Hi there,
>>
>> I am researching running one flink job to support customized event driven
>> workflow executions. The use case is to support running various workflows
>> that listen to a set of kafka topics and performing various rpc checks, a
>> user travel through multiple stages in a rule execution(workflow
>> execution). e.g
>>
>> kafka topic : user click stream
>> rpc checks:
>>
>> if user is member,
>> if user has shown interest of signup
>>
>>
>> ​workflows:
>> ​
>>
>> workflow 1: user click -> if user is member do A then do B
>> workflow 2: user click -> if user has shown interest of signup then do A
>> otherwise wait for 60 mins and try recheck, expire in 24 hours
>>
>> The goal is as I said to run workflow1 & workflow2 in one flink job.
>>
>> Initial thinking describes below
>>
>> sources are series of kafka topics, all events go through coMap,cache
>> lookup event -> rules mapping and fan out to multiple {rule, user} tuple.
>> Based on rule definition and stage user is in a given rule, it do series of
>> async rpc check and side outputs to various of sinks.
>>
>>- If a {rule, user} tuple needs to stay in a operator states longer
>>(1 day), there should be a window following async rpc checks with
>>customized purgetrigger firing those passes and purge either pass check or
>>expired tuples.
>>- If a {rule, user} execute to a stage which waits for a kafka event,
>>it should be added to cache and hookup with coMap lookups near sources
>>
>>
>>  Does that makes sense?
>>
>> Thanks,
>> Chen
>>
>>
>>
>


Re: How to get help on ClassCastException when re-submitting a job

2017-01-24 Thread Giuliano Caliari
Issue reported: 

https://issues.apache.org/jira/browse/FLINK-5633

Sorry for taking so long



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-get-help-on-ClassCastException-when-re-submitting-a-job-tp10972p11277.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: REST api: how to upload jar?

2017-01-24 Thread Sachin Goel
Hey Cliff
You can upload a jar file using http post with the file data sent under a
form field 'jarfile'.

Can you also please open a jira for fixing the documentation?

- Sachin


On Jan 25, 2017 06:55, "Cliff Resnick"  wrote:

> The 1.2 release documentation (https://ci.apache.org/project
> s/flink/flink-docs-release-1.2/monitoring/rest_api.html)  states "It is
> possible to upload, run, and list Flink programs via the REST APIs and web
> frontend". However there is no documentation about uploading a jar via REST
> api. Does this mean that upload is only supported via the web frontend?  I
> did notice that if I manually upload a jar to the configured upload dir an
> prepend its name with a uuid it does get recognized and I can POST a job
> start, but this is messy and I'd rather use the api if supported.
>
> -Cliff
>


Flink with Yarn on MapR

2017-01-24 Thread ani.desh1512
Hi,
I am trying to setup flink with Yarn on Mapr cluster. I built flink
(flink-1.3-SNAPSHOT) as follows:
 
mvn clean install -DskipTests -Pvendor-repos
-Dhadoop.version=2.7.0-mapr-1607
 
The build is successful. Then I try to run ./bin/yarn-session.sh -n 4
(without changing any config or whatsoever) and get the following error:
 
Error while deploying YARN cluster: Couldn't deploy Yarn cluster
java.lang.RuntimeException: Couldn't deploy Yarn cluster
at
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploy(AbstractYarnClusterDescriptor.java:425)
at
org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:620)
at
org.apache.flink.yarn.cli.FlinkYarnSessionCli$1.call(FlinkYarnSessionCli.java:476)
at
org.apache.flink.yarn.cli.FlinkYarnSessionCli$1.call(FlinkYarnSessionCli.java:473)
at
org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)
at java.security.AccessController.doPrivileged(Native
Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1595)
at
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
at
org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:473)
Caused by: java.lang.NumberFormatException: For input string:
"${nodemanager.resource.cpu-vcores}"
at
java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
at java.lang.Integer.parseInt(Integer.java:569)
at java.lang.Integer.parseInt(Integer.java:615)
at
org.apache.hadoop.conf.Configuration.getInt(Configuration.java:1271)
at
org.apache.flink.yarn.AbstractYarnClusterDescriptor.isReadyForDeployment(AbstractYarnClusterDescriptor.java:315)
at
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:434)
at
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploy(AbstractYarnClusterDescriptor.java:423)
... 9 more
 
Now, the property that is causing this error nodemanager.resource.cpu-vcores
is appropriately set in yarn-site.xml. The cluster is 3 ResourceManager (2
on standby) and 5 NodeManager. To be extra safe, I changed the value for
this property at ALL the Nodemanager’s yarn-site.xml.
I believe that this property is default set to 4. So I am trying to
understand as to why is this error cropping up.
The required environment variable is set as follows:
YARN_CONF_DIR=/opt/mapr/hadoop/hadoop-2.7.0/etc/hadoop/
 
I also tried setting the fs.hdfs.hadoopconf property (to point to the Hadoop
conf directory) in flink-config.yaml. But I still get the same error.
 
 
Any help with this error will be greatly appreciated
 
 
Thanks in advance,
 
Aniket D



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-with-Yarn-on-MapR-tp11275.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


REST api: how to upload jar?

2017-01-24 Thread Cliff Resnick
The 1.2 release documentation (https://ci.apache.org/
projects/flink/flink-docs-release-1.2/monitoring/rest_api.html)  states "It
is possible to upload, run, and list Flink programs via the REST APIs and
web frontend". However there is no documentation about uploading a jar via
REST api. Does this mean that upload is only supported via the web
frontend?  I did notice that if I manually upload a jar to the configured
upload dir an prepend its name with a uuid it does get recognized and I can
POST a job start, but this is messy and I'd rather use the api if supported.

-Cliff


Re: How to get top N elements in a DataSet?

2017-01-24 Thread Ivan Mushketyk
Hi @Fabian, @Gabor, and @Aljoscha,

Thank you for your help! It works as expected.

Best regards,
Ivan.

On Tue, 24 Jan 2017 at 17:04 Fabian Hueske  wrote:

> Aljoscha, you are right.
> The second mapPartition() needs to have parallelism(1), but the
> sortPartition() as well:
>
>
> dataset // assuming some partitioning that can be reused to avoid a shuffle
>   .sortPartition(1, Order.DESCENDING)
>   .mapPartition(new ReturnFirstTen())
>   .sortPartition(1, Order.DESCENDING).parallelism(1)
>   .mapPartition(new ReturnFirstTen()).parallelism(1)
>
> Anyway, as Gabor pointed out, this solution is very in efficient.
>
> 2017-01-24 17:52 GMT+01:00 Aljoscha Krettek :
>
> @Fabian, I think there's a typo in your code, shouldn't it be
>
> dataset // assuming some partitioning that can be reused to avoid a shuffle
>   .sortPartition(1, Order.DESCENDING)
>   .mapPartition(new ReturnFirstTen())
>   .sortPartition(1, Order.DESCENDING)
>   .mapPartition(new ReturnFirstTen()).parallelism(1)
>
> i.e. the second MapPartition has to be parallelism=1
>
>
> On Tue, 24 Jan 2017 at 11:57 Fabian Hueske  wrote:
>
> You are of course right Gabor.
> @Ivan, you can use a heap in the MapPartitionFunction to collect the top
> 10 elements (note that you need to create deep-copies if object reuse is
> enabled [1]).
>
> Best, Fabian
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/batch/index.html#operating-on-data-objects-in-functions
>
>
> 2017-01-24 11:49 GMT+01:00 Gábor Gévay :
>
> Hello,
>
> Btw. there is a Jira about this:
> https://issues.apache.org/jira/browse/FLINK-2549
> Note that the discussion there suggests a more efficient approach,
> which doesn't involve sorting the entire partitions.
>
> And if I remember correctly, this question comes up from time to time
> on the mailing list.
>
> Best,
> Gábor
>
>
>
> 2017-01-24 11:35 GMT+01:00 Fabian Hueske :
> > Hi Ivan,
> >
> > I think you can use MapPartition for that.
> > So basically:
> >
> > dataset // assuming some partitioning that can be reused to avoid a
> shuffle
> >   .sortPartition(1, Order.DESCENDING)
> >   .mapPartition(new ReturnFirstTen())
> >   .sortPartition(1, Order.DESCENDING).parallelism(1)
> >   .mapPartition(new ReturnFirstTen())
> >
> > Best, Fabian
> >
> >
> > 2017-01-24 10:10 GMT+01:00 Ivan Mushketyk :
> >>
> >> Hi,
> >>
> >> I have a dataset of tuples with two fields ids and ratings and I need to
> >> find 10 elements with the highest rating in this dataset. I found a
> >> solution, but I think it's suboptimal and I think there should be a
> better
> >> way to do it.
> >>
> >> The best thing that I came up with is to partition dataset by rating,
> sort
> >> locally and write the partitioned dataset to disk:
> >>
> >> dataset
> >> .partitionCustom(new Partitioner() {
> >>   @Override
> >>   public int partition(Double key, int numPartitions) {
> >> return key.intValue() % numPartitions;
> >>   }
> >> }, 1) . // partition by rating
> >> .setParallelism(5)
> >> .sortPartition(1, Order.DESCENDING) // locally sort by rating
> >> .writeAsText("..."); // write the partitioned dataset to disk
> >>
> >> This will store tuples in sorted files with names 5, 4, 3, ... that
> >> contain ratings in ranges (5, 4], (4, 3], and so on. Then I can read
> sorted
> >> data from disk and and N elements with the highest rating.
> >> Is there a way to do the same but without writing a partitioned dataset
> to
> >> a disk?
> >>
> >> I tried to use "first(10)" but it seems to give top 10 items from a
> random
> >> partition. Is there a way to get top N elements from every partition?
> Then I
> >> could locally sort top values from every partition and find top 10
> global
> >> values.
> >>
> >> Best regards,
> >> Ivan.
> >>
> >>
> >
>
>
>
>


Re: Improving Flink Performance

2017-01-24 Thread Jonas
The performance hit due to decoding the JSON is expected and there is not a
lot (except for changing the encoding that I can do about that). Alright.

When joining the above stream with another stream I get another performance
hit by ~80% so that in the end I have only 1k msgs/s remaining. Do you know
how to improve that? Might setting the buffer size / timeout be worth
exploring?




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Improving-Flink-Performance-tp11248p11272.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Kafka data not read in FlinkKafkaConsumer09 second time from command line

2017-01-24 Thread Sujit Sakre
Hi Aljoscha,

Thanks.

Yes, we are using Event Time.
Yes, Flink program is kept running in the IDE, i.e. eclipse and not closed,
after the first batch of events and when adding the second batch.
Yes, We do have acustom timestamp/watermark assigner, implemented as
BoundedOutOfOrdernessGenerator2


Are we using the properties for Kafka correctly?
We are using Flink 1.1.1 and Flink Kafka connector:
flink-connector-kafka-0.9_2.11

More about the behavior:
I have noticed that sometimes even after the first writing to the Kafka
queue,  and when the Flink program runs, sometimes it does process the
queue immediately. We need to restart. This is quite random.

Following is the rough outline of our code.

public class SlidingWindow2{

public static void main(String[] args) throws Exception {

// set up the execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.get
ExecutionEnvironment();

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

// configure the Kafka consumer
Properties kafkaProps = new Properties();
kafkaProps.setProperty("zookeeper.connect", "localhost:2181");
kafkaProps.setProperty("bootstrap.servers", "localhost:9092");
kafkaProps.setProperty("group.id", "demo");
// always read the Kafka topic from the start
kafkaProps.setProperty("auto.offset.reset" ,"earliest");

FlinkKafkaConsumer09> consumer = new FlinkKafkaConsumer09<>(
"test",// kafka topic name
new dataSchema(),
kafkaProps);
DataStream>
stream1 = env.addSource(consumer);
DataStream>
keyedStream = stream1.assignTimestampsAndWatermarks(new
BoundedOutOfOrdernessGenerator2());

keyedStream.keyBy(4)
.window(SlidingEventTimeWindows.of(Time.minutes(6), Time.minutes(2)))
.apply(new CustomSlidingWindowFunction());

env.execute("Sliding Event Time Window Processing");

   }
}


public static class CustomSlidingWindowFunction implements
WindowFunction, Tuple5, Tuple, TimeWindow>{

@Override
public void apply(Tuple key, TimeWindow window, Iterable> input,
Collector> out) throws
Exception {


}


// Implemented custom Periodic Watermark as below from public static class
BoundedOutOfOrdernessGenerator2 implements
AssignerWithPeriodicWatermarks> { /** * */ private static final long serialVersionUID = 1L;
private final long maxOutOfOrderness = MAX_EVENT_DELAY; // constant set in
seconds private long currentMaxTimestamp; @Override public long
extractTimestamp(Tuple5 element, long
previousElementTimestamp) { //System.out.println("inside
extractTimestamp"); Date parseDate = null; SimpleDateFormat dateFormat =
new SimpleDateFormat("dd-MM- HH:mm:ss"); try { parseDate =
dateFormat.parse(element.f0); } catch (ParseException e) {
e.printStackTrace(); } long timestamp = parseDate.getTime();
currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp); return
timestamp; } @Override public Watermark getCurrentWatermark() { // return
the watermark as twice the current highest timestamp minus the
out-of-orderness bound // this is because it is not covering the lateness
sufficiently; now it does // in future this may be multiple of 3 or more if
necessary to cover the gap in records received return new
Watermark(currentMaxTimestamp * 2 - maxOutOfOrderness); } }





*Sujit Sakre*



On 24 January 2017 at 22:34, Aljoscha Krettek  wrote:

> Hi,
> a bit more information would be useful. Are you using event-time? Is the
> Flink program kept running after adding the first batch of events and when
> adding the second batch or is it to invocations of your Flink program? Do
> you have a custom timestamp/watermark assigner?
>
> Cheers,
> Aljoscha
>
> On Tue, 24 Jan 2017 at 14:28 Sujit Sakre 
> wrote:
>
>> Hi,
>>
>> We are using a sliding window function to process data read from Kafka
>> Stream. We are using FlinkKafkaConsumer09 to read the data. The window
>> function and sink are running correctly.
>>
>> To test the program, we are generating a stream of data from command line.
>> This works when we add set of records once. When we add again, it does
>> not work, Flink produces no result, even though the records are added to
>> same Kafka topic from the same command line instance.
>>
>> Please could you suggest what could be wrong.
>>
>> Many thanks.
>>
>>
>> *Sujit Sakre*
>>
>> This email is sent on behalf of Northgate Public Services (UK) Limited
>> and its associated companies including Rave Technologies (India) Pvt
>> Limited (together "Northgate Public Services") and is strictly 

Re: Better way to read several stream sources

2017-01-24 Thread Sendoh
Hi Stephan,

Thank you for answering my question.

I try option 2 and it gives me correct results reading several sources,
while using ParallelSourceFunction it gives 4 times redundancy (same as my
number of threads).

Can I ask what would be the reason causing the difference? I think I don't
understand SourceFunction and ParallelSourceFunction correctly.

Best,

Sendoh




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Better-way-to-read-several-stream-sources-tp11224p11270.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: TestStreamEnvironment: await last flush of processing time-based windows

2017-01-24 Thread Steven Ruppert
Thanks for the clarification. I'm not familiar enough with the
internals of flink to offer any technical suggestions, but it'd be
nice to have some more documentation around testing flink and possible
pitfalls like this.

For anybody with the same issue, note that IngestionTime also works,
and is slightly easier to use with unit tests that don't care about
event time. Also, you can copy and modify the FromElementsFunction and
add a Thread.sleep after it emits all the test inputs. If you pause
long enough for the processing time windows downstream, then they will
fire. Obviously not a great solution, but useful if you can't use
ingestion time instead.

On Tue, Jan 24, 2017 at 9:28 AM, Aljoscha Krettek  wrote:
> Hi,
> I'm afraid there is no way of making this work with the current
> implementation. Especially getting this to work in a distributed setting
> seems hard.
>
> I'm very open for suggestions on this topic, though. :-)
>
> Cheers,
> Aljoscha
>
> On Mon, 23 Jan 2017 at 23:19 Steven Ruppert  wrote:
>>
>> Hi,
>>
>> I'm attempting to unit test link with the flink-test-utils support, on
>> flink 1.1.4. I've got basic flatMap stuff flowing through just fine,
>> but when running any processing time-based windowing functions,
>> `env.execute()` will return before any values are flushed out of the
>> windows.
>>
>> import org.apache.flink.api.java.tuple.Tuple2;
>> import
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>> import org.apache.flink.streaming.api.functions.sink.SinkFunction;
>> import
>> org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
>> import org.apache.flink.streaming.api.windowing.time.Time;
>> import org.apache.flink.streaming.util.TestStreamEnvironment;
>> import org.junit.Test;
>>
>> import java.util.concurrent.atomic.AtomicBoolean;
>>
>> import static org.junit.Assert.assertTrue;
>>
>> public class TestMinimal {
>> static AtomicBoolean sinked = new AtomicBoolean(false);
>> @Test
>> public void testThing() throws Exception {
>> StreamExecutionEnvironment env =
>> TestStreamEnvironment.getExecutionEnvironment();
>>
>> env.fromElements(Tuple2.of("a", 1), Tuple2.of("a", 1))
>> .keyBy(0)
>> .window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
>> .sum(1)
>> .addSink(new SinkFunction>() {
>> @Override
>> public void invoke(Tuple2 value)
>> throws Exception {
>> sinked.set(true);
>> }
>> });
>> env.execute();
>> // presumably once execute returns, all elements have passed
>> through all operators.
>> assertTrue(sinked.get());
>> }
>> }
>>
>> Is there a way to make this test pass?
>>
>> Using event time windows instead does seem to work, but processing
>> time would be a little more convenient.
>>
>> --
>> *CONFIDENTIALITY NOTICE: This email message, and any documents, files or
>> previous e-mail messages attached to it is for the sole use of the
>> intended
>> recipient(s) and may contain confidential and privileged information. Any
>> unauthorized review, use, disclosure or distribution is prohibited. If you
>> are not the intended recipient, please contact the sender by reply email
>> and destroy all copies of the original message.*

-- 
*CONFIDENTIALITY NOTICE: This email message, and any documents, files or 
previous e-mail messages attached to it is for the sole use of the intended 
recipient(s) and may contain confidential and privileged information. Any 
unauthorized review, use, disclosure or distribution is prohibited. If you 
are not the intended recipient, please contact the sender by reply email 
and destroy all copies of the original message.*


Re: Rapidly failing job eventually causes "Not enough free slots"

2017-01-24 Thread Shannon Carey
I am running 1.1.4. It does look like there were problems with the connection 
to Zookeeper due to overworking the network. I'm not sure what I can do about 
it (not sure what happens when a JM loses leadership), but ideally a 
cluster-wide failure would not result in losing all the jobs, checkpoints, etc.

-Shannon

From: Stephan Ewen >
Date: Tuesday, January 24, 2017 at 8:07 AM
To: >
Subject: Re: Rapidly failing job eventually causes "Not enough free slots"

Hi!

I think there were some issues in the HA recovery of 1.1.3 that were fixed in 
1.1.4 and 1.2.0.
What version are you running on?

Stephan


On Sat, Jan 21, 2017 at 4:58 PM, Ufuk Celebi 
> wrote:
Hey Shannon,

the final truth for recovery is in ZooKeeper. Can you check whether
there also references available in ZooKeeper? Do you have the job
manager logs available from after the failure? On recovery, Flink
checks ZooKeeper for entries. These point to files in the storageDir.
It could have happened that these got out of sync, e.g. entries
deleted from ZK but not from the storageDir.

Maybe the loss of the task managers can also be explained by a
connection loss to ZK or something. When a JM looses leadership, the
TMs cancel all tasks and disconnect from the JM. Here again, we would
need to look into the logs.

Sorry for the bad experience lately :-(

– Ufuk


On Sat, Jan 21, 2017 at 4:31 AM, Shannon Carey 
> wrote:
> In fact, I can see all my job jar blobs and some checkpoint & job graph
> files in my configured "recovery.zookeeper.storageDir"… however for some
> reason it didn't get restored when my new Flink cluster started up.
>
>
> From: Shannon Carey >
> Date: Friday, January 20, 2017 at 9:14 PM
> To: "user@flink.apache.org" 
> >
>
> Subject: Re: Rapidly failing job eventually causes "Not enough free slots"
>
> I recently added some better visibility into the metrics we're gathering
> from Flink. My Flink cluster died again due to the "Not enough free slots
> available to run the job" problem, and this time I can see that the number
> of registered task managers went down from 11 to 7, then waffled and only
> ever got back up to 10 (one short of the requested amount) before dropping
> to 0 just before the cluster died. This would seem to explain why there
> weren't sufficient slots (given that we were probably using them all or
> nearly all)… The metric of "running jobs" went down from 5 to 3 during this
> time period as well. So the problem seems to be loss of taskmanagers due to
> errors (not yet sure what exactly as I have to delve into logs).
>
> The other thing I have to figure out is restoring the jobs… I thought that
> HA would start the jobs back up again if Flink died & I re-launched it, but
> that doesn't appear to be the case.
>
>
> From: Stephan Ewen >
> Date: Thursday, January 5, 2017 at 7:52 AM
> To: >
> Subject: Re: Rapidly failing job eventually causes "Not enough free slots"
>
> Another thought on the container failure:
>
> in 1.1, the user code is loaded dynamically whenever a Task is started. That
> means that on every task restart the code is reloaded. For that to work
> proper, class unloading needs to happen, or the permgen will eventually
> overflow.
>
> It can happen that class unloading is prevented if the user functions do
> leave references around as "GC roots", which may be threads, or references
> in registries, etc.
>
> In Flink 1.2, YARN will put the user code into the application classpath, so
> code needs not be reloaded on every restart. That should solve that issue.
> To "simulate" that behavior in Flink 1.1, put your application code jars
> into the "lib" folder
>
> Best,
> Stephan
>
>
> On Thu, Jan 5, 2017 at 1:15 PM, Yury Ruchin 
> > wrote:
>>
>> Hi,
>>
>> I've faced a similar issue recently. Hope sharing my findings will help.
>> The problem can be split into 2 parts:
>>
>> Source of container failures
>> The logs you provided indicate that YARN kills its containers for
>> exceeding memory limits. Important point here is that memory limit = JVM
>> heap memory + off-heap memory. So if off-heap memory usage is high, YARN may
>> kill containers despite JVM heap consumption is fine. To solve this issue,
>> Flink reserves a share of container memory for off-heap memory. How much
>> will be reserved is controlled by yarn.heap-cutoff-ratio and
>> yarn.heap-cutoff-min configuration. By default 25% of the requested
>> container memory will be reserved for off-heap. This is seems to be a good
>> start, but one should experiment and tune to meet their job specifics.
>>

Re: Improving Flink Performance

2017-01-24 Thread Stephan Ewen
One thing you can try and do is to enable object reuse in the execution
config.
That should get rid of the overhead when passing the JSON objects from
function to function.

On Tue, Jan 24, 2017 at 6:00 PM, Aljoscha Krettek 
wrote:

> Hi,
> I think MyJsonDecoder is the bottleneck and I'm also afraid there is
> nothing to do because parsing Strings to Json is simply slow.
>
> I think you would see the biggest gains if you had a binary representation
> that can quickly be serialised/deserialised to objects and you use that
> instead of String/JSON.
>
> Cheers,
> Aljoscha
>
> On Tue, 24 Jan 2017 at 12:17 Jonas  wrote:
>
>> Hello! I'm reposting this since the other thread had some formatting
>> issues apparently. I hope this time it works. I'm having performance
>> problems with a Flink job. If there is anything valuable missing, please
>> ask and I will try to answer ASAP. My job looks like this:
>>
>> /*
>>   Settings
>>  */
>> env.setParallelism(4)
>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>
>> /*
>>   Operator Graph
>>  */
>> env
>>   .addSource(new FlinkKafkaConsumer09("raw.my.topic", new 
>> SimpleStringSchema(), props)) // 100k msgs msgs/s
>>   .map(new MyJsonDecoder) // 25k msgs/s
>>   .map(new AddTypeToJsonForSplitting) // 20k msgs/s
>>   .split(t => Seq(t._1.name))
>>   .select(TYPE_A.name) // 18k msgs/s
>>   .flatMap(new MapJsonToEntity) // 13k msgs/s
>>   .flatMap(new MapEntityToMultipleEntities) // 10k msgs/s
>>   .assignTimestampsAndWatermarks(/* Nothing special */) // 6-8k msgs/s
>>
>> /*
>>   Run
>>  */
>> env.execute()
>>
>> First, I read data from Kafka. This is very fast at 100k msgs/s. The data
>> is decoded, a type is added (we have multiple message types per Kafka
>> topic). Then we select the TYPE_A messages, create a Scala entity out of if
>> (a case class). Afterwards in the MapEntityToMultipleEntities the Scala
>> entities are split into multiple. Finally a watermark is added. As you can
>> see the data is not keyed in any way yet. *Is there a way to make this
>> faster?*
>>
>> *Measurements were taken with def writeToSocket[?](d: DataStream[?],
>> port: Int): Unit = { d.writeToSocket("localhost", port, new
>> SerializationSchema[?] { override def serialize(element: ?): Array[Byte] =
>> { "\n".getBytes(CharsetUtil.UTF_8) } }) } and nc -lk PORT | pv --line-mode
>> --rate --average-rate --format "Current: %r, Avg:%a, Total: %b" > 
>> /dev/null*I'm
>> running this on a Intel i5-3470, 16G RAM, Ubuntu 16.04.1 LTS on Flink 1.1.4
>> --
>> View this message in context: Improving Flink Performance
>> 
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive
>> 
>> at Nabble.com.
>>
>


Re: Flink snapshotting to S3 - Timeout waiting for connection from pool

2017-01-24 Thread Shannon Carey
I haven't seen it yet, I'll let you know if I do.

My last whole-cluster failure seems to have been caused by placing too much 
load on the cluster. We had a job that got up to 12GB in checkpoint size. 
Current cluster is 6x c3.2xlarge. The logs show a lot of 
"java.net.SocketException: Connection reset" when trying to write checkpoints 
to S3, as well as repeated disconnect/reconnect with Zookeeper "Client session 
timed out, have not heard from server in 28301ms for sessionid 
0x254bb682e214f79, closing socket connection and attempting reconnect", and 
things like "akka.pattern.AskTimeoutException: Ask timed out on 
[Actor[akka.tcp://flink@10.0.88.37:38768/user/taskmanager#-497097074]] after 
[1 ms]". Generally, it seems as if the network got overwhelmed.

-Shannon

From: Stephan Ewen >
Date: Tuesday, January 24, 2017 at 8:30 AM
To: >
Subject: Re: Flink snapshotting to S3 - Timeout waiting for connection from pool

Hi Shannon!

I was wondering if you still see this issue in Flink 1.1.4?

Just thinking that another possible cause for the issue could be that there is 
a connection leak somewhere (Flink code or user code or vendor library) and 
thus the S3 connector's connection pool starves.
For Flink 1.2, there is a safetynet that tracks and closes connections that go 
through Flink's filesystem abstraction. So that should not be an issue there 
any more.

Best,
Stephan



On Fri, Jan 13, 2017 at 1:04 AM, Shannon Carey 
> wrote:
Good to know someone else has had the same problem... What did you do about it? 
Did it resolve on its own?

-Shannon




On 1/12/17, 11:55 AM, "Chen Qin" 
> wrote:

>We have seen this issue back to Flink 1.0. Our finding back then was traffic 
>congestion to AWS in internal network. Many teams too dependent on S3 and 
>bandwidth is shared, cause traffic congestion from time to time.
>
>Hope it helps!
>
>Thanks
>Chen
>
>> On Jan 12, 2017, at 03:30, Ufuk Celebi 
>> > wrote:
>>
>> Hey Shannon!
>>
>> Is this always reproducible and how long does it take to reproduce it?
>>
>> I've not seen this error before but as you say it indicates that some
>> streams are not closed.
>>
>> Did the jobs do any restarts before this happened? Flink 1.1.4
>> contains fixes for more robust releasing of resources in failure
>> scenarios. Is trying 1.1.4 an option?
>>
>> – Ufuk
>>
>>> On Thu, Jan 12, 2017 at 1:18 AM, Shannon Carey 
>>> > wrote:
>>> I'm having pretty frequent issues with the exception below. It basically
>>> always ends up killing my cluster after forcing a large number of job
>>> restarts. I just can't keep Flink up & running.
>>>
>>> I am running Flink 1.1.3 on EMR 5.2.0. I already tried updating the
>>> emrfs-site config fs.s3.maxConnections from the default (50) to 75, after
>>> AWS support told me the name of the config option. However, that hasn't
>>> fixed the problem. Assuming that increasing the maxConnections again doesn't
>>> fix the problem, is there anything else I can do? Is anyone else having this
>>> problem? Is it possible that the state backend isn't properly calling
>>> close() on its filesystem objects? Or is there a large number of concurrent
>>> open filesystem objects for some reason? I am using the default
>>> checkpointing settings with one checkpoint at a time, checkpointing every 10
>>> minutes. If I am reading the metrics correctly, the checkpoint duration is
>>> between 12s and 3 minutes on one of the jobs, and 5s or less on the other 3.
>>> Any help is appreciated.
>>>
>>> java.lang.RuntimeException: Could not initialize state backend.
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.setup(AbstractStreamOperator.java:121)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.setup(AbstractUdfStreamOperator.java:82)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:276)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:212)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:271)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:212)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:271)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:212)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:271)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:212)
>>> at
>>> 

Re: Kafka data not read in FlinkKafkaConsumer09 second time from command line

2017-01-24 Thread Aljoscha Krettek
Hi,
a bit more information would be useful. Are you using event-time? Is the
Flink program kept running after adding the first batch of events and when
adding the second batch or is it to invocations of your Flink program? Do
you have a custom timestamp/watermark assigner?

Cheers,
Aljoscha

On Tue, 24 Jan 2017 at 14:28 Sujit Sakre 
wrote:

> Hi,
>
> We are using a sliding window function to process data read from Kafka
> Stream. We are using FlinkKafkaConsumer09 to read the data. The window
> function and sink are running correctly.
>
> To test the program, we are generating a stream of data from command line.
> This works when we add set of records once. When we add again, it does not
> work, Flink produces no result, even though the records are added to same
> Kafka topic from the same command line instance.
>
> Please could you suggest what could be wrong.
>
> Many thanks.
>
>
> *Sujit Sakre*
>
> This email is sent on behalf of Northgate Public Services (UK) Limited and
> its associated companies including Rave Technologies (India) Pvt Limited
> (together "Northgate Public Services") and is strictly confidential and
> intended solely for the addressee(s).
> If you are not the intended recipient of this email you must: (i) not
> disclose, copy or distribute its contents to any other person nor use its
> contents in any way or you may be acting unlawfully;  (ii) contact
> Northgate Public Services immediately on +44(0)1908 264500
> <+44%201908%20264500> quoting the name of the sender and the addressee
> then delete it from your system.
> Northgate Public Services has taken reasonable precautions to ensure that
> no viruses are contained in this email, but does not accept any
> responsibility once this email has been transmitted.  You should scan
> attachments (if any) for viruses.
>
> Northgate Public Services (UK) Limited, registered in England and Wales
> under number 00968498 with a registered address of Peoplebuilding 2,
> Peoplebuilding Estate, Maylands Avenue, Hemel Hempstead, Hertfordshire, HP2
> 4NN.  Rave Technologies (India) Pvt Limited, registered in India under
> number 117068 with a registered address of 2nd Floor, Ballard House, Adi
> Marzban Marg, Ballard Estate, Mumbai, Maharashtra, India, 41.
>


Re: How to get top N elements in a DataSet?

2017-01-24 Thread Fabian Hueske
Aljoscha, you are right.
The second mapPartition() needs to have parallelism(1), but the
sortPartition() as well:

dataset // assuming some partitioning that can be reused to avoid a shuffle
  .sortPartition(1, Order.DESCENDING)
  .mapPartition(new ReturnFirstTen())
  .sortPartition(1, Order.DESCENDING).parallelism(1)
  .mapPartition(new ReturnFirstTen()).parallelism(1)

Anyway, as Gabor pointed out, this solution is very in efficient.

2017-01-24 17:52 GMT+01:00 Aljoscha Krettek :

> @Fabian, I think there's a typo in your code, shouldn't it be
>
> dataset // assuming some partitioning that can be reused to avoid a shuffle
>   .sortPartition(1, Order.DESCENDING)
>   .mapPartition(new ReturnFirstTen())
>   .sortPartition(1, Order.DESCENDING)
>   .mapPartition(new ReturnFirstTen()).parallelism(1)
>
> i.e. the second MapPartition has to be parallelism=1
>
>
> On Tue, 24 Jan 2017 at 11:57 Fabian Hueske  wrote:
>
>> You are of course right Gabor.
>> @Ivan, you can use a heap in the MapPartitionFunction to collect the top
>> 10 elements (note that you need to create deep-copies if object reuse is
>> enabled [1]).
>>
>> Best, Fabian
>>
>> [1] https://ci.apache.org/projects/flink/flink-docs-
>> release-1.1/apis/batch/index.html#operating-on-data-objects-in-functions
>>
>>
>> 2017-01-24 11:49 GMT+01:00 Gábor Gévay :
>>
>> Hello,
>>
>> Btw. there is a Jira about this:
>> https://issues.apache.org/jira/browse/FLINK-2549
>> Note that the discussion there suggests a more efficient approach,
>> which doesn't involve sorting the entire partitions.
>>
>> And if I remember correctly, this question comes up from time to time
>> on the mailing list.
>>
>> Best,
>> Gábor
>>
>>
>>
>> 2017-01-24 11:35 GMT+01:00 Fabian Hueske :
>> > Hi Ivan,
>> >
>> > I think you can use MapPartition for that.
>> > So basically:
>> >
>> > dataset // assuming some partitioning that can be reused to avoid a
>> shuffle
>> >   .sortPartition(1, Order.DESCENDING)
>> >   .mapPartition(new ReturnFirstTen())
>> >   .sortPartition(1, Order.DESCENDING).parallelism(1)
>> >   .mapPartition(new ReturnFirstTen())
>> >
>> > Best, Fabian
>> >
>> >
>> > 2017-01-24 10:10 GMT+01:00 Ivan Mushketyk :
>> >>
>> >> Hi,
>> >>
>> >> I have a dataset of tuples with two fields ids and ratings and I need
>> to
>> >> find 10 elements with the highest rating in this dataset. I found a
>> >> solution, but I think it's suboptimal and I think there should be a
>> better
>> >> way to do it.
>> >>
>> >> The best thing that I came up with is to partition dataset by rating,
>> sort
>> >> locally and write the partitioned dataset to disk:
>> >>
>> >> dataset
>> >> .partitionCustom(new Partitioner() {
>> >>   @Override
>> >>   public int partition(Double key, int numPartitions) {
>> >> return key.intValue() % numPartitions;
>> >>   }
>> >> }, 1) . // partition by rating
>> >> .setParallelism(5)
>> >> .sortPartition(1, Order.DESCENDING) // locally sort by rating
>> >> .writeAsText("..."); // write the partitioned dataset to disk
>> >>
>> >> This will store tuples in sorted files with names 5, 4, 3, ... that
>> >> contain ratings in ranges (5, 4], (4, 3], and so on. Then I can read
>> sorted
>> >> data from disk and and N elements with the highest rating.
>> >> Is there a way to do the same but without writing a partitioned
>> dataset to
>> >> a disk?
>> >>
>> >> I tried to use "first(10)" but it seems to give top 10 items from a
>> random
>> >> partition. Is there a way to get top N elements from every partition?
>> Then I
>> >> could locally sort top values from every partition and find top 10
>> global
>> >> values.
>> >>
>> >> Best regards,
>> >> Ivan.
>> >>
>> >>
>> >
>>
>>
>>


Re: Improving Flink Performance

2017-01-24 Thread Aljoscha Krettek
Hi,
I think MyJsonDecoder is the bottleneck and I'm also afraid there is
nothing to do because parsing Strings to Json is simply slow.

I think you would see the biggest gains if you had a binary representation
that can quickly be serialised/deserialised to objects and you use that
instead of String/JSON.

Cheers,
Aljoscha

On Tue, 24 Jan 2017 at 12:17 Jonas  wrote:

> Hello! I'm reposting this since the other thread had some formatting
> issues apparently. I hope this time it works. I'm having performance
> problems with a Flink job. If there is anything valuable missing, please
> ask and I will try to answer ASAP. My job looks like this:
>
> /*
>   Settings
>  */
> env.setParallelism(4)
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>
> /*
>   Operator Graph
>  */
> env
>   .addSource(new FlinkKafkaConsumer09("raw.my.topic", new 
> SimpleStringSchema(), props)) // 100k msgs msgs/s
>   .map(new MyJsonDecoder) // 25k msgs/s
>   .map(new AddTypeToJsonForSplitting) // 20k msgs/s
>   .split(t => Seq(t._1.name))
>   .select(TYPE_A.name) // 18k msgs/s
>   .flatMap(new MapJsonToEntity) // 13k msgs/s
>   .flatMap(new MapEntityToMultipleEntities) // 10k msgs/s
>   .assignTimestampsAndWatermarks(/* Nothing special */) // 6-8k msgs/s
>
> /*
>   Run
>  */
> env.execute()
>
> First, I read data from Kafka. This is very fast at 100k msgs/s. The data
> is decoded, a type is added (we have multiple message types per Kafka
> topic). Then we select the TYPE_A messages, create a Scala entity out of if
> (a case class). Afterwards in the MapEntityToMultipleEntities the Scala
> entities are split into multiple. Finally a watermark is added. As you can
> see the data is not keyed in any way yet. *Is there a way to make this
> faster?*
>
> *Measurements were taken with def writeToSocket[?](d: DataStream[?], port:
> Int): Unit = { d.writeToSocket("localhost", port, new
> SerializationSchema[?] { override def serialize(element: ?): Array[Byte] =
> { "\n".getBytes(CharsetUtil.UTF_8) } }) } and nc -lk PORT | pv --line-mode
> --rate --average-rate --format "Current: %r, Avg:%a, Total: %b" > 
> /dev/null*I'm
> running this on a Intel i5-3470, 16G RAM, Ubuntu 16.04.1 LTS on Flink 1.1.4
> --
> View this message in context: Improving Flink Performance
> 
> Sent from the Apache Flink User Mailing List archive. mailing list archive
>  at
> Nabble.com.
>


Re: How to get top N elements in a DataSet?

2017-01-24 Thread Aljoscha Krettek
@Fabian, I think there's a typo in your code, shouldn't it be

dataset // assuming some partitioning that can be reused to avoid a shuffle
  .sortPartition(1, Order.DESCENDING)
  .mapPartition(new ReturnFirstTen())
  .sortPartition(1, Order.DESCENDING)
  .mapPartition(new ReturnFirstTen()).parallelism(1)

i.e. the second MapPartition has to be parallelism=1


On Tue, 24 Jan 2017 at 11:57 Fabian Hueske  wrote:

> You are of course right Gabor.
> @Ivan, you can use a heap in the MapPartitionFunction to collect the top
> 10 elements (note that you need to create deep-copies if object reuse is
> enabled [1]).
>
> Best, Fabian
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/batch/index.html#operating-on-data-objects-in-functions
>
>
> 2017-01-24 11:49 GMT+01:00 Gábor Gévay :
>
> Hello,
>
> Btw. there is a Jira about this:
> https://issues.apache.org/jira/browse/FLINK-2549
> Note that the discussion there suggests a more efficient approach,
> which doesn't involve sorting the entire partitions.
>
> And if I remember correctly, this question comes up from time to time
> on the mailing list.
>
> Best,
> Gábor
>
>
>
> 2017-01-24 11:35 GMT+01:00 Fabian Hueske :
> > Hi Ivan,
> >
> > I think you can use MapPartition for that.
> > So basically:
> >
> > dataset // assuming some partitioning that can be reused to avoid a
> shuffle
> >   .sortPartition(1, Order.DESCENDING)
> >   .mapPartition(new ReturnFirstTen())
> >   .sortPartition(1, Order.DESCENDING).parallelism(1)
> >   .mapPartition(new ReturnFirstTen())
> >
> > Best, Fabian
> >
> >
> > 2017-01-24 10:10 GMT+01:00 Ivan Mushketyk :
> >>
> >> Hi,
> >>
> >> I have a dataset of tuples with two fields ids and ratings and I need to
> >> find 10 elements with the highest rating in this dataset. I found a
> >> solution, but I think it's suboptimal and I think there should be a
> better
> >> way to do it.
> >>
> >> The best thing that I came up with is to partition dataset by rating,
> sort
> >> locally and write the partitioned dataset to disk:
> >>
> >> dataset
> >> .partitionCustom(new Partitioner() {
> >>   @Override
> >>   public int partition(Double key, int numPartitions) {
> >> return key.intValue() % numPartitions;
> >>   }
> >> }, 1) . // partition by rating
> >> .setParallelism(5)
> >> .sortPartition(1, Order.DESCENDING) // locally sort by rating
> >> .writeAsText("..."); // write the partitioned dataset to disk
> >>
> >> This will store tuples in sorted files with names 5, 4, 3, ... that
> >> contain ratings in ranges (5, 4], (4, 3], and so on. Then I can read
> sorted
> >> data from disk and and N elements with the highest rating.
> >> Is there a way to do the same but without writing a partitioned dataset
> to
> >> a disk?
> >>
> >> I tried to use "first(10)" but it seems to give top 10 items from a
> random
> >> partition. Is there a way to get top N elements from every partition?
> Then I
> >> could locally sort top values from every partition and find top 10
> global
> >> values.
> >>
> >> Best regards,
> >> Ivan.
> >>
> >>
> >
>
>
>


Re: TestStreamEnvironment: await last flush of processing time-based windows

2017-01-24 Thread Aljoscha Krettek
Hi,
I'm afraid there is no way of making this work with the current
implementation. Especially getting this to work in a distributed setting
seems hard.

I'm very open for suggestions on this topic, though. :-)

Cheers,
Aljoscha

On Mon, 23 Jan 2017 at 23:19 Steven Ruppert  wrote:

> Hi,
>
> I'm attempting to unit test link with the flink-test-utils support, on
> flink 1.1.4. I've got basic flatMap stuff flowing through just fine,
> but when running any processing time-based windowing functions,
> `env.execute()` will return before any values are flushed out of the
> windows.
>
> import org.apache.flink.api.java.tuple.Tuple2;
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.sink.SinkFunction;
> import
> org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
> import org.apache.flink.streaming.api.windowing.time.Time;
> import org.apache.flink.streaming.util.TestStreamEnvironment;
> import org.junit.Test;
>
> import java.util.concurrent.atomic.AtomicBoolean;
>
> import static org.junit.Assert.assertTrue;
>
> public class TestMinimal {
> static AtomicBoolean sinked = new AtomicBoolean(false);
> @Test
> public void testThing() throws Exception {
> StreamExecutionEnvironment env =
> TestStreamEnvironment.getExecutionEnvironment();
>
> env.fromElements(Tuple2.of("a", 1), Tuple2.of("a", 1))
> .keyBy(0)
> .window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
> .sum(1)
> .addSink(new SinkFunction>() {
> @Override
> public void invoke(Tuple2 value)
> throws Exception {
> sinked.set(true);
> }
> });
> env.execute();
> // presumably once execute returns, all elements have passed
> through all operators.
> assertTrue(sinked.get());
> }
> }
>
> Is there a way to make this test pass?
>
> Using event time windows instead does seem to work, but processing
> time would be a little more convenient.
>
> --
> *CONFIDENTIALITY NOTICE: This email message, and any documents, files or
> previous e-mail messages attached to it is for the sole use of the intended
> recipient(s) and may contain confidential and privileged information. Any
> unauthorized review, use, disclosure or distribution is prohibited. If you
> are not the intended recipient, please contact the sender by reply email
> and destroy all copies of the original message.*
>


RE: Custom Partitioning and windowing questions/concerns

2017-01-24 Thread Katsipoulakis, Nikolaos Romanos
Hello Fabian,

First, I would like to thank you for your suggestion and the additional 
information on determinism and partition policies. As I mentioned on my initial 
email, I am new to Flink and every additional piece of advice makes my 
“learning curve” less steep. In addition, I am aware that you (and everyone 
else who follows this thread) might wonder why am I following this 
unconventional path of performance partitioning, but, I have to inform you that 
my use-case’s goal is of academic nature.

Turning to your suggestion, I took some time and go over version’s 1.2-SNAPSHOT 
code, and I read the online documentation on the Process Function API which I 
found at: 
https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/process_function.html
 . From my understanding, the process() transformation can be applied only on a 
KeyedStream and not on a DataStream. Therefore, if I wanted to use a 
custom partition algorithm, I would have to first make a call to 
partitionCustom() (DataStream -> DataStream), followed by a keyBy(…) 
(DataStream -> KeyedStream), and finally apply my first pre-aggregation 
step (i.e., call to process()). Concretely, my code would turn to something 
like the following:
// Phase 1: parallel partial sum, with a parallelism of N (N > 1)
DataStream phaseOne = stream
.partitionCustom(new CustomPartitioner(...)) // or .rebalance() 
or .shuffle()
.keyBy(1)
.process(new CustomProcessFunction(..., Time.seconds(10),...))
.sum(2).setParallelism(N);

Unfortunately, you can understand that the above would be problematic for two 
reasons: First, a call to keyBy() defeats the purpose of a custom partitioner, 
because stream will be (ultimately) partitioned based on the keys and not on my 
CustomPartitioner.selectChannels() method. Second, using process() does not 
solve my problem, because the issue with my use-case is to avoid calling 
keyBy(). If I could do that, then I might as well call window()and not use the 
process API in the first place. To be more precise, if I could use a 
KeyedStream, then I could do the following:

// Phase 1: parallel partial sum, with a parallelism of N (N > 1)
DataStream phaseOne = stream
.partitionCustom(new CustomPartitioner(...))
.keyBy(1)
.window(TumblingEventTimeWindows.of(Time.seconds(10))
.sum(2).setParallelism(N);

Therefore, I don’t think using a Process Function would solve my problem. Am I 
understanding your suggestion correctly? If yes, I would be grateful if you 
could explain to me in more detail. On top of that, after reading my initial 
email again, I believe that the intentions for my use-case were not quite 
clear. Please, do not hesitate to ask me for any clarifications.

Again, thank you very much for your interest and your time.

Kind Regards,

Nikos R. Katsipoulakis,
Department of Computer Science
University of Pittsburgh

From: Fabian Hueske [mailto:fhue...@gmail.com]
Sent: Tuesday, January 24, 2017 5:15 AM
To: user@flink.apache.org
Subject: Re: Custom Partitioning and windowing questions/concerns

Hi Nikos,
Flink's windows require a KeyedStream because they use the keys to manage their 
internal state (each in-progress window has some state that needs to be 
persisted and checkpointed).
Moreover, Flink's event-time window operators return a deterministic result. In 
your use-case, the result of the pre-aggregation (phase 1) should not 
deterministic because it would depend on the partitioning of the input.
I would suggest to implement the pre-aggregation not with a window but with a 
ProcessFunction (available in Flink 1.2-SNAPSHOT which will be release soon).
ProcessFunction allows you to register timers which can be used to emit results 
every 10 seconds.
Hope this helps,
Fabian


2017-01-23 17:50 GMT+01:00 Katsipoulakis, Nikolaos Romanos 
>:
Hello all,

Currently, I examine the effects of stream partitioning on performance for 
simple state-full scenarios.

My toy application for the rest of my question will be the following: A stream 
of non-negative integers, each one annotated with a timestamp, and the goal is 
to get the top-10 most frequent non-negative integers on tumbling windows of 10 
seconds. In other words, my input is a stream of tuples with two fields, 
Tuple2(timestamp, key), where key is the non-negative integer 
value, and timestamp is used to assign each event to a window. The execution 
plan I am considering is to have a first phase (Phase 1), where the stream is 
partitioned and the partial aggregations are processed in parallel (set 
parallelism to N > 1). Afterwards, the second phase (Phase 2) involves 
gathering all partial aggregations on a single node (set parallelism to 1), and 
calculate the full aggregation for each key, order the keys based on 

Re: Flink configuration

2017-01-24 Thread Aljoscha Krettek
Hi,
that wording is from a time where no-one though about VMs with virtual
cores. IMHO this maps directly to virtual cores so you should set it
according to the number of virtual cores of your VMs.

Cheers,
Aljoscha

On Mon, 23 Jan 2017 at 11:51 Nancy Estrada 
wrote:

> Hi all,
>
> I have been reading about how to configure Flink when we have a set up
> consisting on a couple of VMs with more than 1 vCore.  I am a bit confused
> about how to set the degree of parallelism in the
> taskmanager.numberOfTaskSlots parameter:
>
> * According to the Flink documentation[1], this value is typically
> proportional to the number of/ physical CPU cores/ that the TaskManager’s
> machine has.
>
> * However the YARN documentation[2], makes reference to the number of
> /Virtual CPU cores/ per TaskManager.
>
> My question is, If my Flink Jobs will be running on VMs (without using
> YARN),  the "taskmanager.numberOfTaskSlots" will depend on the number of
> vCPU that mi VM has? or must be related to the physical cores?
>
> Thanks in advance for your help!
> Nancy
>
> [1]https://ci.apache.org/projects/flink/flink-docs-release-0.8/config.html
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-0.8/yarn_setup.html
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-configuration-tp11210.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Re: accessing flink HA cluster with scala shell/zeppelin notebook

2017-01-24 Thread Aljoscha Krettek
+Till Rohrmann , do you know what can be used to
access a HA cluster from that setting.

Adding Till since he probably knows the HA stuff best.

On Sun, 22 Jan 2017 at 15:58 Maciek Próchniak  wrote:

> Hi,
>
> I have standalone Flink cluster configured with HA setting (i.e. with
> zookeeper recovery). How should I access it remotely, e.g. with Zeppelin
> notebook or scala shell?
>
> There are settings for host/port, but with HA setting they are not fixed
> - if I check which is *current leader* host and port and set that I get
> exception on job manager:
>
> 20:36:38.237 [flink-akka.actor.default-dispatcher-22704] WARN
> o.a.f.runtime.jobmanager.JobManager - Discard message
> LeaderSessionMessage(null,SubmitJob(JobGraph(jobId:
> 02a3a43464b2a750e04855d4c0b6fecb),EXECUTION_RESULT_AND_STATE_CHANGES))
> because the expected leader session ID
> Some(1a4e9d39-2c59-45bb-b81c-d867bec1958d) did not equal the received
> leader session ID None.
>
> - I guess it's reasonable behaviour, since I should use appropriate
> LeaderRetrievalService and so on. But apparently there's no such
> possibility in scala flink shell?
>
> Is it missing feature? I can prepare patch, but I'm not sure how would I
> hook behaviour of ClusterClient into FlinkILoop?
>
> thanks,
>
> maciek
>
>


Re: Flink snapshotting to S3 - Timeout waiting for connection from pool

2017-01-24 Thread Stephan Ewen
Hi Shannon!

I was wondering if you still see this issue in Flink 1.1.4?

Just thinking that another possible cause for the issue could be that there
is a connection leak somewhere (Flink code or user code or vendor library)
and thus the S3 connector's connection pool starves.
For Flink 1.2, there is a safetynet that tracks and closes connections that
go through Flink's filesystem abstraction. So that should not be an issue
there any more.

Best,
Stephan



On Fri, Jan 13, 2017 at 1:04 AM, Shannon Carey  wrote:

> Good to know someone else has had the same problem... What did you do
> about it? Did it resolve on its own?
>
> -Shannon
>
>
>
>
> On 1/12/17, 11:55 AM, "Chen Qin"  wrote:
>
> >We have seen this issue back to Flink 1.0. Our finding back then was
> traffic congestion to AWS in internal network. Many teams too dependent on
> S3 and bandwidth is shared, cause traffic congestion from time to time.
> >
> >Hope it helps!
> >
> >Thanks
> >Chen
> >
> >> On Jan 12, 2017, at 03:30, Ufuk Celebi  wrote:
> >>
> >> Hey Shannon!
> >>
> >> Is this always reproducible and how long does it take to reproduce it?
> >>
> >> I've not seen this error before but as you say it indicates that some
> >> streams are not closed.
> >>
> >> Did the jobs do any restarts before this happened? Flink 1.1.4
> >> contains fixes for more robust releasing of resources in failure
> >> scenarios. Is trying 1.1.4 an option?
> >>
> >> – Ufuk
> >>
> >>> On Thu, Jan 12, 2017 at 1:18 AM, Shannon Carey 
> wrote:
> >>> I'm having pretty frequent issues with the exception below. It
> basically
> >>> always ends up killing my cluster after forcing a large number of job
> >>> restarts. I just can't keep Flink up & running.
> >>>
> >>> I am running Flink 1.1.3 on EMR 5.2.0. I already tried updating the
> >>> emrfs-site config fs.s3.maxConnections from the default (50) to 75,
> after
> >>> AWS support told me the name of the config option. However, that hasn't
> >>> fixed the problem. Assuming that increasing the maxConnections again
> doesn't
> >>> fix the problem, is there anything else I can do? Is anyone else
> having this
> >>> problem? Is it possible that the state backend isn't properly calling
> >>> close() on its filesystem objects? Or is there a large number of
> concurrent
> >>> open filesystem objects for some reason? I am using the default
> >>> checkpointing settings with one checkpoint at a time, checkpointing
> every 10
> >>> minutes. If I am reading the metrics correctly, the checkpoint
> duration is
> >>> between 12s and 3 minutes on one of the jobs, and 5s or less on the
> other 3.
> >>> Any help is appreciated.
> >>>
> >>> java.lang.RuntimeException: Could not initialize state backend.
> >>> at
> >>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.setup(
> AbstractStreamOperator.java:121)
> >>> at
> >>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.
> setup(AbstractUdfStreamOperator.java:82)
> >>> at
> >>> org.apache.flink.streaming.runtime.tasks.OperatorChain.
> createChainedOperator(OperatorChain.java:276)
> >>> at
> >>> org.apache.flink.streaming.runtime.tasks.OperatorChain.
> createOutputCollector(OperatorChain.java:212)
> >>> at
> >>> org.apache.flink.streaming.runtime.tasks.OperatorChain.
> createChainedOperator(OperatorChain.java:271)
> >>> at
> >>> org.apache.flink.streaming.runtime.tasks.OperatorChain.
> createOutputCollector(OperatorChain.java:212)
> >>> at
> >>> org.apache.flink.streaming.runtime.tasks.OperatorChain.
> createChainedOperator(OperatorChain.java:271)
> >>> at
> >>> org.apache.flink.streaming.runtime.tasks.OperatorChain.
> createOutputCollector(OperatorChain.java:212)
> >>> at
> >>> org.apache.flink.streaming.runtime.tasks.OperatorChain.
> createChainedOperator(OperatorChain.java:271)
> >>> at
> >>> org.apache.flink.streaming.runtime.tasks.OperatorChain.
> createOutputCollector(OperatorChain.java:212)
> >>> at
> >>> org.apache.flink.streaming.runtime.tasks.OperatorChain.<
> init>(OperatorChain.java:105)
> >>> at
> >>> org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:225)
> >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585)
> >>> at java.lang.Thread.run(Thread.java:745)
> >>> Caused by:
> >>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.
> AmazonClientException:
> >>> Unable to execute HTTP request: Timeout waiting for connection from
> pool
> >>> at
> >>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.
> AmazonHttpClient.executeHelper(AmazonHttpClient.java:618)
> >>> at
> >>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.
> AmazonHttpClient.doExecute(AmazonHttpClient.java:376)
> >>> at
> >>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.
> AmazonHttpClient.executeWithTimer(AmazonHttpClient.java:338)
> >>> at
> >>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.
> 

Re: Rapidly failing job eventually causes "Not enough free slots"

2017-01-24 Thread Stephan Ewen
Hi!

I think there were some issues in the HA recovery of 1.1.3 that were fixed
in 1.1.4 and 1.2.0.
What version are you running on?

Stephan


On Sat, Jan 21, 2017 at 4:58 PM, Ufuk Celebi  wrote:

> Hey Shannon,
>
> the final truth for recovery is in ZooKeeper. Can you check whether
> there also references available in ZooKeeper? Do you have the job
> manager logs available from after the failure? On recovery, Flink
> checks ZooKeeper for entries. These point to files in the storageDir.
> It could have happened that these got out of sync, e.g. entries
> deleted from ZK but not from the storageDir.
>
> Maybe the loss of the task managers can also be explained by a
> connection loss to ZK or something. When a JM looses leadership, the
> TMs cancel all tasks and disconnect from the JM. Here again, we would
> need to look into the logs.
>
> Sorry for the bad experience lately :-(
>
> – Ufuk
>
>
> On Sat, Jan 21, 2017 at 4:31 AM, Shannon Carey  wrote:
> > In fact, I can see all my job jar blobs and some checkpoint & job graph
> > files in my configured "recovery.zookeeper.storageDir"… however for some
> > reason it didn't get restored when my new Flink cluster started up.
> >
> >
> > From: Shannon Carey 
> > Date: Friday, January 20, 2017 at 9:14 PM
> > To: "user@flink.apache.org" 
> >
> > Subject: Re: Rapidly failing job eventually causes "Not enough free
> slots"
> >
> > I recently added some better visibility into the metrics we're gathering
> > from Flink. My Flink cluster died again due to the "Not enough free slots
> > available to run the job" problem, and this time I can see that the
> number
> > of registered task managers went down from 11 to 7, then waffled and only
> > ever got back up to 10 (one short of the requested amount) before
> dropping
> > to 0 just before the cluster died. This would seem to explain why there
> > weren't sufficient slots (given that we were probably using them all or
> > nearly all)… The metric of "running jobs" went down from 5 to 3 during
> this
> > time period as well. So the problem seems to be loss of taskmanagers due
> to
> > errors (not yet sure what exactly as I have to delve into logs).
> >
> > The other thing I have to figure out is restoring the jobs… I thought
> that
> > HA would start the jobs back up again if Flink died & I re-launched it,
> but
> > that doesn't appear to be the case.
> >
> >
> > From: Stephan Ewen 
> > Date: Thursday, January 5, 2017 at 7:52 AM
> > To: 
> > Subject: Re: Rapidly failing job eventually causes "Not enough free
> slots"
> >
> > Another thought on the container failure:
> >
> > in 1.1, the user code is loaded dynamically whenever a Task is started.
> That
> > means that on every task restart the code is reloaded. For that to work
> > proper, class unloading needs to happen, or the permgen will eventually
> > overflow.
> >
> > It can happen that class unloading is prevented if the user functions do
> > leave references around as "GC roots", which may be threads, or
> references
> > in registries, etc.
> >
> > In Flink 1.2, YARN will put the user code into the application
> classpath, so
> > code needs not be reloaded on every restart. That should solve that
> issue.
> > To "simulate" that behavior in Flink 1.1, put your application code jars
> > into the "lib" folder
> >
> > Best,
> > Stephan
> >
> >
> > On Thu, Jan 5, 2017 at 1:15 PM, Yury Ruchin 
> wrote:
> >>
> >> Hi,
> >>
> >> I've faced a similar issue recently. Hope sharing my findings will help.
> >> The problem can be split into 2 parts:
> >>
> >> Source of container failures
> >> The logs you provided indicate that YARN kills its containers for
> >> exceeding memory limits. Important point here is that memory limit = JVM
> >> heap memory + off-heap memory. So if off-heap memory usage is high,
> YARN may
> >> kill containers despite JVM heap consumption is fine. To solve this
> issue,
> >> Flink reserves a share of container memory for off-heap memory. How much
> >> will be reserved is controlled by yarn.heap-cutoff-ratio and
> >> yarn.heap-cutoff-min configuration. By default 25% of the requested
> >> container memory will be reserved for off-heap. This is seems to be a
> good
> >> start, but one should experiment and tune to meet their job specifics.
> >>
> >> It's also worthwhile to figure out who consumes off-heap memory. Is it
> >> Flink managed memory moved off heap (taskmanager.memory.off-heap =
> true)? Is
> >> it some external library allocating something off heap? Is it your own
> code?
> >>
> >> How Flink handles task manager failures
> >> Whenever a task manager fails, the Flink jobmanager decides whether it
> >> should:
> >> - reallocate failed task manager container
> >> - fail application entirely
> >> These decisions can be guided by certain configuration
> >> (https://ci.apache.org/projects/flink/flink-docs-
> 

Re: Why is IdentityObjectIntMap.get called so often?

2017-01-24 Thread Stephan Ewen
Hi!

I think the best way to get away from Kryo is to write types that go
through Flink's own serialization stack:
Have a look here for a bit of background:
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/types_serialization.html#flinks-typeinformation-class

BTW: Is the "hot map" involved in Kryo's reference resolution? If yes, you
could try a custom hack and see whether it performs better when reference
resolution is deactivated (modify the KryoSerializer.checkKryoInitialized()
method to deactivate reference handling).

Hope that helps,
Stephan



On Tue, Jan 24, 2017 at 12:47 PM, Dmitry Golubets 
wrote:

> Hi,
>
> I've just added my custom MsgPack serializers hoping to see performance
> increase. I covered all data types in between chains.
>
> However this Kryo method still takes a lot of CPU: IdentityObjectIntMap.get
>
> Is there something else should be configured?
> Or is there no way to get away from Kryo overhead?
>
> Best regards,
> Dmitry
>


Re: About delta awareness caches

2017-01-24 Thread Aljoscha Krettek
Hi,
in fact, this was just merged:
https://issues.apache.org/jira/browse/FLINK-5582. It will be released as
part of Flink 1.3 in roughly 4 months. The feature is still a bit rough
around the edges and needs some follow-up work, however.

Cheers,
Aljoscha

On Thu, 12 Jan 2017 at 11:10 Xingcan  wrote:

> Hi, Aljoscha
>
> Thanks for your explanation.
>
> About the Storm windows simulation, we had tried your suggestion and gave
> up due to its complexity and sort of "reinventing the wheel". Without
> considering the performance, most of our business-logic code have already
> been transformed to the "Flink style".
>
> I am glad to hear that adding the accumulator is just in progress. As far
> as I can see, the operations it supplies will adequately meet the demands.
> I will stay focus on this topic.
>
> Best,
> Xingcan
>
> On Wed, Jan 11, 2017 at 7:28 PM, Aljoscha Krettek 
> wrote:
>
> Hi,
> (I'm just getting back from holidays, therefore the slow response. Sorry
> for that.)
>
> I think you can simulate the way Storm windows work by using a
> GlobalWindows assigner and having a custom Trigger and/or Evictor and also
> some special logic in your WindowFunction.
>
> About mergeable state, we're actually in the process of adding something
> like this that would be a generalisation of reduce and fold: you can call
> it combine or aggregate. The idea is to have these operations:
>
> - create accumulator
> - add value to accumulator
> - merge accumulators
> - extract output from accumulator
>
> You have three types: IN for incoming values, ACC for accumulators and OUT
> as the result of extracting output from an accumulator. This should cover
> most cases.
>
> What do you think?
>
> Cheers,
> Aljoscha
>
> On Thu, 22 Dec 2016 at 07:13 xingcan  wrote:
>
> Hi Aljoscha,
>
> First of all, sorry for that I missed your prompt reply : (
>
> In these days, I've been learning the implementation mechanism of window
> in Flink.
>
> I think the main difference between the window in Storm and Flink (from
> the API level) is that, Storm maintains only one window while Flink
> maintains several isolated windows. Due to that, Storm users can be aware
> of the transformation (tuple add and expire) of a window and take actions
> on each window modification (sliding window forwarding) while Flink users
> can only implement functions on one and another complete window as if they
> are independent of each other (actually they may get quite a few tuples in
> common).
>
> Objectively speaking, the window API provided by Flink is more formalize
> and easy to use. However, for sliding window with high-capacity and short
> interval (e.g. 5m and 1s), each tuple will be calculated redundantly (maybe
> 300 times in the example?). Though it provide the pane optimization, I
> think it's far from enough as the optimization can only be applied on
> reduce functions which restrict the input and output data type to be the
> same. Some other functions, e.g., the MaxAndMin function which take numbers
> as input and output a max pair and the Average function, which should
> avoid redundant calculations can not be satisfied.
>
> Actually, I just wondering if a "mergeable fold function" could be added
> (just *like* this https://en.wikipedia.org/wiki/Mergeable_heap). I know
> it may violate some principles of Flink (probably about states), but I
> insist that unnecessary calculations should be avoided in stream processing.
>
> So, could you give some advices, I am all ears : ), or if you think that
> is feasible, I'll think carefully and try to complete it.
>
> Thank you and merry Christmas.
>
> Best,
>
> - Xingcan
>
> On Thu, Dec 1, 2016 at 7:56 PM, Aljoscha Krettek 
> wrote:
>
> I'm not aware of how windows work in Storm. If you could maybe give some
> details on your use case we could figure out together how that would map to
> Flink windows.
>
> Cheers,
> Aljoscha
>
> On Tue, 29 Nov 2016 at 15:47 xingcan  wrote:
>
> Hi all,
>
> Recently I tried to transfer some old applications from Storm to Flink.
> In Storm, the window implementation (TupleWindow) gets two methods named
> getNew() and getExpired() which supply the delta information of a window
> and therefore we wrote some stateful caches that are aware of them.
> However, it seems that Flink deals with the window in a different way and
> supplies more "formalized" APIs.
> So, any tips on how to adapt these delta awareness caches in Flink or do
> some refactors to make them suitable?
>
> Thanks.
>
> Best,
> Xingcan
>
>
>
>


Re: Received an event in channel 0 while still having data from a record

2017-01-24 Thread Aljoscha Krettek
Hi Billy,
the stack trace seems to indicate that there is a problem at the point
where the data sink is trying to read the input elements so it doesn't seem
to be related to the source. Could you also post what sinks you have and
what the type of the input elements of these sinks are?

Cheers,
Aljoscha

On Thu, 12 Jan 2017 at 04:45 M. Dale  wrote:

> How were the Parquet files you are trying to read generated? Same version
> of libraries? I am successfully using the following Scala code to read
> Parquet files using the HadoopInputFormat wrapper. Maybe try that in Java?
>
> val hadoopInputFormat =
>   new HadoopInputFormat[Void, GenericRecord](new AvroParquetInputFormat, 
> classOf[Void], classOf[GenericRecord], job)
>
> AvroParquetInputFormat.setAvroReadSchema(job, EventOnlyRecord.getClassSchema)
> //APIF extends ParquetInputFormat which extends FileInputFormat 
> (FIP)//addInputPath is a static method on FIP.val inputPath = new Path(input)
> FileInputFormat.addInputPath(job, inputPath)
> val rawEvents: DataSet[(Void, GenericRecord)] = 
> env.createInput(hadoopInputFormat)
>
> On 01/11/2017 03:16 PM, Newport, Billy wrote:
>
> Anyone seen this before:
>
>
>
> Caused by: *java.io.IOException*: Received an event in channel 0 while
> still having data from a record. This indicates broken serialization logic.
> If you are using custom serialization code (Writable or Value types), check
> their serialization routines. In the case of Kryo, check the respective
> Kryo serializer.
>
>  at
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(
> *AbstractRecordReader.java:98*)
>
>  at
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(
> *MutableRecordReader.java:42*)
>
>  at org.apache.flink.runtime.operators.util.ReaderIterator.next(
> *ReaderIterator.java:73*)
>
>  at org.apache.flink.runtime.operators.DataSinkTask.invoke(
> *DataSinkTask.java:190*)
>
>  at org.apache.flink.runtime.taskmanager.Task.run(*Task.java:642*)
>
>  at java.lang.Thread.run(*Thread.java:745*)
>
>
>
>
>
> We’re on 1.1.4 right now. We’re reading parquet file using code like this:
>
>
>
>AvroParquetInputFormat inputFormat = *new*
> AvroParquetInputFormat();
>
>AvroParquetInputFormat.*setAvroReadSchema*(job, getMergeSchema(
> storeName, datasetName));
>
>
>
>// Get patch of input parquet file
>
>DatasetHdfsInfo info = getLastCompleteMergedDatasetHDFSInfo(
> storeName, datasetName);
>
>
>
>Path path = *new* Path(info.getRootDir());
>
>
>
>DataSet> d =
> getExecutionEnvironment().readHadoopFile(inputFormat, Void.*class*,
> GenericRecord.*class*, path.toString(), job);
>
>
>
>
>
>
>
>
>


Kafka data not read in FlinkKafkaConsumer09 second time from command line

2017-01-24 Thread Sujit Sakre
Hi,

We are using a sliding window function to process data read from Kafka
Stream. We are using FlinkKafkaConsumer09 to read the data. The window
function and sink are running correctly.

To test the program, we are generating a stream of data from command line.
This works when we add set of records once. When we add again, it does not
work, Flink produces no result, even though the records are added to same
Kafka topic from the same command line instance.

Please could you suggest what could be wrong.

Many thanks.


*Sujit Sakre*

-- 
This email is sent on behalf of Northgate Public Services (UK) Limited and 
its associated companies including Rave Technologies (India) Pvt Limited 
(together "Northgate Public Services") and is strictly confidential and 
intended solely for the addressee(s). 
If you are not the intended recipient of this email you must: (i) not 
disclose, copy or distribute its contents to any other person nor use its 
contents in any way or you may be acting unlawfully;  (ii) contact 
Northgate Public Services immediately on +44(0)1908 264500 quoting the name 
of the sender and the addressee then delete it from your system.
Northgate Public Services has taken reasonable precautions to ensure that 
no viruses are contained in this email, but does not accept any 
responsibility once this email has been transmitted.  You should scan 
attachments (if any) for viruses.

Northgate Public Services (UK) Limited, registered in England and Wales 
under number 00968498 with a registered address of Peoplebuilding 2, 
Peoplebuilding Estate, Maylands Avenue, Hemel Hempstead, Hertfordshire, HP2 
4NN.  Rave Technologies (India) Pvt Limited, registered in India under 
number 117068 with a registered address of 2nd Floor, Ballard House, Adi 
Marzban Marg, Ballard Estate, Mumbai, Maharashtra, India, 41.


Re: Rate-limit processing

2017-01-24 Thread Florian König
Hi Till,

thank you for the very helpful hints. You are right, I already see 
backpressure. In my case, that’s ok because it throttles the Kafka source. 
Speaking of which: You mentioned putting the rate limiting mechanism into the 
source. How can I do this with a Kafka source? Just extend the Producer, or is 
there a better mechanism to hook into the connector?

Cheers,
Florian


> Am 20.01.2017 um 16:58 schrieb Till Rohrmann :
> 
> Hi Florian,
> 
> any blocking of the user code thread is in general a not so good idea because 
> the checkpointing happens under the very same lock which also guards the user 
> code invocation. Thus any checkpoint barrier arriving at the operator has 
> only the chance to trigger the checkpointing once the blocking is over. Even 
> worse, if the blocking happens in a downstream operator (not a source), then 
> this blocking could cause backpressure. Since the checkpoint barriers flow 
> with the events and are processed in order, the backpressure will then also 
> influence the checkpointing time.
> 
> So if you want to limit the rate, you should do it a the sources without 
> blocking the source thread. You could for example count how many elements 
> you've emitted in the past second and if it exceeds your maximum, then you 
> don't emit the next element to downstream operators until some time has 
> passed (this might end up in a busy loop but it allows the checkpointing to 
> claim the lock).
> 
> Cheers,
> Till
> 
> On Fri, Jan 20, 2017 at 12:18 PM, Yassine MARZOUGUI 
>  wrote:
> Hi,
> 
> You might find this similar thread from the mailing list archive helpful : 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/throttled-stream-td6138.html.
> 
> Best,
> Yassine
> 
> 2017-01-20 10:53 GMT+01:00 Florian König :
> Hi,
> 
> i need to limit the rate of processing in a Flink stream application. 
> Specifically, the number of items processed in a .map() operation has to stay 
> under a certain maximum per second.
> 
> At the moment, I have another .map() operation before the actual processing, 
> which just sleeps for a certain time (e.g., 250ms for a limit of 4 requests / 
> sec) and returns the item unchanged:
> 
> …
> 
> public T map(final T value) throws Exception {
> Thread.sleep(delay);
> return value;
> }
> 
> …
> 
> This works as expected, but is a rather crude approach. Checkpointing the job 
> takes a very long time: minutes for a state of a few kB, which for other jobs 
> is done in a few milliseconds. I assume that letting the whole thread sleep 
> for most of the time interferes with the checkpointing - not good!
> 
> Would using a different synchronization mechanism (e.g., 
> https://google.github.io/guava/releases/19.0/api/docs/index.html?com/google/common/util/concurrent/RateLimiter.html)
>  help to make checkpointing work better?
> 
> Or, preferably, is there a mechanism inside Flink that I can use to 
> accomplish the desired rate limiting? I haven’t found anything in the docs.
> 
> Cheers,
> Florian
> 
> 




Why is IdentityObjectIntMap.get called so often?

2017-01-24 Thread Dmitry Golubets
Hi,

I've just added my custom MsgPack serializers hoping to see performance
increase. I covered all data types in between chains.

However this Kryo method still takes a lot of CPU: IdentityObjectIntMap.get

Is there something else should be configured?
Or is there no way to get away from Kryo overhead?

Best regards,
Dmitry


Improving Flink Performance

2017-01-24 Thread Jonas
Hello!I'm reposting this since the other thread had some formatting issues
apparently. I hope this time it works.I'm having performance problems with a
Flink job. If there is anything valuable missing, please ask and I will try
to answer ASAP. My job looks like this:First, I read data from Kafka. This
is very fast at 100k msgs/s. The data is decoded, a type is added (we have
multiple message types per Kafka topic). Then we select the TYPE_A messages,
create a Scala entity out of if (a case class). Afterwards in the
MapEntityToMultipleEntities the Scala entities are split into multiple.
Finally a watermark is added.As you can see the data is not keyed in any way
yet. *Is there a way to make this faster?*/Measurements were taken withand
/I'm running this on a Intel i5-3470, 16G RAM, Ubuntu 16.04.1 LTS on Flink
1.1.4



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Improving-Flink-Performance-tp11248.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Re: Problem with state Apache Flink 1.2.0 RC0

2017-01-24 Thread Salou Guillaume
Hi !

I have the same problem on my laptop and on my desk at work.

I have also tested, it appears under private browsing.

Regards,

Guillaume

2017-01-24 11:49 GMT+01:00 Stephan Ewen :

> Hi!
>
> Is this a dashboard caching issue? Can you try to "force refresh" the
> dashboard?
>
> Please let us know if that solves the issue
>
> (+Chesnay)
>
> Stephan
>
> On Tue, Jan 24, 2017 at 11:08 AM, Salou Guillaume 
> wrote:
>
>> Hello flink users !
>>
>> I'm using flink  Apache Flink 1.2.0 RC0, the tasks status in Apache web
>> dashboard are CANCELED, but my tasks are yet running and doing their job.
>>
>> The status in the main page is RUNNING
>>
>> I have the same problem with 2 different jobs.
>>
>> Regards,
>>
>> Guillame
>>
>
>


Re: 答复: [DISCUSS] Apache Flink 1.2.0 RC0 (Non-voting testing release candidate)

2017-01-24 Thread Robert Metzger
RC1 creation is in progress ...

On Mon, Jan 23, 2017 at 10:33 AM, Robert Metzger 
wrote:

> Hi all,
>
> I would like to do a proper voting RC1 early this week.
> From the issues mentioned here, most of them have pull requests or were
> changed to a lower priority.
> Once we've merged all outstanding PRs, I'll create the next RC.
>
> Regards,
> Robert
>
>
> On Mon, Jan 16, 2017 at 12:13 PM, Fabian Hueske  wrote:
>
>> A user reported that outer joins on the Table API and SQL compute wrong
>> results:
>>
>> https://issues.apache.org/jira/browse/FLINK-5498
>>
>> 2017-01-15 20:23 GMT+01:00 Till Rohrmann :
>>
>> > I found two problematic issues with Mesos HA mode which breaks it:
>> >
>> > https://issues.apache.org/jira/browse/FLINK-5495
>> > https://issues.apache.org/jira/browse/FLINK-5496
>> >
>> > On Fri, Jan 13, 2017 at 11:29 AM, Fabian Hueske 
>> wrote:
>> >
>> > > I tested the Table API / SQL a bit.
>> > >
>> > > I implemented a windowed aggregation with the streaming Table API and
>> it
>> > > produced the same results as a DataStream API implementation.
>> > > Joining a stream with a TableFunction also seemed to work well.
>> > > Moreover, I checked the results of a bunch of TPC-H queries (batch
>> SQL)
>> > > and all produced correct results.
>> > >
>> > >
>> > >
>> > > 2017-01-12 17:45 GMT+01:00 Till Rohrmann :
>> > >
>> > >> I'm wondering whether we should not depend the webserver encryption
>> on
>> > the
>> > >> global encryption activation and activating it instead per default.
>> > >>
>> > >> On Thu, Jan 12, 2017 at 4:54 PM, Chesnay Schepler <
>> ches...@apache.org>
>> > >> wrote:
>> > >>
>> > >> > FLINK-5470 is a duplicate of FLINK-5298 for which there is also an
>> > open
>> > >> PR.
>> > >> >
>> > >> > FLINK-5472 is imo invalid since the webserver does support https,
>> you
>> > >> just
>> > >> > have to enable it as per the security documentation.
>> > >> >
>> > >> >
>> > >> > On 12.01.2017 16:20, Till Rohrmann wrote:
>> > >> >
>> > >> > I also found an issue:
>> > >> >
>> > >> > https://issues.apache.org/jira/browse/FLINK-5470
>> > >> >
>> > >> > I also noticed that Flink's webserver does not support https
>> requests.
>> > >> It
>> > >> > might be worthwhile to add it, though.
>> > >> >
>> > >> > https://issues.apache.org/jira/browse/FLINK-5472
>> > >> >
>> > >> > On Thu, Jan 12, 2017 at 11:24 AM, Robert Metzger <
>> rmetz...@apache.org
>> > >
>> > >> > wrote:
>> > >> >
>> > >> >> I also found a bunch of issues
>> > >> >>
>> > >> >> https://issues.apache.org/jira/browse/FLINK-5465
>> > >> >> https://issues.apache.org/jira/browse/FLINK-5462
>> > >> >> https://issues.apache.org/jira/browse/FLINK-5464
>> > >> >> https://issues.apache.org/jira/browse/FLINK-5463
>> > >> >>
>> > >> >>
>> > >> >> On Thu, Jan 12, 2017 at 9:56 AM, Fabian Hueske < <
>> fhue...@gmail.com>
>> > >> >> fhue...@gmail.com> wrote:
>> > >> >>
>> > >> >> > I have another bugfix for 1.2.:
>> > >> >> >
>> > >> >> > https://issues.apache.org/jira/browse/FLINK-2662 (pending PR)
>> > >> >> >
>> > >> >> > 2017-01-10 15:16 GMT+01:00 Robert Metzger < <
>> rmetz...@apache.org>
>> > >> >> rmetz...@apache.org>:
>> > >> >> >
>> > >> >> > > Hi,
>> > >> >> > >
>> > >> >> > > this depends a lot on the number of issues we find during the
>> > >> testing.
>> > >> >> > >
>> > >> >> > >
>> > >> >> > > These are the issues I found so far:
>> > >> >> > >
>> > >> >> > > https://issues.apache.org/jira/browse/FLINK-5379 (unresolved)
>> > >> >> > > https://issues.apache.org/jira/browse/FLINK-5383 (resolved)
>> > >> >> > > https://issues.apache.org/jira/browse/FLINK-5382 (resolved)
>> > >> >> > > https://issues.apache.org/jira/browse/FLINK-5381 (resolved)
>> > >> >> > > https://issues.apache.org/jira/browse/FLINK-5380 (pending PR)
>> > >> >> > >
>> > >> >> > >
>> > >> >> > >
>> > >> >> > >
>> > >> >> > > On Tue, Jan 10, 2017 at 11:58 AM, shijinkui <
>> > shijin...@huawei.com>
>> > >> >> > wrote:
>> > >> >> > >
>> > >> >> > > > Do we have a probable time of 1.2 release? This month or
>> Next
>> > >> month?
>> > >> >> > > >
>> > >> >> > > > -邮件原件-
>> > >> >> > > > 发件人: Robert Metzger [mailto: 
>> > >> >> rmetz...@apache.org]
>> > >> >> > > > 发送时间: 2017年1月3日 20:44
>> > >> >> > > > 收件人: d...@flink.apache.org
>> > >> >> > > > 抄送: user@flink.apache.org
>> > >> >> > > > 主题: [DISCUSS] Apache Flink 1.2.0 RC0 (Non-voting testing
>> > release
>> > >> >> > > candidate)
>> > >> >> > > >
>> > >> >> > > > Hi,
>> > >> >> > > >
>> > >> >> > > > First of all, I wish everybody a happy new year 2017.
>> > >> >> > > >
>> > >> >> > > > I've set user@flink in CC so that users who are interested
>> in
>> > >> >> helping
>> > >> >> > > > with the testing get notified. Please respond only to the
>> dev@
>> > >> >> list to
>> > >> >> > > > keep the discussion there!
>> > >> >> > > >
>> > >> >> > > > According to the 1.2 release discussion thread, 

Re: Improving Flink performance

2017-01-24 Thread Jonas
I don't even have images in there :O Will delete this thread and create a new
one.



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Improving-Flink-performance-tp11211p11245.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: How to get top N elements in a DataSet?

2017-01-24 Thread Fabian Hueske
You are of course right Gabor.
@Ivan, you can use a heap in the MapPartitionFunction to collect the top 10
elements (note that you need to create deep-copies if object reuse is
enabled [1]).

Best, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/batch/index.html#operating-on-data-objects-in-functions


2017-01-24 11:49 GMT+01:00 Gábor Gévay :

> Hello,
>
> Btw. there is a Jira about this:
> https://issues.apache.org/jira/browse/FLINK-2549
> Note that the discussion there suggests a more efficient approach,
> which doesn't involve sorting the entire partitions.
>
> And if I remember correctly, this question comes up from time to time
> on the mailing list.
>
> Best,
> Gábor
>
>
>
> 2017-01-24 11:35 GMT+01:00 Fabian Hueske :
> > Hi Ivan,
> >
> > I think you can use MapPartition for that.
> > So basically:
> >
> > dataset // assuming some partitioning that can be reused to avoid a
> shuffle
> >   .sortPartition(1, Order.DESCENDING)
> >   .mapPartition(new ReturnFirstTen())
> >   .sortPartition(1, Order.DESCENDING).parallelism(1)
> >   .mapPartition(new ReturnFirstTen())
> >
> > Best, Fabian
> >
> >
> > 2017-01-24 10:10 GMT+01:00 Ivan Mushketyk :
> >>
> >> Hi,
> >>
> >> I have a dataset of tuples with two fields ids and ratings and I need to
> >> find 10 elements with the highest rating in this dataset. I found a
> >> solution, but I think it's suboptimal and I think there should be a
> better
> >> way to do it.
> >>
> >> The best thing that I came up with is to partition dataset by rating,
> sort
> >> locally and write the partitioned dataset to disk:
> >>
> >> dataset
> >> .partitionCustom(new Partitioner() {
> >>   @Override
> >>   public int partition(Double key, int numPartitions) {
> >> return key.intValue() % numPartitions;
> >>   }
> >> }, 1) . // partition by rating
> >> .setParallelism(5)
> >> .sortPartition(1, Order.DESCENDING) // locally sort by rating
> >> .writeAsText("..."); // write the partitioned dataset to disk
> >>
> >> This will store tuples in sorted files with names 5, 4, 3, ... that
> >> contain ratings in ranges (5, 4], (4, 3], and so on. Then I can read
> sorted
> >> data from disk and and N elements with the highest rating.
> >> Is there a way to do the same but without writing a partitioned dataset
> to
> >> a disk?
> >>
> >> I tried to use "first(10)" but it seems to give top 10 items from a
> random
> >> partition. Is there a way to get top N elements from every partition?
> Then I
> >> could locally sort top values from every partition and find top 10
> global
> >> values.
> >>
> >> Best regards,
> >> Ivan.
> >>
> >>
> >
>


Re: Problem with state Apache Flink 1.2.0 RC0

2017-01-24 Thread Stephan Ewen
Hi!

Is this a dashboard caching issue? Can you try to "force refresh" the
dashboard?

Please let us know if that solves the issue

(+Chesnay)

Stephan

On Tue, Jan 24, 2017 at 11:08 AM, Salou Guillaume  wrote:

> Hello flink users !
>
> I'm using flink  Apache Flink 1.2.0 RC0, the tasks status in Apache web
> dashboard are CANCELED, but my tasks are yet running and doing their job.
>
> The status in the main page is RUNNING
>
> I have the same problem with 2 different jobs.
>
> Regards,
>
> Guillame
>


Re: How to get top N elements in a DataSet?

2017-01-24 Thread Gábor Gévay
Hello,

Btw. there is a Jira about this:
https://issues.apache.org/jira/browse/FLINK-2549
Note that the discussion there suggests a more efficient approach,
which doesn't involve sorting the entire partitions.

And if I remember correctly, this question comes up from time to time
on the mailing list.

Best,
Gábor



2017-01-24 11:35 GMT+01:00 Fabian Hueske :
> Hi Ivan,
>
> I think you can use MapPartition for that.
> So basically:
>
> dataset // assuming some partitioning that can be reused to avoid a shuffle
>   .sortPartition(1, Order.DESCENDING)
>   .mapPartition(new ReturnFirstTen())
>   .sortPartition(1, Order.DESCENDING).parallelism(1)
>   .mapPartition(new ReturnFirstTen())
>
> Best, Fabian
>
>
> 2017-01-24 10:10 GMT+01:00 Ivan Mushketyk :
>>
>> Hi,
>>
>> I have a dataset of tuples with two fields ids and ratings and I need to
>> find 10 elements with the highest rating in this dataset. I found a
>> solution, but I think it's suboptimal and I think there should be a better
>> way to do it.
>>
>> The best thing that I came up with is to partition dataset by rating, sort
>> locally and write the partitioned dataset to disk:
>>
>> dataset
>> .partitionCustom(new Partitioner() {
>>   @Override
>>   public int partition(Double key, int numPartitions) {
>> return key.intValue() % numPartitions;
>>   }
>> }, 1) . // partition by rating
>> .setParallelism(5)
>> .sortPartition(1, Order.DESCENDING) // locally sort by rating
>> .writeAsText("..."); // write the partitioned dataset to disk
>>
>> This will store tuples in sorted files with names 5, 4, 3, ... that
>> contain ratings in ranges (5, 4], (4, 3], and so on. Then I can read sorted
>> data from disk and and N elements with the highest rating.
>> Is there a way to do the same but without writing a partitioned dataset to
>> a disk?
>>
>> I tried to use "first(10)" but it seems to give top 10 items from a random
>> partition. Is there a way to get top N elements from every partition? Then I
>> could locally sort top values from every partition and find top 10 global
>> values.
>>
>> Best regards,
>> Ivan.
>>
>>
>


Re: How to get top N elements in a DataSet?

2017-01-24 Thread Fabian Hueske
Hi Ivan,

I think you can use MapPartition for that.
So basically:

dataset // assuming some partitioning that can be reused to avoid a shuffle
  .sortPartition(1, Order.DESCENDING)
  .mapPartition(new ReturnFirstTen())
  .sortPartition(1, Order.DESCENDING).parallelism(1)
  .mapPartition(new ReturnFirstTen())

Best, Fabian


2017-01-24 10:10 GMT+01:00 Ivan Mushketyk :

> Hi,
>
> I have a dataset of tuples with two fields ids and ratings and I need to
> find 10 elements with the highest rating in this dataset. I found a
> solution, but I think it's suboptimal and I think there should be a better
> way to do it.
>
> The best thing that I came up with is to partition dataset by rating, sort
> locally and write the partitioned dataset to disk:
>
> dataset
> .partitionCustom(new Partitioner() {
>   @Override
>   public int partition(Double key, int numPartitions) {
> return key.intValue() % numPartitions;
>   }
> }, 1) . // partition by rating
> .setParallelism(5)
> .sortPartition(1, Order.DESCENDING) // locally sort by rating
> .writeAsText("..."); // write the partitioned dataset to disk
>
> This will store tuples in sorted files with names 5, 4, 3, ... that
> contain ratings in ranges (5, 4], (4, 3], and so on. Then I can read sorted
> data from disk and and N elements with the highest rating.
> Is there a way to do the same but without writing a partitioned dataset to
> a disk?
>
> I tried to use "first(10)" but it seems to give top 10 items from a random
> partition. Is there a way to get top N elements from every partition? Then
> I could locally sort top values from every partition and find top 10 global
> values.
>
> Best regards,
> Ivan.
>
>
>


Re: multi tenant workflow execution

2017-01-24 Thread Fabian Hueske
Hi Chen,

if you plan to implement your application on top of the upcoming Flink
1.2.0 release, you might find the new AsyncFunction [1] and the
ProcessFunction [2] helpful.
AsyncFunction can be used for non-blocking calls to external services and
maintains the checkpointing semantics.
ProcessFunction allows to register and react to timers. This might easier
to use than a window for the 24h timeout.

Best,
Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/asyncio.html
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/process_function.html

2017-01-24 0:41 GMT+01:00 Chen Qin :

> Hi there,
>
> I am researching running one flink job to support customized event driven
> workflow executions. The use case is to support running various workflows
> that listen to a set of kafka topics and performing various rpc checks, a
> user travel through multiple stages in a rule execution(workflow
> execution). e.g
>
> kafka topic : user click stream
> rpc checks:
>
> if user is member,
> if user has shown interest of signup
>
>
> ​workflows:
> ​
>
> workflow 1: user click -> if user is member do A then do B
> workflow 2: user click -> if user has shown interest of signup then do A
> otherwise wait for 60 mins and try recheck, expire in 24 hours
>
> The goal is as I said to run workflow1 & workflow2 in one flink job.
>
> Initial thinking describes below
>
> sources are series of kafka topics, all events go through coMap,cache
> lookup event -> rules mapping and fan out to multiple {rule, user} tuple.
> Based on rule definition and stage user is in a given rule, it do series of
> async rpc check and side outputs to various of sinks.
>
>- If a {rule, user} tuple needs to stay in a operator states longer (1
>day), there should be a window following async rpc checks with customized
>purgetrigger firing those passes and purge either pass check or expired
>tuples.
>- If a {rule, user} execute to a stage which waits for a kafka event,
>it should be added to cache and hookup with coMap lookups near sources
>
>
>  Does that makes sense?
>
> Thanks,
> Chen
>
>
>


Re: Custom Partitioning and windowing questions/concerns

2017-01-24 Thread Fabian Hueske
Hi Nikos,

Flink's windows require a KeyedStream because they use the keys to manage
their internal state (each in-progress window has some state that needs to
be persisted and checkpointed).
Moreover, Flink's event-time window operators return a deterministic
result. In your use-case, the result of the pre-aggregation (phase 1)
should not deterministic because it would depend on the partitioning of the
input.

I would suggest to implement the pre-aggregation not with a window but with
a ProcessFunction (available in Flink 1.2-SNAPSHOT which will be release
soon).
ProcessFunction allows you to register timers which can be used to emit
results every 10 seconds.

Hope this helps,
Fabian



2017-01-23 17:50 GMT+01:00 Katsipoulakis, Nikolaos Romanos <
kat...@cs.pitt.edu>:

> Hello all,
>
>
>
> Currently, I examine the effects of stream partitioning on performance for
> simple state-full scenarios.
>
>
>
> My toy application for the rest of my question will be the following: A
> stream of non-negative integers, each one annotated with a timestamp, and
> the goal is to get the top-10 most frequent non-negative integers on
> tumbling windows of 10 seconds. In other words, my input is a stream of
> tuples with two fields, Tuple2(timestamp, key), where key
> is the non-negative integer value, and timestamp is used to assign each
> event to a window. The execution plan I am considering is to have a *first
> phase (Phase 1)*, where the stream is partitioned and the partial
> aggregations are processed in parallel (set parallelism to N > 1).
> Afterwards, the *second phase (Phase 2)* involves gathering all partial
> aggregations on a single node (set parallelism to 1), and calculate the
> full aggregation for each key, order the keys based on windowed frequency
> and outputs the top-10 keys for each window.
>
>
>
> As I mentioned earlier, my goal is to compare the performance of different
> partitioning policies on this toy application. Initially, I want to compare
> shuffle-grouping (round-robin) and hash-grouping and then move on to
> different partitioning policies by using Flink’s CustomPartitioner API.
> After reading Flink’s documentation, I managed to develop the toy
> application using hash-partitioning. Below, I present the different parts
> of my code:
>
>
>
> // Phase 0: input setup
>
> DataStream> stream = env.fromCollection(…)
>
>.assignTimestampsAndWatermarks(new
> AscendingTimestampExtractor>() {
>
>@Override
>
> public long extractAscendingTimestamp(Tuple2 Integer> event) { return event.f0; }
>
> }).map( (Tuple2 e) -> new Tuple3 Integer, Integer>(e.f0, e.f1, 1));
>
>
>
> On Phase 0, I collect the input stream, from an in-memory list, define the
> event timestamp which will be used for windowing, and extend each event
> with a value of 1 for calculating the appearance of each number on every
> window. Afterwards, for the parallel Phase 1, I use hash partitioning by
> first using .keyBy() operation on the key of each tuple (i.e., field 1),
> followed by a .window() operation, to assign each tuple on a different
> window, and end with a .sum(). My code for (parallel) Phase 1 is the
> following:
>
>
>
> // Phase 1: parallel partial sum, with a parallelism of N (N > 1)
>
> DataStream phaseOne =
> stream.keyBy(1).window(TumblingEventTimeWindows.of(
> Time.seconds(10)).sum(2).setParallelism(N);
>
>
>
> Moving on to Phase 2, to aggregate all partial results of a single window
> in one operator for producing the full aggregation, ordering based on
> frequency, and return the top-10 keys, I have the following:
>
>
>
> // Phase 2: serial full aggregation and ordering, with a parallelism of 1
>
> DataStream phaseTwo = phaseOne
>
> .windowAll(TumblingEventTimeWindows.of(Time.seconds(10))
>
> .apply(new AllWindowsFunction Integer>, String, TimeWindow>() {
>
> @Override
>
> public void apply(TimeWindow window,
> Iterable> values, Collector out)
> throws Exception {
>
> ...
>
> List topTenValues = ...;
>
> StringBuilder strBuilder = new StringBuilder();
>
> for (Integer t : topTenValues)
>
> strBuilder.append(Integer.toString(t) + “,”);
>
> out.collect(strBuilder.toString());
>
> });
>
>
>
> The previous code makes use of hash-partitioning for its parallel phase.
> From what I understand, Flink allows the .window() operation only on a
> KeyedStream. Furthermore, the .customPartition() method transforms a
> DataStream to a DataStream (and the same is true for .shuffle() which
> round-robins events). Therefore, *I am confused on 

Problem with state Apache Flink 1.2.0 RC0

2017-01-24 Thread Salou Guillaume
Hello flink users !

I'm using flink  Apache Flink 1.2.0 RC0, the tasks status in Apache web
dashboard are CANCELED, but my tasks are yet running and doing their job.

The status in the main page is RUNNING

I have the same problem with 2 different jobs.

Regards,

Guillame


How to get top N elements in a DataSet?

2017-01-24 Thread Ivan Mushketyk
Hi,

I have a dataset of tuples with two fields ids and ratings and I need to
find 10 elements with the highest rating in this dataset. I found a
solution, but I think it's suboptimal and I think there should be a better
way to do it.

The best thing that I came up with is to partition dataset by rating, sort
locally and write the partitioned dataset to disk:

dataset
.partitionCustom(new Partitioner() {
  @Override
  public int partition(Double key, int numPartitions) {
return key.intValue() % numPartitions;
  }
}, 1) . // partition by rating
.setParallelism(5)
.sortPartition(1, Order.DESCENDING) // locally sort by rating
.writeAsText("..."); // write the partitioned dataset to disk

This will store tuples in sorted files with names 5, 4, 3, ... that contain
ratings in ranges (5, 4], (4, 3], and so on. Then I can read sorted data
from disk and and N elements with the highest rating.
Is there a way to do the same but without writing a partitioned dataset to
a disk?

I tried to use "first(10)" but it seems to give top 10 items from a random
partition. Is there a way to get top N elements from every partition? Then
I could locally sort top values from every partition and find top 10 global
values.

Best regards,
Ivan.