Custom state values in CEP

2016-04-21 Thread Sowmya Vallabhajosyula
Is there a way to store a custom state value along with triggering an
event? This would not only help in maintaining an audit of the values that
triggered the state, but also for some complex set of conditions - for e.g.
if the earlier state was triggered by the temperature of a patient, I do
not want to consider temperature in the subsequent state calculation.

At this point of time, I feel that for such a scenario I will have to go
with Data Streaming API and manage state on my own. Is that right?

Thanks and Regards,
Sowmya Vallabhajosyula


Replays message in Kafka topics with FlinkKafkaConsumer09

2016-04-21 Thread Jack Huang
Hi all,

I am trying to force my job to reprocess old messages in my Kafka topics
but couldn't get it to work. Here is my FlinkKafkaConsumer09 setup:

val kafkaProp = new Properties()
kafkaProp.setProperty("bootstrap.servers", "localhost:6667")
kafkaProp.setProperty("auto.offset.reset", "earliest")

env.addSource(new FlinkKafkaConsumer09[String](input, new
SimpleStringSchema, kafkaProp))
.print

​

I thought *auto.offset.reset* is going to do the trick. What am I missing
here?


Thanks,

Jack Huang


Re: Checkpoint and restore states

2016-04-21 Thread Jack Huang
@Stefano, Aljoscha:

Thank you for pointing that out. With the following steps I verified that
the state of the job gets restored

   1. Use HDFS as state backend with env.setStateBackend(new
   FsStateBackend("hdfs:///home/user/flink/KafkaWordCount"))
   2. Start the job. In my case the job ID is
   e4b5316ae4ea0c8ed6fab4fa238b4b2f
   3. Observe that
   hdfs:///home/user/flink/KafkaWordCount/e4b5316ae4ea0c8ed6fab4fa238b4b2f
   is created
   4. Kill all TaskManager, but leave job manager running
   5. Restart all TaskManager with bin/start-cluster.sh
   6. Observe that the job manager automatically restarts the job under the
   same job ID
   7. Observe from the output that the states are restored


Jack



Jack Huang

On Thu, Apr 21, 2016 at 1:40 AM, Aljoscha Krettek 
wrote:

> Hi,
> yes Stefano is spot on! The state is only restored if a job is restarted
> because of abnormal failure. For state that survives stopping/canceling a
> job you can look at savepoints:
> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/savepoints.html
>  This
> essentially uses the same mechanisms as the fault-tolerance stuff for state
> but makes it explicit and allows restarting from different savepoints.
>
> Cheers,
> Aljoscha
>
> On Wed, 20 Apr 2016 at 22:43 Stefano Baghino <
> stefano.bagh...@radicalbit.io> wrote:
>
>> Hello again,
>>
>> thanks for giving a shot at my advice anyway but Aljoscha is far more
>> knowledgeable then me regarding Flink. :)
>>
>> I hope I'm not getting mixed up again but I think gracefully canceling
>> your job means you lose your job state. Am I right in saying that the state
>> is preserved in case of abnormal termination (e.g.: the JobManager crashes)
>> or if you explicitly create a savepoint?
>>
>> On Wed, Apr 20, 2016 at 10:13 PM, Jack Huang 
>> wrote:
>>
>>> @Aljoscha:
>>> For this word count example I am using a kafka topic as the input
>>> stream. The problem is that when I cancel the task and restart it, the task
>>> loses the accumulated word counts so far and start counting from 1 again.
>>> Am I missing something basic here?
>>>
>>> @Stefano:
>>> I also tried to implements the Checkpointed interface but had no luck
>>> either. Canceling and restarting the task did not restore the states. Here
>>> is my class:
>>>
>>> inStream.flatMap({ _.toLowerCase.split("\\W+") filter { _.nonEmpty } })
   .keyBy({s => s})
   .map(new StatefulCounter)
>>>
>>>
>>> class StatefulCounter extends RichMapFunction[String, (String,Int)] with
 Checkpointed[Integer] {
   private var count: Integer = 0

   def map(in: String): (String,Int) = {
 count += 1
 return (in, count)
   }
   def snapshotState(l: Long, l1: Long): Integer = {
 count
   }
   def restoreState(state: Integer) {
 count = state
   }
 }
>>>
>>>
>>>
>>> Thanks,
>>>
>>>
>>> Jack Huang
>>>
>>> On Wed, Apr 20, 2016 at 5:36 AM, Stefano Baghino <
>>> stefano.bagh...@radicalbit.io> wrote:
>>>
 My bad, thanks for pointing that out.

 On Wed, Apr 20, 2016 at 1:49 PM, Aljoscha Krettek 
 wrote:

> Hi,
> the *withState() family of functions use the Key/Value state interface
> internally, so that should work.
>
> On Wed, 20 Apr 2016 at 12:33 Stefano Baghino <
> stefano.bagh...@radicalbit.io> wrote:
>
>> Hi Jack,
>>
>> it seems you correctly enabled the checkpointing by calling
>> `env.enableCheckpointing`. However, your UDFs have to either implement 
>> the
>> Checkpointed interface or use the Key/Value State interface to make sure
>> the state of the computation is snapshotted.
>>
>> The documentation explains how to define your functions so that they
>> checkpoint the state far better than I could in this post:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/state.html
>>
>> I hope I've been of some help, I'll gladly help you further if you
>> need it.
>>
>> On Wed, Apr 20, 2016 at 11:34 AM, Aljoscha Krettek <
>> aljos...@apache.org> wrote:
>>
>>> Hi,
>>> what seems to be the problem?
>>>
>>> Cheers,
>>> Aljoscha
>>>
>>> On Wed, 20 Apr 2016 at 03:52 Jack Huang 
>>> wrote:
>>>
 Hi all,

 I am doing a simple word count example and want to checkpoint the
 accumulated word counts. I am not having any luck getting the counts 
 saved
 and restored. Can someone help?

 env.enableCheckpointing(1000)

 env.setStateBackend(new MemoryStateBackend())


>  ...



 inStream
