Need some help to understand the cause of the error

2016-02-25 Thread Nirmalya Sengupta
Hello Flinksters,

I am trying to use Flinkspector in a Scala code snippet of mine and Flink
is complaining. The code is here:

---

case class Reading(field1:String,field2:String,field3:Int)

object MultiWindowing {

  def main(args: Array[String]) {}

  //  WindowFunction

  class WindowPrinter extends WindowFunction[Reading, String, String,
TimeWindow] {

  //  .
}
  }

  val env = DataStreamTestEnvironment.createTestEnvironment(1)

  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

  val input: EventTimeInput[Reading]  =
EventTimeInputBuilder
.startWith(Reading("hans", "elephant", 15))
.emit(Reading("susi", "arctic", 20), After.period(30, TimeUnit.SECONDS))
.emit(Reading("pete", "elephant", 40), After.period(20,
TimeUnit.SECONDS))

  //acquire data source from input
  val stream = env.fromInput(input)

  //apply transformation
  val k = stream.keyBy(new KeySelector [Reading,String] {
def getKey(r:Reading) =  r.field2
  })
.timeWindow(Time.of(5, TimeUnit.MINUTES), Time.of(1, TimeUnit.MINUTES))

k.sum(3)
.print()

  env.execute()

}

---

And at runtime, I get this error:



