RE: GROUP BY TUMBLE on ROW range

2017-10-18 Thread Stefano Bortoli
Great, thanks for the explanation. I noticed now indeed that the examples are 
for the table API. I believe over window is sufficient for the purpose right 
now, was just curious.

Best,
Stefano

From: Fabian Hueske [mailto:fhue...@gmail.com]
Sent: Tuesday, October 17, 2017 9:24 PM
To: Stefano Bortoli 
Cc: user@flink.apache.org
Subject: Re: GROUP BY TUMBLE on ROW range

Hi Stefano,
this is not supported in Flink's SQL and we would need new Group Window 
functions (like TUMBLE) for this.
A TUMBLE_COUNT function would be somewhat similar to SESSION, which also 
requires checks on the sorted neighboring rows to identify the window of a row.
Such a function would first need to be added to Calcite and then integrated 
with Flink.

A tumble count could also be expressed in plain SQL but wouldn't be very 
intuitive. You would have to
- define an over window (maybe partitioned on some key) sorted on time with a 
ROW_NUMBER function that assigns increasing numbers to rows.
- do a group by on the row number modulo the window size.
Btw. count windows are supported by the Table API.
Best, Fabian


2017-10-17 17:16 GMT+02:00 Stefano Bortoli 
mailto:stefano.bort...@huawei.com>>:
Hi all,
Is there a way to use a tumble window group by with row range in streamSQL?
I mean, something like this:
//  "SELECT COUNT(*) " +
// "FROM T1 " +
//"GROUP BY TUMBLE(rowtime, INTERVAL '2' ROWS PRECEDING )"

However, even looking at tests and looking at the “row interval expression 
generation” I could not find any examples in SQL. I know it is supported by the 
stream APIs, and countWindow is the chosen abstraction.

table
  .window(Tumble over 2.rows on 'long as 'w)
  .groupBy('w)
  .select('int.count)
  .toDataSet[Row]

I fear I am missing something simple. Thanks a lot for the support guys!

Best,
Stefano



GROUP BY TUMBLE on ROW range

2017-10-17 Thread Stefano Bortoli
Hi all,

Is there a way to use a tumble window group by with row range in streamSQL?

I mean, something like this:

//  "SELECT COUNT(*) " +
// "FROM T1 " +
//"GROUP BY TUMBLE(rowtime, INTERVAL '2' ROWS PRECEDING )"

However, even looking at tests and looking at the "row interval expression 
generation" I could not find any examples in SQL. I know it is supported by the 
stream APIs, and countWindow is the chosen abstraction.

table
  .window(Tumble over 2.rows on 'long as 'w)
  .groupBy('w)
  .select('int.count)
  .toDataSet[Row]

I fear I am missing something simple. Thanks a lot for the support guys!

Best,
Stefano


RE: UnilateralSortMerger error (again)

2017-04-21 Thread Stefano Bortoli
In fact the old problem was with the KryoSerializer missed initialization on 
the exception that would trigger the spilling on disk. This would lead to dirty 
serialization buffer that would eventually break the program. Till worked on it 
debugging the source code generating the error. Perhaps someone could try the 
same also this time. If Flavio can make the problem reproducible in a shareable 
program+data.

Stefano

From: Stephan Ewen [mailto:se...@apache.org]
Sent: Friday, April 21, 2017 10:04 AM
To: user 
Subject: Re: UnilateralSortMerger error (again)

In the past, these errors were most often caused by bugs in the serializers, 
not in the sorter.

What types are you using at that point? The Stack Trace reveals ROW and 
StringValue, any other involved types?

On Fri, Apr 21, 2017 at 9:36 AM, Flavio Pompermaier 
mailto:pomperma...@okkam.it>> wrote:
As suggested by Fabian I set taskmanager.memory.size = 1024 (to force spilling 
to disk) and the job failed almost immediately..

On Fri, Apr 21, 2017 at 12:33 AM, Flavio Pompermaier 
mailto:pomperma...@okkam.it>> wrote:
I debugged a bit the process repeating the job on a sub-slice of the entire 
data (using the id value to filter data with parquet push down filters) and all 
slices completed successfully :(
So I tried to increase the parallelism (from 1 slot per TM to 4) to see if this 
was somehow a factor of stress but it didn't cause any error.
Then I almost doubled the number of rows to process and finally the error 
showed up again.
It seems somehow related to spilling to disk but I can't really understand 
what's going on :(
This is a summary of my debug attempts:

4 Task managers with 6 GB  and 1 slot each, parallelism = 4

id < 10.000.000.000  => 1.857.365 rows => OK
id >= 10.000.000.000 && id < 10.010.000.000 => 20.057.714 rows => OK
id >= 10.010.000.000 && id < 99.945.000.000   => 20.926.903 rows => OK
id >= 99.945.000.000 && id < 99.960.000.000   => 23.888.750  rows => OK
id >= 99.960.000.000 => 32.936.422 rows => OK

4 TM with 8 GB and 4 slot each, parallelism 16

id >= 99.960.000.000 => 32.936.422 rows => OK
id >= 99.945.000.000  => 56.825.172 rows => ERROR

Any help is appreciated..
Best,
Flavio

On Wed, Apr 19, 2017 at 8:34 PM, Flavio Pompermaier 
mailto:pomperma...@okkam.it>> wrote:
I could but only if there's a good probability that it fix the problem...how 
confident are you about it?

On Wed, Apr 19, 2017 at 8:27 PM, Ted Yu 
mailto:yuzhih...@gmail.com>> wrote:
Looking at git log of DataInputDeserializer.java , there has been some recent 
change.

If you have time, maybe try with 1.2.1 RC and see if the error is reproducible ?

Cheers

On Wed, Apr 19, 2017 at 11:22 AM, Flavio Pompermaier 
mailto:pomperma...@okkam.it>> wrote:
Hi to all,
I think I'm again on the weird Exception with the 
SpillingAdaptiveSpanningRecordDeserializer...
I'm using Flink 1.2.0 and the error seems to rise when Flink spills to disk but 
the Exception thrown is not very helpful. Any idea?

Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 
'SortMerger Reading Thread' terminated due to an exception: null
at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1094)
at 
org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460)
... 3 more
Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated 
due to an exception: null
at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)
Caused by: java.io.EOFException
at 
org.apache.flink.runtime.util.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:306)
at org.apache.flink.types.StringValue.readString(StringValue.java:747)
at 
org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:69)
at 
org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:74)
at 
org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28)
at 
org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:193)
at 
org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:36)
at 
org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
at 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:144)
at 
org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
at 
org.apache.flink.runti

RE: Flink memory usage

2017-04-20 Thread Stefano Bortoli
I think that if you have a lot of memory available, the GC gets kind of lazy. 
In our case, the issue was just the latency caused by the GC, cause we were 
loading more data than it could fit in memory. Hence optimizing the code gave 
us a lot of improvements. FlatMaps are also dangerous as objects can multiply 
beyond expected, making co-group extremely costly. :-) A distinct() well placed 
saves a lot of time and memory.

My point is that having worked with scarce resources I learned that almost all 
the time the issue was my code, not the framework.

Good luck.

Stefano

From: Newport, Billy [mailto:billy.newp...@gs.com]
Sent: Thursday, April 20, 2017 4:46 PM
To: Stefano Bortoli ; 'Fabian Hueske' 

Cc: 'user@flink.apache.org' 
Subject: RE: Flink memory usage

Your reuse idea kind of implies that it’s a GC generation rate issue, i.e. it’s 
not collecting fast enough so it’s running out of memory versus heap that’s 
actually anchored, right?


From: Stefano Bortoli [mailto:stefano.bort...@huawei.com]
Sent: Thursday, April 20, 2017 10:33 AM
To: Newport, Billy [Tech]; 'Fabian Hueske'
Cc: 'user@flink.apache.org'
Subject: RE: Flink memory usage

Hi Billy,

The only suggestion I can give is to check very well in your code for useless 
variable allocations, and foster reuse as much as possible. Don’t create a new 
collection at any map execution, but rather clear, reuse the collected output 
of the flatMap, and so on.  In the past we run long process of lot of data and 
small memory without problems. Many more complex co-group, joins and so on 
without any issue.

My2c. Hope it helps.

Stefano

From: Newport, Billy [mailto:billy.newp...@gs.com]
Sent: Thursday, April 20, 2017 1:31 PM
To: 'Fabian Hueske' mailto:fhue...@gmail.com>>
Cc: 'user@flink.apache.org' 
mailto:user@flink.apache.org>>
Subject: RE: Flink memory usage

I don’t think our function are memory heavy they typically are cogroups and 
merge the records on the left with the records on the right.

We’re currently requiring 720GB of heap to do our processing which frankly 
appears ridiculous to us. Could too much parallelism be causing the problem? 
Looking at:

http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Optimal-Configuration-for-Cluster-td5024.html<https://urldefense.proofpoint.com/v2/url?u=http-3A__apache-2Dflink-2Duser-2Dmailing-2Dlist-2Darchive.2336050.n4.nabble.com_Optimal-2DConfiguration-2Dfor-2DCluster-2Dtd5024.html&d=DgMGaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=rlkM70D3djmDN7dGPzzbVKG26ShcTFDMKlX5AWucE5Q&m=ORtElREFIpUSPJ6hOwnIj181geKdc8QSLx-WpIoc7FE&s=9H61hE0TVJvlOBRwSHrvnOhxKioqoepvCgjG0ZJefIY&e=>

If we are processing 17 “datasets” in a single job and each has an individual 
parallelism of 40 is that a total parallelism (potential) of 17*40 and given 
your network buffers calculation of parallelism squared, would that do it or 
only if we explicitly configure it that way:

taskmanager.network.numberOfBuffers: p ^ 2 * t * 4

where p is the maximum parallelism of the job and t is the number of task 
manager.
You can process more than one parallel task per TM if you configure more than 
one processing slot per machine ( taskmanager.numberOfTaskSlots). The TM will 
divide its memory among all its slots. So it would be possible to start one TM 
for each machine with 100GB+ memory and 48 slots each.

Our pipeline for each dataset looks like this:

Read avro file -> FlatMap -> Validate each record with a flatmap ->
Read Parquet -> FlatMap -> Filter Live Rows -> CoGroup with the validated avro 
file above -> }
Read Parquet -> FlatMap -> Filter Dead Rows  
>  } Union cogroup with dead rows and write 
result to parquet file.

I don’t understand why this logic couldn’t run with a single task manager and 
just take longer. We’re having a lot of trouble trying to change the tuning to 
reduce the memory burn. We run the above pipeline with parallelism 40 for all 
17 datasets in a single job.

We’re running this config now which is not really justifiable for what we’re 
doing.

20 nodes 2 slots, 40 parallelism 36GB mem = 720GB of heap…

Thanks

From: Fabian Hueske [mailto:fhue...@gmail.com]
Sent: Wednesday, April 19, 2017 10:52 AM
To: Newport, Billy [Tech]
Cc: user@flink.apache.org<mailto:user@flink.apache.org>
Subject: Re: Flink memory usage

Hi Billy,
Flink's internal operators are implemented to not allocate heap space 
proportional to the size of the input data.
Whenever Flink needs to hold data in memory (e.g., for sorting or building a 
hash table) the data is serialized into managed memory. If all memory is in 
use, Flink starts spilling to disk. This blog post discusses how Flink uses its 
managed memory [1] (still up to date, even though it's almost 2 years old).
The runtime code should actually quite stable. Most of the code has

RE: Flink memory usage

2017-04-20 Thread Stefano Bortoli
Hi Billy,

The only suggestion I can give is to check very well in your code for useless 
variable allocations, and foster reuse as much as possible. Don’t create a new 
collection at any map execution, but rather clear, reuse the collected output 
of the flatMap, and so on.  In the past we run long process of lot of data and 
small memory without problems. Many more complex co-group, joins and so on 
without any issue.

My2c. Hope it helps.

Stefano

From: Newport, Billy [mailto:billy.newp...@gs.com]
Sent: Thursday, April 20, 2017 1:31 PM
To: 'Fabian Hueske' 
Cc: 'user@flink.apache.org' 
Subject: RE: Flink memory usage

I don’t think our function are memory heavy they typically are cogroups and 
merge the records on the left with the records on the right.

We’re currently requiring 720GB of heap to do our processing which frankly 
appears ridiculous to us. Could too much parallelism be causing the problem? 
Looking at:

http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Optimal-Configuration-for-Cluster-td5024.html

If we are processing 17 “datasets” in a single job and each has an individual 
parallelism of 40 is that a total parallelism (potential) of 17*40 and given 
your network buffers calculation of parallelism squared, would that do it or 
only if we explicitly configure it that way:

taskmanager.network.numberOfBuffers: p ^ 2 * t * 4

where p is the maximum parallelism of the job and t is the number of task 
manager.
You can process more than one parallel task per TM if you configure more than 
one processing slot per machine ( taskmanager.numberOfTaskSlots). The TM will 
divide its memory among all its slots. So it would be possible to start one TM 
for each machine with 100GB+ memory and 48 slots each.

Our pipeline for each dataset looks like this:

Read avro file -> FlatMap -> Validate each record with a flatmap ->
Read Parquet -> FlatMap -> Filter Live Rows -> CoGroup with the validated avro 
file above -> }
Read Parquet -> FlatMap -> Filter Dead Rows  
>  } Union cogroup with dead rows and write 
result to parquet file.

I don’t understand why this logic couldn’t run with a single task manager and 
just take longer. We’re having a lot of trouble trying to change the tuning to 
reduce the memory burn. We run the above pipeline with parallelism 40 for all 
17 datasets in a single job.

We’re running this config now which is not really justifiable for what we’re 
doing.

20 nodes 2 slots, 40 parallelism 36GB mem = 720GB of heap…

Thanks

From: Fabian Hueske [mailto:fhue...@gmail.com]
Sent: Wednesday, April 19, 2017 10:52 AM
To: Newport, Billy [Tech]
Cc: user@flink.apache.org
Subject: Re: Flink memory usage