> .keyBy({s => s})
>
>
>
> *.mapWithState((in:String, count:Option[Int]) => {val
> newCount = count.getOrElse(0) + 1

Re: Programatic way to get version

2016-04-21 Thread Trevor Grant
dug through the codebase, in case any others want to know:

import org.apache.flink.runtime.util.EnvironmentInformation;

EnvironmentInformation.getVersion()



Trevor Grant
Data Scientist
https://github.com/rawkintrevo
http://stackexchange.com/users/3002022/rawkintrevo
http://trevorgrant.org

*"Fortunate is he, who is able to know the causes of things."  -Virgil*


On Thu, Apr 21, 2016 at 5:05 PM, Trevor Grant 
wrote:

> Is there a programatic way to get the Flink version from the scala shell?
>
> I.e. something akin to
> sc.version
>
> I thought I used env.version or something like that once but I couldn't
> find anything in the scala docs.
>
> Trevor Grant
> Data Scientist
> https://github.com/rawkintrevo
> http://stackexchange.com/users/3002022/rawkintrevo
> http://trevorgrant.org
>
> *"Fortunate is he, who is able to know the causes of things."  -Virgil*
>
>


Programatic way to get version

2016-04-21 Thread Trevor Grant
Is there a programatic way to get the Flink version from the scala shell?

I.e. something akin to
sc.version

I thought I used env.version or something like that once but I couldn't
find anything in the scala docs.

Trevor Grant
Data Scientist
https://github.com/rawkintrevo
http://stackexchange.com/users/3002022/rawkintrevo
http://trevorgrant.org

*"Fortunate is he, who is able to know the causes of things."  -Virgil*


Flink YARN job manager web port

2016-04-21 Thread Shannon Carey
The documentation states: "The ports Flink is using for its services are the 
standard ports configured by the user + the application id as an offset"

When I launch Flink via YARN in an AWS EMR cluster, stdout says:
JobManager Web Interface: 
http://ip-xxx.us-west-2.compute.internal:20888/proxy/application_1461178294210_0010/

I need to be able to create an IAM Security Group that allows access to the 
JobManager web interface so that I can make use of it. However, I am confused 
about how port 20888 is chosen. Based on the code, I would have guessed that it 
would use the same port as given by: "yarn application -status 
application_1461178294210_0010". However, that's not the case (they don't 
match). It gives "Tracking-URL : http://ip-xxx.us-west-2.compute.internal:36495;

On the other hand, I see that YarnApplicationMasterRunner sets the port to 0, 
which InetSocketAddress says results in "A port number of zero will let the 
system pick up an ephemeral port in a bind operation."

I couldn't find anything in the code that adds an offset to a port. Changing 
the value of "jobmanager.web.port" appears to have no effect. The documentation 
on "Running Flink on YARN behind Firewalls" only talks about the JobManager and 
BlobServer ports.

Does Flink need logic to allow users to specify a range of ports for 
jobmanager.web.port in the same way as is done in 
BootstrapTools#startActorSystem? If so, I am happy to make that contribution!

-Shannon


Re: implementing a continuous time window

2016-04-21 Thread Fabian Hueske
Yes, sliding windows are different.
You want to evaluate the window whenever a new element arrives or an
element leaves because 5 secs passed since it entered the window, right?

I think that should be possible with a GlobalWindow, a custom Trigger which
holds state about the time when each element in the window entered the
window, and an Evictor.

2016-04-21 21:19 GMT+02:00 Jonathan Yom-Tov :

> I think sliding windows are different. In the example in the blog post a
> window is computed every 30 seconds (so at fixed time intervals). What I
> want is for a window to be computed every time an event comes in and then
> once again when the event leaves the window.
>
> On Thu, Apr 21, 2016 at 10:14 PM, John Sherwood  wrote:
>
>> You are looking for sliding windows:
>> https://flink.apache.org/news/2015/12/04/Introducing-windows.html
>>
>> Here you would do
>>
>> .timeWindow(Time.seconds(5), Time.seconds(1))
>>
>> On Thu, Apr 21, 2016 at 12:06 PM, Jonathan Yom-Tov 
>> wrote:
>>
>>> hi,
>>>
>>> Is it possible to implement a continuous time window with flink? Here's
>>> an
>>> example. Say I want to count events within a window. The window length
>>> is 5
>>> seconds and I get events at t = 1, 2, 7, 8 seconds. I would then expect
>>> to
>>> get events with a count at t = 1 (count = 1), t = 2 (count = 2), t = 6
>>> (count = 1), t = 7 (count = 2), t = 8 (count = 2), t = 12 (count = 1)
>>> and t
>>> = 13 (count = 0).
>>>
>>> How would I go about doing that?.
>>>
>>> thanks,
>>> Jon.
>>>
>>
>>
>


Re: implementing a continuous time window

2016-04-21 Thread Jonathan Yom-Tov
I think sliding windows are different. In the example in the blog post a
window is computed every 30 seconds (so at fixed time intervals). What I
want is for a window to be computed every time an event comes in and then
once again when the event leaves the window.

On Thu, Apr 21, 2016 at 10:14 PM, John Sherwood  wrote:

> You are looking for sliding windows:
> https://flink.apache.org/news/2015/12/04/Introducing-windows.html
>
> Here you would do
>
> .timeWindow(Time.seconds(5), Time.seconds(1))
>
> On Thu, Apr 21, 2016 at 12:06 PM, Jonathan Yom-Tov 
> wrote:
>
>> hi,
>>
>> Is it possible to implement a continuous time window with flink? Here's an
>> example. Say I want to count events within a window. The window length is
>> 5
>> seconds and I get events at t = 1, 2, 7, 8 seconds. I would then expect to
>> get events with a count at t = 1 (count = 1), t = 2 (count = 2), t = 6
>> (count = 1), t = 7 (count = 2), t = 8 (count = 2), t = 12 (count = 1) and
>> t
>> = 13 (count = 0).
>>
>> How would I go about doing that?.
>>
>> thanks,
>> Jon.
>>
>
>


Re: implementing a continuous time window

2016-04-21 Thread John Sherwood
You are looking for sliding windows:
https://flink.apache.org/news/2015/12/04/Introducing-windows.html

Here you would do

.timeWindow(Time.seconds(5), Time.seconds(1))

On Thu, Apr 21, 2016 at 12:06 PM, Jonathan Yom-Tov 
wrote:

> hi,
>
> Is it possible to implement a continuous time window with flink? Here's an
> example. Say I want to count events within a window. The window length is 5
> seconds and I get events at t = 1, 2, 7, 8 seconds. I would then expect to
> get events with a count at t = 1 (count = 1), t = 2 (count = 2), t = 6
> (count = 1), t = 7 (count = 2), t = 8 (count = 2), t = 12 (count = 1) and t
> = 13 (count = 0).
>
> How would I go about doing that?.
>
> thanks,
> Jon.
>


Re: AvroWriter for Rolling sink

2016-04-21 Thread Igor Berman
ok,
I have working prototype already, if somebody is interested(attached)

I might add it as PR latter(with tests etc)

tested locally & with s3







On 21 April 2016 at 12:01, Aljoscha Krettek  wrote:

> Hi,
> as far as I know there is no one working on this. I'm only aware of
> someone working on an ORC (from Hive) Writer.
>
> This would be a welcome addition! I think you are already on the right
> track, the only thing required will probably be an AvroFileWriter and you
> already started looking at SequenceFileWriter, which should be similar.
>
> Cheers,
> Aljoscha
>
> On Thu, 21 Apr 2016 at 09:45 Igor Berman  wrote:
>
>> Hi All,
>> Is there such implementation somewhere?(before I start to implement it
>> myself, it seems not too difficult based on SequenceFileWriter example)
>>
>> anyway any ideas/pointers will be highly appreciated
>>
>> thanks in advance
>>
>>
package org.apache.flink.streaming.connectors.fs.avro;

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import java.io.IOException;
import java.util.Map;

import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileConstants;
import org.apache.avro.generic.GenericData;
import org.apache.avro.hadoop.file.HadoopCodecFactory;
import org.apache.avro.hadoop.io.AvroDatumConverter;
import org.apache.avro.hadoop.io.AvroDatumConverterFactory;
import org.apache.avro.hadoop.io.AvroSerialization;
import org.apache.avro.mapreduce.AvroJob;
import org.apache.avro.mapreduce.AvroKeyValueRecordWriter;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
import org.apache.flink.streaming.connectors.fs.Writer;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.mapred.JobConf;

/**
 * Implementation of AvroKeyValue writer that can be used in Sink.
 * 
 You'll need dependency(pay attention to classifier, it works only for hadoop2)
 
 {@code
 
 first thing to add avro mapred dependency
	
		org.apache.avro
		avro-mapred
		1.7.6
		hadoop2
	
}
		
 And then:
  
 {@code
 RollingSink> sink = new RollingSink>("/tmp/path");
 sink.setBucketer(new DateTimeBucketer("-MM-dd-HH-mm"));
 Map properties = new HashMap<>();
 Schema longSchema = Schema.create(Type.LONG);
 String keySchema = longSchema.toString();
 properties.put("avro.schema.output.key", keySchema);
 String valueSchema = longSchema.toString();
 properties.put("avro.schema.output.value", valueSchema);
 properties.put(FileOutputFormat.COMPRESS, Boolean.toString(true));
 properties.put(FileOutputFormat.COMPRESS_CODEC, SnappyCodec.class.getName());

 sink.setWriter(new AvroSinkWriter(properties));
 sink.setBatchSize(1024 * 1024 * 64); // this is 64 MB
 }
 
 
 to test with s3:

{@code
	create core-site.xml(I haven't other way to test locally)	

	
	  fs.s3.impl
	  org.apache.hadoop.fs.s3a.S3AFileSystem
	
	
	  fs.s3a.access.key
	  xxx
	
	
	
	  fs.s3a.secret.key
	  yyy
	
	
	
		
		fs.s3a.buffer.dir
		/tmp
	



and add following dependencies(not sure what is best option here):
		
			org.apache.hadoop
			hadoop-aws
			2.7.0
			provided
			

	guava
	com.google.guava

			
		

 }
 
 */
public class AvroSinkWriter implements Writer>, InputTypeConfigurable {
	private static final long serialVersionUID = 1L;

	private transient FSDataOutputStream outputStream;

	private transient AvroKeyValueRecordWriter writer;

	private Class keyClass;

	private Class valueClass;

	private final Map properties;

	/**
	 * C'tor for the writer
	 * 
	 * You can provide different properties that will be used to configure avro key-value writer as simple properties map(see example above)
	 * @param properties
	 */
	public AvroSinkWriter(Map properties) {
		this.properties = properties;
	}

	private AvroSinkWriter(Class keyClass, Class valueClass, Map properties) {
		this.properties = properties;
		this.keyClass = 

Re: Count windows missing last elements?

2016-04-21 Thread Kostya Kulagin
Thanks for reply.

Maybe I would need some advise in this case. My situation: we have a stream
of data, generally speaking  tuples where long is a unique key
(ie there are no tuples with the same key)

I need to filter out all tuples that do not match certain lucene query.

Creating lucene index on one entry is too expensive and I cannot guess what
load in terms of number of entries per second would be. Idea was to group
entries by count, create index, filter and stream remaining tuples for
further processing.

As a sample application if we replace lucene indexing with something like
String's 'contains' method source would look like this:


StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource> source = env.addSource(new
SourceFunction>() {
  @Override
  public void run(SourceContext> ctx) throws Exception {
LongStream.range(0, 30).forEach(l -> {
  ctx.collect(Tuple2.of(l, "This is " + l));
});
  }

  @Override
  public void cancel() {

  }
});

And I need lets say to window tuples and preserve only those which
value.contains("3").
There are no grouping by key since basically all keys are different. I
might not know everything about flink yet but for this particular example -
does what you were saying make sense?


Thanks!
Kostya






On Thu, Apr 21, 2016 at 11:02 AM, Aljoscha Krettek 
wrote:

> Hi,
> if you are doing the windows not for their actual semantics I would
> suggest not using count based windows and also not using the *All windows.
> The *All windows are all non-parallel, i.e. you always only get one
> parallel instance of your window operator even if you have a huge cluster.
>
> Also, in most cases it is better to not use a plain WindowFunction with
> apply because all elements have to be buffered so that they can be passed
> as an Iterable, Iterable in your example. If you can, I would suggest
> to use a ReduceFunction or FoldFunction or an apply() with an incremental
> aggregation function: apply(ReduceFunction, WindowFunction) or
> apply(FoldFunction, WindowFunction). These allow incremental aggregation of
> the result as elements arrive and don't require buffering of all elements
> until the window fires.
>
> Cheers,
> Aljoscha
>
> On Thu, 21 Apr 2016 at 16:53 Kostya Kulagin  wrote:
>
>> Maybe if it is not the first time it worth considering adding this thing
>> as an option? ;-)
>>
>> My usecase - I have a pretty big amount of data basically for ETL. It is
>> finite but it is big. I see it more as a stream not as a dataset. Also I
>> would re-use the same code for infinite stream later...
>> And I do not much care about exact window size - it is just for
>> performance reasons I create a windows.
>>
>> Anyways - that you for the responses!
>>
>>
>> On Thu, Apr 21, 2016 at 8:54 AM, Aljoscha Krettek 
>> wrote:
>>
>>> People have wondered about that a few times, yes. My opinion is that a
>>> stream is potentially infinite and processing only stops for anomalous
>>> reasons: when the job crashes, when stopping a job to later redeploy it. In
>>> those cases you would not want to flush out your data but keep them and
>>> restart from the same state when the job is restarted.
>>>
>>> You can implement the behavior by writing a custom Trigger that behaves
>>> like the count trigger but also fires when receiving a Long.MAX_VALUE
>>> watermark. A watermark of Long.MAX_VALUE signifies that a source has
>>> stopped processing for natural reasons.
>>>
>>> Cheers,
>>> Aljoscha
>>>
>>> On Thu, 21 Apr 2016 at 14:42 Kostya Kulagin  wrote:
>>>
 Thanks,

 I wonder wouldn't it be good to have a built-in such functionality. At
 least when incoming stream is finished - flush remaining elements.

 On Thu, Apr 21, 2016 at 4:47 AM, Aljoscha Krettek 
 wrote:

> Hi,
> yes, you can achieve this by writing a custom Trigger that can trigger
> both on the count or after a long-enough timeout. It would be a 
> combination
> of CountTrigger and EventTimeTrigger (or ProcessingTimeTrigger) so you
> could look to those to get started.
>
> Cheers,
> Aljoscha
>
> On Wed, 20 Apr 2016 at 23:44 Kostya Kulagin 
> wrote:
>
>> I have a pretty big but final stream and I need to be able to window
>> it by number of elements.
>> In this case from my observations flink can 'skip' the latest chunk
>> of data if it has lower amount of elements than window size:
>>
>> StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> DataStreamSource source = env.addSource(new 
>> SourceFunction() {
>>
>>   @Override
>>   public void run(SourceContext ctx) throws Exception {
>> LongStream.range(0, 

Re: Count windows missing last elements?

2016-04-21 Thread Aljoscha Krettek
Hi,
if you are doing the windows not for their actual semantics I would suggest
not using count based windows and also not using the *All windows. The *All
windows are all non-parallel, i.e. you always only get one parallel
instance of your window operator even if you have a huge cluster.

Also, in most cases it is better to not use a plain WindowFunction with
apply because all elements have to be buffered so that they can be passed
as an Iterable, Iterable in your example. If you can, I would suggest
to use a ReduceFunction or FoldFunction or an apply() with an incremental
aggregation function: apply(ReduceFunction, WindowFunction) or
apply(FoldFunction, WindowFunction). These allow incremental aggregation of
the result as elements arrive and don't require buffering of all elements
until the window fires.

Cheers,
Aljoscha

On Thu, 21 Apr 2016 at 16:53 Kostya Kulagin  wrote:

> Maybe if it is not the first time it worth considering adding this thing
> as an option? ;-)
>
> My usecase - I have a pretty big amount of data basically for ETL. It is
> finite but it is big. I see it more as a stream not as a dataset. Also I
> would re-use the same code for infinite stream later...
> And I do not much care about exact window size - it is just for
> performance reasons I create a windows.
>
> Anyways - that you for the responses!
>
>
> On Thu, Apr 21, 2016 at 8:54 AM, Aljoscha Krettek 
> wrote:
>
>> People have wondered about that a few times, yes. My opinion is that a
>> stream is potentially infinite and processing only stops for anomalous
>> reasons: when the job crashes, when stopping a job to later redeploy it. In
>> those cases you would not want to flush out your data but keep them and
>> restart from the same state when the job is restarted.
>>
>> You can implement the behavior by writing a custom Trigger that behaves
>> like the count trigger but also fires when receiving a Long.MAX_VALUE
>> watermark. A watermark of Long.MAX_VALUE signifies that a source has
>> stopped processing for natural reasons.
>>
>> Cheers,
>> Aljoscha
>>
>> On Thu, 21 Apr 2016 at 14:42 Kostya Kulagin  wrote:
>>
>>> Thanks,
>>>
>>> I wonder wouldn't it be good to have a built-in such functionality. At
>>> least when incoming stream is finished - flush remaining elements.
>>>
>>> On Thu, Apr 21, 2016 at 4:47 AM, Aljoscha Krettek 
>>> wrote:
>>>
 Hi,
 yes, you can achieve this by writing a custom Trigger that can trigger
 both on the count or after a long-enough timeout. It would be a combination
 of CountTrigger and EventTimeTrigger (or ProcessingTimeTrigger) so you
 could look to those to get started.

 Cheers,
 Aljoscha

 On Wed, 20 Apr 2016 at 23:44 Kostya Kulagin  wrote:

> I have a pretty big but final stream and I need to be able to window
> it by number of elements.
> In this case from my observations flink can 'skip' the latest chunk of
> data if it has lower amount of elements than window size:
>
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> DataStreamSource source = env.addSource(new 
> SourceFunction() {
>
>   @Override
>   public void run(SourceContext ctx) throws Exception {
> LongStream.range(0, 35).forEach(ctx::collect);
>   }
>
>   @Override
>   public void cancel() {
>
>   }
> });
>
> source.countWindowAll(10).apply(new AllWindowFunction GlobalWindow>() {
>   @Override
>   public void apply(GlobalWindow window, Iterable values, 
> Collector out) throws Exception {
> System.out.println(Joiner.on(',').join(values));
>   }
> }).print();
>
> env.execute("yoyoyo");
>
>
> Output:
> 0,1,2,3,4,5,6,7,8,9
> 10,11,12,13,14,15,16,17,18,19
> 20,21,22,23,24,25,26,27,28,29
>
> I.e. elements from 10 to 35 are not being processed.
>
> Does it make sense to have: count OR timeout window which will evict
> new window when number of elements reach a threshold OR collecting timeout
> occurs?
>

>>>
>


Re: Values are missing, probably due parallelism?

2016-04-21 Thread Kostya Kulagin
Thanks, so you were right and it is really connected to not-finishing
windows problem I've mentioned in the other post.
I don't really need parallelism of 1 for windows - I expect operation on
windows be pretty expensive and I like an idea that I can "parallelize" it.

Thanks for the explanation!

On Thu, Apr 21, 2016 at 8:06 AM, Aljoscha Krettek 
wrote:

> Hi,
> no worries, I also had to read the doc to figure it out. :-)
>
> I now see what the problem is. The .countWindowAll().apply() pattern
> creates a WindowOperator with parallelism of 1 because the "count all" only
> works if one instance of the window operator sees all elements. When
> manually changing the parallelism it essentially becomes a "count per
> parallel instance" window operation and the elements form the source with
> parallelism 1 get distributed round-robin to the parallel instances of the
> count-window operator. This means, that it will take more elements emitted
> from the source before each instance of the window fires. It's a bit
> confusing but Flink does not allow forcing the parallelism to 1 right now.
>
> About using the snapshot version, I would suggest you don't use it if you
> don't absolutely need one of the features in there that is not yet
> released. The build are still pretty stable, however.
>
> Cheers,
> Aljoscha
>
> On Thu, 21 Apr 2016 at 13:53 Kostya Kulagin  wrote:
>
>> First of all you are right about number of elements, my bad and sorry for
>> the confusion, I need to be better in calculations :)
>>
>> However: if I change parallelism to. lets say 2 in windowing, i.e.
>> instead of (of course I changed 29 to 30 as well :) )
>>
>> }).print();
>>
>> put
>>
>> }).setParallelism(2).print();
>>
>> at the very bottom - I am getting:
>>
>> 3> 15
>> 3> 12
>> 2> 9
>> 2> 6
>> 4> 18
>> 04/21/2016 07:47:08  Sink: Unnamed(2/4) switched to FINISHED
>> 04/21/2016 07:47:08  Source: Custom Source(1/1) switched to FINISHED
>> 04/21/2016 07:47:08  Sink: Unnamed(4/4) switched to FINISHED
>> 04/21/2016 07:47:08  Sink: Unnamed(3/4) switched to FINISHED
>> 04/21/2016 07:47:08  TriggerWindow(GlobalWindows(), 
>> PurgingTrigger(CountTrigger(10)), 
>> AllWindowedStream.apply(AllWindowedStream.java:230))(2/2) switched to 
>> FINISHED
>> 04/21/2016 07:47:08  TriggerWindow(GlobalWindows(), 
>> PurgingTrigger(CountTrigger(10)), 
>> AllWindowedStream.apply(AllWindowedStream.java:230))(1/2) switched to 
>> FINISHED
>> 1> 3
>> 1> 0
>>
>> With default setting for parallelism it works fine, same as with value 3
>> and 1.
>>
>> With 2, 4+ it does not work. With 4+ it simply prints nothing. I.e. it
>> might be smth with how threads are finishing their execution?
>>
>> I am using the latest prod version I've found in maven: 1.0.1.
>> Can snapshot versions be used in prod? I mean how well tested are those?
>>
>> I will try the same on master branch later today.
>>
>> Thanks!
>> Kostya
>>
>>
>> On Thu, Apr 21, 2016 at 6:38 AM, Aljoscha Krettek 
>> wrote:
>>
>>> Hi,
>>> which version of Flink are you using? Maybe there is a bug. I've tested
>>> it on the git master (1.1-SNAPSHOT) and it works fine with varying degrees
>>> of parallelism if I change the source to emit 30 elements:
>>> LongStream.range(0, 30).forEach(ctx::collect);
>>>
>>> (The second argument of LongStream.range(start, end) is exclusive)
>>>
>>> Cheers,
>>> Aljoscha
>>>
>>>
>>>
>>> On Thu, 21 Apr 2016 at 11:44 Kostya Kulagin  wrote:
>>>
 Actually this is not true - the source emits 30 values since it is
 started with 0. If I change 29 to 33 result will be the same.
 I can get all values if I play with parallelism. I.e putting parallel 1
 before print.
 Or if I change 29 to 39 ( I have 4 cors)
 I can guess that there is smth wrong with threads. BTW in this case how
 threads are created and how data flows between?
 On Apr 21, 2016 4:50 AM, "Aljoscha Krettek" 
 wrote:

> Hi,
> this is related to your other question about count windows. The source
> emits 29 values so we only have two count-windows with 10 elements each.
> The last window is never triggered.
>
> Cheers,
> Aljoscha
>
> On Wed, 20 Apr 2016 at 23:52 Kostya Kulagin 
> wrote:
>
>> I think it has smth to do with parallelism and I probably do not have
>> clear understanding how parallelism works in flink but in this example:
>>
>> StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> DataStreamSource source = env.addSource(new 
>> SourceFunction() {
>>
>>   @Override
>>   public void run(SourceContext ctx) throws Exception {
>> LongStream.range(0, 29).forEach(ctx::collect);
>>   }
>>
>>   @Override
>>   public void cancel() {
>>
>>   }
>> });
>>
>> 

Re: Threads waiting on LocalBufferPool

2016-04-21 Thread Aljoscha Krettek
Hi,
I would be very happy about improvements to our RocksDB performance. What
are the RocksDB Java benchmarks that you are running? In Flink, we also
have to serialize/deserialize every time that we access RocksDB using our
TypeSerializer. Maybe this is causing the slow down.

By the way, what is the type of value stored in the RocksDB state. Maybe
the TypeSerializer for that value is very slow.

Cheers,
Aljoscha

On Thu, 21 Apr 2016 at 16:41 Maciek Próchniak  wrote:

> Well...
> I found some time to look at rocksDB performance.
>
> It takes around 0.4ms to lookup value state and 0.12ms to update - these
> are means, 95th percentile was > 1ms for get... When I set additional
> options:
>   .setIncreaseParallelism(8)
>   .setMaxOpenFiles(-1)
>   .setCompressionType(CompressionType.SNAPPY_COMPRESSION)
>
> I manage to get
> 0.05ms for update and 0.2ms for get - but still it seems pretty bad
> compared to standard rocksdb java benchmarks that I try on the same
> machine, as they are:
> fillseq  : 1.23238 micros/op;   89.8 MB/s; 100 ops done;
> 1 / 1 task(s) finished.
> readrandom   : 9.25380 micros/op;   12.0 MB/s; 100 / 100
> found;  1 / 1 task(s) finished.
> fillrandom   : 4.46839 micros/op;   24.8 MB/s; 100 ops done;
> 1 / 1 task(s) finished.
>
> guess I'll have to look at it a bit more...
>
> thanks anyway,
> maciek
>
>
>
> On 21/04/2016 08:41, Maciek Próchniak wrote:
>
> Hi Ufuk,
>
> thanks for quick reply.
> Actually I had a little time to try both things.
> 1) helped only temporarily - it just took a bit longer to saturate the
> pool. After few minutes, periodically all kafka threads were waiting for
> bufferPool.
> 2) This seemed to help. I also reduced checkpoint interval - on rocks we
> had 5min, now I tried 30s. .
>
> I attach throughput metrics - the former (around 18) is with increased
> heap & buffers, the latter (around 22) is with FileSystemStateBackend.
> My state is few GB large - during the test it reached around 2-3GB. I must
> admit I was quite impressed that checkpointing to HDFS using FileSystem
> took only about 6-7s (with occasional spikes to 12-13s, which can be seen
> on metrcs - didn't check if it was caused by hdfs or sth else).
>
> Now I looked at logs from 18 and seems like checkpointing rocksdb took
> around 2-3minutes:
> 2016-04-20 17:47:33,439 [Checkpoint Timer] INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
> checkpoint 6 @ 1461167253439
> 2016-04-20 17:49:54,196 [flink-akka.actor.default-dispatcher-147] INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
> checkpoint 6 (in 140588 ms)
> - however I don't see any threads dumping state in threadStacks...
>
> I guess I'll have to add some metrics around state invocations to see
> where is the problem with rocksDB... I'll write if I find anything, but
> that won't be today I think...
>
> Btw - I was looking at FS state and I wonder would it be feasible to make
> variant of this state using immutable map (probably some scala one) to be
> able to do async checkpoints.
> Then synchronous part would be essentially free - just taking the state
> map and materializing it asynchronously.
> Of course, that would work only for immutable state - but this is often
> the case when writing in scala. WDYT?
>
> thanks,
> maciek
>
>
>
>
> On 20/04/2016 16:28, Ufuk Celebi wrote:
>
> Could be different things actually, including the parts of the network
> you mentioned.
>
> 1)
>
> Regarding the TM config:
> - It can help to increase the number of network buffers (you can go
> ahead and give it 4 GB, e.g. 134217728 buffers a 32 KB)
> - In general, you have way more memory available than you actually
> give to Flink. I would increase the 20 GB heap size.
>
> As a first step you could address these two points and re-run your job.
>
> 2)
>
> As a follow-up you could also work with the FileSystemStateBackend,
> which keeps state in memory (on-heap) and writes checkpoints to files.
> This would help in checking how much RocksDB is slowing things down.
>
>
> I'm curious about the results. Do you think you will have time to try
> this?
>
> – Ufuk
>
>
> On Wed, Apr 20, 2016 at 3:45 PM, Maciek Próchniak 
>  wrote:
>
> Hi,
> I'm running my flink job on one rather large machine (20 cores with
> hyperthreading, 120GB RAM). Task manager has 20GB heap allocated.
> It does more or less:
> read csv from kafka -> keyBy one of the fields -> some custom state
> processing.
> Kafka topic has 24 partitions, so my parallelism is also 24
>
> After some tweaks and upgrading to 1.0.2-rc3 (as I use RocksDB state
> backend) I reached a point when throughput is ~120-150k/s.
> One the same kafka and machine I reached > 500k/s with simple filtering
> job,
> so I wanted to see what's the bottleneck.
>
> It turns out that quite often all of kafka threads are stuck waiting for
> buffer from pool:
> "Thread-6695" 

Re: Count windows missing last elements?

2016-04-21 Thread Kostya Kulagin
Maybe if it is not the first time it worth considering adding this thing as
an option? ;-)

My usecase - I have a pretty big amount of data basically for ETL. It is
finite but it is big. I see it more as a stream not as a dataset. Also I
would re-use the same code for infinite stream later...
And I do not much care about exact window size - it is just for performance
reasons I create a windows.

Anyways - that you for the responses!


On Thu, Apr 21, 2016 at 8:54 AM, Aljoscha Krettek 
wrote:

> People have wondered about that a few times, yes. My opinion is that a
> stream is potentially infinite and processing only stops for anomalous
> reasons: when the job crashes, when stopping a job to later redeploy it. In
> those cases you would not want to flush out your data but keep them and
> restart from the same state when the job is restarted.
>
> You can implement the behavior by writing a custom Trigger that behaves
> like the count trigger but also fires when receiving a Long.MAX_VALUE
> watermark. A watermark of Long.MAX_VALUE signifies that a source has
> stopped processing for natural reasons.
>
> Cheers,
> Aljoscha
>
> On Thu, 21 Apr 2016 at 14:42 Kostya Kulagin  wrote:
>
>> Thanks,
>>
>> I wonder wouldn't it be good to have a built-in such functionality. At
>> least when incoming stream is finished - flush remaining elements.
>>
>> On Thu, Apr 21, 2016 at 4:47 AM, Aljoscha Krettek 
>> wrote:
>>
>>> Hi,
>>> yes, you can achieve this by writing a custom Trigger that can trigger
>>> both on the count or after a long-enough timeout. It would be a combination
>>> of CountTrigger and EventTimeTrigger (or ProcessingTimeTrigger) so you
>>> could look to those to get started.
>>>
>>> Cheers,
>>> Aljoscha
>>>
>>> On Wed, 20 Apr 2016 at 23:44 Kostya Kulagin  wrote:
>>>
 I have a pretty big but final stream and I need to be able to window it
 by number of elements.
 In this case from my observations flink can 'skip' the latest chunk of
 data if it has lower amount of elements than window size:

 StreamExecutionEnvironment env = 
 StreamExecutionEnvironment.getExecutionEnvironment();
 DataStreamSource source = env.addSource(new 
 SourceFunction() {

   @Override
   public void run(SourceContext ctx) throws Exception {
 LongStream.range(0, 35).forEach(ctx::collect);
   }

   @Override
   public void cancel() {

   }
 });

 source.countWindowAll(10).apply(new AllWindowFunction() {
   @Override
   public void apply(GlobalWindow window, Iterable values, 
 Collector out) throws Exception {
 System.out.println(Joiner.on(',').join(values));
   }
 }).print();

 env.execute("yoyoyo");


 Output:
 0,1,2,3,4,5,6,7,8,9
 10,11,12,13,14,15,16,17,18,19
 20,21,22,23,24,25,26,27,28,29

 I.e. elements from 10 to 35 are not being processed.

 Does it make sense to have: count OR timeout window which will evict
 new window when number of elements reach a threshold OR collecting timeout
 occurs?