Exception in thread "main" java.lang.ExceptionInInitializerError
at
org.apache.flink.quickstart.StreamWindowExperiments.MultiWindowing.main(MultiWindowing.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
Caused by: java.lang.IndexOutOfBoundsException: Not 0th field selected for
a simple type (non-tuple, non-array).
at
org.apache.flink.streaming.util.FieldAccessor.create(FieldAccessor.java:76)
at
org.apache.flink.streaming.api.functions.aggregation.SumAggregator.(SumAggregator.java:37)
at
org.apache.flink.streaming.api.datastream.WindowedStream.sum(WindowedStream.java:373)
at
org.apache.flink.quickstart.StreamWindowExperiments.MultiWindowing$.(MultiWindowing.scala:63)
at
org.apache.flink.quickstart.StreamWindowExperiments.MultiWindowing$.(MultiWindowing.scala)
... 6 more


---

Can someone help me by pointing out the mistake I am making?

-- Nirmalya

-- 
Software Technologist
http://www.linkedin.com/in/nirmalyasengupta
"If you have built castles in the air, your work need not be lost. That is
where they should be.
Now put the foundation under them."


suggestion for Quickstart

2016-02-25 Thread Tara Athan
On 
https://ci.apache.org/projects/flink/flink-docs-master/quickstart/setup_quickstart.html


some of the instructions have been updated to 1.0-SNAPSHOT, but not all.

The download link goes to 
http://www.apache.org/dyn/closer.cgi/flink/flink-0.9.1/flink-0.9.1-bin-hadoop1.tgz


and all the links there are broken.

Also,
$ bin/flink run ./examples/WordCount.jar file://`pwd`/hamlet.txt 
file://`pwd`/wordcount-result.txt


should be

$ bin/flink run ./examples/batch/WordCount.jar

Or if you want to give the example of explicit arguments, then it needs 
to be


$ bin/flink run ./examples/batch/WordCount.jar --input 
file:///`pwd`/hamlet.txt --output file:///`pwd`/wordcount-result.txt


Similarly, you could add

$ bin/flink run ./examples/streaming/WordCount.jar --output 
file:///`pwd`/streaming-wordcount-result.txt


Finally, to complete the demo please add the instruction

$ bin/stop-local.sh

Best, Tara



Re: Watermarks with repartition

2016-02-25 Thread Zach Cox
I think I found the information I was looking for:

RecordWriter broadcasts each emitted watermark to all outgoing channels [1].

StreamInputProcessor tracks the max watermark received on each incoming
channel separately, and computes the task's watermark as the min of all
incoming watermarks [2].

Is this an accurate summary of Flink's watermark propagation?

So in my previous example, each window count task is building up a count
for each window based on incoming event's timestamp, and when all incoming
watermarks have progressed beyond the end of the window, the count is
emitted. So if one partition's watermark lags behind the other, it just
means the window output is triggered based on this lagging watermark.

-Zach

[1]
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java#L103
[2]
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java#L147


On Thu, Feb 25, 2016 at 3:31 PM Zach Cox  wrote:

> Hi - how are watermarks passed along parallel tasks where there is a
> repartition? For example, say I have a simple streaming job computing
> hourly counts per key, something like this:
>
> val environment = StreamExecutionEnvironment.getExecutionEnvironment
> environment.setParallelism(2)
> environment.setStreamTimeCharacteristic(EventTime)
> environment.getConfig.enableTimestamps()
> environment
>   .addSource(...)
>   .assignAscendingTimestamps(_.timestamp)
>   .keyBy("someField")
>   .timeWindow(Time.hours(1))
>   .fold(0, (count, element) => count + 1)
>   .addSink(...)
> environment.execute("example")
>
> Say the source has 2 parallel partitions (e.g. Kafka topic) and the events
> from the source contain timestamps, but over time the 2 source tasks
> diverge in event time (maybe 1 Kafka topic partition has many more events
> than the other).
>
> The job graph looks like this: http://imgur.com/hxEpF6b
>
> From what I can tell, the execution graph, with parallelism=2, would look
> like this: http://imgur.com/pSX8ov5. The keyBy causes a hash partition to
> be used, so that events with the same key end up at the same window
> subtask, regardless of which source partition they came from.
>
> Since the watermarks are skewed between the parallel pipelines, what
> happens when differing watermarks are sent to the window count operators?
> Is something tracking the min incoming watermark there? Could anyone point
> me to Flink code that implements this? I'd really like to learn more about
> how this works.
>
> Thanks,
> Zach
>
>
>


Graph with stream of updates

2016-02-25 Thread Ankur Sharma
Hello,

Is it possible to create and update graph with streaming edge and vertex data 
in flink?

Best,
Ankur Sharma
3.15 E1.1 Universität des Saarlandes
66123, Saarbrücken Germany
Email: ankur.sha...@mpi-inf.mpg.de  
an...@stud.uni-saarland.de 


smime.p7s
Description: S/MIME cryptographic signature


Watermarks with repartition

2016-02-25 Thread Zach Cox
Hi - how are watermarks passed along parallel tasks where there is a
repartition? For example, say I have a simple streaming job computing
hourly counts per key, something like this:

val environment = StreamExecutionEnvironment.getExecutionEnvironment
environment.setParallelism(2)
environment.setStreamTimeCharacteristic(EventTime)
environment.getConfig.enableTimestamps()
environment
  .addSource(...)
  .assignAscendingTimestamps(_.timestamp)
  .keyBy("someField")
  .timeWindow(Time.hours(1))
  .fold(0, (count, element) => count + 1)
  .addSink(...)
environment.execute("example")

Say the source has 2 parallel partitions (e.g. Kafka topic) and the events
from the source contain timestamps, but over time the 2 source tasks
diverge in event time (maybe 1 Kafka topic partition has many more events
than the other).

The job graph looks like this: http://imgur.com/hxEpF6b

>From what I can tell, the execution graph, with parallelism=2, would look
like this: http://imgur.com/pSX8ov5. The keyBy causes a hash partition to
be used, so that events with the same key end up at the same window
subtask, regardless of which source partition they came from.

Since the watermarks are skewed between the parallel pipelines, what
happens when differing watermarks are sent to the window count operators?
Is something tracking the min incoming watermark there? Could anyone point
me to Flink code that implements this? I'd really like to learn more about
how this works.

Thanks,
Zach


Re: The way to itearte instances in AllWindowFunction in current Master branch

2016-02-25 Thread Aljoscha Krettek
Hi Hung,
I see one thing that could explain the problem, the timestamp assigner should 
look like this:

new AssignerWithPeriodicWatermarks() {

   long curTimeStamp;

   @Override
   public long extractTimestamp(BizEvent biz, long
currentTimestamp) {
   curTimeStamp = Math.max(curTimeStamp, 
biz.time.getMillis());
   return biz.time.getMillis();
   }

   @Override
   public long getCurrentWatermark() {
   return (curTimeStamp - (maxEventDelay * 1000));
   }
   }

The currentTimestamp parameter is the internal timestamp that the element had 
before, which is most likely just “-1” because no timestamp was previously 
assigned.

Does it work with that fix?

Cheers,
Aljoscha

> On 25 Feb 2016, at 17:26, HungChang  wrote:
> 
> An update. The following situation works as expected. The data arrives after
> Flink job starts to execute.
> 1> (2016-02-25T17:46:25.00,13)
> 2> (2016-02-25T17:46:40.00,16)
> 3> (2016-02-25T17:46:50.00,11)
> 4> (2016-02-25T17:47:10.00,12)
> 
> But for the data arrives long time before. Strange behavior appears. Does it
> mean we cannot reply the computation?
> 
> 
> 
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/The-way-to-itearte-instances-in-AllWindowFunction-in-current-Master-branch-tp5137p5156.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.



Re: schedule tasks `inside` Flink

2016-02-25 Thread Michal Fijolek
Thanks for help guys!
Eventually I did implemented it as a RichFunction using open() and closed()
methods.

Michał

2016-02-25 19:00 GMT+01:00 Stephan Ewen :

> Fabian's suggestion with the co-map is good. You can use a "broadcast()"
> connect to make sure the dictionary gets to all nodes.
>
> If you want full control about how and when to read  the data, a scheduled
> task is not that bad even as a solution. Make sure you implement this as a
> "RichFunction", so you can use "open()" to read the first set of data and
> "close()" to stop your threads.
>
> As a related issue: We are looking into extensions to the API to
> explicitly support such "slow changing inputs" in a similar way as
> "broadcast variables" work in the DataSet API.
> This is the JIRA issue, if you post your use case there, you can make this
> part of the discussion: https://issues.apache.org/jira/browse/FLINK-3514
>
> Greetings,
> Stephan
>
>
>
>
>
>
> On Mon, Feb 15, 2016 at 12:33 PM, Fabian Hueske  wrote:
>
>> Hi Michal,
>>
>> If I got your requirements right, you could try to solve this issue by
>> serving the updates through a regular DataStream.
>> You could add a SourceFunction which periodically emits a new version of
>> the cache and a CoFlatMap operator which receives on the first input the
>> regular streamed input and on the second input the cache updates. If the
>> Flink job gets stopped, the update source will be canceled as a regular
>> source.
>>
>> You might also want to expose the cache as operator state to Flink to
>> ensure it is checkpointed and restored in case of a failure.
>>
>> Best, Fabian
>>
>> 2016-02-14 18:36 GMT+01:00 Michal Fijolek :
>>
>>> Hello.
>>> My app needs Map[K, V] as simple cache for business data, which needs to
>>> be invalidated periodically, lets say once per day.
>>> Right now I'm using rather naive approach which is
>>>
>>> trait Dictionary[K, V] extends Serializable {
>>>   @volatile private var cache: Map[K, V] = Map()
>>>   def lookup(id: K): Option[V] = cache.get(id)
>>>   private def fetchDictionary: Map[K, V] = ???
>>>   private def updateDictionary() = {
>>> cache = fetchDictionary
>>>   }
>>>   val invalidate = new Runnable with Serializable {
>>> override def run(): Unit = updateDictionary()
>>>   }
>>>   
>>> Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(invalidate,
>>>  oncePerDay)
>>> }
>>>
>>> This seems wrong, because I guess I should do such thing `inside` Flink, 
>>> and when I stop Flink job, nobody's gonna stop scheduled invalidation tasks.
>>> What will be idomatic Flink way to approach this problem? How can I 
>>> schedule tasks and make Flink aware of them?
>>>
>>> Thanks,
>>> Michal
>>>
>>>
>>
>


Counting tuples within a window in Flink Stream

2016-02-25 Thread Saiph Kappa
Hi,

In Flink Stream what's the best way of counting the number of tuples within
a window of 10 seconds? Using a map-reduce task? Asking because in spark
there is the method rawStream.countByWindow(Seconds(x)).

Thanks.


Re: Read every file in a directory at once

2016-02-25 Thread Stephan Ewen
Thanks for sharing this solution!

On Thu, Feb 18, 2016 at 4:02 PM, Flavio Pompermaier 
wrote:

> My current solution is:
>
> List paths = new ArrayList();
> File dir = new File(BASE_DIR);
> for (File f : dir.listFiles()) {
>   paths.add(f.getName());
> }
>  DataSet mail = env.fromCollection(paths).map(new
> FileToString(BASE_DIR)).
>
> The FileToString does basically a map that return FileUtils.toString(new
> File(baseDir, filePath));
>
> I hope this could help someone else..
>
>
>
> On Thu, Feb 18, 2016 at 3:48 PM, Flavio Pompermaier 
> wrote:
>
>> Hi to all,
>> I want to apply a map function to every file in a folder. Is there an
>> easy way (or an already existing InputFormat) to do that?
>>
>> Best,
>> Flavio
>>
>
>


Re: Frequent exceptions killing streaming job

2016-02-25 Thread Nick Dimiduk
For what it's worth, I dug into the TM logs and found that this exception
was not the root cause, merely a symptom of other backpressure building in
the flow (actually, lock contention in another part of the stack). While
Flink was helpful in finding and bubbling up this stack to the UI, it was
ultimately missleading, caused me to overlook proper evaluation of the
failure.

On Wed, Jan 20, 2016 at 2:59 AM, Robert Metzger  wrote:

> Hey Nick,
>
> I had a discussion with Stephan Ewen on how we could resolve the issue.
> I filed a JIRA with our suggested approach:
> https://issues.apache.org/jira/browse/FLINK-3264
>
> By handling this directly in the KafkaConsumer, we would avoid fetching
> data we can not handle anyways (discarding in the deserialization schema
> would be more inefficient).
>
> Let us know what you think about our suggested approach.
>
> Sadly, it seems that the Kafka 0.9 consumer API does not yet support
> requesting the latest offset of a TopicPartition. I'll ask about this on
> their ML.
>
>
>
>
> On Sun, Jan 17, 2016 at 8:28 PM, Nick Dimiduk  wrote:
>
>> On Sunday, January 17, 2016, Stephan Ewen  wrote:
>>
>>> I agree, real time streams should never go down.
>>>
>>
>>  Glad to hear that :)
>>
>>
>>> [snip] Both should be supported.
>>>
>>
>> Agreed.
>>
>>
>>> Since we interpret streaming very broadly (also including analysis of
>>> historic streams or timely data), the "backpressure/catch-up" mode seemed
>>> natural as the first one to implement.
>>>
>>
>> Indeed, this is what my job is doing. I have set it to, lacking a valid
>> offset, start from the beginning. I have to presume that in my case the
>> stream data is expiring faster than my consumers can keep up. However I
>> haven't investigated proper monitoring yet.
>>
>>
>>> The "load shedding" variant can probably even be realized in the Kafka
>>> consumer, without complex modifications to the core Flink runtime itself.
>>>
>>
>> I agree here as well. Indeed, this exception is being thrown from the
>> consumer, not the runtime.
>>
>>
>>
>>> On Sun, Jan 17, 2016 at 12:42 AM, Nick Dimiduk 
>>> wrote:
>>>
 This goes back to the idea that streaming applications should never go
 down. I'd much rather consume at max capacity and knowingly drop some
 portion of the incoming pipe than have the streaming job crash. Of course,
 once the job itself is robust, I still need the runtime to be robust --
 YARN vs (potential) Mesos vs standalone cluster will be my next
 consideration.

 I can share some details about my setup, but not at this time; in part
 because I don't have my metrics available at the moment and in part because
 this is a public, archived list.

 On Sat, Jan 16, 2016 at 8:23 AM, Stephan Ewen  wrote:

> @Robert: Is it possible to add a "fallback" strategy to the consumer?
> Something like "if offsets cannot be found, use latest"?
>
> I would make this an optional feature to activate. I would think it is
> quite surprising to users if records start being skipped in certain
> situations. But I can see that this would be desirable sometimes.
>
> More control over skipping the records could be something to implement
> in an extended version of the Kafka Consumer. A user could define a policy
> that, in case consumer falls behind producer more than X (offsets), it
> starts requesting the latest offsets (rather than the following), thereby
> skipping a bunch of records.
>
>
>
> On Sat, Jan 16, 2016 at 3:14 PM, Robert Metzger 
> wrote:
>
>> Hi Nick,
>>
>> I'm sorry you ran into the issue. Is it possible that Flink's Kafka
>> consumer falls back in the topic so far that the offsets it's requesting
>> are invalid?
>>
>> For that, the retention time of Kafka has to be pretty short.
>>
>> Skipping records under load is something currently not supported by
>> Flink itself. The only idea I had for handling this would be to give the
>> DeserializationSchema a call back to request the latest offset from Kafka
>> to determine the lag. With that, the schema could determine a "dropping
>> rate" to catch up.
>> What would you as an application developer expect to handle the
>> situation?
>>
>>
>> Just out of curiosity: What's the throughput you have on the Kafka
>> topic?
>>
>>
>> On Fri, Jan 15, 2016 at 10:13 PM, Nick Dimiduk 
>> wrote:
>>
>>> Hi folks,
>>>
>>> I have a streaming job that consumes from of a kafka topic. The
>>> topic is pretty active so the local-mode single worker is obviously not
>>> able to keep up with the fire-hose. I expect the job to skip records and
>>> continue on. However, I'm getting an exception from the LegacyFetcher 
>>> which
>>> 

Re: The way to itearte instances in AllWindowFunction in current Master branch

2016-02-25 Thread HungChang
An update. The following situation works as expected. The data arrives after
Flink job starts to execute.
1> (2016-02-25T17:46:25.00,13)
2> (2016-02-25T17:46:40.00,16)
3> (2016-02-25T17:46:50.00,11)
4> (2016-02-25T17:47:10.00,12)

But for the data arrives long time before. Strange behavior appears. Does it
mean we cannot reply the computation?



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/The-way-to-itearte-instances-in-AllWindowFunction-in-current-Master-branch-tp5137p5156.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Mapping two datasets

2016-02-25 Thread Saliya Ekanayake
Thank you, Marton. That seems doable.

However, is there a way I can create a dummy indexed data set? Like a way
to partition the index range without data across parallel tasks. For
example, if I could have something like,

DataSet ds = ...

then I can implement a custom method to load required data for a split
within a map operation, which will be less expensive than a join for my
case.

Thank you,
Saliya

On Thu, Feb 25, 2016 at 11:45 AM, Márton Balassi 
wrote:

> Hey Saliya,
>
> I would add a uniqe ID to both the DataSets, the variable you referred to
> as 'i'. Then you can join the two DataSets on the field containing 'i' and
> do the mapping on the joined result.
>
> Hope this helps,
>
> Marton
>
> On Thu, Feb 25, 2016 at 5:38 PM, Saliya Ekanayake 
> wrote:
>
>> Hi,
>>
>> I've two data sets like,
>>
>> DataSet a = ...
>> DataSet b = ...
>>
>> They have the same type and same decomposition. I want to apply a map
>> operator that need both *a* and *b. *For example,
>>
>> a.map( i -> OP)
>>
>> within this OP I need the corresponding (*i *th) element of *b* as well.
>> Is there a way to do this?
>>
>> Thank you,
>> Saliya
>>
>> --
>> Saliya Ekanayake
>> Ph.D. Candidate | Research Assistant
>> School of Informatics and Computing | Digital Science Center
>> Indiana University, Bloomington
>> Cell 812-391-4914
>> http://saliya.org
>>
>
>


-- 
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
Cell 812-391-4914
http://saliya.org


Re: Mapping two datasets

2016-02-25 Thread Márton Balassi
Hey Saliya,

I would add a uniqe ID to both the DataSets, the variable you referred to
as 'i'. Then you can join the two DataSets on the field containing 'i' and
do the mapping on the joined result.

Hope this helps,

Marton

On Thu, Feb 25, 2016 at 5:38 PM, Saliya Ekanayake  wrote:

> Hi,
>
> I've two data sets like,
>
> DataSet a = ...
> DataSet b = ...
>
> They have the same type and same decomposition. I want to apply a map
> operator that need both *a* and *b. *For example,
>
> a.map( i -> OP)
>
> within this OP I need the corresponding (*i *th) element of *b* as well.
> Is there a way to do this?
>
> Thank you,
> Saliya
>
> --
> Saliya Ekanayake
> Ph.D. Candidate | Research Assistant
> School of Informatics and Computing | Digital Science Center
> Indiana University, Bloomington
> Cell 812-391-4914
> http://saliya.org
>


Mapping two datasets

2016-02-25 Thread Saliya Ekanayake
Hi,

I've two data sets like,

DataSet a = ...
DataSet b = ...

They have the same type and same decomposition. I want to apply a map
operator that need both *a* and *b. *For example,

a.map( i -> OP)

within this OP I need the corresponding (*i *th) element of *b* as well. Is
there a way to do this?

Thank you,
Saliya

-- 
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
Cell 812-391-4914
http://saliya.org


Re: Master Thesis [Apache-flink paper references]

2016-02-25 Thread Stephan Ewen
Hi!

What Flink implements on the Streaming side is not part of Stratosphere any
more, so unfortunately not linked on that website.

Here are some pointers:

  - Fault tolerance: http://arxiv.org/abs/1506.08603

  - The streaming model (windows / event time) is based on the Dataflow
model:
http://people.csail.mit.edu/matei/courses/2015/6.S897/readings/google-dataflow.pdf

  - The CEP library is inspired by
https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf

Greetings,
Stephan




On Fri, Feb 12, 2016 at 8:42 PM, subash basnet  wrote:

> Hello Matthias,
>
> Thank you very much :)
>
> Best Regards,
> Subash Basnet
>
> On Fri, Feb 12, 2016 at 8:22 PM, Matthias J. Sax  wrote:
>
>> You might want to check out the Stratosphere project web site:
>> http://stratosphere.eu/project/publications/
>>
>> -Matthias
>>
>> On 02/12/2016 05:52 PM, subash basnet wrote:
>> > Hello all,
>> >
>> > I am currently doing master's thesis on Apache-flink. It would be really
>> > helpful to know about the reference papers followed for the
>> > development/background of flink.
>> > It would help me build a solid background knowledge to analyze flink.
>> >
>> > Currently I am reading all the related materials found in internet and
>> > flink/data-artisan materials provided.
>> > Could you please suggest me.
>> >
>> >
>> >
>> > Best Regards,
>> > Subash Basnet
>>
>>
>


Re: The way to itearte instances in AllWindowFunction in current Master branch

2016-02-25 Thread HungChang
Thank you for your reply. Please let me know if other classes o full code is
needed. 

/**
 * Count how many total events
*/

StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironment(4, env_config);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", BOOTSTRAP_SERVERS);
properties.setProperty("group.id", "test");
properties.setProperty("client.id", "flink_test");
properties.setProperty("auto.offset.reset", "earliest");

final int maxEventDelay = 5; // events are out of order by max x
seconds
DataStream bizs = env.addSource(new
FlinkKafkaConsumer09<>(KAFKA_TOPIC,
new BizSchema(), properties)).
assignTimestampsAndWatermarks(new
AssignerWithPeriodicWatermarks() {

long curTimeStamp;

@Override
public long extractTimestamp(BizEvent biz, long
currentTimestamp) {
curTimeStamp = currentTimestamp;
return biz.time.getMillis();
}

@Override
public long getCurrentWatermark() {
return (curTimeStamp - (maxEventDelay * 1000));
}
});

