Re: How to perform this join operation?

2016-05-20 Thread Elias Levy
Till,

An issue with your suggestion is that the job state may grow unbounded. You
are managing
expiration of data from the cache in the operator, but the state is
partitioned by the stream key.
That means if we no longer observe a key, the state associated with that
key will never be
removed.

In my data set keys come and go, and many will never be observed again.
That will lead to
continuous state growth over time.


On Mon, May 2, 2016 at 6:06 PM, Elias Levy 
wrote:

> Thanks for the suggestion.  I ended up implementing it a different way.
>
> [...]
>
> On Wed, Apr 20, 2016 at 10:13 AM, Till Rohrmann 
> wrote:
>
>> orry for the late reply. You're right that with the windowed join you
>> would have to deal with pairs where the timestamp of (x,y) is not
>> necessarily earlier than the timestamp of z. Moreover, by using sliding
>> windows you would receive duplicates as you've described. Using tumbling
>> windows would mean that you lose join matches if (x,y) lives in an earlier
>> window. Thus, in order to solve your problem you would have to write a
>> custom stream operator.
>>
>> The stream operator would do the following: Collecting the inputs from
>> (x,y) and z which are already keyed. Thus, we know that x=z holds true.
>> Using a priority queue we order the elements because we don't know how the
>> arrive at the operator. Whenever we receive a watermark indicating that no
>> earlier events can arrive anymore, we can go through the two priority
>> queues to join the elements. The queues are part of the operators state so
>> that we don't lose information in case of a recovery.
>>
>> I've sketched such an operator here [1]. I hope this helps you to get
>> started.
>>
>
>
>


Re: Weird Kryo exception (Unable to find class: java.ttil.HashSet)

2016-05-20 Thread Flavio Pompermaier
Right now I'm using Flink 1.0.2...to which version should I downgrade?
The hardware seems to be ok..how could I detect a faulty hardware?
These errors appeared in every run of my job after I moved the temporary
directory from ssd to hdd and I extended my pipeline with a dataset that
grows as the pipeline goes on,accumulating data from intermediate datasets..
On 20 May 2016 18:31, "Fabian Hueske"  wrote:

> The problem seems to occur quite often.
> Did you update your Flink version recently? If so, could you try to
> downgrade and see if the problem disappears.
>
> Is it otherwise possible that it is cause by faulty hardware?
>
> 2016-05-20 18:05 GMT+02:00 Flavio Pompermaier :
>
>> This time (Europed instead of Europe):
>>
>> java.lang.Exception: The data preparation for task 'CHAIN GroupReduce 
>> (GroupReduce at createResult(PassaggioWithComprAndVend.java:132)) -> Map 
>> (Key Extractor)' , caused an error: Error obtaining the sorted input: Thread 
>> 'SortMerger spilling thread' terminated due to an exception: The datetime 
>> zone id 'Europd/Rome' is not recognised
>>  at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:456)
>>  at 
>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>  at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.lang.RuntimeException: Error obtaining the sorted input: 
>> Thread 'SortMerger spilling thread' terminated due to an exception: The 
>> datetime zone id 'Europd/Rome' is not recognised
>>  at 
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
>>  at 
>> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
>>  at 
>> org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)
>>  at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
>>  ... 3 more
>> Caused by: java.io.IOException: Thread 'SortMerger spilling thread' 
>> terminated due to an exception: The datetime zone id 'Europd/Rome' is not 
>> recognised
>>  at 
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
>> Caused by: java.lang.IllegalArgumentException: The datetime zone id 
>> 'Europd/Rome' is not recognised
>>  at org.joda.time.DateTimeZone.forID(DateTimeZone.java:229)
>>  at 
>> de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer.readTimeZone(JodaDateTimeSerializer.java:94)
>>  at 
>> de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer.read(JodaDateTimeSerializer.java:74)
>>  at 
>> de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer.read(JodaDateTimeSerializer.java:59)
>>  at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>>  at 
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
>>  at 
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:242)
>>  at 
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:252)
>>  at 
>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:556)
>>  at 
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:75)
>>  at 
>> org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
>>  at 
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
>>  at 
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
>>
>>
>>
>> On Fri, May 20, 2016 at 4:25 PM, Flavio Pompermaier > > wrote:
>>
>>> This time another error (rerialization instead of serialization):
>>>
>>> com.esotericsoftware.kryo.KryoException: Unable to find class: 
>>> it.okkam.flink.entitons.*rerialization*.pojo.EntitonQuadPojo
>>> at 
>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
>>> at 
>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
>>> at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>>> at 
>>> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
>>> at 
>>> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>>> at 
>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
>>> at 
>>> 

Re: inheritance of Program interface in Program.java in org.apache.flink.api.common

2016-05-20 Thread 윤형덕
 then this.program of PackageProgram's object has null. [1]
in run() of CliFrontend.java, invokes executeProgramBlocking(program, client, 
userParallelism) [2]
and this method invokes client.runBlocking(program, parallelism) [3]
and this method invokes runBlocking(prog.getPlanWithJars(), parallelism, 
prog.getSavepointPath()) that is overloaded function[4]
and prog.getPlanWithJars() invokes getPlan() of PackagedProgram.java[5]
and getPlan() invokes createPlanFromProgram(this.program, this.args); [6]
and createPlanFromProgram's source is folowing
 
private static Plan createPlanFromProgram(Program program, String[] options) 
throws ProgramInvocationException {
try {
  return program.getPlan(options);
} catch (Throwable t) {
  throw new ProgramInvocationException("Error while calling the program: " 
+ t.getMessage(), t);
}
}
 
as we checked, this.program has null. so if we invoke program.getPlan(options), 
exception will happen.
but when i runned program, exception didn't occur.
i'd appreciate if you explain this.
 