>>>
>>


Re: Master (1.1-SNAPSHOT) Can't run on YARN

2016-04-21 Thread Maximilian Michels
Hi Stefano,

Thanks for reporting. I wasn't able to reproduce the problem. I ran
./bin/yarn-session.sh -n 1 -s 2 -jm 2048 -tm 2048 on a Yarn cluster
and it created a Flink cluster with a JobManager and a TaskManager
with two task slots. By the way, if you omit the "-s 2" flag, then the
default is read from the config, which is one task slot.

Could it be that an old TaskManager instance is trying to register
with a new JobManager? It looks like it from the log messages because
the ResourceManager (which creates TaskManagers) is not aware of it.
Still questionable why that instance is lingering around. Could you
try to kill the instance and try bringing up a cluster several times
to see if that solved the problem? If not, could you send me the full
logs to my email address?

Thanks,
Max

On Wed, Apr 20, 2016 at 4:30 PM, Ufuk Celebi  wrote:
> The user list is OK since you are reporting a bug here ;-) I'm
> confident that this will be fixed soon! :-)
>
> On Wed, Apr 20, 2016 at 11:28 AM, Stefano Baghino
>  wrote:
>> Not exactly, I just wanted to let you know about it and know if someone else
>> experimented this issue; perhaps it's more of a dev mailing list discussion,
>> sorry for posting this here. Feel free to continue the discussion on the
>> other list if you feel it's more appropriate.
>>
>> On Tue, Apr 19, 2016 at 6:53 PM, Ufuk Celebi  wrote:
>>>
>>> Hey Stefano,
>>>
>>> Flink's resource management has been refactored for 1.1 recently. This
>>> could be a regression introduced by this. Max can probably help you
>>> with more details. Is this currently a blocker for you?
>>>
>>> – Ufuk
>>>
>>> On Tue, Apr 19, 2016 at 6:31 PM, Stefano Baghino
>>>  wrote:
>>> > Hi everyone,
>>> >
>>> > I'm currently experiencing a weird situation, I hope you can help me out
>>> > with this.
>>> >
>>> > I've cloned and built from the master, then I've edited the default
>>> > config
>>> > fil by adding my Hadoop config path, exported the HADOOP_CONF_DIR env
>>> > var
>>> > and ran bin/yarn-session.sh -n 1 -s 2 -jm 2048 -tm 2048
>>> >
>>> > The first thing I noticed is that I had to put "-s 2" or the task
>>> > managers
>>> > gets created with -1 slots (!) by default.
>>> >
>>> > After putting "-s 2" the YARN session startup hangs when trying to
>>> > register
>>> > the task managers. I've stopped the session and aggregated the logs and
>>> > read
>>> > a lot (several thousands) of the messages I attach at the bottom; any
>>> > idea
>>> > of what this may be?
>>> >
>>> > Thank you a lot in advance!
>>> >
>>> > 2016-04-19 12:15:59,507 INFO  org.apache.flink.yarn.YarnTaskManager
>>> > - Trying to register at JobManager
>>> > akka.tcp://flink@172.31.20.101:57379/user/jobmanager (attempt 1,
>>> > timeout:
>>> > 500 milliseconds)
>>> >
>>> > 2016-04-19 12:15:59,649 ERROR org.apache.flink.yarn.YarnTaskManager
>>> > - The registration at JobManager
>>> > Some(akka.tcp://flink@172.31.20.101:57379/user/jobmanager) was refused,
>>> > because: java.lang.IllegalStateException: Resource
>>> > ResourceID{resourceId='container_e02_1461077293721_0016_01_02'} not
>>> > registered with resource manager.. Retrying later...
>>> >
>>> > 2016-04-19 12:16:00,025 INFO  org.apache.flink.yarn.YarnTaskManager
>>> > - Trying to register at JobManager
>>> > akka.tcp://flink@172.31.20.101:57379/user/jobmanager (attempt 2,
>>> > timeout:
>>> > 1000 milliseconds)
>>> >
>>> > 2016-04-19 12:16:00,033 ERROR org.apache.flink.yarn.YarnTaskManager
>>> > - The registration at JobManager
>>> > Some(akka.tcp://flink@172.31.20.101:57379/user/jobmanager) was refused,
>>> > because: java.lang.IllegalStateException: Resource
>>> > ResourceID{resourceId='container_e02_1461077293721_0016_01_02'} not
>>> > registered with resource manager.. Retrying later...
>>> >
>>> > 2016-04-19 12:16:01,045 INFO  org.apache.flink.yarn.YarnTaskManager
>>> > - Trying to register at JobManager
>>> > akka.tcp://flink@172.31.20.101:57379/user/jobmanager (attempt 3,
>>> > timeout:
>>> > 2000 milliseconds)
>>> >
>>> > 2016-04-19 12:16:01,053 ERROR org.apache.flink.yarn.YarnTaskManager
>>> > - The registration at JobManager
>>> > Some(akka.tcp://flink@172.31.20.101:57379/user/jobmanager) was refused,
>>> > because: java.lang.IllegalStateException: Resource
>>> > ResourceID{resourceId='container_e02_1461077293721_0016_01_02'} not
>>> > registered with resource manager.. Retrying later...
>>> >
>>> > 2016-04-19 12:16:03,064 INFO  org.apache.flink.yarn.YarnTaskManager
>>> > - Trying to register at JobManager
>>> > akka.tcp://flink@172.31.20.101:57379/user/jobmanager (attempt 4,
>>> > timeout:
>>> > 4000 milliseconds)
>>> >
>>> > 2016-04-19 12:16:03,072 ERROR org.apache.flink.yarn.YarnTaskManager
>>> > - The registration at JobManager
>>> > Some(akka.tcp://flink@172.31.20.101:57379/user/jobmanager) was refused,
>>> > because: java.lang.IllegalStateException: Resource
>>> > 

Re: Threads waiting on LocalBufferPool

2016-04-21 Thread Maciek Próchniak

Well...
I found some time to look at rocksDB performance.

It takes around 0.4ms to lookup value state and 0.12ms to update - these 
are means, 95th percentile was > 1ms for get... When I set additional 
options:

  .setIncreaseParallelism(8)
  .setMaxOpenFiles(-1)
  .setCompressionType(CompressionType.SNAPPY_COMPRESSION)

I manage to get
0.05ms for update and 0.2ms for get - but still it seems pretty bad 
compared to standard rocksdb java benchmarks that I try on the same 
machine, as they are:
fillseq  : 1.23238 micros/op;   89.8 MB/s; 100 ops 
done;  1 / 1 task(s) finished.
readrandom   : 9.25380 micros/op;   12.0 MB/s; 100 / 100 
found;  1 / 1 task(s) finished.
fillrandom   : 4.46839 micros/op;   24.8 MB/s; 100 ops 
done;  1 / 1 task(s) finished.


guess I'll have to look at it a bit more...

thanks anyway,
maciek


On 21/04/2016 08:41, Maciek Próchniak wrote:

Hi Ufuk,

thanks for quick reply.
Actually I had a little time to try both things.
1) helped only temporarily - it just took a bit longer to saturate the 
pool. After few minutes, periodically all kafka threads were waiting 
for bufferPool.
2) This seemed to help. I also reduced checkpoint interval - on rocks 
we had 5min, now I tried 30s. .


I attach throughput metrics - the former (around 18) is with increased 
heap & buffers, the latter (around 22) is with FileSystemStateBackend.
My state is few GB large - during the test it reached around 2-3GB. I 
must admit I was quite impressed that checkpointing to HDFS using 
FileSystem took only about 6-7s (with occasional spikes to 12-13s, 
which can be seen on metrcs - didn't check if it was caused by hdfs or 
sth else).


Now I looked at logs from 18 and seems like checkpointing rocksdb took 
around 2-3minutes:
2016-04-20 17:47:33,439 [Checkpoint Timer] INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - 
Triggering checkpoint 6 @ 1461167253439
2016-04-20 17:49:54,196 [flink-akka.actor.default-dispatcher-147] 
INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - 
Completed checkpoint 6 (in 140588 ms)

- however I don't see any threads dumping state in threadStacks...

I guess I'll have to add some metrics around state invocations to see 
where is the problem with rocksDB... I'll write if I find anything, 
but that won't be today I think...


Btw - I was looking at FS state and I wonder would it be feasible to 
make variant of this state using immutable map (probably some scala 
one) to be able to do async checkpoints.
Then synchronous part would be essentially free - just taking the 
state map and materializing it asynchronously.
Of course, that would work only for immutable state - but this is 
often the case when writing in scala. WDYT?


thanks,
maciek




On 20/04/2016 16:28, Ufuk Celebi wrote:

Could be different things actually, including the parts of the network
you mentioned.

1)

Regarding the TM config:
- It can help to increase the number of network buffers (you can go
ahead and give it 4 GB, e.g. 134217728 buffers a 32 KB)
- In general, you have way more memory available than you actually
give to Flink. I would increase the 20 GB heap size.

As a first step you could address these two points and re-run your job.

2)

As a follow-up you could also work with the FileSystemStateBackend,
which keeps state in memory (on-heap) and writes checkpoints to files.
This would help in checking how much RocksDB is slowing things down.


I'm curious about the results. Do you think you will have time to try 
this?


– Ufuk


On Wed, Apr 20, 2016 at 3:45 PM, Maciek Próchniak  wrote:

Hi,
I'm running my flink job on one rather large machine (20 cores with
hyperthreading, 120GB RAM). Task manager has 20GB heap allocated.
It does more or less:
read csv from kafka -> keyBy one of the fields -> some custom state
processing.
Kafka topic has 24 partitions, so my parallelism is also 24

After some tweaks and upgrading to 1.0.2-rc3 (as I use RocksDB state
backend) I reached a point when throughput is ~120-150k/s.
One the same kafka and machine I reached > 500k/s with simple 
filtering job,

so I wanted to see what's the bottleneck.

It turns out that quite often all of kafka threads are stuck waiting 
for