DataStream> bizCnt = bizs.flatMap(new
CountBiz());

DataStream> bizWindowTotal =
bizCnt.timeWindowAll(Time.of(5, TimeUnit.MINUTES))
  .apply(new SumStartTsAllWindow());

   // Output(start time of windows, counts)
public static class SumStartTsAllWindow implements
AllWindowFunction>,
Tuple2, TimeWindow> {

private static DateTimeFormatter timeFormatter =
   
DateTimeFormat.forPattern("-MM-dd'T'HH:mm:ss.SSSZ").withLocale(Locale.GERMAN).
withZone(DateTimeZone.forID("Europe/Berlin"));
@Override
public void apply(TimeWindow timeWindow,
Iterable> values,
  Collector> collector)
throws Exception {

DateTime startTs = new DateTime(timeWindow.getStart(),
DateTimeZone.forID("Europe/Berlin"));


Iterator> it = values.iterator();
int sum=0;
while(it.hasNext()){
Tuple2 value = it.next();
sum += value.f1;
}
collector.collect(new Tuple2<>(startTs.toString(timeFormatter),
sum));
}
}

// Output (BizEvent, 1)
public static class CountBiz implements FlatMapFunction> {

@Override
public void flatMap(BizEvent bizEvent, Collector> collector) {
//System.out.println("TIme in count!: " + bizEvent.time);
collector.collect(new Tuple2<>(bizEvent, (int) 1));
}
}



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/The-way-to-itearte-instances-in-AllWindowFunction-in-current-Master-branch-tp5137p5151.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: The way to itearte instances in AllWindowFunction in current Master branch

2016-02-25 Thread Aljoscha Krettek
Hi Hung,
could you maybe post a more complete snippet of your program? This would allow 
me to figure out why the output changes between versions 0.10 and 1.0.

@Matthias: The signature was changed to also allow window functions that don’t 
take an Iterable. For example, when doing WindowedStream.apply(ReduceFunction, 
WindowFunction) the window function only gets a single element. Before, this 
would be a single element inside an Iterable. Now the fact that it gets a 
single element is reflected in the signature.

> On 25 Feb 2016, at 14:47, Matthias J. Sax  wrote:
> 
> Just out of curiosity: Why was it changes like this. Specifying
> "Iterable<...>" as type in AllWindowFunction seems rather unintuitive...
> 
> -Matthias
> 
> On 02/25/2016 01:58 PM, Aljoscha Krettek wrote:
>> Hi,
>> yes that is true. The way you would now write such a function is this:
>> 
>> private static class MyIterableFunction implements 
>> AllWindowFunction>, Tuple2> Integer>, TimeWindow> {
>>   private static final long serialVersionUID = 1L;
>> 
>>   @Override
>>   public void apply(
>> TimeWindow window,
>> Iterable> values,
>> Collector> out) throws Exception {
>> 
>>   }
>> }
>> 
>> (I used Tuple2 as an example input type here.)
>> 
>> and then you can use it with AllWindowedStream.apply(new 
>> MyIterableFunction());
>> 
>> 
>>> On 25 Feb 2016, at 13:29, HungChang  wrote:
>>> 
>>> Thank you for your reply.
>>> 
>>> The following in the current master looks like not iterable? because the
>>> parameter is IN rather than Iterable
>>> So I still have problem to iterate,,,
>>> 
>>> @Public
>>> public interface AllWindowFunction extends
>>> Function, Serializable {
>>> 
>>> /**
>>>  * Evaluates the window and outputs none or several elements.
>>>  *
>>>  * @param window The window that is being evaluated.
>>>  * @param values The elements in the window being evaluated.
>>>  * @param out A collector for emitting elements.
>>>  *
>>>  * @throws Exception The function may throw exceptions to fail the 
>>> program
>>> and trigger recovery.
>>>  */
>>> void apply(W window, IN values, Collector out) throws Exception;
>>> }
>>> 
>>> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AllWindowFunction.java
>>> 
>>> Best,
>>> 
>>> Hung
>>> 
>>> 
>>> 
>>> --
>>> View this message in context: 
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/The-way-to-itearte-instances-in-AllWindowFunction-in-current-Master-branch-tp5137p5145.html
>>> Sent from the Apache Flink User Mailing List archive. mailing list archive 
>>> at Nabble.com.
>> 
> 



Re: The way to itearte instances in AllWindowFunction in current Master branch

2016-02-25 Thread Matthias J. Sax
Just out of curiosity: Why was it changes like this. Specifying
"Iterable<...>" as type in AllWindowFunction seems rather unintuitive...

-Matthias

On 02/25/2016 01:58 PM, Aljoscha Krettek wrote:
> Hi,
> yes that is true. The way you would now write such a function is this:
> 
> private static class MyIterableFunction implements 
> AllWindowFunction>, Tuple2, 
> TimeWindow> {
>private static final long serialVersionUID = 1L;
> 
>@Override
>public void apply(
>  TimeWindow window,
>  Iterable> values,
>  Collector> out) throws Exception {
> 
>}
> }
> 
> (I used Tuple2 as an example input type here.)
> 
> and then you can use it with AllWindowedStream.apply(new 
> MyIterableFunction());
> 
> 
>> On 25 Feb 2016, at 13:29, HungChang  wrote:
>>
>> Thank you for your reply.
>>
>> The following in the current master looks like not iterable? because the
>> parameter is IN rather than Iterable
>> So I still have problem to iterate,,,
>>
>> @Public
>> public interface AllWindowFunction extends
>> Function, Serializable {
>>
>>  /**
>>   * Evaluates the window and outputs none or several elements.
>>   *
>>   * @param window The window that is being evaluated.
>>   * @param values The elements in the window being evaluated.
>>   * @param out A collector for emitting elements.
>>   *
>>   * @throws Exception The function may throw exceptions to fail the 
>> program
>> and trigger recovery.
>>   */
>>  void apply(W window, IN values, Collector out) throws Exception;
>> }
>>
>> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AllWindowFunction.java
>>
>> Best,
>>
>> Hung
>>
>>
>>
>> --
>> View this message in context: 
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/The-way-to-itearte-instances-in-AllWindowFunction-in-current-Master-branch-tp5137p5145.html
>> Sent from the Apache Flink User Mailing List archive. mailing list archive 
>> at Nabble.com.
> 



signature.asc
Description: OpenPGP digital signature


Re: The way to itearte instances in AllWindowFunction in current Master branch

2016-02-25 Thread HungChang
Thanks you. I can be sure this way is correct now. 
I have tried this but the windows are not aggregating as well. Instead, the
AllWindowFunction only works as flatMap. 
Shouldn't it only output for one window range? The most strange part is the
first output is aggregating while others are not.

1> (68,2016-02-18T12:00:00.00,2016-02-18T12:10:00.00)
1> (1,2016-02-18T12:00:00.00,2016-02-18T12:10:00.00)
1> (1,2016-02-18T12:00:00.00,2016-02-18T12:10:00.00)
1> (1,2016-02-18T12:00:00.00,2016-02-18T12:10:00.00)