Hi Billy,
Flink's internal operators are implemented to not allocate heap space 
proportional to the size of the input data.
Whenever Flink needs to hold data in memory (e.g., for sorting or building a 
hash table) the data is serialized into managed memory. If all memory is in 
use, Flink starts spilling to disk. This blog post discusses how Flink uses its 
managed memory [1] (still up to date, even though it's almost 2 years old).
The runtime code should actually quite stable. Most of the code has been there 
for several years (even before Flink was donated to the ASF) and we haven't 
seen many bugs reported for the DataSet runtime. Of course this does not mean 
that the code doesn't contain bugs.

However, Flink does not take care of the user code. For example a 
GroupReduceFunction that collects a lot of data, e.g., in a List on the heap, 
can still kill a program.
I would check if you have user functions that require lots of heap memory.
Also reducing the size of the managed memory to have more heap space available 
might help.
If that doesn't solve the problem, it would be good if you could share some 
details about your job (which operators, which local strategies, how many 
operators) that might help to identify the misbehaving operator.

Thanks, Fabian

[1] 
https://flink.apache.org/news/2015/05/11/Juggling-with-Bits-and-Bytes.html

2017-04-19 16:09 GMT+02:00 Newport, Billy 
mailto:billy.newp...@gs.com>>:
How does Flink use memory? We’re seeing cases when running a job on larger 
datasets where it throws OOM exceptions during the job. We’re using the Dataset 
API. Shouldn’t flink be streaming from disk to disk? We workaround by using 
fewer slots but it seems unintuitive that I need to change these settings given 
Flink != Spark. Why isn’t Flinks memory usage constant? Why couldn’t I run a 
job with a single task and a single slot for any size job successfully other 
than it takes m

Re: JDBC sink in flink

2016-07-05 Thread Stefano Bortoli
As Chesnay said, it not necessary to use a pool as the connection is reused
across split. However, if you had to customize it for some reasons, you can
do it starting from the JDBC Input and Output format.

cheers!

2016-07-05 13:27 GMT+02:00 Harikrishnan S :

> Awesome ! Thanks a lot ! I should probably write a blog post somewhere
> explaining this.
>
> On Tue, Jul 5, 2016 at 4:28 PM, Chesnay Schepler 
> wrote:
>
>> They serve a similar purpose.
>>
>> OutputFormats originate from the Batch API, whereas SinkFunctions are a
>> Streaming API concept.
>>
>> You can however use OutputFormats in the Streaming API using the
>> DataStrea#writeUsingOutputFormat.
>>
>> Regards,
>> Chesnay
>>
>>
>> On 05.07.2016 12:51, Harikrishnan S wrote:
>>
>> Ah that makes send. Also what's the difference between a RichOutputFormat
>> and a RichSinkFunction ? Can I use JDBCOutputFormat as a sink in a stream ?
>>
>> On Tue, Jul 5, 2016 at 3:53 PM, Chesnay Schepler 
>> wrote:
>>
>>> Hello,
>>>
>>> an instance of the JDBCOutputFormat will use a single connection to send
>>> all values.
>>>
>>> Essentially
>>> - open(...) is called at the very start to create the connection
>>> - then all invoke/writeRecord calls are executed (using the same
>>> connection)
>>> - then close() is called to clean up.
>>>
>>> The total number of connections made to the database depends on the
>>> parallelism of the Sink, as every parallel instance creates it's own
>>> connection.
>>>
>>> Regards,
>>> Chesnay
>>>
>>>
>>> On 05.07.2016 12:04, Harikrishnan S wrote:
>>>
>>> The basic idea was that I would create a pool of connections in the
>>> open() method in a custom sink and each invoke() method gets one connection
>>> from the pool and does the upserts needed. I might have misunderstood how
>>> sinks work in flink though.
>>>
>>> On Tue, Jul 5, 2016 at 2:22 PM, Flavio Pompermaier <
>>> pomperma...@okkam.it> wrote:
>>>
 why do you need a connection pool?
 On 5 Jul 2016 11:41, "Harikrishnan S" < 
 hihari...@gmail.com> wrote:

> Hi,
>
> Are there any examples of implementing a jdbc sink in flink using a
> connection pool ?
>
> Thanks
>
> On Tue, Jul 5, 2016 at 2:00 PM, Harikrishnan S < 
> hihari...@gmail.com> wrote:
>
>> Hi,
>>
>> Are there any examples of implementing a jdbc sink in flink using a
>> connection pool ?
>>
>> Thanks
>>
>> On Tue, Jul 5, 2016 at 1:57 PM, Harikrishnan S <
>> hihari...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> Are there any examples of implementing a jdbc sink in flink using a
>>> connection pool ?
>>>
>>> Thanks
>>>
>>
>>
>
>>>
>>>
>>
>>
>


Re: JDBC sink in flink

2016-07-05 Thread Stefano Bortoli
The connection will be managed by the splitManager, no need of using a
pool. However, if you had to, probably you should look into
establishConnection() method of the JDBCInputFormat.



2016-07-05 10:52 GMT+02:00 Flavio Pompermaier :

> why do you need a connection pool?
> On 5 Jul 2016 11:41, "Harikrishnan S"  wrote:
>
>> Hi,
>>
>> Are there any examples of implementing a jdbc sink in flink using a
>> connection pool ?
>>
>> Thanks
>>
>> On Tue, Jul 5, 2016 at 2:00 PM, Harikrishnan S 
>> wrote:
>>
>>> Hi,
>>>
>>> Are there any examples of implementing a jdbc sink in flink using a
>>> connection pool ?
>>>
>>> Thanks
>>>
>>> On Tue, Jul 5, 2016 at 1:57 PM, Harikrishnan S 
>>> wrote:
>>>
 Hi,

 Are there any examples of implementing a jdbc sink in flink using a
 connection pool ?

 Thanks

>>>
>>>
>>


Re: Weird Kryo exception (Unable to find class: java.ttil.HashSet)

2016-05-24 Thread Stefano Bortoli
Till mentioned the fact that 'spilling on disk' was managed through
exception catch. The last serialization error was related to bad management
of Kryo buffer that was not cleaned after spilling on exception management.
Is it possible we are dealing with an issue similar to this but caused by
another exception managed differently?

saluti,
Stefano


2016-05-23 18:44 GMT+02:00 Flavio Pompermaier :

> You can try with this:
>
> import org.apache.flink.api.java.ExecutionEnvironment;
> import org.joda.time.DateTime;
>
> import de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer;
>
> public class DateTimeError {
>
> public static void main(String[] args) throws Exception {
> ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
> //env.registerTypeWithKryoSerializer(DateTime.class,
> JodaDateTimeSerializer.class);
> env.fromElements(DateTime.now(), DateTime.now()).print();
> }
> }
>
> Without the commented row you get:
>
> Exception in thread "main" java.lang.NullPointerException
> at
> org.joda.time.tz.CachedDateTimeZone.getInfo(CachedDateTimeZone.java:143)
> at
> org.joda.time.tz.CachedDateTimeZone.getOffset(CachedDateTimeZone.java:103)
> at
> org.joda.time.format.DateTimeFormatter.printTo(DateTimeFormatter.java:722)
> at
> org.joda.time.format.DateTimeFormatter.printTo(DateTimeFormatter.java:535)
> at
> org.joda.time.format.DateTimeFormatter.print(DateTimeFormatter.java:671)
> at
> org.joda.time.base.AbstractInstant.toString(AbstractInstant.java:424)
> at
> org.joda.time.base.AbstractDateTime.toString(AbstractDateTime.java:314)
> at java.lang.String.valueOf(String.java:2994)
> at java.io.PrintStream.println(PrintStream.java:821)
> at org.apache.flink.api.java.DataSet.print(DataSet.java:1607)
>
> Thanks for the support,
> Flavio
>
> On Mon, May 23, 2016 at 4:17 PM, Maximilian Michels 
> wrote:
>
>> What error do you get when you don't register the Kryo serializer?
>>
>> On Mon, May 23, 2016 at 11:57 AM, Flavio Pompermaier
>>  wrote:
>> > With this last settings I was able to terminate the job the second time
>> I
>> > retried to run it, without restarting the cluster..
>> > If I don't register the serializer for DateTime the job doesn't start
>> at all
>> > (from Flink 1.x you have to register it [1]).
>> > I can't understand what's wrong :(
>> >
>> > [1]
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/Migration+Guide%3A+0.10.x+to+1.0.x
>> >
>> > Best,
>> > Flavio
>>
>
>
>
> --
>
> Flavio Pompermaier
>
> *Development Department*___
> *OKKAM**Srl **- www.okkam.it *
>
> *Phone:* +(39) 0461 283 702
> *Fax:* + (39) 0461 186 6433
> *Email:* pomperma...@okkam.it
> *Headquarters:* Trento (Italy), via G.B. Trener 8
> *Registered office:* Trento (Italy), via Segantini 23
>
> Confidentially notice. This e-mail transmission may contain legally
> privileged and/or confidential information. Please do not read it if you
> are not the intended recipient(S). Any use, distribution, reproduction or
> disclosure by any other person is strictly prohibited. If you have received
> this e-mail in error, please notify the sender and destroy the original
> transmission and its attachments without reading or saving it in any manner.
>
>


Re: Weird Kryo exception (Unable to find class: java.ttil.HashSet)

2016-05-16 Thread Stefano Bortoli
Hi Flavio, Till,

do you think this can be possibly related to the serialization problem
caused by 'the management' of Kryo serializer buffer when spilling on disk?
We are definitely going beyond what is managed in memory with this task.

saluti,
Stefano

2016-05-16 9:44 GMT+02:00 Flavio Pompermaier :

> That exception showed just once, but the following happens randomly (if I
> re-run the job after stopping and restartign the cluster it doesn't show up
> usually):
>
> Caused by: java.io.IOException: Serializer consumed more bytes than the
> record had. This indicates broken serialization. If you are using custom
> serialization types (Value or Writable), check their serialization methods.
> If you are using a Kryo-serialized type, check the corresponding Kryo
> serializer.
> at
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:142)
> at
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
> at
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
> at
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
> at
> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:101)
> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
> at
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.ArrayIndexOutOfBoundsException: -2
> at java.util.ArrayList.elementData(ArrayList.java:418)
> at java.util.ArrayList.get(ArrayList.java:431)
> at
> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
> at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
> at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
> at
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:431)
> at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:135)
> at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
> at
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
> at
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
>
>
> On Sun, May 15, 2016 at 10:58 AM, Flavio Pompermaier  > wrote:
>
>> Hi to all,
>> in my last run of a job I received this weird Kryo Exception in one of
>> the TaskManager...obviously this class in not mentioned anywhere, neither
>> in my project nor in flink...
>> Any help is appreciated!
>>
>> Best,
>> Flavio
>>
>> INFO  org.apache.flink.runtime.taskmanager.Task - CHAIN GroupReduce
>> (GroupReduce at createResult(IndexMappingExecutor.java:42)) -> Map (Map at
>> main(Jsonizer.java:128)) (4/9) switched to FAILED with exception.
>> java.lang.Exception: The data preparation for task 'CHAIN GroupReduce
>> (GroupReduce at createResult(IndexMappingExecutor.java:42)) -> Map (Map at
>> main(Jsonizer.java:128))' , caused an error: Error obtaining the sorted
>> input: Thread 'SortMerger spilling thread' terminated due to an exception:
>> Unable to find class: java.ttil.HashSet
>> at
>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:456)
>> at
>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>> at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
>> Thread 'SortMerger spilling thread' terminated due to an exception: Unable
>> to find class: java.ttil.HashSet
>> at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
>> at
>> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
>> at
>> org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)
>> at
>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
>> ... 3 more
>> Caused by: java.io.IOException: Thread 'SortMerger spilling thread'
>> terminated due to an exception: Unable to find class: java.ttil.HashSet
>> at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
>> Caused by: com.esotericsoftware.kryo.KryoException: Unable to find class:
>> java.ttil.HashSet
>>  

Re: Requesting the next InputSplit failed

2016-04-29 Thread Stefano Bortoli
We could successfully run the job without issues. Thanks a lot everyone for
the support.

FYI: with Flink we completed in 3h28m the job that was planned to run for
15 days 24/7 relying on our legacy customer approach. :-)

saluti,
Stefano

2016-04-28 14:50 GMT+02:00 Fabian Hueske :

> Yes, assigning more than 0.5GB to a JM is a good idea. 3GB is maybe a bit
> too much, 2GB should be enough.
> Increasing the timeout should not hurt either.
>
> 2016-04-28 14:14 GMT+02:00 Flavio Pompermaier :
>
>> So what do you suggest to try for the next run?
>> I was going to increase the Job Manager heap to 3 GB and maybe change
>> some gc setting.
>> Do you think I should increase also the akka timeout or other things?
>>
>> On Thu, Apr 28, 2016 at 2:06 PM, Fabian Hueske  wrote:
>>
>>> Hmm, 113k splits is quite a lot.
>>> However, the IF uses the DefaultInputSplitAssigner which is very
>>> lightweight and should handle a large number of splits well.
>>>
>>>
>>>
>>> 2016-04-28 13:50 GMT+02:00 Flavio Pompermaier :
>>>
>>>> We generate 113k splits because we can't query more than 100k or
>>>> records per split (and we have to manage 11 billions of records). We tried
>>>> to run the job only once, before running it the 2nd time we would like to
>>>> understand which parameter to tune in order to (try to at least to) avoid
>>>> such an error.
>>>>
>>>> Of course I pasted the wrong TM heap size...that is indeed 3Gb (
>>>> taskmanager.heap.mb:512)
>>>>
>>>> Best,
>>>> Flavio
>>>>
>>>> On Thu, Apr 28, 2016 at 1:29 PM, Fabian Hueske 
>>>> wrote:
>>>>
>>>>> Is the problem reproducible?
>>>>> Maybe the SplitAssigner gets stuck somehow, but I've never observed
>>>>> something like that.
>>>>>
>>>>> How many splits do you generate?
>>>>>
>>>>> I guess it is not related, but 512MB for a TM is not a lot on machines
>>>>> with 16GB RAM.
>>>>>
>>>>> 2016-04-28 12:12 GMT+02:00 Flavio Pompermaier :
>>>>>
>>>>>> When does this usually happens? Is it because the JobManager has too
>>>>>> few resources (of some type)?
>>>>>>
>>>>>> Our current configuration of the cluster has 4 machines (with 4 CPUs
>>>>>> and 16 GB of RAM) and one machine has both a JobManager and a TaskManger
>>>>>> (the other 3 just a TM).
>>>>>>
>>>>>> Our flink-conf.yml on every machine has the following params:
>>>>>>
>>>>>>- jobmanager.heap.mb:512
>>>>>>- taskmanager.heap.mb:512
>>>>>>- taskmanager.numberOfTaskSlots:6
>>>>>>- prallelism.default:24
>>>>>>- env.java.home=/usr/lib/jvm/java-8-oracle/
>>>>>>- taskmanager.network.numberOfBuffers:16384
>>>>>>
>>>>>> The job just read a window of max 100k elements and then writes a
>>>>>> Tuple5 into a CSV on the jobmanger fs with parallelism 1 (in order to
>>>>>> produce a single file). The job dies after 40 minutes and hundreds of
>>>>>> millions of records read.
>>>>>>
>>>>>> Do you see anything sospicious?
>>>>>>
>>>>>> Thanks for the support,
>>>>>> Flavio
>>>>>>
>>>>>> On Thu, Apr 28, 2016 at 11:54 AM, Fabian Hueske 
>>>>>> wrote:
>>>>>>
>>>>>>> I checked the input format from your PR, but didn't see anything
>>>>>>> suspicious.
>>>>>>>
>>>>>>> It is definitely OK if the processing of an input split tasks more
>>>>>>> than 10 seconds. That should not be the cause.
>>>>>>> It rather looks like the DataSourceTask fails to request a new split
>>>>>>> from the JobManager.
>>>>>>>
>>>>>>> 2016-04-28 9:37 GMT+02:00 Stefano Bortoli :
>>>>>>>
>>>>>>>> Digging the logs, we found this:
>>>>>>>>
>>>>>>>> WARN  Remoting - Tried to associate with unreachable remote address
>>>>>>>> [akka.tcp://flink@127.0.0.1:34984]. Address is now gated for 5000
>>>>>>>> ms, all messages to this address 

Re: Requesting the next InputSplit failed

2016-04-28 Thread Stefano Bortoli
Digging the logs, we found this:

WARN  Remoting - Tried to associate with unreachable remote address
[akka.tcp://flink@127.0.0.1:34984]. Address is now gated for 5000 ms, all
messages to this address will be delivered to dead letters. Reason:
Connessione rifiutata: /127.0.0.1:34984

however, it is not clear why it should refuse a connection to itself after
40min of run. we'll try to figure out possible environment issues. Its a
fresh installation, therefore we may have left out some configurations.

saluti,
Stefano

2016-04-28 9:22 GMT+02:00 Stefano Bortoli :

> I had this type of exception when trying to build and test Flink on a
> "small machine". I worked around the test increasing the timeout for Akka.
>
>
> https://github.com/stefanobortoli/flink/blob/FLINK-1827/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
>
> it happened only on my machine (a VirtualBox I use for development), but
> not on Flavio's. Is it possible that on load situations the JobManager
> slows down a bit too much?
>
> saluti,
> Stefano
>
> 2016-04-27 17:50 GMT+02:00 Flavio Pompermaier :
>
>> A precursor of the modified connector (since we started a long time ago).
>> However the idea is the same, I compute the inputSplits and then I get the
>> data split by split (similarly to what it happens in FLINK-3750 -
>> https://github.com/apache/flink/pull/1941 )
>>
>> Best,
>> Flavio
>>
>> On Wed, Apr 27, 2016 at 5:38 PM, Chesnay Schepler 
>> wrote:
>>
>>> Are you using your modified connector or the currently available one?
>>>
>>>
>>> On 27.04.2016 17:35, Flavio Pompermaier wrote:
>>>
>>> Hi to all,
>>> I'm running a Flink Job on a JDBC datasource and I obtain the following
>>> exception:
>>>
>>> java.lang.RuntimeException: Requesting the next InputSplit failed.
>>> at
>>> org.apache.flink.runtime.taskmanager.TaskInputSplitProvider.getNextInputSplit(TaskInputSplitProvider.java:91)
>>> at
>>> org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:342)
>>> at
>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:137)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>> at java.lang.Thread.run(Thread.java:745)
>>> Caused by: java.util.concurrent.TimeoutException: Futures timed out
>>> after [1 milliseconds]
>>> at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
>>> at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
>>> at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
>>> at
>>> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
>>> at scala.concurrent.Await$.result(package.scala:107)
>>> at scala.concurrent.Await.result(package.scala)
>>> at
>>> org.apache.flink.runtime.taskmanager.TaskInputSplitProvider.getNextInputSplit(TaskInputSplitProvider.java:71)
>>> ... 4 more
>>>
>>> What can be the cause? Is it because the whole DataSource reading has
>>> cannot take more than 1 milliseconds?
>>>
>>> Best,
>>> Flavio
>>>
>>>
>>>
>>
>>
>


Re: Requesting the next InputSplit failed

2016-04-28 Thread Stefano Bortoli
I had this type of exception when trying to build and test Flink on a
"small machine". I worked around the test increasing the timeout for Akka.

https://github.com/stefanobortoli/flink/blob/FLINK-1827/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java

it happened only on my machine (a VirtualBox I use for development), but
not on Flavio's. Is it possible that on load situations the JobManager
slows down a bit too much?

saluti,
Stefano

2016-04-27 17:50 GMT+02:00 Flavio Pompermaier :

> A precursor of the modified connector (since we started a long time ago).
> However the idea is the same, I compute the inputSplits and then I get the
> data split by split (similarly to what it happens in FLINK-3750 -
> https://github.com/apache/flink/pull/1941 )
>
> Best,
> Flavio
>
> On Wed, Apr 27, 2016 at 5:38 PM, Chesnay Schepler 
> wrote:
>
>> Are you using your modified connector or the currently available one?
>>
>>
>> On 27.04.2016 17:35, Flavio Pompermaier wrote:
>>
>> Hi to all,
>> I'm running a Flink Job on a JDBC datasource and I obtain the following
>> exception:
>>
>> java.lang.RuntimeException: Requesting the next InputSplit failed.
>> at
>> org.apache.flink.runtime.taskmanager.TaskInputSplitProvider.getNextInputSplit(TaskInputSplitProvider.java:91)
>> at
>> org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:342)
>> at
>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:137)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>> at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.util.concurrent.TimeoutException: Futures timed out after
>> [1 milliseconds]
>> at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
>> at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
>> at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
>> at
>> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
>> at scala.concurrent.Await$.result(package.scala:107)
>> at scala.concurrent.Await.result(package.scala)
>> at
>> org.apache.flink.runtime.taskmanager.TaskInputSplitProvider.getNextInputSplit(TaskInputSplitProvider.java:71)
>> ... 4 more
>>
>> What can be the cause? Is it because the whole DataSource reading has
>> cannot take more than 1 milliseconds?
>>
>> Best,
>> Flavio
>>
>>
>>
>
>