[1]
else if (hasMainMethod(mainClass)) {
this.program = null;
 
[2]
if (options.getDetachedMode() || (yarnCluster != null  
yarnCluster.isDetached())) {
 exitCode = executeProgramDetached(program, client, userParallelism);
} else {
 exitCode = executeProgramBlocking(program, client, userParallelism);
}
 
[3]
protected int executeProgramBlocking(PackagedProgram program, Client client, 
int parallelism) {
  LOG.info("Starting execution of program");
  JobSubmissionResult result;
  try {
 result = client.runBlocking(program, parallelism);
  }
 
[4]
public JobSubmissionResult runBlocking(PackagedProgram prog, int parallelism) 
throws ProgramInvocationException {
  Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader());
  if (prog.isUsingProgramEntryPoint()) {
return runBlocking(prog.getPlanWithJars(), parallelism, 
prog.getSavepointPath());
  }
 
[5]
public JobWithJars getPlanWithJars() throws ProgramInvocationException {
  if (isUsingProgramEntryPoint()) {
 return new JobWithJars(getPlan(), getAllLibraries(), classpaths, 
userCodeClassLoader);
 
[6]
private Plan getPlan() throws ProgramInvocationException {
  if (this.plan == null) {
 Thread.currentThread().setContextClassLoader(this.userCodeClassLoader);
 this.plan = createPlanFromProgram(this.program, this.args);
  }
  return this.plan;
}
 
 
-Original Message-
From: "Ufuk Celebi"u...@apache.org 
To: user@flink.apache.org; "윤형덕"ynoo...@naver.com; 
Cc: 
Sent: 2016-05-20 (금) 19:30:01
Subject: Re: inheritance of Program interface in Program.java in 
org.apache.flink.api.common
 

On Thu, May 19, 2016 at 4:46 PM, 윤형덕 ynoo...@naver.com wrote:

 how can this.mainClass that doesn't override getPlan method that is 
abstract method of Program interface(program.class) and has only static main 
method be instantiate as Program?


This is only called if the class is actually a subclass of Program. That's why 
there is the `isAssignable` check. Otherwise, we check that there is a 
mainMethod (else if (hasMainMethod(mainClass)). If this is not the case, a 
ProgramInvocationException is thrown.




Logging Exceptions

2016-05-20 Thread David Kim
Hello!

Using flink 1.0.2, I noticed that exceptions thrown during a flink program
would show up on the flink dashboard in the 'Exceptions' tab. That's great!

However, I don't think flink currently logs this same exception. I was
hoping there would be an equivalent `log.error` call so that third party
logging frameworks can also act upon such errors.

If this currently the known behavior, would it be troublesome to also make
a `log.error` call around the code that is responsible for sending the
exception to the dashboard?

If there's a misconfiguration on my end, let me know!

Thanks!
David


Re: Weird Kryo exception (Unable to find class: java.ttil.HashSet)

2016-05-20 Thread Fabian Hueske
The problem seems to occur quite often.
Did you update your Flink version recently? If so, could you try to
downgrade and see if the problem disappears.

Is it otherwise possible that it is cause by faulty hardware?

2016-05-20 18:05 GMT+02:00 Flavio Pompermaier :

> This time (Europed instead of Europe):
>
> java.lang.Exception: The data preparation for task 'CHAIN GroupReduce 
> (GroupReduce at createResult(PassaggioWithComprAndVend.java:132)) -> Map (Key 
> Extractor)' , caused an error: Error obtaining the sorted input: Thread 
> 'SortMerger spilling thread' terminated due to an exception: The datetime 
> zone id 'Europd/Rome' is not recognised
>   at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:456)
>   at 
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException: Error obtaining the sorted input: 
> Thread 'SortMerger spilling thread' terminated due to an exception: The 
> datetime zone id 'Europd/Rome' is not recognised
>   at 
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
>   at 
> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
>   at 
> org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)
>   at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
>   ... 3 more
> Caused by: java.io.IOException: Thread 'SortMerger spilling thread' 
> terminated due to an exception: The datetime zone id 'Europd/Rome' is not 
> recognised
>   at 
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
> Caused by: java.lang.IllegalArgumentException: The datetime zone id 
> 'Europd/Rome' is not recognised
>   at org.joda.time.DateTimeZone.forID(DateTimeZone.java:229)
>   at 
> de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer.readTimeZone(JodaDateTimeSerializer.java:94)
>   at 
> de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer.read(JodaDateTimeSerializer.java:74)
>   at 
> de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer.read(JodaDateTimeSerializer.java:59)
>   at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>   at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
>   at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:242)
>   at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:252)
>   at 
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:556)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:75)
>   at 
> org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
>   at 
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
>   at 
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
>
>
>
> On Fri, May 20, 2016 at 4:25 PM, Flavio Pompermaier 
> wrote:
>
>> This time another error (rerialization instead of serialization):
>>
>> com.esotericsoftware.kryo.KryoException: Unable to find class: 
>> it.okkam.flink.entitons.*rerialization*.pojo.EntitonQuadPojo
>>  at 
>> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
>>  at 
>> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
>>  at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>>  at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>>  at 
>> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
>>  at 
>> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
>>  at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>>  at 
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
>>  at 
>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:431)
>>  at 
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:135)
>>  at 
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>>  at 
>> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>>  at 
>> 

Re: Weird Kryo exception (Unable to find class: java.ttil.HashSet)

2016-05-20 Thread Flavio Pompermaier
This time (Europed instead of Europe):

java.lang.Exception: The data preparation for task 'CHAIN GroupReduce
(GroupReduce at createResult(PassaggioWithComprAndVend.java:132)) ->
Map (Key Extractor)' , caused an error: Error obtaining the sorted
input: Thread 'SortMerger spilling thread' terminated due to an
exception: The datetime zone id 'Europd/Rome' is not recognised
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:456)
at 
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Error obtaining the sorted
input: Thread 'SortMerger spilling thread' terminated due to an
exception: The datetime zone id 'Europd/Rome' is not recognised
at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
at 
org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
at 
org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
... 3 more
Caused by: java.io.IOException: Thread 'SortMerger spilling thread'
terminated due to an exception: The datetime zone id 'Europd/Rome' is
not recognised
at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
Caused by: java.lang.IllegalArgumentException: The datetime zone id
'Europd/Rome' is not recognised
at org.joda.time.DateTimeZone.forID(DateTimeZone.java:229)
at 
de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer.readTimeZone(JodaDateTimeSerializer.java:94)
at 
de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer.read(JodaDateTimeSerializer.java:74)
at 
de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer.read(JodaDateTimeSerializer.java:59)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:242)
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:252)
at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:556)
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:75)
at 
org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)



On Fri, May 20, 2016 at 4:25 PM, Flavio Pompermaier 
wrote:

> This time another error (rerialization instead of serialization):
>
> com.esotericsoftware.kryo.KryoException: Unable to find class: 
> it.okkam.flink.entitons.*rerialization*.pojo.EntitonQuadPojo
>   at 
> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
>   at 
> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
>   at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>   at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>   at 
> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
>   at 
> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
>   at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>   at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
>   at 
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:431)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:135)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>   at 
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>   at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
>   at 
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
>   at 
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
>   at 
> 

Re: Run jar job in local cluster

2016-05-20 Thread rafal green
Nope - thanks for advice ;) I will try that tomorrow