When running 0.10.2 version the output is correct that the window time is
not overlap (I'm using tumbling window)
1> (8,2016-02-18T12:00:00.00,2016-02-18T12:10:00.00)
1> (5,2016-02-18T12:10:00.00,2016-02-18T12:20:00.00)
1> (6,2016-02-18T12:20:00.00,2016-02-18T12:30:00.00)
1> (3,2016-02-18T12:30:00.00,2016-02-18T12:40:00.00)


Perhaps I should look into other issues.

Best,

Hung



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/The-way-to-itearte-instances-in-AllWindowFunction-in-current-Master-branch-tp5137p5148.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: loss of TaskManager

2016-02-25 Thread Till Rohrmann
Hi Christoph,

have you tried setting the blocks parameter of the SVM algorithm? That
basically decides how many features are grouped together in one block. The
lower the value is the more feature vectors are grouped together and, thus,
the size of the block is increased. Increasing this value might solve the
OOM exception.

Cheers,
Till

On Thu, Feb 25, 2016 at 1:06 PM, Boden, Christoph <
christoph.bo...@tu-berlin.de> wrote:

> Hi Ufuk,
>
> thanks for the hint. Unfortunately  I cannot access the system log on the
> remote machine. But i re-ran the job with slightly increased
> memory.fraction (0.3 -> 0.4) and got an OutOfMemory Exception again:
>
> cloud-25
> Error: com.esotericsoftware.kryo.KryoException: java.io.IOException:
> Failed to serialize element. Serialized size (> 553259005 bytes) exceeds
> JVM heap space
> Serialization trace:
> data (org.apache.flink.ml.math.SparseVector)
> at com.esotericsoftware.kryo.io.Output.flush(Output.java:165)
> at com.esotericsoftware.kryo.io.Output.require(Output.java:142)
> at com.esotericsoftware.kryo.io.Output.writeLong(Output.java:501)
> at com.esotericsoftware.kryo.io.Output.writeDouble(Output.java:630)
> at com.esotericsoftware.kryo.io.Output.writeDoubles(Output.java:711)
> at
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$DoubleArraySerializer.write(DefaultArraySerializers.java:198)
> at
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$DoubleArraySerializer.write(DefaultArraySerializers.java:187)
> at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:577)
> at
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:68)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
> at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
> at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:186)
> at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:89)
> at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:29)
> at
> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:95)
> at
> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:90)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at
> org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:90)
> at
> org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:30)
> at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:89)
> at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:29)
> at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:89)
> at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:29)
> at
> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:51)
> at
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:76)
> at
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:83)
> at
> org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
> at
> org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
> at
> org.apache.flink.runtime.operators.CrossDriver.runBlockedOuterSecond(CrossDriver.java:310)
> at org.apache.flink.runtime.operators.CrossDriver.run(CrossDriver.java:162)
> at
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
> at
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: Failed to serialize element. Serialized
> size (> 553259005 bytes) exceeds JVM heap space
> at
> org.apache.flink.runtime.util.DataOutputSerializer.resize(DataOutputSerializer.java:288)
> at
> org.apache.flink.runtime.util.DataOutputSerializer.write(DataOutputSerializer.java:117)
> at
> org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:39)
> at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)
> ... 36 more
> Caused by: java.lang.OutOfMemoryError: Java heap space
> at
> org.apache.flink.runtime.util.DataOutputSerializer.resize(DataOutputSerializer.java:284)
> at
> 

Re: The way to itearte instances in AllWindowFunction in current Master branch

2016-02-25 Thread Aljoscha Krettek
Hi,
yes that is true. The way you would now write such a function is this:

private static class MyIterableFunction implements 
AllWindowFunction>, Tuple2, 
TimeWindow> {
   private static final long serialVersionUID = 1L;

   @Override
   public void apply(
 TimeWindow window,
 Iterable> values,
 Collector> out) throws Exception {

   }
}

(I used Tuple2 as an example input type here.)

and then you can use it with AllWindowedStream.apply(new MyIterableFunction());


> On 25 Feb 2016, at 13:29, HungChang  wrote:
> 
> Thank you for your reply.
> 
> The following in the current master looks like not iterable? because the
> parameter is IN rather than Iterable
> So I still have problem to iterate,,,
> 
> @Public
> public interface AllWindowFunction extends
> Function, Serializable {
> 
>   /**
>* Evaluates the window and outputs none or several elements.
>*
>* @param window The window that is being evaluated.
>* @param values The elements in the window being evaluated.
>* @param out A collector for emitting elements.
>*
>* @throws Exception The function may throw exceptions to fail the 
> program
> and trigger recovery.
>*/
>   void apply(W window, IN values, Collector out) throws Exception;
> }
> 
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AllWindowFunction.java
> 
> Best,
> 
> Hung
> 
> 
> 
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/The-way-to-itearte-instances-in-AllWindowFunction-in-current-Master-branch-tp5137p5145.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.



Re: The way to itearte instances in AllWindowFunction in current Master branch

2016-02-25 Thread HungChang
Thank you for your reply.

The following in the current master looks like not iterable? because the
parameter is IN rather than Iterable
So I still have problem to iterate,,,

@Public
public interface AllWindowFunction extends
Function, Serializable {

/**
 * Evaluates the window and outputs none or several elements.
 *
 * @param window The window that is being evaluated.
 * @param values The elements in the window being evaluated.
 * @param out A collector for emitting elements.
 *
 * @throws Exception The function may throw exceptions to fail the 
program
and trigger recovery.
 */
void apply(W window, IN values, Collector out) throws Exception;
}

https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AllWindowFunction.java

Best,

Hung



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/The-way-to-itearte-instances-in-AllWindowFunction-in-current-Master-branch-tp5137p5145.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Flink Streaming ContinuousTimeTriggers

2016-02-25 Thread Ankur Sharma
Hello,

Thanks for the reply.

Target: I have a stream of tuples(append-only, no updates) that are appended to 
a file. These records have priority that is changing over time and is 
calculated as a function of the tuple’s timestamp and current system time. Once 
any tuple’s priority reaches some MIN, I want to throw it away.
Using this priority I want to select the next job (given by tuple) that is 
scheduled. These tasks have to be periodically scheduled until they are removed 
from the queue completely.

Let me know if something is unclear.

Best,
Ankur Sharma
Information Systems Group
3.15 E1.1 Universität des Saarlandes
66123, Saarbrücken Germany
Email: ankur.sha...@mpi-inf.mpg.de  
an...@stud.uni-saarland.de 
> On 25 Feb 2016, at 11:05, Aljoscha Krettek  wrote:
> 
> Hi,
> in addition to the video I would also like to point out that the Continuous 
> triggers should only be used with the GlobalWindows assigner. For TimeWindows 
> the right thing to do would be using the EventTimeTrigger (or 
> ProcessingTimeTrigger in case you are doing processing time windows).
> 
> Could you maybe go into some detail of what you are trying to achieve? Then 
> we can figure out what a good strategy would be.
> 
> Cheers,
> Aljoscha
>> On 25 Feb 2016, at 04:27, Gavin Lin  wrote:
>> 
>> Hi, 
>> 
>> how about this video ? 
>> https://www.youtube.com/watch?v=T7hiwcwCXGI
>> 
>> Gavin.Lin
>> 
>> 2016-02-25 3:55 GMT+08:00 Ankur Sharma :
>> Hey,
>> 
>> Can you guide me to some example of ContinuousProcessingTimeTrigger?
>> I want to partition input stream into TimeWindow that should fire at 
>> continuous time interval on its on without waiting for a new element to 
>> enter the stream.
>> Could you guide me to it?
>> 
>> 
>> Thanks
>> 
>> Best,
>> Ankur Sharma
>> Information Systems Group
>> 3.15 E1.1 Universität des Saarlandes
>> 66123, Saarbrücken Germany
>> Email: ankur.sha...@mpi-inf.mpg.de 
>>an...@stud.uni-saarland.de
>> 



smime.p7s
Description: S/MIME cryptographic signature


AW: loss of TaskManager

2016-02-25 Thread Boden, Christoph
Hi Ufuk,

thanks for the hint. Unfortunately  I cannot access the system log on the 
remote machine. But i re-ran the job with slightly increased memory.fraction 
(0.3 -> 0.4) and got an OutOfMemory Exception again:

cloud-25
Error: com.esotericsoftware.kryo.KryoException: java.io.IOException: Failed to 
serialize element. Serialized size (> 553259005 bytes) exceeds JVM heap space
Serialization trace:
data (org.apache.flink.ml.math.SparseVector)
at com.esotericsoftware.kryo.io.Output.flush(Output.java:165)
at com.esotericsoftware.kryo.io.Output.require(Output.java:142)
at com.esotericsoftware.kryo.io.Output.writeLong(Output.java:501)
at com.esotericsoftware.kryo.io.Output.writeDouble(Output.java:630)
at com.esotericsoftware.kryo.io.Output.writeDoubles(Output.java:711)
at 
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$DoubleArraySerializer.write(DefaultArraySerializers.java:198)
at 
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$DoubleArraySerializer.write(DefaultArraySerializers.java:187)
at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:577)
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:68)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:186)
at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:89)
at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:29)
at 
org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:95)
at 
org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:90)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at 
org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:90)
at 
org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:30)
at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:89)
at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:29)
at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:89)
at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:29)
at 
org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:51)
at 
org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:76)
at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:83)
at 
org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
at 
org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
at 
org.apache.flink.runtime.operators.CrossDriver.runBlockedOuterSecond(CrossDriver.java:310)
at org.apache.flink.runtime.operators.CrossDriver.run(CrossDriver.java:162)
at 
org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
at 
org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Failed to serialize element. Serialized size (> 
553259005 bytes) exceeds JVM heap space
at 
org.apache.flink.runtime.util.DataOutputSerializer.resize(DataOutputSerializer.java:288)
at 
org.apache.flink.runtime.util.DataOutputSerializer.write(DataOutputSerializer.java:117)
at 
org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:39)
at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)
... 36 more
Caused by: java.lang.OutOfMemoryError: Java heap space
at 
org.apache.flink.runtime.util.DataOutputSerializer.resize(DataOutputSerializer.java:284)
at 
org.apache.flink.runtime.util.DataOutputSerializer.write(DataOutputSerializer.java:117)
at 
org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:39)
at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)
at com.esotericsoftware.kryo.io.Output.require(Output.java:142)
at com.esotericsoftware.kryo.io.Output.writeLong(Output.java:501)
at com.esotericsoftware.kryo.io.Output.writeDouble(Output.java:630)
at com.esotericsoftware.kryo.io.Output.writeDoubles(Output.java:711)
at 

Re: [VOTE] Release Apache Flink 1.0.0 (RC1)

2016-02-25 Thread Aljoscha Krettek
I’ll look at the usual testing stuff and also focus on testing savepoints on a 
cluster.

Btw, we don’t yet have the usual “testing checklist” document, do we?

> On 25 Feb 2016, at 12:11, Márton Balassi  wrote:
> 
> Thanks for creating the candidate Robert and for the heads-up, Slim.
> 
> I would like to get a PR [1] in before 1.0.0 as it breaks hashing behavior of 
> DataStream.keyBy. The PR has the feature implemented and the java tests 
> adopted, there is still a bit of outstanding fix for the scala tests. Gábor 
> Horváth or myself will finish it by tomorrow evening.
> 
> [1] https://github.com/apache/flink/pull/1685
> 
> Best,
> 
> Marton
> 
> On Thu, Feb 25, 2016 at 12:04 PM, Slim Baltagi  wrote:
> Dear Flink community
> 
> It is great news that the vote for the first release candidate (RC1) of 
> Apache Flink 1.0.0 is starting today February 25th, 2016!
> As a community, we need to double our efforts and make sure that Flink 1.0.0 
> is GA before these 2 upcoming major events: 
>   •  Strata + Hadoop World in San Jose on March 28-31, 2016
>   •  Hadoop Summit Europe in Dublin on April 13-14, 2016
> This is one aspect of the ‘market dynamics’ that we need to take into account 
> as a community. 
> 
> Good luck!
> 
> Slim Baltagi
> 
> On Feb 25, 2016, at 4:34 AM, Robert Metzger  wrote:
> 
>> Dear Flink community,
>> 
>> Please vote on releasing the following candidate as Apache Flink version 
>> 1.0.0.
>> 
>> I've set user@flink.apache.org on CC because users are encouraged to help 
>> testing Flink 1.0.0 for their specific use cases. Please report issues (and 
>> successful tests!) on d...@flink.apache.org.
>> 
>> 
>> The commit to be voted on 
>> (http://git-wip-us.apache.org/repos/asf/flink/commit/e4d308d6)
>> e4d308d64057e5f94bec8bbca8f67aab0ea78faa
>> 
>> Branch:
>> release-1.0.0-rc1 (see 
>> https://git1-us-west.apache.org/repos/asf/flink/repo?p=flink.git;a=shortlog;h=refs/heads/release-1.0.0-rc1)
>> 
>> The release artifacts to be voted on can be found at:
>> http://people.apache.org/~rmetzger/flink-1.0.0-rc1/
>> 
>> The release artifacts are signed with the key with fingerprint D9839159:
>> http://www.apache.org/dist/flink/KEYS
>> 
>> The staging repository for this release can be found at:
>> https://repository.apache.org/content/repositories/orgapacheflink-1063
>> 
>> -
>> 
>> The vote is open until Tuesday and passes if a majority of at least three +1 
>> PMC votes are cast.
>> 
>> The vote ends on Tuesday, March 1, 12:00 CET.
>> 
>> [ ] +1 Release this package as Apache Flink 1.0.0
>> [ ] -1 Do not release this package because ...
> 
> 



Re: loss of TaskManager

2016-02-25 Thread Ufuk Celebi
Hey Chris!

I think that the full amount of memory to Flink leads to the TM
process being killed by the OS. Can you check the OS logs whether the
OOM killer shut it down? You should be able to see this in the system
logs.

– Ufuk


On Thu, Feb 25, 2016 at 11:24 AM, Boden, Christoph
 wrote:
> Dear Flink Community,
>
> I am trying to fit a support vector machine classifier using the CoCoA 
> implementation provided in flink/ml/classification/ on a data set of moderate 
> size (400k data points, 2000 features, approx. 12GB) on a cluster of 25 nodes 
> with 28 GB memory each - and each worker node is awarded the full 28GB in  
> taskmanager.heap.mb.
>
> With the standard configuration I constantly run into different versions of 
> JVM HeapSpace OutOfMemory Errors. (e.g. 
> com.esotericsoftware.kryo.KryoException: java.io.IOException: Failed to 
> serialize element. Serialized size (> 276647402 bytes) exceeds JVM heap space 
> - Serialization trace: data (org.apache.flink.ml.math.SparseVector) ... ")
>
> As changing DOP did not alter anything, I significantly reduced the 
> taskmanager.memory.fraction. With this I now (reproducibly) run into the 
> following problem.
>
> After running for a while, the job fails with the following error:
>
> java.lang.Exception: The slot in which the task was executed has been 
> released. Probably loss of TaskManager  @ host slots - URL: 
> akka.tcp://flink@url
> 2/user/taskmanager
> at 
> org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:153)
> at 
> org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:547)
> at 
> org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:119)
> at 
> org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:547)
> at 
> org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:119)
> at 
> org.apache.flink.runtime.instance.Instance.markDead(Instance.java:154)
> at 
> org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:182)
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:421)
> at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
> at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
> at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
> at 
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
> at 
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
> at 
> org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
> at 
> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> at 
> akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46)
> at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:369)
> at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:501)
> at akka.actor.ActorCell.invoke(ActorCell.scala:486)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
> at akka.dispatch.Mailbox.run(Mailbox.scala:221)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
> at 
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
> However the log of the taskmanager in question does not show any error or 
> exception in its log. The last log entry is:
>
> 2016-02-25 09:38:12,543 INFO  
> org.apache.flink.runtime.iterative.task.IterationIntermediatePactTask  - 
> finishing iteration [2]:  Combine (Reduce at 
> org.apache.flink.ml.classification.SVM$$anon$25$$anonfun$6.apply(SVM.scala:392))
>  (3/91)
>
> I am somewhat puzzled what could be the cause of this. Any help, or pointers 
> to appropriate documentation would be greatly appreciated.
>
> I'll try increasing the heartbeat intervals next, but would still like to 
> understand what goes wrong here.
>
> Best regards,
> Christoph
>