Re: threads, parallelism and task managers

2016-04-13 Thread Stefano Bortoli
Sounds you are damn right! thanks for the insight, dumb on us for not
checking this before.

saluti,
Stefano

2016-04-13 11:05 GMT+02:00 Stephan Ewen :

> Sounds actually not like a Flink issue. I would look into the commons pool
> docs.
> Maybe they size their pools by default with the number of cores, so the
> pool has only 8 threads, and other requests are queues?
>
> On Wed, Apr 13, 2016 at 10:29 AM, Flavio Pompermaier  > wrote:
>
>> Any feedback about our JDBC InputFormat issue..?
>>
>> On Thu, Apr 7, 2016 at 12:37 PM, Flavio Pompermaier > > wrote:
>>
>>> We've finally created a running example (For Flink 0.10.2) of our
>>> improved JDBC imputformat that you can run from an IDE (it creates an
>>> in-memory derby database with 1000 rows and batch of 10) at
>>> https://gist.github.com/fpompermaier/bcd704abc93b25b6744ac76ac17ed351.
>>> The first time you run the program you have to comment the following
>>> line:
>>>
>>> stmt.executeUpdate("Drop Table users ");
>>>
>>> In your pom declare the following dependencies:
>>>
>>> 
>>> org.apache.derby
>>> derby
>>> 10.10.1.1
>>> 
>>> 
>>> org.apache.commons
>>> commons-pool2
>>> 2.4.2
>>> 
>>>
>>> In my laptop I have 8 cores and if I put parallelism to 16 I expect to
>>> see 16 calls to the connection pool (i.e. ' CREATING
>>> NEW CONNECTION!') while I see only 8 (up to my maximum number of cores).
>>> The number of created task instead is correct (16).
>>>
>>> I hope this could help in understanding where the problem is!
>>>
>>> Best and thank in advance,
>>> Flavio
>>>
>>> On Wed, Mar 30, 2016 at 11:01 AM, Stefano Bortoli 
>>> wrote:
>>>
>>>> Hi Ufuk,
>>>>
>>>> here is our preliminary input formar implementation:
>>>> https://gist.github.com/anonymous/dbf05cad2a6cc07b8aa88e74a2c23119
>>>>
>>>> if you need a running project, I will have to create a test one cause I
>>>> cannot share the current configuration.
>>>>
>>>> thanks a lot in advance!
>>>>
>>>>
>>>>
>>>> 2016-03-30 10:13 GMT+02:00 Ufuk Celebi :
>>>>
>>>>> Do you have the code somewhere online? Maybe someone can have a quick
>>>>> look over it later. I'm pretty sure that is indeed a problem with the
>>>>> custom input format.
>>>>>
>>>>> – Ufuk
>>>>>
>>>>> On Tue, Mar 29, 2016 at 3:50 PM, Stefano Bortoli 
>>>>> wrote:
>>>>> > Perhaps there is a misunderstanding on my side over the parallelism
>>>>> and
>>>>> > split management given a data source.
>>>>> >
>>>>> > We started from the current JDBCInputFormat to make it multi-thread.
>>>>> Then,
>>>>> > given a space of keys, we create the splits based on a fetchsize set
>>>>> as a
>>>>> > parameter. In the open, we get a connection from the pool, and
>>>>> execute a
>>>>> > query using the split interval. This sets the 'resultSet', and then
>>>>> the
>>>>> > DatasourceTask iterates between reachedEnd, next and close. On
>>>>> close, the
>>>>> > connection is returned to the pool. We set parallelism to 32, and we
>>>>> would
>>>>> > expect 32 connection opened but the connections opened are just 8.
>>>>> >
>>>>> > We tried to make an example with the textinputformat, but being a
>>>>> > delimitedinpurformat, the open is called sequentially when
>>>>> statistics are
>>>>> > built, and then the processing is executed in parallel just after
>>>>> all the
>>>>> > open are executed. This is not feasible in our case, because there
>>>>> would be
>>>>> > millions of queries before the statistics are collected.
>>>>> >
>>>>> > Perhaps we are doing something wrong, still to figure out what. :-/
>>>>> >
>>>>> > thanks a lot for your help.
>>>>> >
>>>>> > saluti,
>>>>> > Stefano
>>>>> >
>>>>> >
>>>>> > 2016-03-29 13:30 GMT+02:00 Stefano Bortoli 

Joda DateTimeSerializer

2016-04-08 Thread Stefano Bortoli
Hi to all,
we've just upgraded to Flink 1.0.0 and we had some problems with joda
DateTime serialization.
The problem was caused by Flink-3305 that removed the JavaKaffee dependency.
We had to re-add such dependency in our application and then register the
DateTime serializer in the environment:

env.registerTypeWithKryoSerializer(DateTime.class,
JodaDateTimeSerializer.class );

and in the pom.xml of course (checking compatibility with Flink's Kryo
version that is 2.24.0):

   
de.javakaffee
kryo-serializers
0.28


We didn't see a mention to this problem in the migration guide, I think it
should be added.

Best,
Stefano


Re: threads, parallelism and task managers

2016-03-30 Thread Stefano Bortoli
Hi Ufuk,

here is our preliminary input formar implementation:
https://gist.github.com/anonymous/dbf05cad2a6cc07b8aa88e74a2c23119

if you need a running project, I will have to create a test one cause I
cannot share the current configuration.

thanks a lot in advance!



2016-03-30 10:13 GMT+02:00 Ufuk Celebi :

> Do you have the code somewhere online? Maybe someone can have a quick
> look over it later. I'm pretty sure that is indeed a problem with the
> custom input format.
>
> – Ufuk
>
> On Tue, Mar 29, 2016 at 3:50 PM, Stefano Bortoli 
> wrote:
> > Perhaps there is a misunderstanding on my side over the parallelism and
> > split management given a data source.
> >
> > We started from the current JDBCInputFormat to make it multi-thread.
> Then,
> > given a space of keys, we create the splits based on a fetchsize set as a
> > parameter. In the open, we get a connection from the pool, and execute a
> > query using the split interval. This sets the 'resultSet', and then the
> > DatasourceTask iterates between reachedEnd, next and close. On close, the
> > connection is returned to the pool. We set parallelism to 32, and we
> would
> > expect 32 connection opened but the connections opened are just 8.
> >
> > We tried to make an example with the textinputformat, but being a
> > delimitedinpurformat, the open is called sequentially when statistics are
> > built, and then the processing is executed in parallel just after all the
> > open are executed. This is not feasible in our case, because there would
> be
> > millions of queries before the statistics are collected.
> >
> > Perhaps we are doing something wrong, still to figure out what. :-/
> >
> > thanks a lot for your help.
> >
> > saluti,
> > Stefano
> >
> >
> > 2016-03-29 13:30 GMT+02:00 Stefano Bortoli :
> >>
> >> That is exactly my point. I should have 32 threads running, but I have
> >> only 8. 32 Task are created, but only only 8 are run concurrently.
> Flavio
> >> and I will try to make a simple program to produce the problem. If we
> solve
> >> our issues on the way, we'll let you know.
> >>
> >> thanks a lot anyway.
> >>
> >> saluti,
> >> Stefano
> >>
> >> 2016-03-29 12:44 GMT+02:00 Till Rohrmann :
> >>>
> >>> Then it shouldn’t be a problem. The ExeuctionContetxt is used to run
> >>> futures and their callbacks. But as Ufuk said, each task will spawn
> it’s own
> >>> thread and if you set the parallelism to 32 then you should have 32
> threads
> >>> running.
> >>>
> >>>
> >>> On Tue, Mar 29, 2016 at 12:29 PM, Stefano Bortoli  >
> >>> wrote:
> >>>>
> >>>> In fact, I don't use it. I just had to crawl back the runtime
> >>>> implementation to get to the point where parallelism was switching
> from 32
> >>>> to 8.
> >>>>
> >>>> saluti,
> >>>> Stefano
> >>>>
> >>>> 2016-03-29 12:24 GMT+02:00 Till Rohrmann :
> >>>>>
> >>>>> Hi,
> >>>>>
> >>>>> for what do you use the ExecutionContext? That should actually be
> >>>>> something which you shouldn’t be concerned with since it is only used
> >>>>> internally by the runtime.
> >>>>>
> >>>>> Cheers,
> >>>>> Till
> >>>>>
> >>>>>
> >>>>> On Tue, Mar 29, 2016 at 12:09 PM, Stefano Bortoli <
> s.bort...@gmail.com>
> >>>>> wrote:
> >>>>>>
> >>>>>> Well, in theory yes. Each task has a thread, but only a number is
> run
> >>>>>> in parallel (the job of the scheduler).  Parallelism is set in the
> >>>>>> environment. However, whereas the parallelism parameter is set and
> read
> >>>>>> correctly, when it comes to actual starting of the threads, the
> number is
> >>>>>> fix to 8. We run a debugger to get to the point where the thread was
> >>>>>> started. As Flavio mentioned, the ExecutionContext has the
> parallelims set
> >>>>>> to 8. We have a pool of connections to a RDBS and il logs the
> creation of
> >>>>>> just 8 connections although parallelism is much higher.
> >>>>>>
> >>>>>> My question is whether this is a bug (or a feature) of the
> >&g

Re: threads, parallelism and task managers

2016-03-29 Thread Stefano Bortoli
Perhaps there is a misunderstanding on my side over the parallelism and
split management given a data source.

We started from the current JDBCInputFormat to make it multi-thread. Then,
given a space of keys, we create the splits based on a fetchsize set as a
parameter. In the open, we get a connection from the pool, and execute a
query using the split interval. This sets the 'resultSet', and then the
DatasourceTask iterates between reachedEnd, next and close. On close, the
connection is returned to the pool. We set parallelism to 32, and we would
expect 32 connection opened but the connections opened are just 8.

We tried to make an example with the textinputformat, but being a
delimitedinpurformat, the open is called sequentially when statistics are
built, and then the processing is executed in parallel just after all the
open are executed. This is not feasible in our case, because there would be
millions of queries before the statistics are collected.

Perhaps we are doing something wrong, still to figure out what. :-/

thanks a lot for your help.

saluti,
Stefano


2016-03-29 13:30 GMT+02:00 Stefano Bortoli :

> That is exactly my point. I should have 32 threads running, but I have
> only 8. 32 Task are created, but only only 8 are run concurrently. Flavio
> and I will try to make a simple program to produce the problem. If we solve
> our issues on the way, we'll let you know.
>
> thanks a lot anyway.
>
> saluti,
> Stefano
>
> 2016-03-29 12:44 GMT+02:00 Till Rohrmann :
>
>> Then it shouldn’t be a problem. The ExeuctionContetxt is used to run
>> futures and their callbacks. But as Ufuk said, each task will spawn it’s
>> own thread and if you set the parallelism to 32 then you should have 32
>> threads running.
>> ​
>>
>> On Tue, Mar 29, 2016 at 12:29 PM, Stefano Bortoli 
>> wrote:
>>
>>> In fact, I don't use it. I just had to crawl back the runtime
>>> implementation to get to the point where parallelism was switching from 32
>>> to 8.
>>>
>>> saluti,
>>> Stefano
>>>
>>> 2016-03-29 12:24 GMT+02:00 Till Rohrmann :
>>>
>>>> Hi,
>>>>
>>>> for what do you use the ExecutionContext? That should actually be
>>>> something which you shouldn’t be concerned with since it is only used
>>>> internally by the runtime.
>>>>
>>>> Cheers,
>>>> Till
>>>> ​
>>>>
>>>> On Tue, Mar 29, 2016 at 12:09 PM, Stefano Bortoli 
>>>> wrote:
>>>>
>>>>> Well, in theory yes. Each task has a thread, but only a number is run
>>>>> in parallel (the job of the scheduler).  Parallelism is set in the
>>>>> environment. However, whereas the parallelism parameter is set and read
>>>>> correctly, when it comes to actual starting of the threads, the number is
>>>>> fix to 8. We run a debugger to get to the point where the thread was
>>>>> started. As Flavio mentioned, the ExecutionContext has the parallelims set
>>>>> to 8. We have a pool of connections to a RDBS and il logs the creation of
>>>>> just 8 connections although parallelism is much higher.
>>>>>
>>>>> My question is whether this is a bug (or a feature) of the
>>>>> LocalMiniCluster. :-) I am not scala expert, but I see some variable
>>>>> assignment in setting up of the MiniCluster, involving parallelism and
>>>>> 'default values'. Default values in terms of parallelism are based on the
>>>>> number of cores.
>>>>>
>>>>> thanks a lot for the support!
>>>>>
>>>>> saluti,
>>>>> Stefano
>>>>>
>>>>> 2016-03-29 11:51 GMT+02:00 Ufuk Celebi :
>>>>>
>>>>>> Hey Stefano,
>>>>>>
>>>>>> this should work by setting the parallelism on the environment, e.g.
>>>>>>
>>>>>> env.setParallelism(32)
>>>>>>
>>>>>> Is this what you are doing?
>>>>>>
>>>>>> The task threads are not part of a pool, but each submitted task
>>>>>> creates its own Thread.
>>>>>>
>>>>>> – Ufuk
>>>>>>
>>>>>>
>>>>>> On Fri, Mar 25, 2016 at 9:10 PM, Flavio Pompermaier
>>>>>>  wrote:
>>>>>> > Any help here? I think that the problem is that the JobManager
>>>>>> creates the
>>>>>> > executionContext of the scheduler 

Re: threads, parallelism and task managers