buffer from pool:
"Thread-6695" #7466 daemon prio=5 os_prio=0 tid=0x7f77fd80d000
nid=0x8118 in Object.wait() [0x7f7ad54d9000]
java.lang.Thread.State: TIMED_WAITING (on object monitor)
 at java.lang.Object.wait(Native Method)
 at
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163) 


 - locked <0x0002eade3890> (a java.util.ArrayDeque)
 at
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133) 


 at
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92) 


 - locked <0x0002eb73cbd0> (a

Re: lost connection

2016-04-21 Thread Chesnay Schepler
That is an exempt from the client log, can you check the JobManager log? 
It could have crashed, and if so the cause is hopefully in there.


Did this issue suddenly occur; as in have you run a job successfully on 
the system before? (to exclude network configuration issues)


Regards,
Chesnay

On 21.04.2016 16:09, Radu Tudoran wrote:


- Could not submit job Operator2 execution 
(170aef70d31f3fee62f8a483930be213), because there is no connection to 
a JobManager.


15:59:48,456 WARN Remoting - Tried to associate with unreachable 
remote address [akka.tcp://flink@10.204.62.71:6123]. Address is now 
gated for 5000 ms, all messages to this address will be delivered to 
dead letters. Reason: Connection refused: /10.204.62.71:6123


16:01:28,409 ERROR org.apache.flink.client.CliFrontend - Error while 
running the command.


org.apache.flink.client.program.ProgramInvocationException: The 
program execution failed: Communication with JobManager failed: Lost 
connection to the JobManager.


I do not understand what could be the root cause of this… the IPs look 
ok and there is not firewall to block things…


Dr. Radu Tudoran

Research Engineer - Big Data Expert

IT R Division

cid:image007.jpg@01CD52EB.AD060EE0

HUAWEI TECHNOLOGIES Duesseldorf GmbH

European Research Center

Riesstrasse 25, 80992 München

E-mail: _radu.tudoran@huawei.com_

Mobile: +49 15209084330

Telephone: +49 891588344173

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com 


Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN

This e-mail and its attachments contain confidential information from 
HUAWEI, which is intended only for the person or entity whose address 
is listed above. Any use of the information contained herein in any 
way (including, but not limited to, total or partial disclosure, 
reproduction, or dissemination) by persons other than the intended 
recipient(s) is prohibited. If you receive this e-mail in error, 
please notify the sender by phone or email immediately and delete it!


*From:*Chesnay Schepler [mailto:ches...@apache.org]
*Sent:* Thursday, April 21, 2016 3:58 PM
*To:* user@flink.apache.org
*Subject:* Re: lost connection

Hello,

the first step is always to check the logs under /log. The JobManager 
log in particular may contain clues as why no connection could be 
established.


Regards,
Chesnay

On 21.04.2016 15:44, Radu Tudoran wrote:

Hi,

I am trying to submit a jar via the console (flink run my.jar).
The result is that I get an error saying that the communication
with the jobmanager failed: Lost connection to the jobmanager.

Can you give me some hints/ recommendations about approaching this
issue.

Thanks

Dr. Radu Tudoran

Research Engineer - Big Data Expert

IT R Division

cid:image007.jpg@01CD52EB.AD060EE0

HUAWEI TECHNOLOGIES Duesseldorf GmbH

European Research Center

Riesstrasse 25, 80992 München

E-mail: _radu.tudo...@huawei.com _

Mobile: +49 15209084330

Telephone: +49 891588344173

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com

Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN

This e-mail and its attachments contain confidential information
from HUAWEI, which is intended only for the person or entity whose
address is listed above. Any use of the information contained
herein in any way (including, but not limited to, total or partial
disclosure, reproduction, or dissemination) by persons other than
the intended recipient(s) is prohibited. If you receive this
e-mail in error, please notify the sender by phone or email
immediately and delete it!





RE: lost connection

2016-04-21 Thread Radu Tudoran
- Could not submit job Operator2 execution (170aef70d31f3fee62f8a483930be213), 
because there is no connection to a JobManager.
15:59:48,456 WARN  Remoting 
 - Tried to associate with unreachable remote address 
[akka.tcp://flink@10.204.62.71:6123]. Address is now gated for 5000 ms, all 
messages to this address will be delivered to dead letters. Reason: Connection 
refused: /10.204.62.71:6123
16:01:28,409 ERROR org.apache.flink.client.CliFrontend  
 - Error while running the command.
org.apache.flink.client.program.ProgramInvocationException: The program 
execution failed: Communication with JobManager failed: Lost connection to the 
JobManager.

I do not understand what could be the root cause of this... the IPs look ok and 
there is not firewall to block things...

Dr. Radu Tudoran
Research Engineer - Big Data Expert
IT R Division

[cid:image007.jpg@01CD52EB.AD060EE0]
HUAWEI TECHNOLOGIES Duesseldorf GmbH
European Research Center
Riesstrasse 25, 80992 München

E-mail: radu.tudo...@huawei.com
Mobile: +49 15209084330
Telephone: +49 891588344173

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, 
www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
This e-mail and its attachments contain confidential information from HUAWEI, 
which is intended only for the person or entity whose address is listed above. 
Any use of the information contained herein in any way (including, but not 
limited to, total or partial disclosure, reproduction, or dissemination) by 
persons other than the intended recipient(s) is prohibited. If you receive this 
e-mail in error, please notify the sender by phone or email immediately and 
delete it!

From: Chesnay Schepler [mailto:ches...@apache.org]
Sent: Thursday, April 21, 2016 3:58 PM
To: user@flink.apache.org
Subject: Re: lost connection

Hello,

the first step is always to check the logs under /log. The JobManager log in 
particular may contain clues as why no connection could be established.

Regards,
Chesnay

On 21.04.2016 15:44, Radu Tudoran wrote:
Hi,

I am trying to submit a jar via the console (flink run my.jar). The result is 
that I get an error saying that the communication with the jobmanager failed: 
Lost connection to the jobmanager.
Can you give me some hints/ recommendations about approaching this issue.

Thanks

Dr. Radu Tudoran
Research Engineer - Big Data Expert
IT R Division

[cid:image007.jpg@01CD52EB.AD060EE0]
HUAWEI TECHNOLOGIES Duesseldorf GmbH
European Research Center
Riesstrasse 25, 80992 München

E-mail: radu.tudo...@huawei.com
Mobile: +49 15209084330
Telephone: +49 891588344173

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, 
www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
This e-mail and its attachments contain confidential information from HUAWEI, 
which is intended only for the person or entity whose address is listed above. 
Any use of the information contained herein in any way (including, but not 
limited to, total or partial disclosure, reproduction, or dissemination) by 
persons other than the intended recipient(s) is prohibited. If you receive this 
e-mail in error, please notify the sender by phone or email immediately and 
delete it!




Re: lost connection

2016-04-21 Thread Chesnay Schepler

Hello,

the first step is always to check the logs under /log. The JobManager 
log in particular may contain clues as why no connection could be 
established.


Regards,
Chesnay

On 21.04.2016 15:44, Radu Tudoran wrote:


Hi,

I am trying to submit a jar via the console (flink run my.jar). The 
result is that I get an error saying that the communication with the 
jobmanager failed: Lost connection to the jobmanager.


Can you give me some hints/ recommendations about approaching this issue.

Thanks

Dr. Radu Tudoran

Research Engineer - Big Data Expert

IT R Division

cid:image007.jpg@01CD52EB.AD060EE0

HUAWEI TECHNOLOGIES Duesseldorf GmbH

European Research Center

Riesstrasse 25, 80992 München

E-mail: _radu.tudoran@huawei.com_

Mobile: +49 15209084330

Telephone: +49 891588344173

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com 


Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN

This e-mail and its attachments contain confidential information from 
HUAWEI, which is intended only for the person or entity whose address 
is listed above. Any use of the information contained herein in any 
way (including, but not limited to, total or partial disclosure, 
reproduction, or dissemination) by persons other than the intended 
recipient(s) is prohibited. If you receive this e-mail in error, 
please notify the sender by phone or email immediately and delete it!






Re: Processing millions of messages in milliseconds real time -- Architecture guide required

2016-04-21 Thread Ken Krugler
This seems pretty similar to what you’re asking about:

http://data-artisans.com/extending-the-yahoo-streaming-benchmark/

Especially the part where they “...directly exposed the in-flight windows to be 
queried”, as that sounds like what you meant by “The data cache should have the 
capability to serve the historical data in milliseconds”

— Ken



> On Apr 18, 2016, at 10:03pm, Deepak Sharma  wrote:
> 
> Hi all,
> I am looking for an architecture to ingest 10 mils of messages in the real 
> time streaming mode.
> If anyone has worked on similar kind of architecture  , can you please point 
> me to any documentation around the same like what should be the architecture 
> , which all components/big data ecosystem tools should i consider etc.
> The messages has to be in xml/json format , a preprocessor engine or message 
> enhancer and then finally a processor.
> I thought about using data cache as well for serving the data 
> The data cache should have the capability to serve the historical  data in 
> milliseconds (may be upto 30 days of data)
> 
> 
> -- 
> Thanks
> Deepak

--
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr





Re: logback.xml and logback-yarn.xml rollingpolicy configuration

2016-04-21 Thread Balaji Rajagopalan
Thanks Till setting in log4j.properties worked.

On Tue, Apr 19, 2016 at 8:04 PM, Till Rohrmann  wrote:

> Have you made sure that Flink is using logback [1]?
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/apis/best_practices.html#using-logback-instead-of-log4j
>
> Cheers,
> Till
>
> On Tue, Apr 19, 2016 at 2:01 PM, Balaji Rajagopalan <
> balaji.rajagopa...@olacabs.com> wrote:
>
>> The are two files in the /usr/share/flink/conf directory, and I was
>> trying to do the rolling of application logs which goes to following
>> directory in task nodes.
>>
>> /var/log/hadoop-yarn/containers/application_*/container_*/taskmanager.log
>> out err
>>
>> Changing the logback.xml and  logback-yarn.xml has no effect on the log
>> file size nor any rolling is happening. Any inputs please.
>>
>> Here is the configuration I have, anything wrong with the config or the
>> approach.
>>
>> 
>>
>>
>> ${log.file}
>>
>> 
>>
>>   
>>
>>   ${log.file}_%d{-MM-dd}.%i.log> fileNamePattern>
>>
>>   
>>
>>   
>>
>> 50MB
>>
>>   
>>
>>   
>>
>>   30
>>
>> 
>>
>>
>> UTF-8
>> %d{-MM-dd HH:mm:ss.SSS} [%thread] %-5level
>> %logger{60} %X{sourceThread} - %msg%n
>> 
>>
>> 
>>
>>
>> 
>> 
>> %d{-MM-dd HH:mm:ss} %-5level %logger{60}
>> %X{sourceThread} - %msg%n
>> 
>> 
>>
>> 
>> 
>> 
>> 
>> 
>> 
>>
>
>


Re: Help with generics

2016-04-21 Thread Aljoscha Krettek
Hi,
I'm sorry, I meant TypeInformation.of(initModel.getClass()).

On Thu, 21 Apr 2016 at 15:17 Martin Neumann  wrote:

> Hej,
>
> I already tried TypeInformation.of(initModel.class) and it complained
> that initModel class is unknown. (Since it's of type M)
> I added a function to the model.class that returns the TypeInformation its
> working now though I still don't understand what happend behind the scenes
> and what I changed :-)
>
> cheers Martin
>
>
> On Thu, Apr 21, 2016 at 2:27 PM, Aljoscha Krettek 
> wrote:
>
>> Hi,
>> you're right there is not much (very little) in the documentation about
>> TypeInformation. There is only the description in the JavaDoc:
>> TypeInformation
>> 
>>  Essentially,
>> how it works is that we always use a TypeSerializer when we have to
>> serialize values (for sending across network, storing in state, etc.). A
>> TypeSerializer is created from a TypeInformation and TypeInformation
>> can be obtained in several ways:
>>
>>  - the TypeExtractor tries to analyze user functions to determine a
>> TypeInformation for the input and output type
>>  - the TypeExtractor can try and analyze a given Class to determine a
>> TypeInformation
>>  - the Scala API uses macros and implicit parameters to create
>> TypeInformation
>>  - TypeHint can be created to retrieve a TypeInformation
>>  - a TypeInformation can be manually constructed
>>
>> tl;dr In your case you can try TypeInformation.of(initModel.class). If
>> that doesn't work you can try and pass in a function that gives you a
>> TypeInformation for your model type M.
>>
>> Cheers,
>> Aljoscha
>>
>> On Thu, 21 Apr 2016 at 14:16 Martin Neumann  wrote:
>>
>>> Hej,
>>>
>>> I pass an instance of M in the constructor of the class, can I use that
>>> instead? Maybe give the class a function that returns the right
>>> TypeInformation? I'm trying figure out how TypeInformation works to better
>>> understand the Issue is there any documentation about this? At the moment I
>>> don't really understand what TypeInformation does and how it works.
>>>
>>> cheers Martin
>>>
>>> On Thu, Apr 21, 2016 at 2:08 PM, Aljoscha Krettek 
>>> wrote:
>>>
 Hi,
 I think it doesn't work because the concrete type of M is not available
 to create a TypeInformation for M. What you can do is manually pass a
 TypeInformation or a TypeSerializer to the AnomalyFlatMap and use
 that when creating the state descriptor.

 Cheers,
 Aljoscha

 On Thu, 21 Apr 2016 at 13:45 Martin Neumann  wrote:

> Hey,
>
> I have a FlatMap that uses some generics (appended at the end of the
> mail).
> I have some trouble with the type inference running into
> InvalidTypesException on the first line in the open function.
>
> How can I fix it?
>
> Cheers Martin
>
>
>
>
> public class AnomalyFlatMap 
> extends RichFlatMapFunction, Tuple2> {
> private transient ValueState microModel;
> private final double threshold;
> private boolean updateIfAnomaly;
> private M initModel;
>
> public AnomalyFlatMap(double threshold, M model, boolean 
> updateIfAnomaly) {
> this.threshold = threshold;
> this.updateIfAnomaly = updateIfAnomaly;
> this.initModel = model;
>
> }
>
> @Override
> public void open(Configuration parameters) throws Exception {
> ValueStateDescriptor descriptor =
> new ValueStateDescriptor<>(
> "RollingMicroModel",
> TypeInformation.of(new TypeHint() {
> }),initModel
> );
> microModel = getRuntimeContext().getState(descriptor);
> }
>
> @Override
> public void flatMap(Tuple2 sample, Collector T>> collector) throws Exception {
> M model = microModel.value();
> Anomaly res  = model.calculateAnomaly(sample.f0);
>
> if ( res.getScore() <= threshold || updateIfAnomaly){
> model.addWindow(sample.f0);
> microModel.update(model);
> }
> collector.collect(new Tuple2<>(res,sample.f1));
> }
> }
>
>
>
>>>
>


Re: Help with generics

2016-04-21 Thread Martin Neumann
Hej,

I already tried TypeInformation.of(initModel.class) and it complained that
initModel class is unknown. (Since it's of type M)
I added a function to the model.class that returns the TypeInformation its
working now though I still don't understand what happend behind the scenes
and what I changed :-)

cheers Martin


On Thu, Apr 21, 2016 at 2:27 PM, Aljoscha Krettek 
wrote:

> Hi,
> you're right there is not much (very little) in the documentation about
> TypeInformation. There is only the description in the JavaDoc:
> TypeInformation
> 
>  Essentially,
> how it works is that we always use a TypeSerializer when we have to
> serialize values (for sending across network, storing in state, etc.). A
> TypeSerializer is created from a TypeInformation and TypeInformation
> can be obtained in several ways:
>
>  - the TypeExtractor tries to analyze user functions to determine a
> TypeInformation for the input and output type
>  - the TypeExtractor can try and analyze a given Class to determine a
> TypeInformation
>  - the Scala API uses macros and implicit parameters to create
> TypeInformation
>  - TypeHint can be created to retrieve a TypeInformation
>  - a TypeInformation can be manually constructed
>
> tl;dr In your case you can try TypeInformation.of(initModel.class). If
> that doesn't work you can try and pass in a function that gives you a
> TypeInformation for your model type M.
>
> Cheers,
> Aljoscha
>
> On Thu, 21 Apr 2016 at 14:16 Martin Neumann  wrote:
>
>> Hej,
>>
>> I pass an instance of M in the constructor of the class, can I use that
>> instead? Maybe give the class a function that returns the right
>> TypeInformation? I'm trying figure out how TypeInformation works to better
>> understand the Issue is there any documentation about this? At the moment I
>> don't really understand what TypeInformation does and how it works.
>>
>> cheers Martin
>>
>> On Thu, Apr 21, 2016 at 2:08 PM, Aljoscha Krettek 
>> wrote:
>>
>>> Hi,
>>> I think it doesn't work because the concrete type of M is not available
>>> to create a TypeInformation for M. What you can do is manually pass a
>>> TypeInformation or a TypeSerializer to the AnomalyFlatMap and use
>>> that when creating the state descriptor.
>>>
>>> Cheers,
>>> Aljoscha
>>>
>>> On Thu, 21 Apr 2016 at 13:45 Martin Neumann  wrote:
>>>
 Hey,

 I have a FlatMap that uses some generics (appended at the end of the
 mail).
 I have some trouble with the type inference running into
 InvalidTypesException on the first line in the open function.

 How can I fix it?

 Cheers Martin




 public class AnomalyFlatMap 
 extends RichFlatMapFunction, Tuple2> {
 private transient ValueState microModel;
 private final double threshold;
 private boolean updateIfAnomaly;
 private M initModel;

 public AnomalyFlatMap(double threshold, M model, boolean 
 updateIfAnomaly) {
 this.threshold = threshold;
 this.updateIfAnomaly = updateIfAnomaly;
 this.initModel = model;

 }

 @Override
 public void open(Configuration parameters) throws Exception {
 ValueStateDescriptor descriptor =
 new ValueStateDescriptor<>(
 "RollingMicroModel",
 TypeInformation.of(new TypeHint() {
 }),initModel
 );
 microModel = getRuntimeContext().getState(descriptor);
 }

 @Override
 public void flatMap(Tuple2 sample, Collector> 
 collector) throws Exception {
 M model = microModel.value();
 Anomaly res  = model.calculateAnomaly(sample.f0);

 if ( res.getScore() <= threshold || updateIfAnomaly){
 model.addWindow(sample.f0);
 microModel.update(model);
 }
 collector.collect(new Tuple2<>(res,sample.f1));
 }
 }



>>


Re: Explanation on limitations of the Flink Table API

2016-04-21 Thread Flavio Pompermaier
We're also trying to work around the current limitations of Table API and
we're reading DataSets with on-purpose input formats that returns a POJO
Row containing the list of values (but we're reading all values as
String...).
Actually we would also need a way to abstract the composition of Flink
operators and UDFs to compose a transformation from a Graphical UI or from
a script..during the Stratosphere project there was Meteor and Supremo
allowing that [1] but then it was dismissed in favour of Pig integration
that I don't wheter it was ever completed..some days ago I discovered
Piglet project[2] that allows to use PIG with Spark and Flink but I don't
know how well it works (Flink integration is also very recent and not
documented anywhere).

Best,
Flavio

[1] http://stratosphere.eu/assets/papers/Sopremo_Meteor%20BigData.pdf
[2] https://github.com/ksattler/piglet

On Thu, Apr 21, 2016 at 2:41 PM, Fabian Hueske  wrote:

> Hi Simone,
>
> in Flink 1.0.x, the Table API does not support reading external data,
> i.e., it is not possible to read a CSV file directly from the Table API.
> Tables can only be created from DataSet or DataStream which means that the
> data is already converted into "Flink types".
>
> However, the Table API is currently under heavy development as part of the
> the efforts to add SQL support.
> This work is taking place on the master branch and I am currently working
> on interfaces to scan external data sets or ingest external data streams.
> The interface will be quite generic such that it should be possible to
> define a table source that reads the first lines of a file to infer
> attribute names and types.
> You can have a look at the current state of the API design here [1].
>
> Feedback is welcome and can be very easily included in this phase of the
> development ;-)
>
> Cheers, Fabian
>
> [1]
> https://docs.google.com/document/d/1sITIShmJMGegzAjGqFuwiN_iw1urwykKsLiacokxSw0
> 
>
> 2016-04-21 14:26 GMT+02:00 Simone Robutti :
>
>> Hello,
>>
>> I would like to know if it's possible to create a Flink Table from an
>> arbitrary CSV (or any other form of tabular data) without doing type safe
>> parsing with expliciteky type classes/POJOs.
>>
>> To my knowledge this is not possible but I would like to know if I'm
>> missing something. My requirement is to be able to read a CSV file and
>> manipulate it reading the field names from the file and inferring data
>> types.
>>
>> Thanks,
>>
>> Simone
>>
>
>


Re: Count windows missing last elements?

2016-04-21 Thread Aljoscha Krettek
People have wondered about that a few times, yes. My opinion is that a
stream is potentially infinite and processing only stops for anomalous
reasons: when the job crashes, when stopping a job to later redeploy it. In
those cases you would not want to flush out your data but keep them and
restart from the same state when the job is restarted.

You can implement the behavior by writing a custom Trigger that behaves
like the count trigger but also fires when receiving a Long.MAX_VALUE
watermark. A watermark of Long.MAX_VALUE signifies that a source has
stopped processing for natural reasons.

Cheers,
Aljoscha

On Thu, 21 Apr 2016 at 14:42 Kostya Kulagin  wrote:

> Thanks,
>
> I wonder wouldn't it be good to have a built-in such functionality. At
> least when incoming stream is finished - flush remaining elements.
>
> On Thu, Apr 21, 2016 at 4:47 AM, Aljoscha Krettek 
> wrote:
>
>> Hi,
>> yes, you can achieve this by writing a custom Trigger that can trigger
>> both on the count or after a long-enough timeout. It would be a combination
>> of CountTrigger and EventTimeTrigger (or ProcessingTimeTrigger) so you
>> could look to those to get started.
>>
>> Cheers,
>> Aljoscha
>>
>> On Wed, 20 Apr 2016 at 23:44 Kostya Kulagin  wrote:
>>
>>> I have a pretty big but final stream and I need to be able to window it
>>> by number of elements.
>>> In this case from my observations flink can 'skip' the latest chunk of
>>> data if it has lower amount of elements than window size:
>>>
>>> StreamExecutionEnvironment env = 
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>> DataStreamSource source = env.addSource(new 
>>> SourceFunction() {
>>>
>>>   @Override
>>>   public void run(SourceContext ctx) throws Exception {
>>> LongStream.range(0, 35).forEach(ctx::collect);
>>>   }
>>>
>>>   @Override
>>>   public void cancel() {
>>>
>>>   }
>>> });
>>>
>>> source.countWindowAll(10).apply(new AllWindowFunction>> GlobalWindow>() {
>>>   @Override
>>>   public void apply(GlobalWindow window, Iterable values, 
>>> Collector out) throws Exception {
>>> System.out.println(Joiner.on(',').join(values));
>>>   }
>>> }).print();
>>>
>>> env.execute("yoyoyo");
>>>
>>>
>>> Output:
>>> 0,1,2,3,4,5,6,7,8,9
>>> 10,11,12,13,14,15,16,17,18,19
>>> 20,21,22,23,24,25,26,27,28,29
>>>
>>> I.e. elements from 10 to 35 are not being processed.
>>>
>>> Does it make sense to have: count OR timeout window which will evict new
>>> window when number of elements reach a threshold OR collecting timeout
>>> occurs?
>>>
>>
>


Re: Explanation on limitations of the Flink Table API

2016-04-21 Thread Fabian Hueske
Hi Simone,

in Flink 1.0.x, the Table API does not support reading external data, i.e.,
it is not possible to read a CSV file directly from the Table API.
Tables can only be created from DataSet or DataStream which means that the
data is already converted into "Flink types".

However, the Table API is currently under heavy development as part of the
the efforts to add SQL support.
This work is taking place on the master branch and I am currently working
on interfaces to scan external data sets or ingest external data streams.
The interface will be quite generic such that it should be possible to
define a table source that reads the first lines of a file to infer
attribute names and types.
You can have a look at the current state of the API design here [1].

Feedback is welcome and can be very easily included in this phase of the
development ;-)

Cheers, Fabian

[1]
https://docs.google.com/document/d/1sITIShmJMGegzAjGqFuwiN_iw1urwykKsLiacokxSw0


2016-04-21 14:26 GMT+02:00 Simone Robutti :

> Hello,
>
> I would like to know if it's possible to create a Flink Table from an
> arbitrary CSV (or any other form of tabular data) without doing type safe
> parsing with expliciteky type classes/POJOs.
>
> To my knowledge this is not possible but I would like to know if I'm
> missing something. My requirement is to be able to read a CSV file and
> manipulate it reading the field names from the file and inferring data
> types.
>
> Thanks,
>
> Simone
>


Re: Help with generics

2016-04-21 Thread Aljoscha Krettek
Hi,
you're right there is not much (very little) in the documentation about
TypeInformation. There is only the description in the JavaDoc:
TypeInformation

Essentially,
how it works is that we always use a TypeSerializer when we have to
serialize values (for sending across network, storing in state, etc.). A
TypeSerializer is created from a TypeInformation and TypeInformation
can be obtained in several ways:

 - the TypeExtractor tries to analyze user functions to determine a
TypeInformation for the input and output type
 - the TypeExtractor can try and analyze a given Class to determine a
TypeInformation
 - the Scala API uses macros and implicit parameters to create
TypeInformation
 - TypeHint can be created to retrieve a TypeInformation
 - a TypeInformation can be manually constructed

tl;dr In your case you can try TypeInformation.of(initModel.class). If that
doesn't work you can try and pass in a function that gives you a
TypeInformation for your model type M.

Cheers,
Aljoscha

On Thu, 21 Apr 2016 at 14:16 Martin Neumann  wrote:

> Hej,
>
> I pass an instance of M in the constructor of the class, can I use that
> instead? Maybe give the class a function that returns the right
> TypeInformation? I'm trying figure out how TypeInformation works to better
> understand the Issue is there any documentation about this? At the moment I
> don't really understand what TypeInformation does and how it works.
>
> cheers Martin
>
> On Thu, Apr 21, 2016 at 2:08 PM, Aljoscha Krettek 
> wrote:
>
>> Hi,
>> I think it doesn't work because the concrete type of M is not available
>> to create a TypeInformation for M. What you can do is manually pass a
>> TypeInformation or a TypeSerializer to the AnomalyFlatMap and use
>> that when creating the state descriptor.
>>
>> Cheers,
>> Aljoscha
>>
>> On Thu, 21 Apr 2016 at 13:45 Martin Neumann  wrote:
>>
>>> Hey,
>>>
>>> I have a FlatMap that uses some generics (appended at the end of the
>>> mail).
>>> I have some trouble with the type inference running into
>>> InvalidTypesException on the first line in the open function.
>>>
>>> How can I fix it?
>>>
>>> Cheers Martin
>>>
>>>
>>>
>>>
>>> public class AnomalyFlatMap 
>>> extends RichFlatMapFunction, Tuple2> {
>>> private transient ValueState microModel;
>>> private final double threshold;
>>> private boolean updateIfAnomaly;
>>> private M initModel;
>>>
>>> public AnomalyFlatMap(double threshold, M model, boolean 
>>> updateIfAnomaly) {
>>> this.threshold = threshold;
>>> this.updateIfAnomaly = updateIfAnomaly;
>>> this.initModel = model;
>>>
>>> }
>>>
>>> @Override
>>> public void open(Configuration parameters) throws Exception {
>>> ValueStateDescriptor descriptor =
>>> new ValueStateDescriptor<>(
>>> "RollingMicroModel",
>>> TypeInformation.of(new TypeHint() {
>>> }),initModel
>>> );
>>> microModel = getRuntimeContext().getState(descriptor);
>>> }
>>>
>>> @Override
>>> public void flatMap(Tuple2 sample, Collector> 
>>> collector) throws Exception {
>>> M model = microModel.value();
>>> Anomaly res  = model.calculateAnomaly(sample.f0);
>>>
>>> if ( res.getScore() <= threshold || updateIfAnomaly){
>>> model.addWindow(sample.f0);
>>> microModel.update(model);
>>> }
>>> collector.collect(new Tuple2<>(res,sample.f1));
>>> }
>>> }
>>>
>>>
>>>
>


Explanation on limitations of the Flink Table API

2016-04-21 Thread Simone Robutti
Hello,

I would like to know if it's possible to create a Flink Table from an
arbitrary CSV (or any other form of tabular data) without doing type safe
parsing with expliciteky type classes/POJOs.

To my knowledge this is not possible but I would like to know if I'm
missing something. My requirement is to be able to read a CSV file and
manipulate it reading the field names from the file and inferring data
types.

Thanks,

Simone


Re: Values are missing, probably due parallelism?

2016-04-21 Thread Aljoscha Krettek
Hi,
no worries, I also had to read the doc to figure it out. :-)