2016-05-20 11:24 GMT+02:00 Ufuk Celebi :

> I would suggest to set the log level to DEBUG and check the logs why
> the client can not connect to your elastic search cluster. Did you try
> that?
>
> On Mon, May 9, 2016 at 3:54 PM, rafal green 
> wrote:
> > Dear Sir or Madam,
> >
> > Can you tell me why I have a problem with elasticsearch in local cluster?
> >
> > I analysed this example:
> >
> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/connectors/elasticsearch2.html
> >
> > My flink and elasticsearch config are default (only I change node.name
> to
> > "node-1")
> >
> > This example run on my IntelliJIdea 15 but on local cluster I have a
> > problem. Of course WordCount and SocketTextStreamWordCount works fine.
> >
> >
> > I spend 2 days to try find solution (With uncle google ;) ) but It's not
> > easy
> >
> > val config = new java.util.HashMap[String, String]
> > config.put("bulk.flush.max.actions", "1")
> > config.put("cluster.name", "elasticsearch")
> > config.put("path.home",
> >
> "/media/user/e5e05ab5-28f3-4cee-a57c-444e32b99f04/thesis/elasticsearch-2.3.2")
> >
> > val transports = new util.ArrayList[InetSocketAddress]
> > transports.add(new
> > InetSocketAddress(InetAddress.getByName("127.0.0.1"),9300))
> >
> >
> >
> > Error output:
> >
> > java.lang.RuntimeException: Client is not connected to any Elasticsearch
> > nodes!
> > at
> >
> org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:172)
> > at
> >
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
> > at
> >
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:317)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:215)
> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:579)
> > at java.lang.Thread.run(Thread.java:745)
> >
> > 05/08/2016 22:57:02 Job execution switched to status FAILING.
> > java.lang.RuntimeException: Client is not connected to any Elasticsearch
> > nodes!
> > at
> >
> org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:172)
> > at
> >
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
> > at
> >
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:317)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:215)
> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:579)
> > at java.lang.Thread.run(Thread.java:745)
> > 05/08/2016 22:57:02 Job execution switched to status FAILED.
> >
> > 
> >  The program finished with the following exception:
> >
> > org.apache.flink.client.program.ProgramInvocationException: The program
> > execution failed: Job execution failed.
> > at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
> > at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
> > at
> >
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:65)
> > at
> >
> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:541)
> > at com.pl.greeny.flink.TwitterAnalysis$.main(TwitterAnalysis.scala:69)
> > at com.pl.greeny.flink.TwitterAnalysis.main(TwitterAnalysis.scala)
> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> > at
> >
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> > at
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> > at java.lang.reflect.Method.invoke(Method.java:498)
> > at
> >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
> > at
> >
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
> > at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
> > at
> >
> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:860)
> > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:327)
> > at
> >
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1187)
> > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1238)
> > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
> > execution failed.
> > at
> >
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:807)
> > at
> >
> 

[RichFlattMapfunction] Configuration File

2016-05-20 Thread simon peyer
Hi folks

I'm extending a RichFlatMapFunction in order to use states on a keyed stream.
Concerning this i have two questions:

1. I have a  var state_item: ValueState[Option[String]] as a local variable in 
this class. Initialized with state_item = getRuntimeContext.getState(new 
ValueStateDescriptor. in the open function.
Is the field state_item for every key different?

In other words if I have a key with val1 and val2 will these get two different 
states?


2. The open function contains a  override def open(conf: Configuration) 
configuration.
Is there a way to input a custom configuration in there?

Thanks Simon

Re: Weird Kryo exception (Unable to find class: java.ttil.HashSet)

2016-05-20 Thread Flavio Pompermaier
This time another error (rerialization instead of serialization):

com.esotericsoftware.kryo.KryoException: Unable to find class:
it.okkam.flink.entitons.*rerialization*.pojo.EntitonQuadPojo
at 
com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
at 
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
at 
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
at 
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:431)
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:135)
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
at 
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
at 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
at 
org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
at 
org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
at 
org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
at 
org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:101)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
at 
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException:
it.okkam.flink.entitons.rerialization.pojo.EntitonQuadPojo
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at 
com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136)
... 20 more



On Fri, May 20, 2016 at 12:48 PM, Flavio Pompermaier 
wrote:

> Hi Ufuk,
> my records could be quite large Pojos (I think some MB).
> The only thing I do to configure Kryo is:
>
>  env.registerTypeWithKryoSerializer(DateTime.class,
> JodaDateTimeSerializer.class );
>
> Best,
> Flavio
>
> On Fri, May 20, 2016 at 12:38 PM, Ufuk Celebi  wrote:
>
>> @Stefano: the records are serialized anyway for batch jobs. The
>> spilling deserializer is only relevant if single records are very
>> large. How large are your records? In any case, I don't expect this to
>> be the problem.
>>
>> @Flavio: The class name "typo" errors (Vdhicle instead of Vehicle and
>> ttil instead of util) look like some kind of data corruption and would
>> need further investigation. The other failure you reported might be
>> related to this. As a starting point, how do you configure the Kryo
>> serializer?
>>
>> On Fri, May 20, 2016 at 10:02 AM, Flavio Pompermaier
>>  wrote:
>> > Today I've got this other strange error.. Obviously I don't have a
>> > VdhicleEvent class, but a VehicleEvent class :(
>> >
>> > java.lang.RuntimeException: Cannot instantiate class.
>> >   at
>> >
>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:407)
>> >   at
>> >
>> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>> >   at
>> >
>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
>> >   at
>> >
>> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
>> >   at
>> >
>> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
>> >   at
>> >
>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
>> >   at
>> >
>> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:101)
>> >   at
>> 

Re: TypeExtractor:1672 - class org.joda.time.DateTime is not a valid POJO type

2016-05-20 Thread Flavio Pompermaier
Ok..great to hear that!

Thanks to all for the support

On Fri, May 20, 2016 at 3:53 PM, Ufuk Celebi  wrote:

> On Fri, May 20, 2016 at 3:27 PM, Aljoscha Krettek 
> wrote:
> > I think it might just be a warning. When using Kryo it is in the end a
> > GenericTypeInformation but the TypeAnalyzer might still initially try to
> > analyze it as a POJO.
>
> Yes, that's correct.
>
> To get back at the initial question: it's safe to ignore it then. ;)
>


Re: TypeExtractor:1672 - class org.joda.time.DateTime is not a valid POJO type