2016-03-29 Thread Stefano Bortoli
That is exactly my point. I should have 32 threads running, but I have only
8. 32 Task are created, but only only 8 are run concurrently. Flavio and I
will try to make a simple program to produce the problem. If we solve our
issues on the way, we'll let you know.

thanks a lot anyway.

saluti,
Stefano

2016-03-29 12:44 GMT+02:00 Till Rohrmann :

> Then it shouldn’t be a problem. The ExeuctionContetxt is used to run
> futures and their callbacks. But as Ufuk said, each task will spawn it’s
> own thread and if you set the parallelism to 32 then you should have 32
> threads running.
> ​
>
> On Tue, Mar 29, 2016 at 12:29 PM, Stefano Bortoli 
> wrote:
>
>> In fact, I don't use it. I just had to crawl back the runtime
>> implementation to get to the point where parallelism was switching from 32
>> to 8.
>>
>> saluti,
>> Stefano
>>
>> 2016-03-29 12:24 GMT+02:00 Till Rohrmann :
>>
>>> Hi,
>>>
>>> for what do you use the ExecutionContext? That should actually be
>>> something which you shouldn’t be concerned with since it is only used
>>> internally by the runtime.
>>>
>>> Cheers,
>>> Till
>>> ​
>>>
>>> On Tue, Mar 29, 2016 at 12:09 PM, Stefano Bortoli 
>>> wrote:
>>>
>>>> Well, in theory yes. Each task has a thread, but only a number is run
>>>> in parallel (the job of the scheduler).  Parallelism is set in the
>>>> environment. However, whereas the parallelism parameter is set and read
>>>> correctly, when it comes to actual starting of the threads, the number is
>>>> fix to 8. We run a debugger to get to the point where the thread was
>>>> started. As Flavio mentioned, the ExecutionContext has the parallelims set
>>>> to 8. We have a pool of connections to a RDBS and il logs the creation of
>>>> just 8 connections although parallelism is much higher.
>>>>
>>>> My question is whether this is a bug (or a feature) of the
>>>> LocalMiniCluster. :-) I am not scala expert, but I see some variable
>>>> assignment in setting up of the MiniCluster, involving parallelism and
>>>> 'default values'. Default values in terms of parallelism are based on the
>>>> number of cores.
>>>>
>>>> thanks a lot for the support!
>>>>
>>>> saluti,
>>>> Stefano
>>>>
>>>> 2016-03-29 11:51 GMT+02:00 Ufuk Celebi :
>>>>
>>>>> Hey Stefano,
>>>>>
>>>>> this should work by setting the parallelism on the environment, e.g.
>>>>>
>>>>> env.setParallelism(32)
>>>>>
>>>>> Is this what you are doing?
>>>>>
>>>>> The task threads are not part of a pool, but each submitted task
>>>>> creates its own Thread.
>>>>>
>>>>> – Ufuk
>>>>>
>>>>>
>>>>> On Fri, Mar 25, 2016 at 9:10 PM, Flavio Pompermaier
>>>>>  wrote:
>>>>> > Any help here? I think that the problem is that the JobManager
>>>>> creates the
>>>>> > executionContext of the scheduler with
>>>>> >
>>>>> >val executionContext = ExecutionContext.fromExecutor(new
>>>>> > ForkJoinPool())
>>>>> >
>>>>> > and thus the number of concurrently running threads is limited to
>>>>> the number
>>>>> > of cores (using the default constructor of the ForkJoinPool).
>>>>> > What do you think?
>>>>> >
>>>>> >
>>>>> > On Wed, Mar 23, 2016 at 6:55 PM, Stefano Bortoli <
>>>>> s.bort...@gmail.com>
>>>>> > wrote:
>>>>> >>
>>>>> >> Hi guys,
>>>>> >>
>>>>> >> I am trying to test a job that should run a number of tasks to read
>>>>> from a
>>>>> >> RDBMS using an improved JDBC connector. The connection and the
>>>>> reading run
>>>>> >> smoothly, but I cannot seem to be able to move above the limit of 8
>>>>> >> concurrent threads running. 8 is of course the number of cores of my
>>>>> >> machine.
>>>>> >>
>>>>> >> I have tried working around configurations and settings, but the
>>>>> Executor
>>>>> >> within the ExecutionContext keeps on having a parallelism of 8.
>>>>> Although, of
>>>>> >> course, the parallelism of the execution environment is much higher
>>>>> (in fact
>>>>> >> I have many more tasks to be allocated).
>>>>> >>
>>>>> >> I feel it may be an issue of the LocalMiniCluster configuration
>>>>> that may
>>>>> >> just override/neglect my wish for higher degree of parallelism. Is
>>>>> there a
>>>>> >> way for me to work around this issue?
>>>>> >>
>>>>> >> please let me know. Thanks a lot for you help! :-)
>>>>> >>
>>>>> >> saluti,
>>>>> >> Stefano
>>>>> >
>>>>> >
>>>>> >
>>>>>
>>>>
>>>>
>>>
>>
>


Re: threads, parallelism and task managers

2016-03-29 Thread Stefano Bortoli
In fact, I don't use it. I just had to crawl back the runtime
implementation to get to the point where parallelism was switching from 32
to 8.

saluti,
Stefano

2016-03-29 12:24 GMT+02:00 Till Rohrmann :

> Hi,
>
> for what do you use the ExecutionContext? That should actually be
> something which you shouldn’t be concerned with since it is only used
> internally by the runtime.
>
> Cheers,
> Till
> ​
>
> On Tue, Mar 29, 2016 at 12:09 PM, Stefano Bortoli 
> wrote:
>
>> Well, in theory yes. Each task has a thread, but only a number is run in
>> parallel (the job of the scheduler).  Parallelism is set in the
>> environment. However, whereas the parallelism parameter is set and read
>> correctly, when it comes to actual starting of the threads, the number is
>> fix to 8. We run a debugger to get to the point where the thread was
>> started. As Flavio mentioned, the ExecutionContext has the parallelims set
>> to 8. We have a pool of connections to a RDBS and il logs the creation of
>> just 8 connections although parallelism is much higher.
>>
>> My question is whether this is a bug (or a feature) of the
>> LocalMiniCluster. :-) I am not scala expert, but I see some variable
>> assignment in setting up of the MiniCluster, involving parallelism and
>> 'default values'. Default values in terms of parallelism are based on the
>> number of cores.
>>
>> thanks a lot for the support!
>>
>> saluti,
>> Stefano
>>
>> 2016-03-29 11:51 GMT+02:00 Ufuk Celebi :
>>
>>> Hey Stefano,
>>>
>>> this should work by setting the parallelism on the environment, e.g.
>>>
>>> env.setParallelism(32)
>>>
>>> Is this what you are doing?
>>>
>>> The task threads are not part of a pool, but each submitted task
>>> creates its own Thread.
>>>
>>> – Ufuk
>>>
>>>
>>> On Fri, Mar 25, 2016 at 9:10 PM, Flavio Pompermaier
>>>  wrote:
>>> > Any help here? I think that the problem is that the JobManager creates
>>> the
>>> > executionContext of the scheduler with
>>> >
>>> >val executionContext = ExecutionContext.fromExecutor(new
>>> > ForkJoinPool())
>>> >
>>> > and thus the number of concurrently running threads is limited to the
>>> number
>>> > of cores (using the default constructor of the ForkJoinPool).
>>> > What do you think?
>>> >
>>> >
>>> > On Wed, Mar 23, 2016 at 6:55 PM, Stefano Bortoli 
>>> > wrote:
>>> >>
>>> >> Hi guys,
>>> >>
>>> >> I am trying to test a job that should run a number of tasks to read
>>> from a
>>> >> RDBMS using an improved JDBC connector. The connection and the
>>> reading run
>>> >> smoothly, but I cannot seem to be able to move above the limit of 8
>>> >> concurrent threads running. 8 is of course the number of cores of my
>>> >> machine.
>>> >>
>>> >> I have tried working around configurations and settings, but the
>>> Executor
>>> >> within the ExecutionContext keeps on having a parallelism of 8.
>>> Although, of
>>> >> course, the parallelism of the execution environment is much higher
>>> (in fact
>>> >> I have many more tasks to be allocated).
>>> >>
>>> >> I feel it may be an issue of the LocalMiniCluster configuration that
>>> may
>>> >> just override/neglect my wish for higher degree of parallelism. Is
>>> there a
>>> >> way for me to work around this issue?
>>> >>
>>> >> please let me know. Thanks a lot for you help! :-)
>>> >>
>>> >> saluti,
>>> >> Stefano
>>> >
>>> >
>>> >
>>>
>>
>>
>


Re: threads, parallelism and task managers

2016-03-29 Thread Stefano Bortoli
Well, in theory yes. Each task has a thread, but only a number is run in
parallel (the job of the scheduler).  Parallelism is set in the
environment. However, whereas the parallelism parameter is set and read
correctly, when it comes to actual starting of the threads, the number is
fix to 8. We run a debugger to get to the point where the thread was
started. As Flavio mentioned, the ExecutionContext has the parallelims set
to 8. We have a pool of connections to a RDBS and il logs the creation of
just 8 connections although parallelism is much higher.

My question is whether this is a bug (or a feature) of the
LocalMiniCluster. :-) I am not scala expert, but I see some variable
assignment in setting up of the MiniCluster, involving parallelism and
'default values'. Default values in terms of parallelism are based on the
number of cores.

thanks a lot for the support!

saluti,
Stefano

2016-03-29 11:51 GMT+02:00 Ufuk Celebi :

> Hey Stefano,
>
> this should work by setting the parallelism on the environment, e.g.
>
> env.setParallelism(32)
>
> Is this what you are doing?
>
> The task threads are not part of a pool, but each submitted task
> creates its own Thread.
>
> – Ufuk
>
>
> On Fri, Mar 25, 2016 at 9:10 PM, Flavio Pompermaier
>  wrote:
> > Any help here? I think that the problem is that the JobManager creates
> the
> > executionContext of the scheduler with
> >
> >val executionContext = ExecutionContext.fromExecutor(new
> > ForkJoinPool())
> >
> > and thus the number of concurrently running threads is limited to the
> number
> > of cores (using the default constructor of the ForkJoinPool).
> > What do you think?
> >
> >
> > On Wed, Mar 23, 2016 at 6:55 PM, Stefano Bortoli 
> > wrote:
> >>
> >> Hi guys,
> >>
> >> I am trying to test a job that should run a number of tasks to read
> from a
> >> RDBMS using an improved JDBC connector. The connection and the reading
> run
> >> smoothly, but I cannot seem to be able to move above the limit of 8
> >> concurrent threads running. 8 is of course the number of cores of my
> >> machine.
> >>
> >> I have tried working around configurations and settings, but the
> Executor
> >> within the ExecutionContext keeps on having a parallelism of 8.
> Although, of
> >> course, the parallelism of the execution environment is much higher (in
> fact
> >> I have many more tasks to be allocated).
> >>
> >> I feel it may be an issue of the LocalMiniCluster configuration that may
> >> just override/neglect my wish for higher degree of parallelism. Is
> there a
> >> way for me to work around this issue?
> >>
> >> please let me know. Thanks a lot for you help! :-)
> >>
> >> saluti,
> >> Stefano
> >
> >
> >
>


threads, parallelism and task managers

2016-03-23 Thread Stefano Bortoli
Hi guys,

I am trying to test a job that should run a number of tasks to read from a
RDBMS using an improved JDBC connector. The connection and the reading run
smoothly, but I cannot seem to be able to move above the limit of 8
concurrent threads running. 8 is of course the number of cores of my
machine.

I have tried working around configurations and settings, but the Executor
within the ExecutionContext keeps on having a parallelism of 8. Although,
of course, the parallelism of the execution environment is much higher (in
fact I have many more tasks to be allocated).

I feel it may be an issue of the LocalMiniCluster configuration that may
just override/neglect my wish for higher degree of parallelism. Is there a
way for me to work around this issue?

please let me know. Thanks a lot for you help! :-)

saluti,
Stefano


Re: Oracle 11g number serialization: classcast problem

2016-03-23 Thread Stefano Bortoli
Thanks for the clarification.

case java.sql.Types.DECIMAL:
reuse.setField(resultSet.getBigDecimal(pos +
1).doubleValue(), pos);
break;

this causes both a nullpointer on null values as well as a double class
cast exception when serializing the tuple.

For the moment, because we have mostly a 'reading problem', we modified the
inputformat to get strings and we output them as CSV.

case java.sql.Types.NUMERIC:
if(resultSet.getBigDecimal(pos + 1)==null)
reuse.setField("", pos);
else reuse.setField(resultSet.getBigDecimal(pos +
1).toPlainString(), pos);
break;

Another problem is that the reading is sequential and does not allow for
splits. When we get a working version that is satisfying, we'll share the
contribution. Our idea is to enable the execution of Sqoop scripts using
Flink. We are testing it on a Oracle table of 11 billion records, but we
did not get through a complete run. We are just at first prototype level,
so there is surely some work to do. :-)

saluti,
Stefano



2016-03-23 10:38 GMT+01:00 Chesnay Schepler :

> On 23.03.2016 10:04, Stefano Bortoli wrote:
>
> I had a look at the JDBC input format, and it does indeed interpret
> BIGDECIMAL and NUMERIC values as double.
>
> This sounds more like a bug actually. Feel free to open a JIRA for this.
>
> The status of the JDBCInputFormat is not adequate for real world use case,
> as for example does not deal with NULL values.
>
> This was already reported in FLINK-3471. To clarify, for NULL fields the
> format fails only if the type is either DECIMAL, NUMERIC, DATE, TIME,
> TIMESTAMP, or SQLXML. Other types should default to 0, empty string or
> false; which actually isn't intended behavior, but caused by JDBC itself.
>
> Defaulting to some value seems the only way to deal with this issue, since
> we can't store null in a Tuple.
>
> I wasn't sure what value DATE, TIME, TIMESTAMP and SQLXML should default
> to, as such i didn't change them yet. I also just dislike the fact that a
> straight copy from A to B will not produce the same table.
>
> However, with little effort we fixed few stuff and now we are getting to
> something usable. We are actually trying to do something a-la sqoop,
> therefore given a boundary query, we create the splits, and then assign it
> to the input format to read the database with configurable parallelism. We
> are still working on it. If we get to something stable and working, we'll
> gladly share it.
>
> saluti,
> Stefano
>
> 2016-03-22 15:46 GMT+01:00 Chesnay Schepler :
>
>> The JDBC formats don't make any assumption as to what DB backend is used.
>>
>> A JDBC float in general is returned as a double, since that was the
>> recommended mapping i found when i wrote the formats.
>>
>> Is the INT returned as a double as well?
>>
>> Note: The (runtime) output type is in no way connected to the TypeInfo
>> you pass when constructing the format.
>>
>>
>> On 21.03.2016 14:16, Stefano Bortoli wrote:
>>
>>> Hi squirrels,
>>>
>>> I working on a flink job connecting to a Oracle DB. I started from the
>>> JDBC example for Derby, and used the TupleTypeInfo to configure the fields
>>> of the tuple as it is read.
>>>
>>> The record of the example has 2 INT, 1 FLOAT and 2 VARCHAR. Apparently,
>>> using Oracle, all the numbers are read as Double, causing a ClassCast
>>> exception. Of course I can fix it by changing the TupleTypeInfo, but I
>>> wonder whether there is some assumption for Oracle and Numbers.
>>>
>>> Thanks a lot for your support!
>>>
>>> saluti,
>>> Stefano
>>>
>>
>>
>
>


Re: Oracle 11g number serialization: classcast problem

2016-03-23 Thread Stefano Bortoli
I had a look at the JDBC input format, and it does indeed interpret
BIGDECIMAL and NUMERIC values as double. The status of the JDBCInputFormat
is not adequate for real world use case, as for example does not deal with
NULL values.

However, with little effort we fixed few stuff and now we are getting to
something usable. We are actually trying to do something a-la sqoop,
therefore given a boundary query, we create the splits, and then assign it
to the input format to read the database with configurable parallelism. We
are still working on it. If we get to something stable and working, we'll
gladly share it.

saluti,
Stefano

2016-03-22 15:46 GMT+01:00 Chesnay Schepler :