I now see what the problem is. The .countWindowAll().apply() pattern
creates a WindowOperator with parallelism of 1 because the "count all" only
works if one instance of the window operator sees all elements. When
manually changing the parallelism it essentially becomes a "count per
parallel instance" window operation and the elements form the source with
parallelism 1 get distributed round-robin to the parallel instances of the
count-window operator. This means, that it will take more elements emitted
from the source before each instance of the window fires. It's a bit
confusing but Flink does not allow forcing the parallelism to 1 right now.

About using the snapshot version, I would suggest you don't use it if you
don't absolutely need one of the features in there that is not yet
released. The build are still pretty stable, however.

Cheers,
Aljoscha

On Thu, 21 Apr 2016 at 13:53 Kostya Kulagin  wrote:

> First of all you are right about number of elements, my bad and sorry for
> the confusion, I need to be better in calculations :)
>
> However: if I change parallelism to. lets say 2 in windowing, i.e. instead
> of (of course I changed 29 to 30 as well :) )
>
> }).print();
>
> put
>
> }).setParallelism(2).print();
>
> at the very bottom - I am getting:
>
> 3> 15
> 3> 12
> 2> 9
> 2> 6
> 4> 18
> 04/21/2016 07:47:08   Sink: Unnamed(2/4) switched to FINISHED
> 04/21/2016 07:47:08   Source: Custom Source(1/1) switched to FINISHED
> 04/21/2016 07:47:08   Sink: Unnamed(4/4) switched to FINISHED
> 04/21/2016 07:47:08   Sink: Unnamed(3/4) switched to FINISHED
> 04/21/2016 07:47:08   TriggerWindow(GlobalWindows(), 
> PurgingTrigger(CountTrigger(10)), 
> AllWindowedStream.apply(AllWindowedStream.java:230))(2/2) switched to FINISHED
> 04/21/2016 07:47:08   TriggerWindow(GlobalWindows(), 
> PurgingTrigger(CountTrigger(10)), 
> AllWindowedStream.apply(AllWindowedStream.java:230))(1/2) switched to FINISHED
> 1> 3
> 1> 0
>
> With default setting for parallelism it works fine, same as with value 3
> and 1.
>
> With 2, 4+ it does not work. With 4+ it simply prints nothing. I.e. it
> might be smth with how threads are finishing their execution?
>
> I am using the latest prod version I've found in maven: 1.0.1.
> Can snapshot versions be used in prod? I mean how well tested are those?
>
> I will try the same on master branch later today.
>
> Thanks!
> Kostya
>
>
> On Thu, Apr 21, 2016 at 6:38 AM, Aljoscha Krettek 
> wrote:
>
>> Hi,
>> which version of Flink are you using? Maybe there is a bug. I've tested
>> it on the git master (1.1-SNAPSHOT) and it works fine with varying degrees
>> of parallelism if I change the source to emit 30 elements:
>> LongStream.range(0, 30).forEach(ctx::collect);
>>
>> (The second argument of LongStream.range(start, end) is exclusive)
>>
>> Cheers,
>> Aljoscha
>>
>>
>>
>> On Thu, 21 Apr 2016 at 11:44 Kostya Kulagin  wrote:
>>
>>> Actually this is not true - the source emits 30 values since it is
>>> started with 0. If I change 29 to 33 result will be the same.
>>> I can get all values if I play with parallelism. I.e putting parallel 1
>>> before print.
>>> Or if I change 29 to 39 ( I have 4 cors)
>>> I can guess that there is smth wrong with threads. BTW in this case how
>>> threads are created and how data flows between?
>>> On Apr 21, 2016 4:50 AM, "Aljoscha Krettek"  wrote:
>>>
 Hi,
 this is related to your other question about count windows. The source
 emits 29 values so we only have two count-windows with 10 elements each.
 The last window is never triggered.

 Cheers,
 Aljoscha

 On Wed, 20 Apr 2016 at 23:52 Kostya Kulagin  wrote:

> I think it has smth to do with parallelism and I probably do not have
> clear understanding how parallelism works in flink but in this example:
>
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> DataStreamSource source = env.addSource(new 
> SourceFunction() {
>
>   @Override
>   public void run(SourceContext ctx) throws Exception {
> LongStream.range(0, 29).forEach(ctx::collect);
>   }
>
>   @Override
>   public void cancel() {
>
>   }
> });
>
> source.countWindowAll(10).apply(new AllWindowFunction GlobalWindow>() {
>   @Override
>   public void apply(GlobalWindow window, Iterable values, 
> Collector out) throws Exception {
> for (Long value : values) {
>   if (value % 3 == 0) {
> out.collect(value);
>   }
> }
>   }
> }).print();
>
> env.execute("yoyoyo");
>
> Why my output is like this:
>
> 4> 9
> 1> 0
> 1> 12
> 3> 6
> 

Re: Values are missing, probably due parallelism?

2016-04-21 Thread Kostya Kulagin
Actually this is not true - the source emits 30 values since it is started
with 0. If I change 29 to 33 result will be the same.
I can get all values if I play with parallelism. I.e putting parallel 1
before print.
Or if I change 29 to 39 ( I have 4 cors)
I can guess that there is smth wrong with threads. BTW in this case how
threads are created and how data flows between?
On Apr 21, 2016 4:50 AM, "Aljoscha Krettek"  wrote:

> Hi,
> this is related to your other question about count windows. The source
> emits 29 values so we only have two count-windows with 10 elements each.
> The last window is never triggered.
>
> Cheers,
> Aljoscha
>
> On Wed, 20 Apr 2016 at 23:52 Kostya Kulagin  wrote:
>
>> I think it has smth to do with parallelism and I probably do not have
>> clear understanding how parallelism works in flink but in this example:
>>
>> StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> DataStreamSource source = env.addSource(new SourceFunction() 
>> {
>>
>>   @Override
>>   public void run(SourceContext ctx) throws Exception {
>> LongStream.range(0, 29).forEach(ctx::collect);
>>   }
>>
>>   @Override
>>   public void cancel() {
>>
>>   }
>> });
>>
>> source.countWindowAll(10).apply(new AllWindowFunction> GlobalWindow>() {
>>   @Override
>>   public void apply(GlobalWindow window, Iterable values, 
>> Collector out) throws Exception {
>> for (Long value : values) {
>>   if (value % 3 == 0) {
>> out.collect(value);
>>   }
>> }
>>   }
>> }).print();
>>
>> env.execute("yoyoyo");
>>
>> Why my output is like this:
>>
>> 4> 9
>> 1> 0
>> 1> 12
>> 3> 6
>> 3> 18
>> 2> 3
>> 2> 15
>>
>> ? I.e. where id s value of 24 for example? I expect to see it. What am I
>> doing wrong?
>>
>


Re: save state in windows operation

2016-04-21 Thread Rubén Casado
Thanks for your help!! That is exactly what we need :-) 






















__ 

Dr. Rubén Casado 
Head of Big Data 
Treelogic 
ruben.casado.treelogic 

+34 902 286 386 - +34 607 18 28 06 
Parque Tecnológico de Asturias · Parcela 30 
E33428 Llanera · Asturias [Spain] 
www.treelogic.com 
__ 

- Mensaje original - 
De: "Aljoscha Krettek"  
Para: user@flink.apache.org 
Enviados: Jueves, 21 de Abril 2016 11:21:00 GMT +01:00 Amsterdam / Berlín / 
Berna / Roma / Estocolmo / Viena 
Asunto: Re: save state in windows operation 


Hi, 
you should be able to do this using Flink's state abstraction in a 
RichWindowFunction like this: 



public static class MyApplyFunction extends RichWindowFunction, Tuple2, Tuple, GlobalWindow> { 


ValueStateDescriptor> stateDescriptor = 
new ValueStateDescriptor<>("last-result", 
new TypeHint>() {}.getTypeInfo(), 
null); 


@Override 
public void apply(Tuple key, 
GlobalWindow window, 
Iterable> input, 
Collector> out) throws Exception { 
ValueState> state = 
getRuntimeContext().getState(stateDescriptor); 


Tuple2 lastResult = state.value(); 
if (lastResult != null) { 
// do something with it 
} else { 

} 

// do our computation 

// store for future use 
state.update(new Tuple2<>("hey there", 42)); 
} 
} 


The arguments of ValueStateDescriptor are: state name, TypeInformation for the 
values in the state, default value of the state that you get if nothing is set. 


Also, keep in mind that the state is local to each key, just as the window is 
local to each key. 


Cheers, 
Aljoscha 


On Thu, 21 Apr 2016 at 11:10 Rubén Casado < ruben.cas...@treelogic.com > wrote: 




Hello, 


We have problems working with states in Flink and I am sure you can help us :-) 


Let's say we have a workflow something like: 


DataStream myData = env.from... 


myData.map(new MyMap (..)) 
.keyBy(0) 
.countWindow(n) 
.apply(new MyApplyFunction()) 
.writeAsCSV(...) 


To implement the logic of our MyApplyFunction, in the apply() method we would 
need to have access to the result of the last window computation. Before 
emiting the resulst in the apply () using collector.collect(..), we could save 
that result in an external storage systems (e.g Redis /Hazelcast) and then, in 
the begininig of the next window computation read such value, but we woud like 
to use some internal mechanism of Flink to do that. 


Could some provide help about it? Thanks in advance!!! :-) 


Best 






















__ 

Dr. Rubén Casado 
Head of Big Data 
Treelogic 
ruben.casado.treelogic 

+34 902 286 386 - +34 607 18 28 06 
Parque Tecnológico de Asturias · Parcela 30 
E33428 Llanera · Asturias [Spain] 
www.treelogic.com 
__ 


Re: save state in windows operation

2016-04-21 Thread Aljoscha Krettek
Hi,
you should be able to do this using Flink's state abstraction in a
RichWindowFunction like this:

public static class MyApplyFunction extends
RichWindowFunction, Tuple2, Tuple,
GlobalWindow> {

ValueStateDescriptor> stateDescriptor =
new ValueStateDescriptor<>("last-result",
new TypeHint>()
{}.getTypeInfo(),
null);

@Override
public void apply(Tuple key,
GlobalWindow window,
Iterable> input,
Collector> out) throws Exception {
ValueState> state =
getRuntimeContext().getState(stateDescriptor);

Tuple2 lastResult = state.value();
if (lastResult != null) {
// do something with it
} else {

}

// do our computation

// store for future use
state.update(new Tuple2<>("hey there", 42));
}
}

The arguments of ValueStateDescriptor are: state name, TypeInformation for
the values in the state, default value of the state that you get if nothing
is set.

Also, keep in mind that the state is local to each key, just as the window
is local to each key.

Cheers,
Aljoscha

On Thu, 21 Apr 2016 at 11:10 Rubén Casado 
wrote:

> Hello,
>
> We have problems working with states in Flink and I am sure you can help
> us :-)
>
> Let's say we have a workflow something like:
>
> DataStream myData = env.from...
>
> myData.map(new MyMap (..))
> .keyBy(0)
> .countWindow(n)
> .apply(new MyApplyFunction())
> .writeAsCSV(...)
>
> To implement the logic of our MyApplyFunction, in the apply() method we
> would need to have access to the result of the last window computation.
> Before emiting the resulst in the apply () using collector.collect(..), we
> could save that result in an external storage systems (e.g Redis
> /Hazelcast) and then, in the begininig of the next window computation read
> such value, but we woud like to use some internal mechanism of Flink to do
> that.
>
> Could some provide help about it? Thanks in advance!!! :-)
>
> Best
>
> __
>
> *Dr. Rubén Casado*
> Head of Big Data
> Treelogic
>  
> *ruben.casado.treelogic*
>
> +34 902 286 386 - +34 607 18 28 06
> Parque Tecnológico de Asturias · Parcela 30
> E33428 Llanera · Asturias [Spain]
> www.treelogic.com
> __
>
>