2016-05-20 Thread Ufuk Celebi
On Fri, May 20, 2016 at 3:27 PM, Aljoscha Krettek  wrote:
> I think it might just be a warning. When using Kryo it is in the end a
> GenericTypeInformation but the TypeAnalyzer might still initially try to
> analyze it as a POJO.

Yes, that's correct.

To get back at the initial question: it's safe to ignore it then. ;)


Re: TypeExtractor:1672 - class org.joda.time.DateTime is not a valid POJO type

2016-05-20 Thread Flavio Pompermaier
Indeed you can test this problem with:

import org.apache.flink.api.java.ExecutionEnvironment;
import org.joda.time.DateTime;

import de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer;

public class DateTimeError {

public static void main(String[] args) throws Exception {
ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
env.registerTypeWithKryoSerializer(DateTime.class,
JodaDateTimeSerializer.class );
env.fromElements(DateTime.now(), DateTime.now()).print();
}
}


The very first line of the output is:

2016-05-20 15:39:02 INFO  TypeExtractor:1672 - class org.joda.time.DateTime
is not a valid POJO type

So the program actually runs successfully but there's this annoying LOG
statement :(

On Fri, May 20, 2016 at 3:27 PM, Aljoscha Krettek 
wrote:

> I think it might just be a warning. When using Kryo it is in the end a
> GenericTypeInformation but the TypeAnalyzer might still initially try to
> analyze it as a POJO.
>
> On Fri, 20 May 2016 at 12:51 Ufuk Celebi  wrote:
>
>> I tried to reproduce this and if you set up the depdency
>>
>> 
>>de.javakaffee
>>kryo-serializers
>>0.28
>>
>>
>> and register the Kryo type as suggested you should not see any log
>> messages (e.g. the type should be treated as a generic type and not a
>> pojo type).
>>
>> Does the program successfully execute for you?
>>
>> – Ufuk
>>
>>
>> On Thu, May 19, 2016 at 5:49 PM, Flavio Pompermaier
>>  wrote:
>> > Hi to all,
>> >
>> > I'm using Flink 1.0.2 and testing the job I discovered that I have a
>> lot of
>> > log with this error:
>> >
>> > TypeExtractor:1672 - class org.joda.time.DateTime is not a valid POJO
>> type
>> >
>> > initially I though I forgot to properly migrate my code from 0.10.x to
>> 1.0.x
>> > as stated in [1] but the I checked my code and i correctly build the
>> > ExecutionEnvironment as:
>> >
>> > env = ExecutionEnvironment.createLocalEnvironment(c);
>> > env.registerTypeWithKryoSerializer(DateTime.class,
>> > JodaDateTimeSerializer.class );
>> >
>> > So, is this log to ignore or I have a problem? Should it be put to
>> debug or
>> > not printed at all?
>> >
>> > [1]
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/Migration+Guide%3A+0.10.x+to+1.0.x
>> >
>> > Best,
>> > Flavio
>>
>


Re: flink async snapshots

2016-05-20 Thread Aljoscha Krettek
Thats correct. With the fully async option the checkpoints take longer but
you don't impact ongoing processing of elements. With the semi-async method
snapshots take a shorter time but during the synchronous part no element
processing can happen.

On Fri, 20 May 2016 at 15:04 Abhishek Singh 
wrote:

> Yes. Thanks for explaining.
>
> On Friday, May 20, 2016, Ufuk Celebi  wrote:
>
>> On Thu, May 19, 2016 at 8:54 PM, Abhishek R. Singh
>>  wrote:
>> > If you can take atomic in-memory copies, then it works (at the cost of
>> > doubling your instantaneous memory). For larger state (say rocks DB),
>> won’t
>> > you have to stop the world (atomic snapshot) and make a copy? Doesn’t
>> that
>> > make it synchronous, instead of background/async?
>>
>> Hey Abhishek,
>>
>> that's correct. There are two variants for RocksDB:
>>
>> - semi-async (default): snapshot is taking via RocksDB backup feature
>> to backup to a directory (sync). This is then copied to the final
>> checkpoint location (async, e.g copy to HDFS).
>>
>> - fully-async: snapshot is taking via RocksDB snapshot feature (sync,
>> but no full copy and essentially "free"). With this snapshot we
>> iterate over all k/v-pairs and copy them to the final checkpoint
>> location (async, e.g. copy to HDFS).
>>
>> You enable the second variant via:
>> rocksDbBackend.enableFullyAsyncSnapshots();
>>
>> This is only part of the 1.1-SNAPSHOT version though.
>>
>> I'm not too familiar with the performance characteristics of both
>> variants, but maybe Aljoscha can chime in.
>>
>> Does this clarify things for you?
>>
>> – Ufuk
>>
>


Re: TypeExtractor:1672 - class org.joda.time.DateTime is not a valid POJO type

2016-05-20 Thread Aljoscha Krettek
I think it might just be a warning. When using Kryo it is in the end a
GenericTypeInformation but the TypeAnalyzer might still initially try to
analyze it as a POJO.

On Fri, 20 May 2016 at 12:51 Ufuk Celebi  wrote:

> I tried to reproduce this and if you set up the depdency
>
> 
>de.javakaffee
>kryo-serializers
>0.28
>
>
> and register the Kryo type as suggested you should not see any log
> messages (e.g. the type should be treated as a generic type and not a
> pojo type).
>
> Does the program successfully execute for you?
>
> – Ufuk
>
>
> On Thu, May 19, 2016 at 5:49 PM, Flavio Pompermaier
>  wrote:
> > Hi to all,
> >
> > I'm using Flink 1.0.2 and testing the job I discovered that I have a lot
> of
> > log with this error:
> >
> > TypeExtractor:1672 - class org.joda.time.DateTime is not a valid POJO
> type
> >
> > initially I though I forgot to properly migrate my code from 0.10.x to
> 1.0.x
> > as stated in [1] but the I checked my code and i correctly build the
> > ExecutionEnvironment as:
> >
> > env = ExecutionEnvironment.createLocalEnvironment(c);
> > env.registerTypeWithKryoSerializer(DateTime.class,
> > JodaDateTimeSerializer.class );
> >
> > So, is this log to ignore or I have a problem? Should it be put to debug
> or
> > not printed at all?
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/Migration+Guide%3A+0.10.x+to+1.0.x
> >
> > Best,
> > Flavio
>


Re: join stream with last available element of other stream

2016-05-20 Thread Ufuk Celebi
Aljoscha answered this in the other thread you started for this
("'Last One' Window")

On Fri, May 20, 2016 at 12:43 PM, Artem Bogachev
 wrote:
> Hi,
>
> I’ve faced a problem trying to model our platform using Flink Streams.
>
> Let me describe our model:
>
> // Stream of data, ex. stocks: (AAPL, 100.0), (GZMP, 100.0) etc.
> val realData: DataStream[(K, V)] =  env.addSource(…)
>
> // Stream of forecasts (same format) based on some window aggregates
> val forecastedData: DataStream[(K, V)] =
> realData.keyBy(1).timeWindow(Time.minutes(FORECAST_INTERVAL)).apply(new
> Forecaster(…))
>
> I would like to construct a stream errors, which values are just differences
> between realData stream and the latest available forecast for this key in
> forecastedData stream
>
> // I suppose this solution does not guarantee that all realData values will
> have corresponding forecast
> val errors: DataStream[(K, V)] =
> realData.join(forecastedData).where(0).equal(0)…
>
> Could you give an advice on how to implement such pattern? Do I have to
> write custom windows?
>
> Artem


Re: custom sources

2016-05-20 Thread Ufuk Celebi
On Fri, May 20, 2016 at 3:12 PM, Abhishek Singh
 wrote:
> Thanks. I am still in theory/evaluation mode. Will try to code this up to
> see if checkpoint will become an issue. I do have a high rate of ingest and
> lots of in flight data. Hopefully flink back pressure keeps this nicely
> bounded.

Sure! Feel free to post any questions that you have during you evaluation.

If you are interested in how back pressure is propagated you can have
at this blog post here:
http://data-artisans.com/how-flink-handles-backpressure/


Re: custom sources

2016-05-20 Thread Abhishek Singh
Thanks. I am still in theory/evaluation mode. Will try to code this up to
see if checkpoint will become an issue. I do have a high rate of ingest and
lots of in flight data. Hopefully flink back pressure keeps this
nicely bounded.

I doubt it will be a problem for me - because even spark is writing
all in-flight data to disk - because all partitioning goes thru disk and is
inline - ie sync. Flink disk usage is write only and for failure case only.
Looks pretty compelling so far.

On Friday, May 20, 2016, Ufuk Celebi  wrote:

> On Thu, May 19, 2016 at 7:48 PM, Abhishek R. Singh
> > wrote:
> > There seems to be some relationship between watermarks, triggers and
> > checkpoint that is someone not being leveraged.
>
> Checkpointing is independent of this, yes. Did the state size become a
> problem for your use case? There are various users running Flink with
> very large state sizes without any issues. The recommended state
> backend for these use cases is the RocksDB backend.
>
> The barriers are triggered at the sources and flow with the data
> (
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/internals/stream_checkpointing.html
> ).
> Everything in-flight after the barrier is not relevant for the
> checkpoint. We are only interested in a consistent state snapshot.
>


Re: flink async snapshots

2016-05-20 Thread Abhishek Singh
Yes. Thanks for explaining.

On Friday, May 20, 2016, Ufuk Celebi  wrote:

> On Thu, May 19, 2016 at 8:54 PM, Abhishek R. Singh
> > wrote:
> > If you can take atomic in-memory copies, then it works (at the cost of
> > doubling your instantaneous memory). For larger state (say rocks DB),
> won’t
> > you have to stop the world (atomic snapshot) and make a copy? Doesn’t
> that
> > make it synchronous, instead of background/async?
>
> Hey Abhishek,
>
> that's correct. There are two variants for RocksDB:
>
> - semi-async (default): snapshot is taking via RocksDB backup feature
> to backup to a directory (sync). This is then copied to the final
> checkpoint location (async, e.g copy to HDFS).
>
> - fully-async: snapshot is taking via RocksDB snapshot feature (sync,
> but no full copy and essentially "free"). With this snapshot we
> iterate over all k/v-pairs and copy them to the final checkpoint
> location (async, e.g. copy to HDFS).
>
> You enable the second variant via:
> rocksDbBackend.enableFullyAsyncSnapshots();
>
> This is only part of the 1.1-SNAPSHOT version though.
>
> I'm not too familiar with the performance characteristics of both
> variants, but maybe Aljoscha can chime in.
>
> Does this clarify things for you?
>
> – Ufuk
>


Re: unsubscribe

2016-05-20 Thread Ufuk Celebi
You have to send this to

user-unsubscr...@flink.apache.org

You've sent this this to user@f.a.o instead.

On Fri, May 20, 2016 at 1:01 PM, Christophe Salperwyck
 wrote:
>


unsubscribe

2016-05-20 Thread Christophe Salperwyck



Re: custom sources

2016-05-20 Thread Ufuk Celebi
On Thu, May 19, 2016 at 7:48 PM, Abhishek R. Singh
 wrote:
> There seems to be some relationship between watermarks, triggers and
> checkpoint that is someone not being leveraged.

Checkpointing is independent of this, yes. Did the state size become a
problem for your use case? There are various users running Flink with
very large state sizes without any issues. The recommended state
backend for these use cases is the RocksDB backend.

The barriers are triggered at the sources and flow with the data
(https://ci.apache.org/projects/flink/flink-docs-release-1.0/internals/stream_checkpointing.html).
Everything in-flight after the barrier is not relevant for the
checkpoint. We are only interested in a consistent state snapshot.


Re: TypeExtractor:1672 - class org.joda.time.DateTime is not a valid POJO type

2016-05-20 Thread Ufuk Celebi
I tried to reproduce this and if you set up the depdency


   de.javakaffee
   kryo-serializers
   0.28
   

and register the Kryo type as suggested you should not see any log
messages (e.g. the type should be treated as a generic type and not a
pojo type).

Does the program successfully execute for you?

– Ufuk


On Thu, May 19, 2016 at 5:49 PM, Flavio Pompermaier
 wrote:
> Hi to all,
>
> I'm using Flink 1.0.2 and testing the job I discovered that I have a lot of
> log with this error:
>
> TypeExtractor:1672 - class org.joda.time.DateTime is not a valid POJO type
>
> initially I though I forgot to properly migrate my code from 0.10.x to 1.0.x
> as stated in [1] but the I checked my code and i correctly build the
> ExecutionEnvironment as:
>
> env = ExecutionEnvironment.createLocalEnvironment(c);
> env.registerTypeWithKryoSerializer(DateTime.class,
> JodaDateTimeSerializer.class );
>
> So, is this log to ignore or I have a problem? Should it be put to debug or
> not printed at all?
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/Migration+Guide%3A+0.10.x+to+1.0.x
>
> Best,
> Flavio


Re: Weird Kryo exception (Unable to find class: java.ttil.HashSet)

2016-05-20 Thread Flavio Pompermaier
Hi Ufuk,
my records could be quite large Pojos (I think some MB).
The only thing I do to configure Kryo is:

 env.registerTypeWithKryoSerializer(DateTime.class,
JodaDateTimeSerializer.class );

Best,
Flavio

On Fri, May 20, 2016 at 12:38 PM, Ufuk Celebi  wrote:

> @Stefano: the records are serialized anyway for batch jobs. The
> spilling deserializer is only relevant if single records are very
> large. How large are your records? In any case, I don't expect this to
> be the problem.
>
> @Flavio: The class name "typo" errors (Vdhicle instead of Vehicle and
> ttil instead of util) look like some kind of data corruption and would
> need further investigation. The other failure you reported might be
> related to this. As a starting point, how do you configure the Kryo
> serializer?
>
> On Fri, May 20, 2016 at 10:02 AM, Flavio Pompermaier
>  wrote:
> > Today I've got this other strange error.. Obviously I don't have a
> > VdhicleEvent class, but a VehicleEvent class :(
> >
> > java.lang.RuntimeException: Cannot instantiate class.
> >   at
> >
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:407)
> >   at
> >
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
> >   at
> >
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
> >   at
> >
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
> >   at
> >
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
> >   at
> >
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
> >   at
> >
> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:101)
> >   at
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
> >   at
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
> >   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> >   at java.lang.Thread.run(Thread.java:745)
> > Caused by: java.lang.ClassNotFoundException:
> > it.okkam.flink.test.model.pojo.VdhicleEvent
> >   at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> >   at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> >   at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> >   at java.lang.Class.forName0(Native Method)
> >   at java.lang.Class.forName(Class.java:348)
> >   at
> >
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:405)
> >   ... 10 more
> >
> >
> > Thanks in advance,
> > Flavio
> >
> >
> > On Mon, May 16, 2016 at 10:08 AM, Stefano Bortoli 
> > wrote:
> >>
> >> Hi Flavio, Till,
> >>
> >> do you think this can be possibly related to the serialization problem
> >> caused by 'the management' of Kryo serializer buffer when spilling on
> disk?
> >> We are definitely going beyond what is managed in memory with this task.
> >>
> >> saluti,
> >> Stefano
> >>
> >> 2016-05-16 9:44 GMT+02:00 Flavio Pompermaier :
> >>>
> >>> That exception showed just once, but the following happens randomly
> (if I
> >>> re-run the job after stopping and restartign the cluster it doesn't
> show up
> >>> usually):
> >>>
> >>> Caused by: java.io.IOException: Serializer consumed more bytes than the
> >>> record had. This indicates broken serialization. If you are using
> custom
> >>> serialization types (Value or Writable), check their serialization
> methods.
> >>> If you are using a Kryo-serialized type, check the corresponding Kryo
> >>> serializer.
> >>> at
> >>>
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:142)
> >>> at
> >>>
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
> >>> at
> >>>
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
> >>> at
> >>>
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
> >>> at
> >>>
> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:101)
> >>> at
> >>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
> >>> at
> >>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
> >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> >>> at java.lang.Thread.run(Thread.java:745)
> >>> Caused by: java.lang.ArrayIndexOutOfBoundsException: -2
> >>> at java.util.ArrayList.elementData(ArrayList.java:418)
> >>> at java.util.ArrayList.get(ArrayList.java:431)
> >>> at
> 