> The JDBC formats don't make any assumption as to what DB backend is used.
>
> A JDBC float in general is returned as a double, since that was the
> recommended mapping i found when i wrote the formats.
>
> Is the INT returned as a double as well?
>
> Note: The (runtime) output type is in no way connected to the TypeInfo you
> pass when constructing the format.
>
>
> On 21.03.2016 14:16, Stefano Bortoli wrote:
>
>> Hi squirrels,
>>
>> I working on a flink job connecting to a Oracle DB. I started from the
>> JDBC example for Derby, and used the TupleTypeInfo to configure the fields
>> of the tuple as it is read.
>>
>> The record of the example has 2 INT, 1 FLOAT and 2 VARCHAR. Apparently,
>> using Oracle, all the numbers are read as Double, causing a ClassCast
>> exception. Of course I can fix it by changing the TupleTypeInfo, but I
>> wonder whether there is some assumption for Oracle and Numbers.
>>
>> Thanks a lot for your support!
>>
>> saluti,
>> Stefano
>>
>
>


Oracle 11g number serialization: classcast problem

2016-03-21 Thread Stefano Bortoli
Hi squirrels,

I working on a flink job connecting to a Oracle DB. I started from the JDBC
example for Derby, and used the TupleTypeInfo to configure the fields of
the tuple as it is read.

The record of the example has 2 INT, 1 FLOAT and 2 VARCHAR. Apparently,
using Oracle, all the numbers are read as Double, causing a ClassCast
exception. Of course I can fix it by changing the TupleTypeInfo, but I
wonder whether there is some assumption for Oracle and Numbers.

Thanks a lot for your support!

saluti,
Stefano


Re: Error running an hadoop job from web interface

2015-10-23 Thread Stefano Bortoli
What I normally do is to

java -cp MYUBERJAR.jar my.package.mainclass

does it make sense?

2015-10-23 17:22 GMT+02:00 Flavio Pompermaier :

> could you write ne the command please?I'm not in the office right now..
> On 23 Oct 2015 17:10, "Maximilian Michels"  wrote:
>
>> Could you try submitting the job from the command-line and see if it
>> works?
>>
>> Thanks,
>> Max
>>
>> On Fri, Oct 23, 2015 at 4:42 PM, Flavio Pompermaier > > wrote:
>>
>>> 0.10-snapshot
>>> On 23 Oct 2015 16:09, "Maximilian Michels"  wrote:
>>>
 Hi Flavio,

 Which version of Flink are you using?

 Cheers,
 Max

 On Fri, Oct 23, 2015 at 2:45 PM, Flavio Pompermaier <
 pomperma...@okkam.it> wrote:

> Hi to all,
> I'm trying to run a job from the web interface but I get this error:
>
> java.lang.RuntimeException: java.io.FileNotFoundException: JAR entry 
> core-site.xml not found in /tmp/webclient-jobs/EntitonsJsonizer.jar
>   at 
> org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:2334)
>   at 
> org.apache.hadoop.conf.Configuration.loadResources(Configuration.java:2187)
>   at 
> org.apache.hadoop.conf.Configuration.getProps(Configuration.java:2104)
>   at org.apache.hadoop.conf.Configuration.get(Configuration.java:853)
>   at 
> org.apache.hadoop.mapred.JobConf.checkAndWarnDeprecation(JobConf.java:2088)
>   at org.apache.hadoop.mapred.JobConf.(JobConf.java:446)
>   at org.apache.hadoop.mapreduce.Job.getInstance(Job.java:175)
>   at org.apache.hadoop.mapreduce.Job.getInstance(Job.java:156)
>   at 
> it.okkam.flink.entitons.io.utils.ParquetThriftEntitons.readEntitons(ParquetThriftEntitons.java:42)
>   at 
> it.okkam.flink.entitons.io.utils.ParquetThriftEntitons.readEntitonsWithId(ParquetThriftEntitons.java:73)
>   at 
> org.okkam.entitons.EntitonsJsonizer.readAtomQuads(EntitonsJsonizer.java:235)
>   at org.okkam.entitons.EntitonsJsonizer.main(EntitonsJsonizer.java:119)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:497)
>   at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:395)
>   at 
> org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:80)
>   at 
> org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:220)
>   at org.apache.flink.client.CliFrontend.info(CliFrontend.java:412)
>   at 
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:977)
>   at 
> org.apache.flink.client.web.JobSubmissionServlet.doGet(JobSubmissionServlet.java:171)
>   at javax.servlet.http.HttpServlet.service(HttpServlet.java:734)
>   at javax.servlet.http.HttpServlet.service(HttpServlet.java:847)
>   at 
> org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:532)
>   at 
> org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:453)
>   at 
> org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:227)
>   at 
> org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:965)
>   at 
> org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:388)
>   at 
> org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:187)
>   at 
> org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:901)
>   at 
> org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:117)
>   at 
> org.eclipse.jetty.server.handler.HandlerList.handle(HandlerList.java:47)
>   at 
> org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:113)
>   at org.eclipse.jetty.server.Server.handle(Server.java:352)
>   at 
> org.eclipse.jetty.server.HttpConnection.handleRequest(HttpConnection.java:596)
>   at 
> org.eclipse.jetty.server.HttpConnection$RequestHandler.headerComplete(HttpConnection.java:1048)
>   at org.eclipse.jetty.http.HttpParser.parseNext(HttpParser.java:549)
>   at org.eclipse.jetty.http.HttpParser.parseAvailable(HttpParser.java:211)
>   at 
> org.eclipse.jetty.server.HttpConnection.handle(HttpConnection.java:425)
>   at 
> org.eclipse.jetty.io.nio.SelectChannelEndPoint.run(SelectChannelEndPoint.java:489)
>   at 
> org.eclipse.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:436)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.FileNotFoundException: J

Re: kryo exception due to race condition

2015-10-07 Thread Stefano Bortoli
Perhaps we can put hands on it during the FlinkForward. :-D I have updated
the ticket description finding out that the issue is generated performing a
join just after the cross. See you in Berlin!

saluti,
Stefano

2015-10-06 9:39 GMT+02:00 Till Rohrmann :

> Hi Stefano,
>
> we'll definitely look into it once Flink Forward is over and we've
> finished the current release work. Thanks for reporting the issue.
>
> Cheers,
> Till
>
> On Tue, Oct 6, 2015 at 9:21 AM, Stefano Bortoli  wrote:
>
>> Hi guys, I could manage to complete the process crossing byte arrays I
>> deserialize within the group function. However, I think this workaround is
>> feasible just with relatively simple processes. Any idea/plan about to fix
>> the serialization problem?
>>
>> saluti,
>> Stefano
>>
>> Stefano Bortoli, PhD
>>
>> *ENS Technical Director *___
>> *OKKAM**Srl **- www.okkam.it <http://www.okkam.it/>*
>>
>> *Email:* bort...@okkam.it
>>
>> *Phone nr: +39 0461 1823913 <%2B39%200461%201823913> *
>>
>> *Headquarters:* Trento (Italy), Via Trener 8
>> *Registered office:* Trento (Italy), via Segantini 23
>>
>> Confidentially notice. This e-mail transmission may contain legally
>> privileged and/or confidential information. Please do not read it if you
>> are not the intended recipient(S). Any use, distribution, reproduction or
>> disclosure by any other person is strictly prohibited. If you have received
>> this e-mail in error, please notify the sender and destroy the original
>> transmission and its attachments without reading or saving it in any manner.
>>
>> 2015-10-02 12:05 GMT+02:00 Stefano Bortoli :
>>
>>> I don't know whether it is the same issue, but after switching from my
>>> POJOs to BSONObject I have got a race condition issue with kryo
>>> serialization.
>>> I could complete the process using the byte[], but at this point I
>>> actually need the POJO. I truly believe it is related to the reuse of the
>>> Kryo instance, which is not thread safe.
>>>
>>>
>>> --
>>> 2015-10-02 11:55:26 INFO  JobClient:161 - 10/02/2015 11:55:26
>>> Cross(Cross at main(FlinkMongoHadoop2LinkPOI2CDA.java:138))(4/4) switched
>>> to FAILED
>>> java.lang.IndexOutOfBoundsException: Index: 112, Size: 0
>>> at java.util.ArrayList.rangeCheck(ArrayList.java:635)
>>> at java.util.ArrayList.get(ArrayList.java:411)
>>> at
>>> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
>>> at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
>>> at
>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:210)
>>> at
>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:127)
>>> at
>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>>> at
>>> org.apache.flink.runtime.operators.resettable.AbstractBlockResettableIterator.getNextRecord(AbstractBlockResettableIterator.java:180)
>>> at
>>> org.apache.flink.runtime.operators.resettable.BlockResettableMutableObjectIterator.next(BlockResettableMutableObjectIterator.java:111)
>>> at
>>> org.apache.flink.runtime.operators.CrossDriver.runBlockedOuterSecond(CrossDriver.java:309)
>>> at
>>> org.apache.flink.runtime.operators.CrossDriver.run(CrossDriver.java:162)
>>> at
>>> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:489)
>>> at
>>> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:354)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581)
>>> at java.lang.Thread.run(Thread.java:745)
>>>
>>> 2015-10-02 9:46 GMT+02:00 Stefano Bortoli :
>>>
>>>> here it is: https://issues.apache.org/jira/browse/FLINK-2800
>>>>
>>>> saluti,
>>>> Stefano
>>>>
>>>> 2015-10-01 18:50 GMT+02:00 Stephan Ewen :
>>>>
>>>>> This looks to me like a bug where type registrations are not properly
>>>>> forwarded to all Serializers.
>>>>>
>>>>> Can you open a JIRA tick

Re: kryo exception due to race condition

2015-10-06 Thread Stefano Bortoli
Hi guys, I could manage to complete the process crossing byte arrays I
deserialize within the group function. However, I think this workaround is
feasible just with relatively simple processes. Any idea/plan about to fix
the serialization problem?

saluti,
Stefano

Stefano Bortoli, PhD

*ENS Technical Director *___
*OKKAM**Srl **- www.okkam.it <http://www.okkam.it/>*

*Email:* bort...@okkam.it

*Phone nr: +39 0461 1823913 *

*Headquarters:* Trento (Italy), Via Trener 8
*Registered office:* Trento (Italy), via Segantini 23

Confidentially notice. This e-mail transmission may contain legally
privileged and/or confidential information. Please do not read it if you
are not the intended recipient(S). Any use, distribution, reproduction or
disclosure by any other person is strictly prohibited. If you have received
this e-mail in error, please notify the sender and destroy the original
transmission and its attachments without reading or saving it in any manner.

2015-10-02 12:05 GMT+02:00 Stefano Bortoli :

> I don't know whether it is the same issue, but after switching from my
> POJOs to BSONObject I have got a race condition issue with kryo
> serialization.
> I could complete the process using the byte[], but at this point I
> actually need the POJO. I truly believe it is related to the reuse of the
> Kryo instance, which is not thread safe.
>
>
> --
> 2015-10-02 11:55:26 INFO  JobClient:161 - 10/02/2015 11:55:26
> Cross(Cross at main(FlinkMongoHadoop2LinkPOI2CDA.java:138))(4/4) switched
> to FAILED
> java.lang.IndexOutOfBoundsException: Index: 112, Size: 0
> at java.util.ArrayList.rangeCheck(ArrayList.java:635)
> at java.util.ArrayList.get(ArrayList.java:411)
> at
> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
> at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
> at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:210)
> at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:127)
> at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
> at
> org.apache.flink.runtime.operators.resettable.AbstractBlockResettableIterator.getNextRecord(AbstractBlockResettableIterator.java:180)
> at
> org.apache.flink.runtime.operators.resettable.BlockResettableMutableObjectIterator.next(BlockResettableMutableObjectIterator.java:111)
> at
> org.apache.flink.runtime.operators.CrossDriver.runBlockedOuterSecond(CrossDriver.java:309)
> at
> org.apache.flink.runtime.operators.CrossDriver.run(CrossDriver.java:162)
> at
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:489)
> at
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:354)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581)
> at java.lang.Thread.run(Thread.java:745)
>
> 2015-10-02 9:46 GMT+02:00 Stefano Bortoli :
>
>> here it is: https://issues.apache.org/jira/browse/FLINK-2800
>>
>> saluti,
>> Stefano
>>
>> 2015-10-01 18:50 GMT+02:00 Stephan Ewen :
>>
>>> This looks to me like a bug where type registrations are not properly
>>> forwarded to all Serializers.
>>>
>>> Can you open a JIRA ticket for this?
>>>
>>> On Thu, Oct 1, 2015 at 6:46 PM, Stefano Bortoli 
>>> wrote:
>>>
>>>> Hi guys,
>>>>
>>>> I hit a Kryo exception while running a process 'crossing' POJOs
>>>> datasets. I am using the 0.10-milestone-1.
>>>> Checking the serializer:
>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:210)
>>>>
>>>> I have noticed that the Kryo instance is reused along serialization
>>>> calls (e.g. line 187).  However, Kryo is not threadsafe, and therefore I
>>>> think it may cause the problem due to possible race condition. We had these
>>>> types of issues solved with a KryoFactory implementing a pool. Perhaps it
>>>> should just a matter of calling the
>>>>
>>>> what should I do? Open a ticket?
>>>>
>>>> Thanks a lot guys for the great job!
>>>>
>>>> saluti,
>>>> Stefano
>>>>
>>>> -
>>>> com.esotericsoftware.kryo.KryoException: Encou

Re: data flow example on cluster

2015-10-02 Thread Stefano Bortoli
I had problems running a flink job with maven, probably there is some issue
of classloading. For me worked to run a simple java command with the
uberjar. So I build the jar using maven, and then run it this way

java -Xmx2g -cp target/youruberjar.jar yourclass arg1 arg2

hope it helps,
Stefano

2015-10-02 12:21 GMT+02:00 Lydia Ickler :

> Hi,
>
> I did not create anything by myself.
> I just downloaded the files from here:
> https://github.com/tillrohrmann/flink-perf
>
> And then executed mvn clean install -DskipTests
>
> Then I opened the project within IntelliJ and there it works fine.
> Then I exported it to the cluster that runs with 0.10-SNAPSHOT.
>
>
> Am 02.10.2015 um 12:15 schrieb Stephan Ewen :
>
> @Lydia  Did you create your POM files for your job with an 0.8.x
> quickstart?
>
> Can you try to simply re-create your project's POM files with a new
> quickstart?
>
> I think that the POMS between 0.8-incubating-SNAPSHOT and 0.10-SNAPSHOT
> may not be quite compatible any more...
>
> On Fri, Oct 2, 2015 at 12:07 PM, Robert Metzger 
> wrote:
>
>> Are you relying on a feature only available in 0.10-SNAPSHOT?
>> Otherwise, I would recommend to use the latest stable release (0.9.1) for
>> your flink job and on the cluster.
>>
>> On Fri, Oct 2, 2015 at 11:55 AM, Lydia Ickler 
>> wrote:
>>
>>> Hi,
>>>
>>> but inside the pom of flunk-job is the flink version set to 0.8
>>>
>>> 0.8-incubating-SNAPSHOT
>>>
>>> how can I change it to the newest?
>>> 0.10-SNAPSHOT
>>> Is not working
>>>
>>> Am 02.10.2015 um 11:48 schrieb Robert Metzger :
>>>
>>> I think there is a version mismatch between the Flink version you've
>>> used to compile your job and the Flink version installed on the cluster.
>>>
>>> Maven automagically pulls newer 0.10-SNAPSHOT versions every time you're
>>> building your job.
>>>
>>> On Fri, Oct 2, 2015 at 11:45 AM, Lydia Ickler 
>>> wrote:
>>>
 Hi Till,
 I want to execute your Matrix Completion program „ALSJoin“.

 Locally it works perfect.
 Now I want to execute it on the cluster with:

 run -c com.github.projectflink.als.ALSJoin -cp
 /tmp/icklerly/flink-jobs-0.1-SNAPSHOT.jar 0 2 0.001 10 1 1

 but I get the following error:
 java.lang.NoSuchMethodError:
 org.apache.flink.api.scala.typeutils.CaseClassTypeInfo.(Ljava/lang/Class;Lscala/collection/Seq;Lscala/collection/Seq;)V

 I guess something like the flink-scala-0.10-SNAPSHOT.jar is missing.
 How can I add that to the path?

 Best regards,
 Lydia