Re: The way to itearte instances in AllWindowFunction in current Master branch

2016-02-25 Thread Aljoscha Krettek
Hi Hung,
you are right, the generic parameters of AllWindowFunction changed from 
Iterable to IN. However, in the apply function on AllWindowedStream the 
parameter changed from IN to Iterable.

What this means is that you can still do:

windowed.apply(new MyIterableWindowFunction())

and iterate over the elements in the window.

I hope that helps but please let me know if I should go into more details.

Cheers,
Aljoscha
> On 25 Feb 2016, at 11:16, HungChang  wrote:
> 
> Hi,
> 
> I would like to iterate all the instances in windows (count the events in
> the windows and show the time range of windows).
> 
> in 0.10.2 there is AllWindowFunction that can be used to iterate tuples.
> public interface AllWindowFunction extends
> Function, Serializable {
>void apply(W var1, Iterable var2, Collector var3) throws
> Exception;
> }
> 
> In the current master branch AllWindowFunction now it's not able to iterate. 
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceIterableAllWindowFunction.java
> 
> Can I ask what would be current the way to iterate the instances in windows?
> I saw there are ReduceAllWindowFunction and ReduceIterableAllWindowFunction
> but they are @internal.
> 
> Best,
> 
> Hung
> 
> 
> 
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/The-way-to-itearte-instances-in-AllWindowFunction-in-current-Master-branch-tp5137.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.



Re: [VOTE] Release Apache Flink 1.0.0 (RC1)

2016-02-25 Thread Márton Balassi
Thanks for creating the candidate Robert and for the heads-up, Slim.

I would like to get a PR [1] in before 1.0.0 as it breaks hashing behavior
of DataStream.keyBy. The PR has the feature implemented and the java tests
adopted, there is still a bit of outstanding fix for the scala tests. Gábor
Horváth or myself will finish it by tomorrow evening.

[1] https://github.com/apache/flink/pull/1685

Best,

Marton

On Thu, Feb 25, 2016 at 12:04 PM, Slim Baltagi  wrote:

> Dear Flink community
>
> It is great news that the vote for the first release candidate (RC1) of
> Apache Flink 1.0.0 is starting today February 25th, 2016!
> As a community, we need to double our efforts and make sure that Flink
> 1.0.0 is GA before these 2 upcoming major events:
>
>-  Strata + Hadoop World in San Jose on *March 28-31, 2016*
>-  Hadoop Summit Europe in Dublin on *April 13-14, 2016*
>
> This is one aspect of the ‘market dynamics’ that we need to take into
> account as a community.
>
> Good luck!
>
> Slim Baltagi
>
> On Feb 25, 2016, at 4:34 AM, Robert Metzger  wrote:
>
> Dear Flink community,
>
> Please vote on releasing the following candidate as Apache Flink version
> 1.0.0.
>
> I've set user@flink.apache.org on CC because users are encouraged to help
> testing Flink 1.0.0 for their specific use cases. Please report issues (and
> successful tests!) on d...@flink.apache.org.
>
>
> The commit to be voted on (
> http://git-wip-us.apache.org/repos/asf/flink/commit/e4d308d6)
> e4d308d64057e5f94bec8bbca8f67aab0ea78faa
>
> Branch:
> release-1.0.0-rc1 (see
> https://git1-us-west.apache.org/repos/asf/flink/repo?p=flink.git;a=shortlog;h=refs/heads/release-1.0.0-rc1
> )
>
> The release artifacts to be voted on can be found at:
> http://people.apache.org/~rmetzger/flink-1.0.0-rc1/
>
> The release artifacts are signed with the key with fingerprint D9839159:
> http://www.apache.org/dist/flink/KEYS
>
> The staging repository for this release can be found at:
> https://repository.apache.org/content/repositories/orgapacheflink-1063
>
> -
>
> The vote is open until Tuesday and passes if a majority of at least three
> +1 PMC votes are cast.
>
> The vote ends on Tuesday, March 1, 12:00 CET.
>
> [ ] +1 Release this package as Apache Flink 1.0.0
> [ ] -1 Do not release this package because ...
>
>
>


Re: [VOTE] Release Apache Flink 1.0.0 (RC1)

2016-02-25 Thread Slim Baltagi
Dear Flink community

It is great news that the vote for the first release candidate (RC1) of Apache 
Flink 1.0.0 is starting today February 25th, 2016!
As a community, we need to double our efforts and make sure that Flink 1.0.0 is 
GA before these 2 upcoming major events: 
 Strata + Hadoop World in San Jose on March 28-31, 2016
 Hadoop Summit Europe in Dublin on April 13-14, 2016
This is one aspect of the ‘market dynamics’ that we need to take into account 
as a community. 

Good luck!

Slim Baltagi

On Feb 25, 2016, at 4:34 AM, Robert Metzger  wrote:

> Dear Flink community,
> 
> Please vote on releasing the following candidate as Apache Flink version 
> 1.0.0.
> 
> I've set user@flink.apache.org on CC because users are encouraged to help 
> testing Flink 1.0.0 for their specific use cases. Please report issues (and 
> successful tests!) on d...@flink.apache.org.
> 
> 
> The commit to be voted on 
> (http://git-wip-us.apache.org/repos/asf/flink/commit/e4d308d6)
> e4d308d64057e5f94bec8bbca8f67aab0ea78faa
> 
> Branch:
> release-1.0.0-rc1 (see 
> https://git1-us-west.apache.org/repos/asf/flink/repo?p=flink.git;a=shortlog;h=refs/heads/release-1.0.0-rc1)
> 
> The release artifacts to be voted on can be found at:
> http://people.apache.org/~rmetzger/flink-1.0.0-rc1/
> 
> The release artifacts are signed with the key with fingerprint D9839159:
> http://www.apache.org/dist/flink/KEYS
> 
> The staging repository for this release can be found at:
> https://repository.apache.org/content/repositories/orgapacheflink-1063
> 
> -
> 
> The vote is open until Tuesday and passes if a majority of at least three +1 
> PMC votes are cast.
> 
> The vote ends on Tuesday, March 1, 12:00 CET.
> 
> [ ] +1 Release this package as Apache Flink 1.0.0
> [ ] -1 Do not release this package because ...



Re: Kafka partition alignment for event time

2016-02-25 Thread Erdem Agaoglu
Hi Robert,

I switched to SNAPSHOT and confirm that it works. Thanks!

On Thu, Feb 25, 2016 at 10:50 AM, Robert Metzger 
wrote:

> Hi Erdem,
>
> FLINK-3440 has been resolved. The fix is merged to master. 1.0-SNAPSHOT
> should already contain the fix and it'll be in 1.0.0 (for which I'll post a
> release candidate today) as well.
>
> On Thu, Feb 18, 2016 at 3:24 PM, Erdem Agaoglu 
> wrote:
>
>> Thanks Stephan
>>
>> On Thu, Feb 18, 2016 at 3:00 PM, Stephan Ewen  wrote:
>>
>>> You are right, the checkpoints should contain all offsets.
>>>
>>> I created a Ticket for this:
>>> https://issues.apache.org/jira/browse/FLINK-3440
>>>
>>>
>>>
>>>
>>> On Thu, Feb 18, 2016 at 10:15 AM, agaoglu 
>>> wrote:
>>>
 Hi,

 On a related and a more exaggerated setup, our kafka-producer (flume)
 seems
 to send data to a single partition at a time and switches it every few
 minutes. So when i run my flink datastream program for the first time,
 it
 starts on the *largest* offsets and shows something like this:

 . Fetched the following start offsets [FetchPartition {partition=7,
 offset=15118832832}]
 . Fetched the following start offsets [FetchPartition {partition=1,
 offset=15203613236}]
 . Fetched the following start offsets [FetchPartition {partition=2,
 offset=15366811664}]
 . Fetched the following start offsets [FetchPartition {partition=0,
 offset=15393999709}]
 . Fetched the following start offsets [FetchPartition {partition=8,
 offset=15319475583}]
 . Fetched the following start offsets [FetchPartition {partition=5,
 offset=15482889767}]
 . Fetched the following start offsets [FetchPartition {partition=6,
 offset=15113885928}]
 . Fetched the following start offsets [FetchPartition {partition=3,
 offset=15182701991}]
 . Fetched the following start offsets [FetchPartition {partition=4,
 offset=15186569356}]

 For that instance flume happens to be sending data to partition-6 only,
 so
 other consumers sit idly. Working with default paralellism 4, only one
 of
 the 4 threads is able to source data and checkpointing logs reflect
 that:

 Committing offsets [-915623761776, -915623761776, -915623761776,
 -915623761776, -915623761776, -915623761776, -915623761776,
 -915623761776,
 -915623761776] to offset store: FLINK_ZOOKEEPER
 Committing offsets [-915623761776, -915623761776, -915623761776,
 -915623761776, -915623761776, -915623761776, 15114275927, -915623761776,
 -915623761776] to offset store: FLINK_ZOOKEEPER
 Committing offsets [-915623761776, -915623761776, -915623761776,
 -915623761776, -915623761776, -915623761776, -915623761776,
 -915623761776,
 -915623761776] to offset store: FLINK_ZOOKEEPER
 Committing offsets [-915623761776, -915623761776, -915623761776,
 -915623761776, -915623761776, -915623761776, -915623761776,
 -915623761776,
 -915623761776] to offset store: FLINK_ZOOKEEPER

 This also means checkpoint will only contain the offset for
 partition-6. So
 if program is stopped and restarted at a later time, it restores the
 offset
 for partition-6 only and other partitions are started at the largest
 offset.
 So it's able to process unseen data in partition-6 but not others. Say
 if
 flume produces data to partition-3 when flink program is stopped,
 they're
 lost, while the data in partition-6 is not. This generally causes
 multiple
 (late-)windows to be fired after restart, because we now generate
 watermarks
 off partition-3 which says the windows of the unseen data in
 partition-6 are
 already complete.

 This also has a side effect of windows not triggering unless some
 rebalancing is done beforehand. Since only 1 of the 4 threads will
 source
 data and generate watermarks, window triggers won't get watermarks from
 other 3 sources and wait long past the watermarks generated from the
 single
 source.

 I know producers shouldn't work like that, but consumers shouldn't
 care. I
 think it may also create some edge cases even if things were not as
 extreme
 as ours. If checkpoints could contain offsets of all of the partitions
 regardless of their contents, probably storing start offsets in first
 run, i
 guess that would solve the problems around restarting.



 --
 View this message in context:
 http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-partition-alignment-for-event-time-tp4782p4998.html
 Sent from the Apache Flink User Mailing List archive. mailing list
 archive at Nabble.com.

>>>
>>>
>>
>>
>> --
>> erdem agaoglu
>>
>
>


-- 
erdem agaoglu


Re: Kafka partition alignment for event time

2016-02-25 Thread Robert Metzger
Hi Erdem,

FLINK-3440 has been resolved. The fix is merged to master. 1.0-SNAPSHOT
should already contain the fix and it'll be in 1.0.0 (for which I'll post a
release candidate today) as well.

On Thu, Feb 18, 2016 at 3:24 PM, Erdem Agaoglu 
wrote:

> Thanks Stephan
>
> On Thu, Feb 18, 2016 at 3:00 PM, Stephan Ewen  wrote:
>
>> You are right, the checkpoints should contain all offsets.
>>
>> I created a Ticket for this:
>> https://issues.apache.org/jira/browse/FLINK-3440
>>
>>
>>
>>
>> On Thu, Feb 18, 2016 at 10:15 AM, agaoglu 
>> wrote:
>>
>>> Hi,
>>>
>>> On a related and a more exaggerated setup, our kafka-producer (flume)
>>> seems
>>> to send data to a single partition at a time and switches it every few
>>> minutes. So when i run my flink datastream program for the first time, it
>>> starts on the *largest* offsets and shows something like this:
>>>
>>> . Fetched the following start offsets [FetchPartition {partition=7,
>>> offset=15118832832}]
>>> . Fetched the following start offsets [FetchPartition {partition=1,
>>> offset=15203613236}]
>>> . Fetched the following start offsets [FetchPartition {partition=2,
>>> offset=15366811664}]
>>> . Fetched the following start offsets [FetchPartition {partition=0,
>>> offset=15393999709}]
>>> . Fetched the following start offsets [FetchPartition {partition=8,
>>> offset=15319475583}]
>>> . Fetched the following start offsets [FetchPartition {partition=5,
>>> offset=15482889767}]
>>> . Fetched the following start offsets [FetchPartition {partition=6,
>>> offset=15113885928}]
>>> . Fetched the following start offsets [FetchPartition {partition=3,
>>> offset=15182701991}]
>>> . Fetched the following start offsets [FetchPartition {partition=4,
>>> offset=15186569356}]
>>>
>>> For that instance flume happens to be sending data to partition-6 only,
>>> so
>>> other consumers sit idly. Working with default paralellism 4, only one of
>>> the 4 threads is able to source data and checkpointing logs reflect that:
>>>
>>> Committing offsets [-915623761776, -915623761776, -915623761776,
>>> -915623761776, -915623761776, -915623761776, -915623761776,
>>> -915623761776,
>>> -915623761776] to offset store: FLINK_ZOOKEEPER
>>> Committing offsets [-915623761776, -915623761776, -915623761776,
>>> -915623761776, -915623761776, -915623761776, 15114275927, -915623761776,
>>> -915623761776] to offset store: FLINK_ZOOKEEPER
>>> Committing offsets [-915623761776, -915623761776, -915623761776,
>>> -915623761776, -915623761776, -915623761776, -915623761776,
>>> -915623761776,
>>> -915623761776] to offset store: FLINK_ZOOKEEPER
>>> Committing offsets [-915623761776, -915623761776, -915623761776,
>>> -915623761776, -915623761776, -915623761776, -915623761776,
>>> -915623761776,
>>> -915623761776] to offset store: FLINK_ZOOKEEPER
>>>
>>> This also means checkpoint will only contain the offset for partition-6.
>>> So
>>> if program is stopped and restarted at a later time, it restores the
>>> offset
>>> for partition-6 only and other partitions are started at the largest
>>> offset.
>>> So it's able to process unseen data in partition-6 but not others. Say if
>>> flume produces data to partition-3 when flink program is stopped, they're
>>> lost, while the data in partition-6 is not. This generally causes
>>> multiple
>>> (late-)windows to be fired after restart, because we now generate
>>> watermarks
>>> off partition-3 which says the windows of the unseen data in partition-6
>>> are
>>> already complete.
>>>
>>> This also has a side effect of windows not triggering unless some
>>> rebalancing is done beforehand. Since only 1 of the 4 threads will source
>>> data and generate watermarks, window triggers won't get watermarks from
>>> other 3 sources and wait long past the watermarks generated from the
>>> single
>>> source.
>>>
>>> I know producers shouldn't work like that, but consumers shouldn't care.
>>> I
>>> think it may also create some edge cases even if things were not as
>>> extreme
>>> as ours. If checkpoints could contain offsets of all of the partitions
>>> regardless of their contents, probably storing start offsets in first
>>> run, i
>>> guess that would solve the problems around restarting.
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-partition-alignment-for-event-time-tp4782p4998.html
>>> Sent from the Apache Flink User Mailing List archive. mailing list
>>> archive at Nabble.com.
>>>
>>
>>
>
>
> --
> erdem agaoglu
>