join stream with last available element of other stream

2016-05-20 Thread Artem Bogachev
Hi,

I’ve faced a problem trying to model our platform using Flink Streams.

Let me describe our model:

// Stream of data, ex. stocks: (AAPL, 100.0), (GZMP, 100.0) etc.
val realData: DataStream[(K, V)] =  env.addSource(…)

// Stream of forecasts (same format) based on some window aggregates
val forecastedData: DataStream[(K, V)] = 
realData.keyBy(1).timeWindow(Time.minutes(FORECAST_INTERVAL)).apply(new 
Forecaster(…))

I would like to construct a stream errors, which values are just differences 
between realData stream and the latest available forecast for this key in 
forecastedData stream

// I suppose this solution does not guarantee that all realData values will 
have corresponding forecast
val errors: DataStream[(K, V)] = 
realData.join(forecastedData).where(0).equal(0)…

Could you give an advice on how to implement such pattern? Do I have to write 
custom windows?

Artem

Re: Weird Kryo exception (Unable to find class: java.ttil.HashSet)

2016-05-20 Thread Ufuk Celebi
@Stefano: the records are serialized anyway for batch jobs. The
spilling deserializer is only relevant if single records are very
large. How large are your records? In any case, I don't expect this to
be the problem.

@Flavio: The class name "typo" errors (Vdhicle instead of Vehicle and
ttil instead of util) look like some kind of data corruption and would
need further investigation. The other failure you reported might be
related to this. As a starting point, how do you configure the Kryo
serializer?