>>>
>>>
>>
>
>


Re: kryo exception due to race condition

2015-10-02 Thread Stefano Bortoli
I don't know whether it is the same issue, but after switching from my
POJOs to BSONObject I have got a race condition issue with kryo
serialization.
I could complete the process using the byte[], but at this point I actually
need the POJO. I truly believe it is related to the reuse of the Kryo
instance, which is not thread safe.

--
2015-10-02 11:55:26 INFO  JobClient:161 - 10/02/2015 11:55:26
Cross(Cross at main(FlinkMongoHadoop2LinkPOI2CDA.java:138))(4/4) switched
to FAILED
java.lang.IndexOutOfBoundsException: Index: 112, Size: 0
at java.util.ArrayList.rangeCheck(ArrayList.java:635)
at java.util.ArrayList.get(ArrayList.java:411)
at
com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:210)
at
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:127)
at
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
at
org.apache.flink.runtime.operators.resettable.AbstractBlockResettableIterator.getNextRecord(AbstractBlockResettableIterator.java:180)
at
org.apache.flink.runtime.operators.resettable.BlockResettableMutableObjectIterator.next(BlockResettableMutableObjectIterator.java:111)
at
org.apache.flink.runtime.operators.CrossDriver.runBlockedOuterSecond(CrossDriver.java:309)
at
org.apache.flink.runtime.operators.CrossDriver.run(CrossDriver.java:162)
at
org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:489)
at
org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:354)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581)
at java.lang.Thread.run(Thread.java:745)

2015-10-02 9:46 GMT+02:00 Stefano Bortoli :

> here it is: https://issues.apache.org/jira/browse/FLINK-2800
>
> saluti,
> Stefano
>
> 2015-10-01 18:50 GMT+02:00 Stephan Ewen :
>
>> This looks to me like a bug where type registrations are not properly
>> forwarded to all Serializers.
>>
>> Can you open a JIRA ticket for this?
>>
>> On Thu, Oct 1, 2015 at 6:46 PM, Stefano Bortoli 
>> wrote:
>>
>>> Hi guys,
>>>
>>> I hit a Kryo exception while running a process 'crossing' POJOs
>>> datasets. I am using the 0.10-milestone-1.
>>> Checking the serializer:
>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:210)
>>>
>>> I have noticed that the Kryo instance is reused along serialization
>>> calls (e.g. line 187).  However, Kryo is not threadsafe, and therefore I
>>> think it may cause the problem due to possible race condition. We had these
>>> types of issues solved with a KryoFactory implementing a pool. Perhaps it
>>> should just a matter of calling the
>>>
>>> what should I do? Open a ticket?
>>>
>>> Thanks a lot guys for the great job!
>>>
>>> saluti,
>>> Stefano
>>>
>>> -
>>> com.esotericsoftware.kryo.KryoException: Encountered unregistered class
>>> ID: 114
>>> at
>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
>>> at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>>> at
>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:210)
>>> at
>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:127)
>>> at
>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>>> at
>>> org.apache.flink.runtime.operators.resettable.AbstractBlockResettableIterator.getNextRecord(AbstractBlockResettableIterator.java:180)
>>> at
>>> org.apache.flink.runtime.operators.resettable.BlockResettableMutableObjectIterator.next(BlockResettableMutableObjectIterator.java:111)
>>> at
>>> org.apache.flink.runtime.operators.CrossDriver.runBlockedOuterSecond(CrossDriver.java:309)
>>> at
>>> org.apache.flink.runtime.operators.CrossDriver.run(CrossDriver.java:162)
>>> at
>>> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:489)
>>> at
>>> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:354)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581)
>>> at java.lang.Thread.run(Thread.java:745)
>>>
>>
>>
>


Re: kryo exception due to race condition

2015-10-02 Thread Stefano Bortoli
here it is: https://issues.apache.org/jira/browse/FLINK-2800

saluti,
Stefano

2015-10-01 18:50 GMT+02:00 Stephan Ewen :

> This looks to me like a bug where type registrations are not properly
> forwarded to all Serializers.
>
> Can you open a JIRA ticket for this?
>
> On Thu, Oct 1, 2015 at 6:46 PM, Stefano Bortoli 
> wrote:
>
>> Hi guys,
>>
>> I hit a Kryo exception while running a process 'crossing' POJOs datasets.
>> I am using the 0.10-milestone-1.
>> Checking the serializer:
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:210)
>>
>> I have noticed that the Kryo instance is reused along serialization calls
>> (e.g. line 187).  However, Kryo is not threadsafe, and therefore I think it
>> may cause the problem due to possible race condition. We had these types of
>> issues solved with a KryoFactory implementing a pool. Perhaps it should
>> just a matter of calling the
>>
>> what should I do? Open a ticket?
>>
>> Thanks a lot guys for the great job!
>>
>> saluti,
>> Stefano
>>
>> -
>> com.esotericsoftware.kryo.KryoException: Encountered unregistered class
>> ID: 114
>> at
>> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
>> at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>> at
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:210)
>> at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:127)
>> at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>> at
>> org.apache.flink.runtime.operators.resettable.AbstractBlockResettableIterator.getNextRecord(AbstractBlockResettableIterator.java:180)
>> at
>> org.apache.flink.runtime.operators.resettable.BlockResettableMutableObjectIterator.next(BlockResettableMutableObjectIterator.java:111)
>> at
>> org.apache.flink.runtime.operators.CrossDriver.runBlockedOuterSecond(CrossDriver.java:309)
>> at
>> org.apache.flink.runtime.operators.CrossDriver.run(CrossDriver.java:162)
>> at
>> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:489)
>> at
>> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:354)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581)
>> at java.lang.Thread.run(Thread.java:745)
>>
>
>


kryo exception due to race condition

2015-10-01 Thread Stefano Bortoli
Hi guys,

I hit a Kryo exception while running a process 'crossing' POJOs datasets. I
am using the 0.10-milestone-1.
Checking the serializer:
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:210)

I have noticed that the Kryo instance is reused along serialization calls
(e.g. line 187).  However, Kryo is not threadsafe, and therefore I think it
may cause the problem due to possible race condition. We had these types of
issues solved with a KryoFactory implementing a pool. Perhaps it should
just a matter of calling the

what should I do? Open a ticket?

Thanks a lot guys for the great job!

saluti,
Stefano

-
com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID:
114
at
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:210)
at
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:127)
at
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
at
org.apache.flink.runtime.operators.resettable.AbstractBlockResettableIterator.getNextRecord(AbstractBlockResettableIterator.java:180)
at
org.apache.flink.runtime.operators.resettable.BlockResettableMutableObjectIterator.next(BlockResettableMutableObjectIterator.java:111)
at
org.apache.flink.runtime.operators.CrossDriver.runBlockedOuterSecond(CrossDriver.java:309)
at
org.apache.flink.runtime.operators.CrossDriver.run(CrossDriver.java:162)
at
org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:489)
at
org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:354)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581)
at java.lang.Thread.run(Thread.java:745)


Re: starting flink job from bash script with maven

2015-07-24 Thread Stefano Bortoli
It seems there is a problem with the maven class loading. I have created
the uberjar and then executed with traditional java -cp uberjar.jar args
and it worked with no problems. It could be interesting to investigate the
reason, as maven exec is very convenient. However, with the uberjar the
problems of classpath are eased, so I can live with it.

thanks a lot for your support.

saluti,
Stefano

2015-07-24 11:17 GMT+02:00 Stefano Bortoli :

> HI Stephan,
>
> I think I may have found a possible root of the problem. I do not build
> the fat jar, I simply execute the main with maven exec:java with default
> install and compile. No uberjar created shading. I will try that and
> report. The fact that it runs in eclipse so easily makes it confusing
> somehow.
>
> saluti,
> Stefano
>
> 2015-07-24 11:09 GMT+02:00 Stephan Ewen :
>
>> Hi!
>>
>> There is probably something going wrong in MongoOutputFormat or 
>> MongoHadoop2OutputFormat.
>> Something fails, but Java swallows the problem during Serialization.
>>
>> It may be a classloading issue that gets not reported. Are the
>> MongoOutputFormat and the MongoHadoop2OutputFormat both in the fat jar?
>> If not, try putting them in there.
>>
>> The last check we could to (to validate the Flink Serialization
>> utilities) is the code pasted below. If that does not cause the error, it
>> is probably the issue described above.
>>
>> Greetings,
>> Stephan
>>
>>
>> --
>>
>> UserCodeObjectWrapper userCode = new
>> UserCodeObjectWrapper(new MongoHadoop2OutputFormat<>(new
>> MongoOutputFormat<>(), Job.getInstance()));
>> Configuration cfg = new Configuration();
>> TaskConfig taskConfig = new TaskConfig(cfg);
>> taskConfig.setStubWrapper(userCode);
>> taskConfig.getStubWrapper(ClassLoader.getSystemClassLoader());
>>
>>
>>
>> On Fri, Jul 24, 2015 at 10:44 AM, Stefano Bortoli 
>> wrote:
>>
>>> I have implemented this test without any exception:
>>>
>>> package org.tagcloud.persistence.batch.test;
>>>
>>> import java.io.IOException;
>>>
>>> import org.apache.commons.lang.SerializationUtils;
>>> import org.apache.hadoop.mapreduce.Job;
>>> import org.tagcloud.persistence.batch.MongoHadoop2OutputFormat;
>>>
>>> import com.mongodb.hadoop.MongoOutputFormat;
>>>
>>> public class MongoHadoopSerializationTest {
>>>
>>> public static void main(String[] args) {
>>> Job job;
>>> try {
>>> job = Job.getInstance();
>>> SerializationUtils.clone(new MongoHadoop2OutputFormat<>(new
>>> MongoOutputFormat<>(), job));
>>> } catch (IOException e) {
>>> e.printStackTrace();
>>> }
>>>
>>> }
>>>
>>> }
>>>
>>> 2015-07-24 10:01 GMT+02:00 Stephan Ewen :
>>>
>>>> Hi!
>>>>
>>>> The user code object (the output format here) has a corrupt
>>>> serialization routine.
>>>>
>>>> We use default Java Serialization for these objects. Either the 
>>>> MongoHadoopOutputFormat
>>>> cannot be serialized and swallows an exception, or it overrides the
>>>> readObject() / writeObject() methods (from Java Serialization) in an
>>>> inconsistent way.
>>>>
>>>> To figure that out, can you try whether you can manually serialize the
>>>> MongoHadoopOutputFormat?
>>>>
>>>> Can you try and call "SerializationUtils.clone(new
>>>> MongoHadoopOutputFormat)", for example at the beginning of your main
>>>> method? The SerializationUtils are part of Apache Commons and are probably
>>>> in your class path anyways.
>>>>
>>>> Stephan
>>>>
>>>>
>>>> On Fri, Jul 24, 2015 at 9:51 AM, Stefano Bortoli 
>>>> wrote:
>>>>
>>>>> Hi guys!
>>>>>
>>>>> I could program a data maintenance job using Flink on MongoDB. The job
>>>>> runs smoothly if I start it from eclipse. However, when I try to run it
>>>>> using a bash script invoking a maven exec:java I have a serialization
>>>>> exception:
>>>>> org.apache.flink.runtime.client.JobExecutionException: Cannot
>>>>> initialize task 'DataSink
>>>>> (org.tagcloud.persistence.batch.MongoHadoopOutputFormat@19b88891)':
>>>>> 

Re: starting flink job from bash script with maven

2015-07-24 Thread Stefano Bortoli
HI Stephan,

I think I may have found a possible root of the problem. I do not build the
fat jar, I simply execute the main with maven exec:java with default
install and compile. No uberjar created shading. I will try that and
report. The fact that it runs in eclipse so easily makes it confusing
somehow.

saluti,
Stefano

2015-07-24 11:09 GMT+02:00 Stephan Ewen :

> Hi!
>
> There is probably something going wrong in MongoOutputFormat or 
> MongoHadoop2OutputFormat.
> Something fails, but Java swallows the problem during Serialization.
>
> It may be a classloading issue that gets not reported. Are the
> MongoOutputFormat and the MongoHadoop2OutputFormat both in the fat jar?
> If not, try putting them in there.
>
> The last check we could to (to validate the Flink Serialization utilities)
> is the code pasted below. If that does not cause the error, it is probably
> the issue described above.
>
> Greetings,
> Stephan
>
>
> --
>
> UserCodeObjectWrapper userCode = new
> UserCodeObjectWrapper(new MongoHadoop2OutputFormat<>(new
> MongoOutputFormat<>(), Job.getInstance()));
> Configuration cfg = new Configuration();
> TaskConfig taskConfig = new TaskConfig(cfg);
> taskConfig.setStubWrapper(userCode);
> taskConfig.getStubWrapper(ClassLoader.getSystemClassLoader());
>
>
>
> On Fri, Jul 24, 2015 at 10:44 AM, Stefano Bortoli 
> wrote:
>
>> I have implemented this test without any exception:
>>
>> package org.tagcloud.persistence.batch.test;
>>
>> import java.io.IOException;
>>
>> import org.apache.commons.lang.SerializationUtils;
>> import org.apache.hadoop.mapreduce.Job;
>> import org.tagcloud.persistence.batch.MongoHadoop2OutputFormat;
>>
>> import com.mongodb.hadoop.MongoOutputFormat;
>>
>> public class MongoHadoopSerializationTest {
>>
>> public static void main(String[] args) {
>> Job job;
>> try {
>> job = Job.getInstance();
>> SerializationUtils.clone(new MongoHadoop2OutputFormat<>(new
>> MongoOutputFormat<>(), job));
>> } catch (IOException e) {
>> e.printStackTrace();
>> }
>>
>> }
>>
>> }
>>
>> 2015-07-24 10:01 GMT+02:00 Stephan Ewen :
>>
>>> Hi!
>>>
>>> The user code object (the output format here) has a corrupt
>>> serialization routine.
>>>
>>> We use default Java Serialization for these objects. Either the 
>>> MongoHadoopOutputFormat
>>> cannot be serialized and swallows an exception, or it overrides the
>>> readObject() / writeObject() methods (from Java Serialization) in an
>>> inconsistent way.
>>>
>>> To figure that out, can you try whether you can manually serialize the
>>> MongoHadoopOutputFormat?
>>>
>>> Can you try and call "SerializationUtils.clone(new
>>> MongoHadoopOutputFormat)", for example at the beginning of your main
>>> method? The SerializationUtils are part of Apache Commons and are probably
>>> in your class path anyways.
>>>
>>> Stephan
>>>
>>>
>>> On Fri, Jul 24, 2015 at 9:51 AM, Stefano Bortoli 
>>> wrote:
>>>
>>>> Hi guys!
>>>>
>>>> I could program a data maintenance job using Flink on MongoDB. The job
>>>> runs smoothly if I start it from eclipse. However, when I try to run it
>>>> using a bash script invoking a maven exec:java I have a serialization
>>>> exception:
>>>> org.apache.flink.runtime.client.JobExecutionException: Cannot
>>>> initialize task 'DataSink
>>>> (org.tagcloud.persistence.batch.MongoHadoopOutputFormat@19b88891)':
>>>> Deserializing the OutputFormat
>>>> (org.tagcloud.persistence.batch.MongoHadoopOutputFormat@19b88891)
>>>> failed: Could not read the user code wrapper: unexpected block data
>>>> at
>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:523)
>>>> at
>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:507)
>>>>
>>>> attached the complete stack trace. I thought it was a matter of
>>>> serializable classes, so I have made all my classes serializable.. still I
>>>> have the same error. Perhaps it is not possible to do these things with
>>>> Flink.
>>>>
>>&g

Re: starting flink job from bash script with maven

2015-07-24 Thread Stefano Bortoli
I have implemented this test without any exception:

package org.tagcloud.persistence.batch.test;

import java.io.IOException;

import org.apache.commons.lang.SerializationUtils;
import org.apache.hadoop.mapreduce.Job;
import org.tagcloud.persistence.batch.MongoHadoop2OutputFormat;

import com.mongodb.hadoop.MongoOutputFormat;