save state in windows operation

2016-04-21 Thread Rubén Casado
Hello, 


We have problems working with states in Flink and I am sure you can help us :-) 


Let's say we have a workflow something like: 


DataStream myData = env.from... 


myData.map(new MyMap (..)) 
.keyBy(0) 
.countWindow(n) 
.apply(new MyApplyFunction()) 
.writeAsCSV(...) 


To implement the logic of our MyApplyFunction, in the apply() method we would 
need to have access to the result of the last window computation. Before 
emiting the resulst in the apply () using collector.collect(..), we could save 
that result in an external storage systems (e.g Redis /Hazelcast) and then, in 
the begininig of the next window computation read such value, but we woud like 
to use some internal mechanism of Flink to do that. 


Could some provide help about it? Thanks in advance!!! :-) 


Best 






















__ 

Dr. Rubén Casado 
Head of Big Data 
Treelogic 
ruben.casado.treelogic 

+34 902 286 386 - +34 607 18 28 06 
Parque Tecnológico de Asturias · Parcela 30 
E33428 Llanera · Asturias [Spain] 
www.treelogic.com 
__ 


Re: AvroWriter for Rolling sink

2016-04-21 Thread Aljoscha Krettek
Hi,
as far as I know there is no one working on this. I'm only aware of someone
working on an ORC (from Hive) Writer.

This would be a welcome addition! I think you are already on the right
track, the only thing required will probably be an AvroFileWriter and you
already started looking at SequenceFileWriter, which should be similar.

Cheers,
Aljoscha

On Thu, 21 Apr 2016 at 09:45 Igor Berman  wrote:

> Hi All,
> Is there such implementation somewhere?(before I start to implement it
> myself, it seems not too difficult based on SequenceFileWriter example)
>
> anyway any ideas/pointers will be highly appreciated
>
> thanks in advance
>
>


Re: Control triggering on empty window

2016-04-21 Thread Aljoscha Krettek
Hi,
I'm afraid this is not possible with our windowing model (expect with hacks
using GlobalWindow, as you mentioned). The reason is, that windows only
come into existence once there is an element that has a window. Before
that, the system has no reference point about what windows there should
exist because there is no knowledge about time except when looking at
elements.

Cheers,
Aljoscha

On Thu, 21 Apr 2016 at 01:31 Maxim  wrote:

> I have the following use case:
>
> Input stream of timestamped "on" and "off" events received out of order.
> I need to produce an event with time that system was "on" every 15
> minutes. Events should be produced only for intervals that system was "on".
>
> When 15 minute window has at least one record it is triggered and the
> required aggregate is created, but when no event is received within 15
> minute period window is not triggered and nothing is produced.
>
> I understand that it is not feasible to trigger on empty windows when the
> set of keys is unbounded. But it would be nice to give the control for such
> triggering to a window function. In my case the window function could
> enable the empty triggering for the current key when the last event in the
> evaluated window is "on" and disable it if is "off".
> The strawman API for such feature:
>
> public void apply(String key, TimeWindow window, Iterable input, 
> Collector out) throws Exception {
>
> ...
>
> RuntimeContext context = this.getRuntimeContext();
>
> if (lastEvent.isOn()) {
>
>context.enableEmptyWindowTriggering();
>
> } else {
>
>context.disableEmptyWindowTriggering();
>
> }
>
> }
>
> I could implement the same logic using global window and custom trigger
> and evictor, but it looks like ugly workaround to me.
>
> Is there any better way to solve this use case?
>
> Thanks,
>
> Maxim.
>
>


Re: Processing millions of messages in milliseconds real time -- Architecture guide required

2016-04-21 Thread Sendoh
Maybe you can refer to this- Kafka + Flink
http://data-artisans.com/kafka-flink-a-practical-how-to/



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Processing-millions-of-messages-in-milliseconds-real-time-Architecture-guide-required-tp6191p6282.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Checkpoint and restore states

2016-04-21 Thread Aljoscha Krettek
Hi,
yes Stefano is spot on! The state is only restored if a job is restarted
because of abnormal failure. For state that survives stopping/canceling a
job you can look at savepoints:
https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/savepoints.html
This
essentially uses the same mechanisms as the fault-tolerance stuff for state
but makes it explicit and allows restarting from different savepoints.

Cheers,
Aljoscha

On Wed, 20 Apr 2016 at 22:43 Stefano Baghino 
wrote:

> Hello again,
>
> thanks for giving a shot at my advice anyway but Aljoscha is far more
> knowledgeable then me regarding Flink. :)
>
> I hope I'm not getting mixed up again but I think gracefully canceling
> your job means you lose your job state. Am I right in saying that the state
> is preserved in case of abnormal termination (e.g.: the JobManager crashes)
> or if you explicitly create a savepoint?
>
> On Wed, Apr 20, 2016 at 10:13 PM, Jack Huang 
> wrote:
>
>> @Aljoscha:
>> For this word count example I am using a kafka topic as the input stream.
>> The problem is that when I cancel the task and restart it, the task loses
>> the accumulated word counts so far and start counting from 1 again. Am I
>> missing something basic here?
>>
>> @Stefano:
>> I also tried to implements the Checkpointed interface but had no luck
>> either. Canceling and restarting the task did not restore the states. Here
>> is my class:
>>
>> inStream.flatMap({ _.toLowerCase.split("\\W+") filter { _.nonEmpty } })
>>>   .keyBy({s => s})
>>>   .map(new StatefulCounter)
>>
>>
>> class StatefulCounter extends RichMapFunction[String, (String,Int)] with
>>> Checkpointed[Integer] {
>>>   private var count: Integer = 0
>>>
>>>   def map(in: String): (String,Int) = {
>>> count += 1
>>> return (in, count)
>>>   }
>>>   def snapshotState(l: Long, l1: Long): Integer = {
>>> count
>>>   }
>>>   def restoreState(state: Integer) {
>>> count = state
>>>   }
>>> }
>>
>>
>>
>> Thanks,
>>
>>
>> Jack Huang
>>
>> On Wed, Apr 20, 2016 at 5:36 AM, Stefano Baghino <
>> stefano.bagh...@radicalbit.io> wrote:
>>
>>> My bad, thanks for pointing that out.
>>>
>>> On Wed, Apr 20, 2016 at 1:49 PM, Aljoscha Krettek 
>>> wrote:
>>>
 Hi,
 the *withState() family of functions use the Key/Value state interface
 internally, so that should work.

 On Wed, 20 Apr 2016 at 12:33 Stefano Baghino <
 stefano.bagh...@radicalbit.io> wrote:

> Hi Jack,
>
> it seems you correctly enabled the checkpointing by calling
> `env.enableCheckpointing`. However, your UDFs have to either implement the
> Checkpointed interface or use the Key/Value State interface to make sure
> the state of the computation is snapshotted.
>
> The documentation explains how to define your functions so that they
> checkpoint the state far better than I could in this post:
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/state.html
>
> I hope I've been of some help, I'll gladly help you further if you
> need it.
>
> On Wed, Apr 20, 2016 at 11:34 AM, Aljoscha Krettek <
> aljos...@apache.org> wrote:
>
>> Hi,
>> what seems to be the problem?
>>
>> Cheers,
>> Aljoscha
>>
>> On Wed, 20 Apr 2016 at 03:52 Jack Huang 
>> wrote:
>>
>>> Hi all,
>>>
>>> I am doing a simple word count example and want to checkpoint the
>>> accumulated word counts. I am not having any luck getting the counts 
>>> saved
>>> and restored. Can someone help?
>>>
>>> env.enableCheckpointing(1000)
>>>
>>> env.setStateBackend(new MemoryStateBackend())
>>>
>>>
  ...
>>>
>>>
>>>
>>> inStream
 .keyBy({s => s})



 *.mapWithState((in:String, count:Option[Int]) => {val
 newCount = count.getOrElse(0) + 1((in, newCount), 
 Some(newCount))
   })*
 .print()
>>>
>>>
>>>
>>> Thanks,
>>>
>>> Jack Huang
>>>
>>
>
>
> --
> BR,
> Stefano Baghino
>
> Software Engineer @ Radicalbit
>

>>>
>>>
>>> --
>>> BR,
>>> Stefano Baghino
>>>
>>> Software Engineer @ Radicalbit
>>>
>>
>>
>
>
> --
> BR,
> Stefano Baghino
>
> Software Engineer @ Radicalbit
>


Re: Threads waiting on LocalBufferPool

2016-04-21 Thread Maciek Próchniak

Hi Ufuk,

thanks for quick reply.
Actually I had a little time to try both things.
1) helped only temporarily - it just took a bit longer to saturate the 
pool. After few minutes, periodically all kafka threads were waiting for 
bufferPool.
2) This seemed to help. I also reduced checkpoint interval - on rocks we 
had 5min, now I tried 30s. .


I attach throughput metrics - the former (around 18) is with increased 
heap & buffers, the latter (around 22) is with FileSystemStateBackend.
My state is few GB large - during the test it reached around 2-3GB. I 
must admit I was quite impressed that checkpointing to HDFS using 
FileSystem took only about 6-7s (with occasional spikes to 12-13s, which 
can be seen on metrcs - didn't check if it was caused by hdfs or sth else).


Now I looked at logs from 18 and seems like checkpointing rocksdb took 
around 2-3minutes:
2016-04-20 17:47:33,439 [Checkpoint Timer] INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - 
Triggering checkpoint 6 @ 1461167253439
2016-04-20 17:49:54,196 [flink-akka.actor.default-dispatcher-147] INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed 
checkpoint 6 (in 140588 ms)

- however I don't see any threads dumping state in threadStacks...

I guess I'll have to add some metrics around state invocations to see 
where is the problem with rocksDB... I'll write if I find anything, but 
that won't be today I think...


Btw - I was looking at FS state and I wonder would it be feasible to 
make variant of this state using immutable map (probably some scala one) 
to be able to do async checkpoints.
Then synchronous part would be essentially free - just taking the state 
map and materializing it asynchronously.
Of course, that would work only for immutable state - but this is often 
the case when writing in scala. WDYT?


thanks,
maciek




On 20/04/2016 16:28, Ufuk Celebi wrote:

Could be different things actually, including the parts of the network
you mentioned.

1)

Regarding the TM config:
- It can help to increase the number of network buffers (you can go
ahead and give it 4 GB, e.g. 134217728 buffers a 32 KB)
- In general, you have way more memory available than you actually
give to Flink. I would increase the 20 GB heap size.

As a first step you could address these two points and re-run your job.

2)

As a follow-up you could also work with the FileSystemStateBackend,
which keeps state in memory (on-heap) and writes checkpoints to files.
This would help in checking how much RocksDB is slowing things down.


I'm curious about the results. Do you think you will have time to try this?

– Ufuk


On Wed, Apr 20, 2016 at 3:45 PM, Maciek Próchniak  wrote:

Hi,
I'm running my flink job on one rather large machine (20 cores with
hyperthreading, 120GB RAM). Task manager has 20GB heap allocated.
It does more or less:
read csv from kafka -> keyBy one of the fields -> some custom state
processing.
Kafka topic has 24 partitions, so my parallelism is also 24

After some tweaks and upgrading to 1.0.2-rc3 (as I use RocksDB state
backend) I reached a point when throughput is ~120-150k/s.
One the same kafka and machine I reached > 500k/s with simple filtering job,
so I wanted to see what's the bottleneck.

It turns out that quite often all of kafka threads are stuck waiting for
buffer from pool:
"Thread-6695" #7466 daemon prio=5 os_prio=0 tid=0x7f77fd80d000
nid=0x8118 in Object.wait() [0x7f7ad54d9000]
java.lang.Thread.State: TIMED_WAITING (on object monitor)
 at java.lang.Object.wait(Native Method)
 at
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
 - locked <0x0002eade3890> (a java.util.ArrayDeque)
 at
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
 at
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
 - locked <0x0002eb73cbd0> (a
org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
 at
org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:86)
 at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:78)
 at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:39)
 at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
 at
org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream.scala:541)
 at
org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream.scala:541)
 at scala.collection.immutable.List.foreach(List.scala:381)
 at
org.apache.flink.streaming.api.scala.DataStream$$anon$6.flatMap(DataStream.scala:541)
 at