On Fri, May 20, 2016 at 10:02 AM, Flavio Pompermaier
 wrote:
> Today I've got this other strange error.. Obviously I don't have a
> VdhicleEvent class, but a VehicleEvent class :(
>
> java.lang.RuntimeException: Cannot instantiate class.
>   at
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:407)
>   at
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>   at
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
>   at
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
>   at
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
>   at
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
>   at
> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:101)
>   at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
>   at 
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.ClassNotFoundException:
> it.okkam.flink.test.model.pojo.VdhicleEvent
>   at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>   at java.lang.Class.forName0(Native Method)
>   at java.lang.Class.forName(Class.java:348)
>   at
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:405)
>   ... 10 more
>
>
> Thanks in advance,
> Flavio
>
>
> On Mon, May 16, 2016 at 10:08 AM, Stefano Bortoli 
> wrote:
>>
>> Hi Flavio, Till,
>>
>> do you think this can be possibly related to the serialization problem
>> caused by 'the management' of Kryo serializer buffer when spilling on disk?
>> We are definitely going beyond what is managed in memory with this task.
>>
>> saluti,
>> Stefano
>>
>> 2016-05-16 9:44 GMT+02:00 Flavio Pompermaier :
>>>
>>> That exception showed just once, but the following happens randomly (if I
>>> re-run the job after stopping and restartign the cluster it doesn't show up
>>> usually):
>>>
>>> Caused by: java.io.IOException: Serializer consumed more bytes than the
>>> record had. This indicates broken serialization. If you are using custom
>>> serialization types (Value or Writable), check their serialization methods.
>>> If you are using a Kryo-serialized type, check the corresponding Kryo
>>> serializer.
>>> at
>>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:142)
>>> at
>>> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
>>> at
>>> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
>>> at
>>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
>>> at
>>> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:101)
>>> at
>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
>>> at
>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>> at java.lang.Thread.run(Thread.java:745)
>>> Caused by: java.lang.ArrayIndexOutOfBoundsException: -2
>>> at java.util.ArrayList.elementData(ArrayList.java:418)
>>> at java.util.ArrayList.get(ArrayList.java:431)
>>> at
>>> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
>>> at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
>>> at
>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
>>> at
>>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:431)
>>> at
>>> 