public class MongoHadoopSerializationTest {

public static void main(String[] args) {
Job job;
try {
job = Job.getInstance();
SerializationUtils.clone(new MongoHadoop2OutputFormat<>(new
MongoOutputFormat<>(), job));
} catch (IOException e) {
e.printStackTrace();
}

}

}

2015-07-24 10:01 GMT+02:00 Stephan Ewen :

> Hi!
>
> The user code object (the output format here) has a corrupt serialization
> routine.
>
> We use default Java Serialization for these objects. Either the 
> MongoHadoopOutputFormat
> cannot be serialized and swallows an exception, or it overrides the
> readObject() / writeObject() methods (from Java Serialization) in an
> inconsistent way.
>
> To figure that out, can you try whether you can manually serialize the
> MongoHadoopOutputFormat?
>
> Can you try and call "SerializationUtils.clone(new MongoHadoopOutputFormat
> )", for example at the beginning of your main method? The
> SerializationUtils are part of Apache Commons and are probably in your
> class path anyways.
>
> Stephan
>
>
> On Fri, Jul 24, 2015 at 9:51 AM, Stefano Bortoli  wrote:
>
>> Hi guys!
>>
>> I could program a data maintenance job using Flink on MongoDB. The job
>> runs smoothly if I start it from eclipse. However, when I try to run it
>> using a bash script invoking a maven exec:java I have a serialization
>> exception:
>> org.apache.flink.runtime.client.JobExecutionException: Cannot initialize
>> task 'DataSink
>> (org.tagcloud.persistence.batch.MongoHadoopOutputFormat@19b88891)':
>> Deserializing the OutputFormat
>> (org.tagcloud.persistence.batch.MongoHadoopOutputFormat@19b88891)
>> failed: Could not read the user code wrapper: unexpected block data
>> at
>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:523)
>> at
>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:507)
>>
>> attached the complete stack trace. I thought it was a matter of
>> serializable classes, so I have made all my classes serializable.. still I
>> have the same error. Perhaps it is not possible to do these things with
>> Flink.
>>
>> any intuition? is it doable?
>>
>> thanks a lot for your support. :-)
>>
>> saluti,
>>
>> Stefano Bortoli, PhD
>>
>> *ENS Technical Director *___
>> *OKKAM**Srl **- www.okkam.it <http://www.okkam.it/>*
>>
>> *Email:* bort...@okkam.it
>>
>> *Phone nr: +39 0461 1823912 <%2B39%200461%201823912> *
>>
>> *Headquarters:* Trento (Italy), Via Trener 8
>> *Registered office:* Trento (Italy), via Segantini 23
>>
>> Confidentially notice. This e-mail transmission may contain legally
>> privileged and/or confidential information. Please do not read it if you
>> are not the intended recipient(S). Any use, distribution, reproduction or
>> disclosure by any other person is strictly prohibited. If you have received
>> this e-mail in error, please notify the sender and destroy the original
>> transmission and its attachments without reading or saving it in any manner.
>>
>>
>


starting flink job from bash script with maven

2015-07-24 Thread Stefano Bortoli
Hi guys!

I could program a data maintenance job using Flink on MongoDB. The job runs
smoothly if I start it from eclipse. However, when I try to run it using a
bash script invoking a maven exec:java I have a serialization exception:
org.apache.flink.runtime.client.JobExecutionException: Cannot initialize
task 'DataSink
(org.tagcloud.persistence.batch.MongoHadoopOutputFormat@19b88891)':
Deserializing the OutputFormat
(org.tagcloud.persistence.batch.MongoHadoopOutputFormat@19b88891) failed:
Could not read the user code wrapper: unexpected block data
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:523)
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:507)

attached the complete stack trace. I thought it was a matter of
serializable classes, so I have made all my classes serializable.. still I
have the same error. Perhaps it is not possible to do these things with
Flink.

any intuition? is it doable?

thanks a lot for your support. :-)

saluti,

Stefano Bortoli, PhD

*ENS Technical Director *___
*OKKAM**Srl **- www.okkam.it <http://www.okkam.it/>*

*Email:* bort...@okkam.it

*Phone nr: +39 0461 1823912 *

*Headquarters:* Trento (Italy), Via Trener 8
*Registered office:* Trento (Italy), via Segantini 23

Confidentially notice. This e-mail transmission may contain legally
privileged and/or confidential information. Please do not read it if you
are not the intended recipient(S). Any use, distribution, reproduction or
disclosure by any other person is strictly prohibited. If you have received
this e-mail in error, please notify the sender and destroy the original
transmission and its attachments without reading or saving it in any manner.
okkam@okkamVM:~/git/flink-batch-processor/flink-batch-processor/scripts$ 
./mongo_quality_analysis.sh ./../ mongodb://localhost:27017/tagcloud.entitonstmp
moving to folder  ./../  to analyze quality of MongoDB collection  
mongodb://localhost:27017/tagcloud.entitonstmp
[INFO] Scanning for projects...
[INFO] 
[INFO] 
[INFO] Building Apache Flink Batch Processor 1.1-SNAPSHOT
[INFO] 
[INFO] 
[INFO] --- maven-clean-plugin:2.3:clean (default-clean) @ flink-batch-processor 
---
[INFO] Deleting file set: 
/home/okkam/git/flink-batch-processor/flink-batch-processor/target (included: 
[**], excluded: [])
[INFO] 
[INFO] --- maven-resources-plugin:2.3:resources (default-resources) @ 
flink-batch-processor ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] Copying 1 resource
[INFO] 
[INFO] --- maven-compiler-plugin:3.1:compile (default-compile) @ 
flink-batch-processor ---
[INFO] Changes detected - recompiling the module!
[INFO] Compiling 15 source files to 
/home/okkam/git/flink-batch-processor/flink-batch-processor/target/classes
[WARNING] 
/home/okkam/git/flink-batch-processor/flink-batch-processor/src/main/java/org/tagcloud/persistence/batch/quality/operator/QualityCompletenessAnalyzerFlatMap.java:
 Some input files use unchecked or unsafe operations.
[WARNING] 
/home/okkam/git/flink-batch-processor/flink-batch-processor/src/main/java/org/tagcloud/persistence/batch/quality/operator/QualityCompletenessAnalyzerFlatMap.java:
 Recompile with -Xlint:unchecked for details.
[INFO] 
[INFO] --- maven-resources-plugin:2.3:testResources (default-testResources) @ 
flink-batch-processor ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] Copying 0 resource
[INFO] 
[INFO] --- maven-compiler-plugin:3.1:testCompile (default-testCompile) @ 
flink-batch-processor ---
[INFO] Nothing to compile - all classes are up to date
[INFO] 
[INFO] --- maven-surefire-plugin:2.10:test (default-test) @ 
flink-batch-processor ---
[INFO] Tests are skipped.
[INFO] 
[INFO] --- maven-jar-plugin:2.2:jar (default-jar) @ flink-batch-processor ---
[INFO] Building jar: 
/home/okkam/git/flink-batch-processor/flink-batch-processor/target/flink-batch-processor-1.1-SNAPSHOT.jar
[INFO] 
[INFO] --- maven-install-plugin:2.3:install (default-install) @ 
flink-batch-processor ---
[INFO] Installing 
/home/okkam/git/flink-batch-processor/flink-batch-processor/target/flink-batch-processor-1.1-SNAPSHOT.jar
 to 
/home/okkam/.m2/repository/org/tagcloud/persistence/flink-batch-processor/1.1-SNAPSHOT/flink-batch-processor-1.1-SNAPSHOT.jar
[INFO] Installing 
/home/okkam/git/flink-batch-processor/flink-batch-processor/pom.xml to 
/home/okkam/.m2/repository/org/tagcloud/persistence/flink-batch-processor/1.1-SNAPSHOT/flink-batch-processor-1.1-SNAPSHOT.pom
[INFO] ---

Re: MongoOutputFormat does not write back to collection

2015-07-23 Thread Stefano Bortoli
Yes it does. :-) I have implemented it with Hadoop1 and Hadoop2.
Essentially I have extended the HadoopOutputFormat reusing part of the code
of the HadoopOutputFormatBase, and set the MongoOutputCommiter to replace
the FileOutputCommitter.

saluti,
Stefano



Stefano Bortoli, PhD

*ENS Technical Director *___
*OKKAM**Srl **- www.okkam.it <http://www.okkam.it/>*

*Email:* bort...@okkam.it

*Phone nr: +39 0461 1823912 *

*Headquarters:* Trento (Italy), Via Trener 8
*Registered office:* Trento (Italy), via Segantini 23

Confidentially notice. This e-mail transmission may contain legally
privileged and/or confidential information. Please do not read it if you
are not the intended recipient(S). Any use, distribution, reproduction or
disclosure by any other person is strictly prohibited. If you have received
this e-mail in error, please notify the sender and destroy the original
transmission and its attachments without reading or saving it in any manner.

2015-07-23 13:31 GMT+02:00 Stephan Ewen :

> Does this make the MongoHadoopOutputFormat work for you?
>
> On Thu, Jul 23, 2015 at 12:44 PM, Stefano Bortoli 
> wrote:
>
>> https://issues.apache.org/jira/browse/FLINK-2394?filter=-2
>>
>> Meanwhile, I have implemented the MongoHadoopOutputFormat overriding
>> open, close and globalFinalize methods.
>>
>> saluti,
>> Stefano
>>
>> 2015-07-22 17:11 GMT+02:00 Stephan Ewen :
>>
>>> Thank's for reporting this, Stefano!
>>>
>>> Seems like the HadoopOutputFormat wrapper is pretty much specialized on
>>> File Output Formats.
>>>
>>> Can you open an issue for that? Someone will need to look into this...
>>>
>>> On Wed, Jul 22, 2015 at 4:48 PM, Stefano Bortoli 
>>> wrote:
>>>
>>>> In fact, on close() of the HadoopOutputFormat the fileOutputCommitter
>>>> returns false on if
>>>> (this.fileOutputCommitter.needsTaskCommit(this.context)) returns false.
>>>>
>>>> i/**
>>>>  * commit the task by moving the output file out from the temporary
>>>> directory.
>>>>  * @throws java.io.IOException
>>>>  */
>>>> @Override
>>>> public void close() throws IOException {
>>>> this.recordWriter.close(new HadoopDummyReporter());
>>>>
>>>> if (this.fileOutputCommitter.needsTaskCommit(this.context)) {
>>>> this.fileOutputCommitter.commitTask(this.context);
>>>> }
>>>> }
>>>>
>>>>
>>>> Also, both the close and the finalize global use a FileOutputCommitter,
>>>> and never the MongoOutputCommitter
>>>>
>>>> @Override
>>>> public void finalizeGlobal(int parallelism) throws IOException {
>>>>
>>>> try {
>>>> JobContext jobContext =
>>>> HadoopUtils.instantiateJobContext(this.jobConf, new JobID());
>>>> FileOutputCommitter fileOutputCommitter = new
>>>> FileOutputCommitter();
>>>>
>>>> // finalize HDFS output format
>>>> fileOutputCommitter.commitJob(jobContext);
>>>> } catch (Exception e) {
>>>> throw new RuntimeException(e);
>>>> }
>>>> }
>>>>
>>>> anyone can have a look into that?
>>>>
>>>> saluti,
>>>> Stefano
>>>>
>>>> 2015-07-22 15:53 GMT+02:00 Stefano Bortoli :
>>>>
>>>>> Debugging, it seem the commitTask method of the MongoOutputCommitter
>>>>> is never called. Is it possible that this 'bulk' approach of mongo-hadoop
>>>>> 1.4 does not fit the task execution method of Flink?
>>>>>
>>>>> any idea? thanks a lot in advance.
>>>>>
>>>>> saluti,
>>>>> Stefano
>>>>>
>>>>> Stefano Bortoli, PhD
>>>>>
>>>>> *ENS Technical Director *
>>>>> ___
>>>>> *OKKAM**Srl **- www.okkam.it <http://www.okkam.it/>*
>>>>>
>>>>> *Email:* bort...@okkam.it
>>>>>
>>>>> *Phone nr: +39 0461 1823912 <%2B39%200461%201823912> *
>>>>>
>>>>> *Headquarters:* Trento (Italy), Via Trener 8
>>>>> *Registered office:* Trento (Italy), via Segantini 23
>>>>>
>>>>> Confidentially noti

Re: MongoOutputFormat does not write back to collection

2015-07-23 Thread Stefano Bortoli
https://issues.apache.org/jira/browse/FLINK-2394?filter=-2

Meanwhile, I have implemented the MongoHadoopOutputFormat overriding open,
close and globalFinalize methods.

saluti,
Stefano

2015-07-22 17:11 GMT+02:00 Stephan Ewen :

> Thank's for reporting this, Stefano!
>
> Seems like the HadoopOutputFormat wrapper is pretty much specialized on
> File Output Formats.
>
> Can you open an issue for that? Someone will need to look into this...
>
> On Wed, Jul 22, 2015 at 4:48 PM, Stefano Bortoli 
> wrote:
>
>> In fact, on close() of the HadoopOutputFormat the fileOutputCommitter
>> returns false on if
>> (this.fileOutputCommitter.needsTaskCommit(this.context)) returns false.
>>
>> i/**
>>  * commit the task by moving the output file out from the temporary
>> directory.
>>  * @throws java.io.IOException
>>  */
>> @Override
>> public void close() throws IOException {
>> this.recordWriter.close(new HadoopDummyReporter());
>>
>> if (this.fileOutputCommitter.needsTaskCommit(this.context)) {
>> this.fileOutputCommitter.commitTask(this.context);
>> }
>> }
>>
>>
>> Also, both the close and the finalize global use a FileOutputCommitter,
>> and never the MongoOutputCommitter
>>
>> @Override
>> public void finalizeGlobal(int parallelism) throws IOException {
>>
>> try {
>> JobContext jobContext =
>> HadoopUtils.instantiateJobContext(this.jobConf, new JobID());
>> FileOutputCommitter fileOutputCommitter = new
>> FileOutputCommitter();
>>
>> // finalize HDFS output format
>> fileOutputCommitter.commitJob(jobContext);
>> } catch (Exception e) {
>> throw new RuntimeException(e);
>> }
>> }
>>
>> anyone can have a look into that?
>>
>> saluti,
>> Stefano
>>
>> 2015-07-22 15:53 GMT+02:00 Stefano Bortoli :
>>
>>> Debugging, it seem the commitTask method of the MongoOutputCommitter is
>>> never called. Is it possible that this 'bulk' approach of mongo-hadoop 1.4
>>> does not fit the task execution method of Flink?
>>>
>>> any idea? thanks a lot in advance.
>>>
>>> saluti,
>>> Stefano
>>>
>>> Stefano Bortoli, PhD
>>>
>>> *ENS Technical Director *___
>>> *OKKAM**Srl **- www.okkam.it <http://www.okkam.it/>*
>>>
>>> *Email:* bort...@okkam.it
>>>
>>> *Phone nr: +39 0461 1823912 <%2B39%200461%201823912> *
>>>
>>> *Headquarters:* Trento (Italy), Via Trener 8
>>> *Registered office:* Trento (Italy), via Segantini 23
>>>
>>> Confidentially notice. This e-mail transmission may contain legally
>>> privileged and/or confidential information. Please do not read it if you
>>> are not the intended recipient(S). Any use, distribution, reproduction or
>>> disclosure by any other person is strictly prohibited. If you have received
>>> this e-mail in error, please notify the sender and destroy the original
>>> transmission and its attachments without reading or saving it in any manner.
>>>
>>> 2015-07-22 14:26 GMT+02:00 Stefano Bortoli :
>>>
>>>> Hi,
>>>>
>>>> I am trying to analyze and update a MongoDB collection with Apache
>>>> Flink 0.9.0 and Mongo Hadoop 1.4.0 Hadoop 2.6.0.
>>>>
>>>> The process is fairly simple, and the MongoInputFormat works smoothly,
>>>> however it does not write back to the collection. The process works,
>>>> because the writeAsText works as expected. I am quite puzzled because
>>>> debugging I can see it writes in some temporary directory.
>>>>
>>>> The mapred.output.uri seem to serve just to output a file named
>>>> _SUCCESS, and if I do not set it fails with
>>>> java.lang.IllegalArgumentException: Can not create a Path from a null
>>>> string
>>>> at org.apache.hadoop.fs.Path.checkPathArg(Path.java:123)
>>>> at org.apache.hadoop.fs.Path.(Path.java:135)
>>>> at
>>>> org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormatBase.open(HadoopOutputFormatBase.java:108)
>>>> at
>>>> org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:186)
>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>> at java.l