Re: inheritance of Program interface in Program.java in org.apache.flink.api.common

2016-05-20 Thread Ufuk Celebi
On Thu, May 19, 2016 at 4:46 PM, 윤형덕  wrote:
>
> how can this.mainClass that doesn't override getPlan method that is
abstract method of Program interface(program.class) and has only static
main method be instantiate as Program?


This is only called if the class is actually a subclass of Program. That's
why there is the `isAssignable` check. Otherwise, we check that there is a
mainMethod (else if (hasMainMethod(mainClass)). If this is not the case, a
ProgramInvocationException is thrown.


Re: hot deployment of stream processing(event at a time) jobs

2016-05-20 Thread Ufuk Celebi
I think you are looking for the savepoints feature:
https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/savepoints.html

The general idea is to trigger a savepoint, start the second job from
this savepoint (reading from the same topic), and then eventually
cancel the first job. Depending on the sink, the second job needs to
produce its result to a different end point though.

Does this help?

– Ufuk

On Thu, May 19, 2016 at 8:18 AM, Igor Berman  wrote:
> Hi,
> I have simple job that consumes events from Kafka topic and process events
> with filter/flat-map only(i.e. no aggregation, no windows, no private state)
>
> The most important constraint in my setup is to continue processing no
> matter what(i.e. stopping for few seconds to cancel job and restart it with
> new version is not an option because it will take few seconds)
>
> I was thinking about Blue/Green deployment concept and will start new job
> with new fat-jar while old job still running and then eventually cancel old
> job
>
> How Flink will handle such scenario? What will happen regarding semantics of
> event processing in transition time?
>
> I know that core Kafka has consumer re-balancing mechanism, but I'm not too
> familiar with it
>
> any thought will be highly appreciated
>
> thanks in advance
> Igor
>
>
>


Re: flink-kafka-connector offset management

2016-05-20 Thread Ufuk Celebi
Hey Arun!

How did you configure your Kafka source? If the offset has been
committed and you configured the source to read from the latest
offset, the message should not be re-processed.

– Ufuk


On Fri, May 13, 2016 at 2:19 PM, Arun Balan  wrote:
> Hi, I am trying to use the flink-kafka-connector and I notice that every
> time I restart my application it re-reads the last message on the kafka
> topic. So if the latest offset on the topic is 10, then when the application
> is restarted, kafka-connector will re-read message 10. Why is this the
> behavior? I would assume that the last message has already been read and
> offset committed. I require that messages that are already processed from
> the topic not be reprocessed. Any insight would be helpful.
>
> Thanks
> Arun Balan


Re: Java heap space error

2016-05-20 Thread Flavio Pompermaier
Indeed I can confirm that I resolved this problem reducing the number of
slots per Task Manager (and thus incrementing the available memory of each
task)!
However from time to time I have serialization issue that I don't know
where they come from..it looks like the PjoSerialization has some issue
somewhere

Thanks anyway Ufuk!

On Fri, May 20, 2016 at 12:04 PM, Ufuk Celebi  wrote:

> The job is running out of heap memory, probably because a user
> function needs a lot of it (the parquet thrift sink?).
>
> You can try to work around it by reducing the amount of managed memory
> in order to leave more heap space available.
>
> On Thu, May 12, 2016 at 6:55 PM, Flavio Pompermaier
>  wrote:
> > Hi to all,
> > running a job that writes parquet-thrift files I had this exception (in a
> > Task Manager):
> >
> > io.netty.channel.nio.NioEventLoop -
> Unexpected
> > exception in the selector loop.
> > java.lang.OutOfMemoryError: Java heap space
> > 2016-05-12 18:49:11,302 WARN
> > org.jboss.netty.channel.socket.nio.AbstractNioSelector-
> Unexpected
> > exception in the selector loop.
> > java.lang.OutOfMemoryError: Java heap space
> > 2016-05-12 18:49:11,302 ERROR
> > org.apache.flink.runtime.io.disk.iomanager.IOManager  - The
> handler
> > of the request-complete-callback threw an exception: Java heap space
> > java.lang.OutOfMemoryError: Java heap space
> > 2016-05-12 18:49:11,303 ERROR
> > org.apache.flink.runtime.io.disk.iomanager.IOManager  - I/O
> reading
> > thread encountered an error: segment has been freed
> > java.lang.IllegalStateException: segment has been freed
> > at
> >
> org.apache.flink.core.memory.HeapMemorySegment.wrap(HeapMemorySegment.java:85)
> > at
> >
> org.apache.flink.runtime.io.disk.iomanager.SegmentReadRequest.read(AsynchronousFileIOChannel.java:310)
> > at
> >
> org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$ReaderThread.run(IOManagerAsync.java:396)
> > 2016-05-12 18:49:11,303 ERROR
> > org.apache.flink.runtime.io.disk.iomanager.IOManager  - I/O
> reading
> > thread encountered an error: segment has been freed
> >
> >
> > Any idea of what could be the cause?
> >
> > Best,
> > Flavio
>


Re: Java heap space error

2016-05-20 Thread Ufuk Celebi
The job is running out of heap memory, probably because a user
function needs a lot of it (the parquet thrift sink?).

You can try to work around it by reducing the amount of managed memory
in order to leave more heap space available.

On Thu, May 12, 2016 at 6:55 PM, Flavio Pompermaier
 wrote:
> Hi to all,
> running a job that writes parquet-thrift files I had this exception (in a
> Task Manager):
>
> io.netty.channel.nio.NioEventLoop - Unexpected
> exception in the selector loop.
> java.lang.OutOfMemoryError: Java heap space
> 2016-05-12 18:49:11,302 WARN
> org.jboss.netty.channel.socket.nio.AbstractNioSelector- Unexpected
> exception in the selector loop.
> java.lang.OutOfMemoryError: Java heap space
> 2016-05-12 18:49:11,302 ERROR
> org.apache.flink.runtime.io.disk.iomanager.IOManager  - The handler
> of the request-complete-callback threw an exception: Java heap space
> java.lang.OutOfMemoryError: Java heap space
> 2016-05-12 18:49:11,303 ERROR
> org.apache.flink.runtime.io.disk.iomanager.IOManager  - I/O reading
> thread encountered an error: segment has been freed
> java.lang.IllegalStateException: segment has been freed
> at
> org.apache.flink.core.memory.HeapMemorySegment.wrap(HeapMemorySegment.java:85)
> at
> org.apache.flink.runtime.io.disk.iomanager.SegmentReadRequest.read(AsynchronousFileIOChannel.java:310)
> at
> org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$ReaderThread.run(IOManagerAsync.java:396)
> 2016-05-12 18:49:11,303 ERROR
> org.apache.flink.runtime.io.disk.iomanager.IOManager  - I/O reading
> thread encountered an error: segment has been freed
>
>
> Any idea of what could be the cause?
>
> Best,
> Flavio


Re: Run jar job in local cluster

2016-05-20 Thread Ufuk Celebi
I would suggest to set the log level to DEBUG and check the logs why
the client can not connect to your elastic search cluster. Did you try
that?

On Mon, May 9, 2016 at 3:54 PM, rafal green  wrote:
> Dear Sir or Madam,
>
> Can you tell me why I have a problem with elasticsearch in local cluster?
>
> I analysed this example:
> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/connectors/elasticsearch2.html
>
> My flink and elasticsearch config are default (only I change node.name to
> "node-1")
>
> This example run on my IntelliJIdea 15 but on local cluster I have a
> problem. Of course WordCount and SocketTextStreamWordCount works fine.
>
>
> I spend 2 days to try find solution (With uncle google ;) ) but It's not
> easy
>
> val config = new java.util.HashMap[String, String]
> config.put("bulk.flush.max.actions", "1")
> config.put("cluster.name", "elasticsearch")
> config.put("path.home",
> "/media/user/e5e05ab5-28f3-4cee-a57c-444e32b99f04/thesis/elasticsearch-2.3.2")
>
> val transports = new util.ArrayList[InetSocketAddress]
> transports.add(new
> InetSocketAddress(InetAddress.getByName("127.0.0.1"),9300))
>
>
>
> Error output:
>
> java.lang.RuntimeException: Client is not connected to any Elasticsearch
> nodes!
> at
> org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:172)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:317)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:215)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:579)
> at java.lang.Thread.run(Thread.java:745)
>
> 05/08/2016 22:57:02 Job execution switched to status FAILING.
> java.lang.RuntimeException: Client is not connected to any Elasticsearch
> nodes!
> at
> org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:172)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:317)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:215)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:579)
> at java.lang.Thread.run(Thread.java:745)
> 05/08/2016 22:57:02 Job execution switched to status FAILED.
>
> 
>  The program finished with the following exception:
>
> org.apache.flink.client.program.ProgramInvocationException: The program
> execution failed: Job execution failed.
> at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
> at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
> at
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:65)
> at
> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:541)
> at com.pl.greeny.flink.TwitterAnalysis$.main(TwitterAnalysis.scala:69)
> at com.pl.greeny.flink.TwitterAnalysis.main(TwitterAnalysis.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
> at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
> at
> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:860)
> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:327)
> at
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1187)
> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1238)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
> execution failed.
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:807)
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:753)
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:753)
> at
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
> at
> 

Re: Weird Kryo exception (Unable to find class: java.ttil.HashSet)

2016-05-20 Thread Flavio Pompermaier
Today I've got this other strange error.. Obviously I don't have a
VdhicleEvent class, but a VehicleEvent class :(

java.lang.RuntimeException: Cannot instantiate class.
at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:407)
at 
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
at 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
at 
org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
at 
org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
at 
org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
at 
org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:101)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
at 
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException:
it.okkam.flink.test.model.pojo.VdhicleEvent
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:405)
... 10 more

Thanks in advance,
Flavio


On Mon, May 16, 2016 at 10:08 AM, Stefano Bortoli 
wrote:

> Hi Flavio, Till,
>
> do you think this can be possibly related to the serialization problem
> caused by 'the management' of Kryo serializer buffer when spilling on disk?
> We are definitely going beyond what is managed in memory with this task.
>
> saluti,
> Stefano
>
> 2016-05-16 9:44 GMT+02:00 Flavio Pompermaier :
>
>> That exception showed just once, but the following happens randomly (if I
>> re-run the job after stopping and restartign the cluster it doesn't show up
>> usually):
>>
>> Caused by: java.io.IOException: Serializer consumed more bytes than the
>> record had. This indicates broken serialization. If you are using custom
>> serialization types (Value or Writable), check their serialization methods.
>> If you are using a Kryo-serialized type, check the corresponding Kryo
>> serializer.
>> at
>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:142)
>> at
>> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
>> at
>> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
>> at
>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
>> at
>> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:101)
>> at
>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
>> at
>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>> at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.lang.ArrayIndexOutOfBoundsException: -2
>> at java.util.ArrayList.elementData(ArrayList.java:418)
>> at java.util.ArrayList.get(ArrayList.java:431)
>> at
>> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
>> at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
>> at
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
>> at
>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:431)
>> at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:135)
>> at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>> at
>> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>> at
>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
>>
>>
>> On Sun, May 15, 2016 at 10:58 AM, Flavio Pompermaier <
>> pomperma...@okkam.it> wrote:
>>
>>> Hi to all,
>>> in my last run of a job I received this weird Kryo 

Re: flink async snapshots

2016-05-20 Thread Ufuk Celebi
On Thu, May 19, 2016 at 8:54 PM, Abhishek R. Singh
 wrote:
> If you can take atomic in-memory copies, then it works (at the cost of
> doubling your instantaneous memory). For larger state (say rocks DB), won’t
> you have to stop the world (atomic snapshot) and make a copy? Doesn’t that
> make it synchronous, instead of background/async?

Hey Abhishek,

that's correct. There are two variants for RocksDB:

- semi-async (default): snapshot is taking via RocksDB backup feature
to backup to a directory (sync). This is then copied to the final
checkpoint location (async, e.g copy to HDFS).

- fully-async: snapshot is taking via RocksDB snapshot feature (sync,
but no full copy and essentially "free"). With this snapshot we
iterate over all k/v-pairs and copy them to the final checkpoint
location (async, e.g. copy to HDFS).

You enable the second variant via: rocksDbBackend.enableFullyAsyncSnapshots();

This is only part of the 1.1-SNAPSHOT version though.

I'm not too familiar with the performance characteristics of both
variants, but maybe Aljoscha can chime in.

Does this clarify things for you?

– Ufuk