Re: MongoOutputFormat does not write back to collection

2015-07-22 Thread Stefano Bortoli
A simple solution would be to:

1 - create a constructor of HadoopOutputFormatBase and HadoopOutputFormat
that gets the OutputCommitter as a parameter
2 - change the outputCommitter field of HadoopOutputFormatBase to be a
generic OutputCommitter
3 - remove the default assignment in the open() and finalizeGlobal to the
outputCommitter to FileOutputCommitter(), or keep it as a default in case
of no specific assignment.

saluti,
Stefano

2015-07-22 16:48 GMT+02:00 Stefano Bortoli :

> In fact, on close() of the HadoopOutputFormat the fileOutputCommitter
> returns false on if
> (this.fileOutputCommitter.needsTaskCommit(this.context)) returns false.
>
> i/**
>  * commit the task by moving the output file out from the temporary
> directory.
>  * @throws java.io.IOException
>  */
> @Override
> public void close() throws IOException {
> this.recordWriter.close(new HadoopDummyReporter());
>
> if (this.fileOutputCommitter.needsTaskCommit(this.context)) {
> this.fileOutputCommitter.commitTask(this.context);
> }
> }
>
>
> Also, both the close and the finalize global use a FileOutputCommitter,
> and never the MongoOutputCommitter
>
> @Override
> public void finalizeGlobal(int parallelism) throws IOException {
>
> try {
> JobContext jobContext =
> HadoopUtils.instantiateJobContext(this.jobConf, new JobID());
> FileOutputCommitter fileOutputCommitter = new
> FileOutputCommitter();
>
> // finalize HDFS output format
> fileOutputCommitter.commitJob(jobContext);
> } catch (Exception e) {
> throw new RuntimeException(e);
>     }
>     }
>
> anyone can have a look into that?
>
> saluti,
> Stefano
>
> 2015-07-22 15:53 GMT+02:00 Stefano Bortoli :
>
>> Debugging, it seem the commitTask method of the MongoOutputCommitter is
>> never called. Is it possible that this 'bulk' approach of mongo-hadoop 1.4
>> does not fit the task execution method of Flink?
>>
>> any idea? thanks a lot in advance.
>>
>> saluti,
>> Stefano
>>
>> Stefano Bortoli, PhD
>>
>> *ENS Technical Director *___
>> *OKKAM**Srl **- www.okkam.it <http://www.okkam.it/>*
>>
>> *Email:* bort...@okkam.it
>>
>> *Phone nr: +39 0461 1823912 <%2B39%200461%201823912> *
>>
>> *Headquarters:* Trento (Italy), Via Trener 8
>> *Registered office:* Trento (Italy), via Segantini 23
>>
>> Confidentially notice. This e-mail transmission may contain legally
>> privileged and/or confidential information. Please do not read it if you
>> are not the intended recipient(S). Any use, distribution, reproduction or
>> disclosure by any other person is strictly prohibited. If you have received
>> this e-mail in error, please notify the sender and destroy the original
>> transmission and its attachments without reading or saving it in any manner.
>>
>> 2015-07-22 14:26 GMT+02:00 Stefano Bortoli :
>>
>>> Hi,
>>>
>>> I am trying to analyze and update a MongoDB collection with Apache Flink
>>> 0.9.0 and Mongo Hadoop 1.4.0 Hadoop 2.6.0.
>>>
>>> The process is fairly simple, and the MongoInputFormat works smoothly,
>>> however it does not write back to the collection. The process works,
>>> because the writeAsText works as expected. I am quite puzzled because
>>> debugging I can see it writes in some temporary directory.
>>>
>>> The mapred.output.uri seem to serve just to output a file named
>>> _SUCCESS, and if I do not set it fails with
>>> java.lang.IllegalArgumentException: Can not create a Path from a null
>>> string
>>> at org.apache.hadoop.fs.Path.checkPathArg(Path.java:123)
>>> at org.apache.hadoop.fs.Path.(Path.java:135)
>>> at
>>> org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormatBase.open(HadoopOutputFormatBase.java:108)
>>> at
>>> org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:186)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>> at java.lang.Thread.run(Thread.java:745)
>>>
>>> Anyone experienced something similar? any hints where to look at? Thanks
>>> a lot in advance!
>>>
>>> saluti,
>>> Stefano
>>>
>>> 
>>> Configuration conf = new Configuration();
>>> conf.set("mapred.output.dir", "/tmp/");
>>>

Re: MongoOutputFormat does not write back to collection

2015-07-22 Thread Stefano Bortoli
In fact, on close() of the HadoopOutputFormat the fileOutputCommitter
returns false on if
(this.fileOutputCommitter.needsTaskCommit(this.context)) returns false.

i/**
 * commit the task by moving the output file out from the temporary
directory.
 * @throws java.io.IOException
 */
@Override
public void close() throws IOException {
this.recordWriter.close(new HadoopDummyReporter());

if (this.fileOutputCommitter.needsTaskCommit(this.context)) {
this.fileOutputCommitter.commitTask(this.context);
}
}


Also, both the close and the finalize global use a FileOutputCommitter, and
never the MongoOutputCommitter

@Override
public void finalizeGlobal(int parallelism) throws IOException {

try {
JobContext jobContext =
HadoopUtils.instantiateJobContext(this.jobConf, new JobID());
FileOutputCommitter fileOutputCommitter = new
FileOutputCommitter();

// finalize HDFS output format
fileOutputCommitter.commitJob(jobContext);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

anyone can have a look into that?

saluti,
Stefano

2015-07-22 15:53 GMT+02:00 Stefano Bortoli :

> Debugging, it seem the commitTask method of the MongoOutputCommitter is
> never called. Is it possible that this 'bulk' approach of mongo-hadoop 1.4
> does not fit the task execution method of Flink?
>
> any idea? thanks a lot in advance.
>
> saluti,
> Stefano
>
> Stefano Bortoli, PhD
>
> *ENS Technical Director *___
> *OKKAM**Srl **- www.okkam.it <http://www.okkam.it/>*
>
> *Email:* bort...@okkam.it
>
> *Phone nr: +39 0461 1823912 <%2B39%200461%201823912> *
>
> *Headquarters:* Trento (Italy), Via Trener 8
> *Registered office:* Trento (Italy), via Segantini 23
>
> Confidentially notice. This e-mail transmission may contain legally
> privileged and/or confidential information. Please do not read it if you
> are not the intended recipient(S). Any use, distribution, reproduction or
> disclosure by any other person is strictly prohibited. If you have received
> this e-mail in error, please notify the sender and destroy the original
> transmission and its attachments without reading or saving it in any manner.
>
> 2015-07-22 14:26 GMT+02:00 Stefano Bortoli :
>
>> Hi,
>>
>> I am trying to analyze and update a MongoDB collection with Apache Flink
>> 0.9.0 and Mongo Hadoop 1.4.0 Hadoop 2.6.0.
>>
>> The process is fairly simple, and the MongoInputFormat works smoothly,
>> however it does not write back to the collection. The process works,
>> because the writeAsText works as expected. I am quite puzzled because
>> debugging I can see it writes in some temporary directory.
>>
>> The mapred.output.uri seem to serve just to output a file named
>> _SUCCESS, and if I do not set it fails with
>> java.lang.IllegalArgumentException: Can not create a Path from a null
>> string
>> at org.apache.hadoop.fs.Path.checkPathArg(Path.java:123)
>> at org.apache.hadoop.fs.Path.(Path.java:135)
>> at
>> org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormatBase.open(HadoopOutputFormatBase.java:108)
>> at
>> org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:186)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> Anyone experienced something similar? any hints where to look at? Thanks
>> a lot in advance!
>>
>> saluti,
>> Stefano
>>
>> 
>> Configuration conf = new Configuration();
>> conf.set("mapred.output.dir", "/tmp/");
>> conf.set(MongoUtil.MONGO_INPUT_URI_PROPERTY,
>> collectionsUri);
>> conf.set(MongoUtil.MONGO_OUTPUT_URI_PROPERTY,
>> collectionsUri);
>>
>> Job job = Job.getInstance(conf);
>>
>> // create a MongodbInputFormat, using a Hadoop input format
>> wrapper
>> InputFormat  mapreduceInputFormat =  new
>> MyMongoInputFormat();
>> HadoopInputFormat hdIf = new
>> HadoopInputFormat(
>> mapreduceInputFormat, Object.class, BSONObject.class,
>> job);
>> DataSet> fin = input
>> .flatMap(new myFlatMapFunction()).setParallelism(16);
>>
>> MongoConfigUtil.setOutputURI(job.getConfiguration(),
>> collectionsUri);
>>
>> fin.output(new HadoopOutputFormat(
>> new MongoOutputFormat(),
>> job));
>> //fin.writeAsText("/tmp/out", WriteMode.OVERWRITE);
>>
>>
>


Re: MongoOutputFormat does not write back to collection

2015-07-22 Thread Stefano Bortoli
Debugging, it seem the commitTask method of the MongoOutputCommitter is
never called. Is it possible that this 'bulk' approach of mongo-hadoop 1.4
does not fit the task execution method of Flink?

any idea? thanks a lot in advance.

saluti,
Stefano

Stefano Bortoli, PhD

*ENS Technical Director *___
*OKKAM**Srl **- www.okkam.it <http://www.okkam.it/>*

*Email:* bort...@okkam.it

*Phone nr: +39 0461 1823912 *

*Headquarters:* Trento (Italy), Via Trener 8
*Registered office:* Trento (Italy), via Segantini 23

Confidentially notice. This e-mail transmission may contain legally
privileged and/or confidential information. Please do not read it if you
are not the intended recipient(S). Any use, distribution, reproduction or
disclosure by any other person is strictly prohibited. If you have received
this e-mail in error, please notify the sender and destroy the original
transmission and its attachments without reading or saving it in any manner.

2015-07-22 14:26 GMT+02:00 Stefano Bortoli :

> Hi,
>
> I am trying to analyze and update a MongoDB collection with Apache Flink
> 0.9.0 and Mongo Hadoop 1.4.0 Hadoop 2.6.0.
>
> The process is fairly simple, and the MongoInputFormat works smoothly,
> however it does not write back to the collection. The process works,
> because the writeAsText works as expected. I am quite puzzled because
> debugging I can see it writes in some temporary directory.
>
> The mapred.output.uri seem to serve just to output a file named  _SUCCESS,
> and if I do not set it fails with
> java.lang.IllegalArgumentException: Can not create a Path from a null
> string
> at org.apache.hadoop.fs.Path.checkPathArg(Path.java:123)
> at org.apache.hadoop.fs.Path.(Path.java:135)
> at
> org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormatBase.open(HadoopOutputFormatBase.java:108)
> at
> org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:186)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> at java.lang.Thread.run(Thread.java:745)
>
> Anyone experienced something similar? any hints where to look at? Thanks a
> lot in advance!
>
> saluti,
> Stefano
>
> 
> Configuration conf = new Configuration();
> conf.set("mapred.output.dir", "/tmp/");
> conf.set(MongoUtil.MONGO_INPUT_URI_PROPERTY,
> collectionsUri);
> conf.set(MongoUtil.MONGO_OUTPUT_URI_PROPERTY,
> collectionsUri);
>
> Job job = Job.getInstance(conf);
>
> // create a MongodbInputFormat, using a Hadoop input format wrapper
> InputFormat  mapreduceInputFormat =  new
> MyMongoInputFormat();
> HadoopInputFormat hdIf = new
> HadoopInputFormat(
> mapreduceInputFormat, Object.class, BSONObject.class,
> job);
> DataSet> fin = input
> .flatMap(new myFlatMapFunction()).setParallelism(16);
>
> MongoConfigUtil.setOutputURI(job.getConfiguration(),
> collectionsUri);
>
> fin.output(new HadoopOutputFormat(
> new MongoOutputFormat(),
> job));
> //fin.writeAsText("/tmp/out", WriteMode.OVERWRITE);
>
>


MongoOutputFormat does not write back to collection

2015-07-22 Thread Stefano Bortoli
Hi,

I am trying to analyze and update a MongoDB collection with Apache Flink
0.9.0 and Mongo Hadoop 1.4.0 Hadoop 2.6.0.

The process is fairly simple, and the MongoInputFormat works smoothly,
however it does not write back to the collection. The process works,
because the writeAsText works as expected. I am quite puzzled because
debugging I can see it writes in some temporary directory.

The mapred.output.uri seem to serve just to output a file named  _SUCCESS,
and if I do not set it fails with
java.lang.IllegalArgumentException: Can not create a Path from a null string
at org.apache.hadoop.fs.Path.checkPathArg(Path.java:123)
at org.apache.hadoop.fs.Path.(Path.java:135)
at
org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormatBase.open(HadoopOutputFormatBase.java:108)
at
org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:186)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)

Anyone experienced something similar? any hints where to look at? Thanks a
lot in advance!

saluti,
Stefano


Configuration conf = new Configuration();
conf.set("mapred.output.dir", "/tmp/");
conf.set(MongoUtil.MONGO_INPUT_URI_PROPERTY,
collectionsUri);
conf.set(MongoUtil.MONGO_OUTPUT_URI_PROPERTY,
collectionsUri);

Job job = Job.getInstance(conf);

// create a MongodbInputFormat, using a Hadoop input format wrapper
InputFormat  mapreduceInputFormat =  new
MyMongoInputFormat();
HadoopInputFormat hdIf = new
HadoopInputFormat(
mapreduceInputFormat, Object.class, BSONObject.class,
job);
DataSet> fin = input
.flatMap(new myFlatMapFunction()).setParallelism(16);

MongoConfigUtil.setOutputURI(job.getConfiguration(),
collectionsUri);

fin.output(new HadoopOutputFormat(
new MongoOutputFormat(),
job));
//fin.writeAsText("/tmp/out", WriteMode.OVERWRITE);


Re: Cannot instantiate Mysql connection

2015-06-05 Thread Stefano Bortoli
Hi Robert,

I answer on behalf of Flavio. He told me the driver jar was included.
Smells lik class-loading issue due to 'conflicting' dependencies.  Is it
possible?

Saluti,
Stefano

2015-06-05 16:24 GMT+02:00 Robert Metzger :

> Hi,
>
> is the MySQL driver part of the Jar file that you've build?
>
> On Fri, Jun 5, 2015 at 4:11 PM, Flavio Pompermaier 
> wrote:
>
>> Hi to all,
>>
>> I'm using a fresh build of flink-0.9-SNAPSHOT and in my flink job I set
>> up a mysql connection.
>> When I run the job from Eclipse everything is fine,
>> while when running the job from the Web UI I get the following exception:
>>
>> java.sql.SQLException: No suitable driver found for
>> jdbc:mysql:/localhost:3306/mydb?autoReconnect=true
>> at java.sql.DriverManager.getConnection(DriverManager.java:596)
>> at java.sql.DriverManager.getConnection(DriverManager.java:215)
>>
>> How can I fix that?
>>
>> Best,
>> Flavio
>>
>
>


Re: How to use Tuple in ListValue?

2015-01-23 Thread Stefano Bortoli
what I did was to implement ListValue in a MyListValue object, then you can
do pretty much what you want. :-)

saluti,
Stefano

2015-01-23 11:29 GMT+01:00 Robert Metzger :

> Hi,
>
> I think you can just use a java collection for the Tuple2's. (Starting
> from Flink 0.8.0)
>
> Robert.
>
> On Fri, Jan 23, 2015 at 11:27 AM, Malte Schwarzer  wrote:
>
>> Hello,
>>
>> is it possible to use some how Tuple objects (e.g. Tuple2> Integer>) in a ListValue? Even if Tuple is serializable, it does not
>> implement the Value interface. Therefore ListValue is it working.
>>
>> Is there any work-around? I want to use something like "Tuple2> ListValue>>“ as dataset.
>>
>> Cheers
>> Malte
>>
>
>