Re: [gelly] Spargel model rework

2015-11-11 Thread Stephan Ewen
See: https://issues.apache.org/jira/browse/FLINK-3002

On Wed, Nov 11, 2015 at 10:54 AM, Stephan Ewen  wrote:

> "Either" an "Optional" types are quite useful.
>
> Let's add them to the core Java API.
>
> On Wed, Nov 11, 2015 at 10:00 AM, Vasiliki Kalavri <
> vasilikikala...@gmail.com> wrote:
>
>> Thanks Fabian! I'll try that :)
>>
>> On 10 November 2015 at 22:31, Fabian Hueske  wrote:
>>
>> > You could implement a Java Either type (similar to Scala's Either) that
>> > either has a Message or the VertexState and a corresponding
>> TypeInformation
>> > and TypeSerializer that serializes a byte flag to indicate which both
>> types
>> > is used.
>> > It might actually make sense, to add a generic Either type to the Java
>> API
>> > in general (similar to the Java Tuples with resemble the Scala Tuples).
>> >
>> > Cheers, Fabian
>> >
>> > 2015-11-10 22:16 GMT+01:00 Vasiliki Kalavri > >:
>> >
>> > > Hi,
>> > >
>> > > after running a few experiments, I can confirm that putting the
>> combiner
>> > > after the flatMap is indeed more efficient.
>> > >
>> > > I ran SSSP and Connected Components with Spargel, GSA, and the Pregel
>> > model
>> > > and the results are the following:
>> > >
>> > > - for SSSP, Spargel is always the slowest, GSA is a ~1.2x faster and
>> > Pregel
>> > > is ~1.1x faster without combiner, ~1.3x faster with combiner.
>> > > - for Connected Components, Spargel and GSA perform similarly, while
>> > Pregel
>> > > is 1.4-1.6x slower.
>> > >
>> > > To start with, this is much better than I expected :)
>> > > However, there is a main shortcoming in my current implementation that
>> > > negatively impacts performance:
>> > > Since the compute function coGroup needs to output both new vertex
>> values
>> > > and new messages, I emit a wrapping tuple that contains both vertex
>> state
>> > > and messages and then filter them out based on a boolean field. The
>> > problem
>> > > is that since I cannot emit null fields, I emit a dummy message for
>> each
>> > > new vertex state and a dummy vertex state for each new message. That
>> > > essentially means that the intermediate messages result is double in
>> > size,
>> > > if say the vertex values are of the same type as the messages (can be
>> > worse
>> > > if the vertex values are more complex).
>> > > So my question is, is there a way to avoid this redundancy, by either
>> > > emitting null fields or by creating an operator that could emit 2
>> > different
>> > > types of tuples?
>> > >
>> > > Thanks!
>> > > -Vasia.
>> > >
>> > > On 9 November 2015 at 15:20, Fabian Hueske  wrote:
>> > >
>> > > > Hi Vasia,
>> > > >
>> > > > sorry for the late reply.
>> > > > I don't think there is a big difference. In both cases, the
>> > partitioning
>> > > > and sorting happens at the end of the iteration.
>> > > > If the groupReduce is applied before the workset is returned, the
>> > sorting
>> > > > happens on the filtered result (after the flatMap) which might be a
>> > > little
>> > > > bit more efficient (depending on the ratio of messages and solution
>> set
>> > > > updates). Also it does not require that the initial workset is
>> sorted
>> > for
>> > > > the first groupReduce.
>> > > >
>> > > > I would put it at the end.
>> > > >
>> > > > Cheers, Fabian
>> > > >
>> > > > 2015-11-05 17:19 GMT+01:00 Vasiliki Kalavri <
>> vasilikikala...@gmail.com
>> > >:
>> > > >
>> > > > > @Fabian
>> > > > >
>> > > > > Is there any advantage in putting the reducer-combiner before
>> > updating
>> > > > the
>> > > > > workset vs. after (i.e. right before the join with the solution
>> set)?
>> > > > >
>> > > > > If it helps, here are the plans of these 2 alternatives:
>> > > > >
>> > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://drive.google.com/file/d/0BzQJrI2eGlyYcFV2RFo5dUFNXzg/view?usp=sharing
>> > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://drive.google.com/file/d/0BzQJrI2eGlyYN014NXp6OEZUdGs/view?usp=sharing
>> > > > >
>> > > > > Thanks a lot for the help!
>> > > > >
>> > > > > -Vasia.
>> > > > >
>> > > > > On 30 October 2015 at 21:28, Fabian Hueske 
>> > wrote:
>> > > > >
>> > > > > > We can of course inject an optional ReduceFunction (or
>> GroupReduce,
>> > > or
>> > > > > > combinable GroupReduce) to reduce the size of the work set.
>> > > > > > I suggested to remove the GroupReduce function, because it did
>> only
>> > > > > collect
>> > > > > > all messages into a single record by emitting the input iterator
>> > > which
>> > > > is
>> > > > > > quite dangerous. Applying a combinable reduce function is could
>> > > improve
>> > > > > the
>> > > > > > performance considerably.
>> > > > > >
>> > > > > > The good news is that it would come "for free" because the
>> > necessary
>> > > > > > partitioning and sorting can be reused (given the forwardField
>> > > > > annotations
>> > > > > > are correctly set):
>> > > > > > - The partitioning of the reduce can be 

[jira] [Created] (FLINK-3002) Add an EitherType to the Java API

2015-11-11 Thread Stephan Ewen (JIRA)
Stephan Ewen created FLINK-3002:
---

 Summary: Add an EitherType to the Java API
 Key: FLINK-3002
 URL: https://issues.apache.org/jira/browse/FLINK-3002
 Project: Flink
  Issue Type: New Feature
  Components: Java API
Affects Versions: 0.10
Reporter: Stephan Ewen
 Fix For: 1.0


Either types are recurring patterns and should be serialized efficiently, so it 
makes sense to add them to the core Java API.

Since Java does not have such a type as of Java 8, we would need to add our own 
version.

The Scala API handles the Scala Either Type already efficiently. I would not 
use the Scala Either Type in the Java API, since we are trying to get the 
{{flink-java}} project "Scala free" for people that don't use Scala and o not 
want to worry about Scala version matches and mismatches.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Fixing the ExecutionConfig

2015-11-11 Thread Stephan Ewen
Hi all!

The ExecutionConfig is a bit of a strange thing right now. It looks like it
became the place where everyone just put the stuff they want to somehow
push from the client to runtime, plus a random assortment of conflig flags.

As a result:

  - The ExecutionConfig is available in batch and streaming, but has a
number of fields that are very streaming specific, like the watermark
interval, etc.

  - Several fields that are purely pre-flight time relevant are in there,
like whether to use the closure cleaner, or whether to force Avro or Kryo
serializers for POJOs.

Any interest in cleaning this up? Because these messy classes simply grow
ever more messy unless we establish a proper definition of what its
concerns and non-concerns are...

Greetings,
Stephan


Re: [VOTE] Release Apache Flink 0.10.0 (release-0.10.0-rc8)

2015-11-11 Thread Stephan Ewen
+1 from my side

 - Compiled code against Hadoop 2.3.0 and Hadoop 2.6.0
 - Executed all tests
 - Executed manual tests, plus ChaosMonkeyITCase
 - Checked the LICENSE and NOTICE files
 - Tested streaming program with window implementation with custom session
timeout example


On Tue, Nov 10, 2015 at 9:41 PM, Maximilian Michels  wrote:

> Please note that this vote has a slightly shorter voting period of 48
> hours. Only very small changes have been made since the last release
> candidate. Since the community has already done extensive testing of the
> previous release candidates, I'm assuming 48 hours will suffice to vote on
> this release candidate.
>
> -
>
> Please vote on releasing the following candidate as Apache Flink version
> 0.10.0:
>
> The commit to be voted on:
> ab2cca4891f58e31bc3ec8d758d253a6cf84bc71
>
> Branch:
> release-0.10.0-rc8 (see
> https://git1-us-west.apache.org/repos/asf/flink/?p=flink.git)
>
> The release artifacts to be voted on can be found at:
> http://people.apache.org/~mxm/flink-0.10.0-rc8/
>
> The release artifacts are signed with the key with fingerprint C2909CBF:
> http://www.apache.org/dist/flink/KEYS
>
> The staging repository for this release can be found at:
> https://repository.apache.org/content/repositories/orgapacheflink-1055
>
> -
>
> The vote is open for the next 48 hours and passes if a majority of at least
> three +1 PMC votes are cast.
>
> The vote ends on Thursday November 12, 2015.
>
> [ ] +1 Release this package as Apache Flink 0.10.0
> [ ] -1 Do not release this package because ...
>
> ===
>
> The following commits have been added on top of release-0.10.0-rc7:
>
> c0fe305 [FLINK-2992] Remove use of SerializationUtils
> c098377 [hotfix] Check for null in StreamSource.cancel()
>


Re: Tagging Flink classes with InterfaceAudience and InterfaceStability

2015-11-11 Thread Aljoscha Krettek
+1 for some way of declaring public interfaces as experimental.

> On 10 Nov 2015, at 22:24, Stephan Ewen  wrote:
> 
> I think we need anyways an annotation "@PublicExperimental".
> 
> We can make this annotation such that it can be added to methods and can
> use that to declare
> Methods in an otherwise public class (such as DataSet) as experimental.
> 
> On Tue, Nov 10, 2015 at 10:19 PM, Fabian Hueske  wrote:
> 
>> I am not sure if we always should declare complete classes as
>> @PublicInterface.
>> This does definitely make sense for interfaces and abstract classes such as
>> MapFunction or InputFormat but not necessarily for classes such as DataSet
>> that we might want to extend by methods which should not immediately be
>> considered as stable.
>> 
>> 
>> 2015-11-10 21:36 GMT+01:00 Vasiliki Kalavri :
>> 
>>> Yes, my opinion is that we shouldn't declare the Gelly API frozen yet.
>>> We can reconsider when we're closer to the 1.0 release, but if possible,
>> I
>>> would give it some more time.
>>> 
>>> -V.
>>> 
>>> On 10 November 2015 at 21:06, Stephan Ewen  wrote:
>>> 
 I think no component should be forced to be stable. It should be an
 individual decision for each component, and in some cases even for
 individual classes.
 
 @Vasia If you think Gelly should not be declared interface-frozen, then
 this is a good point to raise and this should definitely be reflected.
 There is no point in declaring certain APIs as frozen when we are not
>> yet
 confident they have converged.
 
 
 
 On Tue, Nov 10, 2015 at 8:39 PM, Vasiliki Kalavri <
 vasilikikala...@gmail.com
> wrote:
 
> Hi Robert,
> 
> thanks for bringing this up!
> 
> I generally like the idea, but I wouldn't rush to annotate the Gelly
> classes yet. Gelly hasn't had that many users and I'm quite sure
>> we'll
 find
> things to improve as it gets more exposure.
> TBH, I think it's quite unfair to force Gelly (also e.g. ML, Table)
>> to
>>> a
> "1.0" status (from an API stability point of view) since they're
>> really
> young compared to the other Flink APIs.
> 
> Cheers,
> Vasia.
> On Nov 10, 2015 8:04 PM, "Robert Metzger" 
>> wrote:
> 
>> Hi everyone,
>> 
>> I would like to bring this discussion back to your attention as we
>>> seem
> to
>> approach the 1.0 release of Flink.
>> 
>> My suggestion back in January was to annotate all classes, but I
>>> think
>> it'll be more feasible to just annotate public classes.
>> So how about adding an annotation @PublicInterface
>> 
>> For @PublicInterface, I would annotate classes such as: DataSet,
>> DataStream, ExecutionEnvironment, InputFormat, MapFunction,
>>> FileSystems
> but
>> also Gelly for example.
>> 
>> I would not annotate as public components such as ML, Storm
> compatibility,
>> internals from runtime, yarn, optimizer.
>> 
>> 
>> From a tooling perspective, I've looked into different maven
>> plugins
 and
>> java libraries and I found https://github.com/siom79/japicmp to be
>>> the
>> closest to our needs. I actually opened a pull request to the
>> project
 to
>> allow inclusion/exclusion of classes based on annotations. Lets
>> hope
>>> it
>> gets merged.
>> 
>> Does everybody agree with adding just the @PublicInterface
>>> annotation?
>> 
>> Note that I'll add the annotation on a class-level, making the
>> entire
> class
>> either public or private (from a stability point of view). If we
>>> need a
>> more fine-grained annotation, we have to add a second
>>> @PrivateInterface
>> annotation which we'll only apply to certain methods.
>> 
>> The next step is that I'm going to open a pull request with all
>>> classes
>> annotated that I consider public.
>> 
>> 
>> On Fri, Jan 30, 2015 at 7:10 PM, Henry Saputra <
 henry.sapu...@gmail.com>
>> wrote:
>> 
>>> I like the idea. But would love to have different name for the
>>> "LimitedPrivate" to make it easier to distinguish.
>>> How about "Module" or "Package" ?
>>> 
>>> - Henry
>>> 
>>> On Wed, Jan 28, 2015 at 1:29 AM, Robert Metzger <
>>> rmetz...@apache.org
> 
>>> wrote:
 I think in Hadoop they use LimitedPrivate for the different
> components
>> of
 the project.
 For example LimitedPrivate("yarn").
 Here is a very good documentation on the topic:
 
>>> 
>> 
> 
 
>>> 
>> https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/InterfaceClassification.html
 
 On Tue, Jan 27, 2015 at 3:58 PM, Alexander Alexandrov <
 alexander.s.alexand...@gmail.com> wrote:
 
> I don't get the difference between 

[jira] [Created] (FLINK-3001) Add Support for Java 8 Optional type

2015-11-11 Thread Stephan Ewen (JIRA)
Stephan Ewen created FLINK-3001:
---

 Summary: Add Support for Java 8 Optional type
 Key: FLINK-3001
 URL: https://issues.apache.org/jira/browse/FLINK-3001
 Project: Flink
  Issue Type: New Feature
  Components: Java API
Affects Versions: 0.10
Reporter: Stephan Ewen
Priority: Minor
 Fix For: 1.0


Using {{Optional}} is a good way to handle nullable fields.
The missing support for null fields in tuples can be easily handled by using 
{{Optional}} for nullable fields and {{T}} directly for non nullable fields.
That also retains best serialization efficiency.

Since we cannot always assume the presence of {{Optional}} (only introduced in 
Java8), the TypeExtractor needs to analyze and create that TypeInfo with 
reflection.

Further more, we need to add the OptionalTypeInfo to the flink-java8 project 
and people need to include flink-java8 in their project if they want to use the 
Optional.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [gelly] Spargel model rework

2015-11-11 Thread Vasiliki Kalavri
Thanks Fabian! I'll try that :)

On 10 November 2015 at 22:31, Fabian Hueske  wrote:

> You could implement a Java Either type (similar to Scala's Either) that
> either has a Message or the VertexState and a corresponding TypeInformation
> and TypeSerializer that serializes a byte flag to indicate which both types
> is used.
> It might actually make sense, to add a generic Either type to the Java API
> in general (similar to the Java Tuples with resemble the Scala Tuples).
>
> Cheers, Fabian
>
> 2015-11-10 22:16 GMT+01:00 Vasiliki Kalavri :
>
> > Hi,
> >
> > after running a few experiments, I can confirm that putting the combiner
> > after the flatMap is indeed more efficient.
> >
> > I ran SSSP and Connected Components with Spargel, GSA, and the Pregel
> model
> > and the results are the following:
> >
> > - for SSSP, Spargel is always the slowest, GSA is a ~1.2x faster and
> Pregel
> > is ~1.1x faster without combiner, ~1.3x faster with combiner.
> > - for Connected Components, Spargel and GSA perform similarly, while
> Pregel
> > is 1.4-1.6x slower.
> >
> > To start with, this is much better than I expected :)
> > However, there is a main shortcoming in my current implementation that
> > negatively impacts performance:
> > Since the compute function coGroup needs to output both new vertex values
> > and new messages, I emit a wrapping tuple that contains both vertex state
> > and messages and then filter them out based on a boolean field. The
> problem
> > is that since I cannot emit null fields, I emit a dummy message for each
> > new vertex state and a dummy vertex state for each new message. That
> > essentially means that the intermediate messages result is double in
> size,
> > if say the vertex values are of the same type as the messages (can be
> worse
> > if the vertex values are more complex).
> > So my question is, is there a way to avoid this redundancy, by either
> > emitting null fields or by creating an operator that could emit 2
> different
> > types of tuples?
> >
> > Thanks!
> > -Vasia.
> >
> > On 9 November 2015 at 15:20, Fabian Hueske  wrote:
> >
> > > Hi Vasia,
> > >
> > > sorry for the late reply.
> > > I don't think there is a big difference. In both cases, the
> partitioning
> > > and sorting happens at the end of the iteration.
> > > If the groupReduce is applied before the workset is returned, the
> sorting
> > > happens on the filtered result (after the flatMap) which might be a
> > little
> > > bit more efficient (depending on the ratio of messages and solution set
> > > updates). Also it does not require that the initial workset is sorted
> for
> > > the first groupReduce.
> > >
> > > I would put it at the end.
> > >
> > > Cheers, Fabian
> > >
> > > 2015-11-05 17:19 GMT+01:00 Vasiliki Kalavri  >:
> > >
> > > > @Fabian
> > > >
> > > > Is there any advantage in putting the reducer-combiner before
> updating
> > > the
> > > > workset vs. after (i.e. right before the join with the solution set)?
> > > >
> > > > If it helps, here are the plans of these 2 alternatives:
> > > >
> > > >
> > > >
> > >
> >
> https://drive.google.com/file/d/0BzQJrI2eGlyYcFV2RFo5dUFNXzg/view?usp=sharing
> > > >
> > > >
> > >
> >
> https://drive.google.com/file/d/0BzQJrI2eGlyYN014NXp6OEZUdGs/view?usp=sharing
> > > >
> > > > Thanks a lot for the help!
> > > >
> > > > -Vasia.
> > > >
> > > > On 30 October 2015 at 21:28, Fabian Hueske 
> wrote:
> > > >
> > > > > We can of course inject an optional ReduceFunction (or GroupReduce,
> > or
> > > > > combinable GroupReduce) to reduce the size of the work set.
> > > > > I suggested to remove the GroupReduce function, because it did only
> > > > collect
> > > > > all messages into a single record by emitting the input iterator
> > which
> > > is
> > > > > quite dangerous. Applying a combinable reduce function is could
> > improve
> > > > the
> > > > > performance considerably.
> > > > >
> > > > > The good news is that it would come "for free" because the
> necessary
> > > > > partitioning and sorting can be reused (given the forwardField
> > > > annotations
> > > > > are correctly set):
> > > > > - The partitioning of the reduce can be reused for the join with
> the
> > > > > solution set
> > > > > - The sort of the reduce is preserved by the join with the
> in-memory
> > > > > hash-table of the solution set and can be reused for the coGroup.
> > > > >
> > > > > Best,
> > > > > Fabian
> > > > >
> > > > > 2015-10-30 18:38 GMT+01:00 Vasiliki Kalavri <
> > vasilikikala...@gmail.com
> > > >:
> > > > >
> > > > > > Hi Fabian,
> > > > > >
> > > > > > thanks so much for looking into this so quickly :-)
> > > > > >
> > > > > > One update I have to make is that I tried running a few
> experiments
> > > > with
> > > > > > this on a 6-node cluster. The current implementation gets stuck
> at
> > > > > > "Rebuilding Workset Properties" and never finishes a single
> > 

[jira] [Created] (FLINK-2999) Support connected keyed streams

2015-11-11 Thread Fabian Hueske (JIRA)
Fabian Hueske created FLINK-2999:


 Summary: Support connected keyed streams
 Key: FLINK-2999
 URL: https://issues.apache.org/jira/browse/FLINK-2999
 Project: Flink
  Issue Type: Improvement
  Components: Streaming
Affects Versions: 1.0
Reporter: Fabian Hueske


It would be nice to add support for connected keyed streams to enable 
key-partitioned state in Co*Functions.

This could be done by simply connecting two keyed Streams or adding a new 
method to connect and key two streams as one operation.

{code}
DataStream s1 = ...
DataStream s2 = ...

// alternative 1
s1
  .keyBy(0)
  .connect(s2.keyBy(1))
  .map(new KeyedCoMap());

// alternative 2
s1
  .connectByKey(s2, 0, 1)
  .map(new KeyedCoMap());

public class KeyedCoMap implements RichCoMapFunction {
  
  OperatorState s;

  public void open() {
s = getRuntimeContext().getKeyValueState("abc", A.class, new A());
  }

  // ...

}
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3003) Add container allocation timeout to YARN CLI

2015-11-11 Thread Ufuk Celebi (JIRA)
Ufuk Celebi created FLINK-3003:
--

 Summary: Add container allocation timeout to YARN CLI
 Key: FLINK-3003
 URL: https://issues.apache.org/jira/browse/FLINK-3003
 Project: Flink
  Issue Type: Improvement
  Components: YARN Client
Affects Versions: 0.10
Reporter: Ufuk Celebi
 Fix For: 1.0, 0.10.1


Programs submitted via {{bin/flink run -m yarn-cluster}} start a short-lived 
YARN sessions before submitting the job. The job is only submitted when all 
resources have been allocated. All allocated containers are "blocked" by the to 
be submitted job and the cluster is only partially allocated.

If you have multiple submissions like this with partial allocations, you can 
block the whole YARN cluster (e.g. 10 containers in total and two sessions want 
6 containers each and both have allocated 5).

A simple work around for these situations is to add an allocation timeout after 
which the YARN sessions fails and releases all the resources.

[Other strategies like wait for X amount of time for Y containers, but then go 
with what you have if you don't get all are also possible.]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Core Memory Error

2015-11-11 Thread Ufuk Celebi
Hey Ali,

thanks for sharing the code. I assume that the custom
ProtocolEvent, ProtocolDetailMap, Subscriber type are all PoJos. They
should not be a problem. I think this is a bug in Flink 0.9.1.

Is it possible to re-run your program with the upcoming 0.10.0 (RC8)
version and report back?

1) Add
https://repository.apache.org/content/repositories/orgapacheflink-1055 as a
snapshot repository



apache.snapshots
Apache Development Snapshot Repository
https://repository.apache.org/content/repositories/orgapacheflink-1055


false


true




2) Set the Flink dependency version to 0.10.0

3) Use the Flink binary matching your Hadoop installation from here:
http://people.apache.org/~mxm/flink-0.10.0-rc8/ (since you use Java, you
can go with the Scala 2.10 builds)

Sorry for the inconvenience! The release is about to be finished (the
voting process is already going on).

– Ufuk


On Tue, Nov 10, 2015 at 8:05 PM, Kashmar, Ali  wrote:

> Thanks for the quick reply guys! A lot of interest in this one. I¹ve
> attached the source code is attached. There are other supporting
> modules/classes but the main flink component is in the included zip file.
>
> In answer to Fabian¹s question: I¹m using the 0.9.1 release right off the
> website (flink-0.9.1-bin-hadoop1.tgz).
>
> In answer to Ufuk¹s question: Yes I¹m using custom data types.
>
> Thanks,
> Ali
>
>
>
> On 2015-11-10, 3:01 PM, "Ufuk Celebi"  wrote:
>
> >Thanks for reporting this. Are you using any custom data types?
> >
> >If you can share your code, it would be very helpful in order to debug
> >this.
> >
> >­ Ufuk
> >
> >On Tuesday, 10 November 2015, Fabian Hueske  wrote:
> >
> >> I agree with Robert. Looks like a bug in Flink.
> >> Maybe an off-by-one issue (violating index is 32768 and the default
> >>memory
> >> segment size is 32KB).
> >>
> >> Which Flink version are you using?
> >> In case you are using a custom build, can you share the commit ID (is
> >> reported in the first lines of the JobManager log file)?
> >>
> >> Thanks, Fabian
> >>
> >> 2015-11-10 18:29 GMT+01:00 Robert Metzger  >> >:
> >>
> >> > Hi Ali,
> >> >
> >> > this could be a bug in Flink.
> >> > Can you share the code of your program with us to debug the issue?
> >> >
> >> > On Tue, Nov 10, 2015 at 6:25 PM, Kashmar, Ali  >> > wrote:
> >> >
> >> > > Hello,
> >> > >
> >> > > I¹m getting this error while running a streaming module on a cluster
> >> of 3
> >> > > nodes:
> >> > >
> >> > >
> >> > > java.lang.ArrayIndexOutOfBoundsException: 32768
> >> > >
> >> > > at
> >> org.apache.flink.core.memory.MemorySegment.get(MemorySegment.java:178)
> >> > >
> >> > > at
> >> > >
> >> >
> >>
> >>org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpa
> >>nningRecordDeserializer$NonSpanningWrapper.readByte(SpillingAdaptiveSpann
> >>ingRecordDeserializer.java:214)
> >> > >
> >> > > at
> >> > >
> >> >
> >>
> >>org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpa
> >>nningRecordDeserializer$NonSpanningWrapper.readUnsignedByte(SpillingAdapt
> >>iveSpanningRecordDeserializer.java:219)
> >> > >
> >> > > at
> >>org.apache.flink.types.StringValue.readString(StringValue.java:764)
> >> > >
> >> > > at
> >> > >
> >> >
> >>
> >>org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(S
> >>tringSerializer.java:68)
> >> > >
> >> > > at
> >> > >
> >> >
> >>
> >>org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(S
> >>tringSerializer.java:73)
> >> > >
> >> > > at
> >> > >
> >> >
> >>
> >>org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(S
> >>tringSerializer.java:28)
> >> > >
> >> > > at
> >> > >
> >> >
> >>
> >>org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(Po
> >>joSerializer.java:499)
> >> > >
> >> > > at
> >> > >
> >> >
> >>
> >>
> org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.de
> >>serialize(StreamRecordSerializer.java:102)
> >> > >
> >> > > at
> >> > >
> >> >
> >>
> >>
> org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.de
> >>serialize(StreamRecordSerializer.java:29)
> >> > >
> >> > > at
> >> > >
> >> >
> >>
> >>org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(Reu
> >>singDeserializationDelegate.java:57)
> >> > >
> >> > > at
> >> > >
> >> >
> >>
> >>org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpa
> >>nningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeser
> >>ializer.java:110)
> >> > >
> >> > > at
> >> > >
> >> >
> >>
> >>org.apache.flink.streaming.runtime.io.StreamingAbstractRecordReader.getNe
> >>xtRecord(StreamingAbstractRecordReader.java:80)
> >> > >
> >> > > at
> >> > >
> >> >
> >>
> >>org.apache.flink.streaming.runtime.io.StreamingMutableRecordReader.next(S
> >>treamingMutableRecordReader.java:36)
> >> > >
> >> > > at
> >> > >
> >> >
> >>
> 

Re: [VOTE] Release Apache Flink 0.10.0 (release-0.10.0-rc8)

2015-11-11 Thread Aljoscha Krettek
Let’s try this again… :D

+1

I think this one could be it. I did:
- verify the checksums of some of the release artifacts, I assume that
the rest is also OK
- test build for custom Hadoop versions 2.4, 2.5, 2.6
- verify that LICENSE/NOTICE are correct
- verify that licenses of dependencies are compatible
- read the README file
- verify that the start/stop scripts work (multi-cluster mode)
- run the bundled examples with built-in and external data
- verify that the log is free of errors/exceptions
- run fault-tolerant job with Kafka with randomly killing TMs and JM
- check that java/scala quickstarts work (also with IntelliJ)
- run an example against a running cluster with RemoteEnvironment
- run the manual tests in flink-tests

> On 11 Nov 2015, at 09:33, Stephan Ewen  wrote:
> 
> +1 from my side
> 
> - Compiled code against Hadoop 2.3.0 and Hadoop 2.6.0
> - Executed all tests
> - Executed manual tests, plus ChaosMonkeyITCase
> - Checked the LICENSE and NOTICE files
> - Tested streaming program with window implementation with custom session
> timeout example
> 
> 
> On Tue, Nov 10, 2015 at 9:41 PM, Maximilian Michels  wrote:
> 
>> Please note that this vote has a slightly shorter voting period of 48
>> hours. Only very small changes have been made since the last release
>> candidate. Since the community has already done extensive testing of the
>> previous release candidates, I'm assuming 48 hours will suffice to vote on
>> this release candidate.
>> 
>> -
>> 
>> Please vote on releasing the following candidate as Apache Flink version
>> 0.10.0:
>> 
>> The commit to be voted on:
>> ab2cca4891f58e31bc3ec8d758d253a6cf84bc71
>> 
>> Branch:
>> release-0.10.0-rc8 (see
>> https://git1-us-west.apache.org/repos/asf/flink/?p=flink.git)
>> 
>> The release artifacts to be voted on can be found at:
>> http://people.apache.org/~mxm/flink-0.10.0-rc8/
>> 
>> The release artifacts are signed with the key with fingerprint C2909CBF:
>> http://www.apache.org/dist/flink/KEYS
>> 
>> The staging repository for this release can be found at:
>> https://repository.apache.org/content/repositories/orgapacheflink-1055
>> 
>> -
>> 
>> The vote is open for the next 48 hours and passes if a majority of at least
>> three +1 PMC votes are cast.
>> 
>> The vote ends on Thursday November 12, 2015.
>> 
>> [ ] +1 Release this package as Apache Flink 0.10.0
>> [ ] -1 Do not release this package because ...
>> 
>> ===
>> 
>> The following commits have been added on top of release-0.10.0-rc7:
>> 
>> c0fe305 [FLINK-2992] Remove use of SerializationUtils
>> c098377 [hotfix] Check for null in StreamSource.cancel()
>> 



Re: Core Memory Error

2015-11-11 Thread Fabian Hueske
Hi Ali,

one more thing. Did that error occur once or is it reproducable?

Thanks for your help,
Fabian

2015-11-11 9:50 GMT+01:00 Ufuk Celebi :

> Hey Ali,
>
> thanks for sharing the code. I assume that the custom
> ProtocolEvent, ProtocolDetailMap, Subscriber type are all PoJos. They
> should not be a problem. I think this is a bug in Flink 0.9.1.
>
> Is it possible to re-run your program with the upcoming 0.10.0 (RC8)
> version and report back?
>
> 1) Add
> https://repository.apache.org/content/repositories/orgapacheflink-1055 as
> a
> snapshot repository
>
> 
> 
> apache.snapshots
> Apache Development Snapshot Repository
> 
> https://repository.apache.org/content/repositories/orgapacheflink-1055
> 
> 
> false
> 
> 
> true
> 
> 
> 
>
> 2) Set the Flink dependency version to 0.10.0
>
> 3) Use the Flink binary matching your Hadoop installation from here:
> http://people.apache.org/~mxm/flink-0.10.0-rc8/ (since you use Java, you
> can go with the Scala 2.10 builds)
>
> Sorry for the inconvenience! The release is about to be finished (the
> voting process is already going on).
>
> – Ufuk
>
>
> On Tue, Nov 10, 2015 at 8:05 PM, Kashmar, Ali  wrote:
>
> > Thanks for the quick reply guys! A lot of interest in this one. I¹ve
> > attached the source code is attached. There are other supporting
> > modules/classes but the main flink component is in the included zip file.
> >
> > In answer to Fabian¹s question: I¹m using the 0.9.1 release right off the
> > website (flink-0.9.1-bin-hadoop1.tgz).
> >
> > In answer to Ufuk¹s question: Yes I¹m using custom data types.
> >
> > Thanks,
> > Ali
> >
> >
> >
> > On 2015-11-10, 3:01 PM, "Ufuk Celebi"  wrote:
> >
> > >Thanks for reporting this. Are you using any custom data types?
> > >
> > >If you can share your code, it would be very helpful in order to debug
> > >this.
> > >
> > >­ Ufuk
> > >
> > >On Tuesday, 10 November 2015, Fabian Hueske  wrote:
> > >
> > >> I agree with Robert. Looks like a bug in Flink.
> > >> Maybe an off-by-one issue (violating index is 32768 and the default
> > >>memory
> > >> segment size is 32KB).
> > >>
> > >> Which Flink version are you using?
> > >> In case you are using a custom build, can you share the commit ID (is
> > >> reported in the first lines of the JobManager log file)?
> > >>
> > >> Thanks, Fabian
> > >>
> > >> 2015-11-10 18:29 GMT+01:00 Robert Metzger  > >> >:
> > >>
> > >> > Hi Ali,
> > >> >
> > >> > this could be a bug in Flink.
> > >> > Can you share the code of your program with us to debug the issue?
> > >> >
> > >> > On Tue, Nov 10, 2015 at 6:25 PM, Kashmar, Ali  > >> > wrote:
> > >> >
> > >> > > Hello,
> > >> > >
> > >> > > I¹m getting this error while running a streaming module on a
> cluster
> > >> of 3
> > >> > > nodes:
> > >> > >
> > >> > >
> > >> > > java.lang.ArrayIndexOutOfBoundsException: 32768
> > >> > >
> > >> > > at
> > >> org.apache.flink.core.memory.MemorySegment.get(MemorySegment.java:178)
> > >> > >
> > >> > > at
> > >> > >
> > >> >
> > >>
> >
> >>org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpa
> >
> >>nningRecordDeserializer$NonSpanningWrapper.readByte(SpillingAdaptiveSpann
> > >>ingRecordDeserializer.java:214)
> > >> > >
> > >> > > at
> > >> > >
> > >> >
> > >>
> >
> >>org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpa
> >
> >>nningRecordDeserializer$NonSpanningWrapper.readUnsignedByte(SpillingAdapt
> > >>iveSpanningRecordDeserializer.java:219)
> > >> > >
> > >> > > at
> > >>org.apache.flink.types.StringValue.readString(StringValue.java:764)
> > >> > >
> > >> > > at
> > >> > >
> > >> >
> > >>
> >
> >>org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(S
> > >>tringSerializer.java:68)
> > >> > >
> > >> > > at
> > >> > >
> > >> >
> > >>
> >
> >>org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(S
> > >>tringSerializer.java:73)
> > >> > >
> > >> > > at
> > >> > >
> > >> >
> > >>
> >
> >>org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(S
> > >>tringSerializer.java:28)
> > >> > >
> > >> > > at
> > >> > >
> > >> >
> > >>
> >
> >>org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(Po
> > >>joSerializer.java:499)
> > >> > >
> > >> > > at
> > >> > >
> > >> >
> > >>
> > >>
> >
> org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.de
> > >>serialize(StreamRecordSerializer.java:102)
> > >> > >
> > >> > > at
> > >> > >
> > >> >
> > >>
> > >>
> >
> org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.de
> > >>serialize(StreamRecordSerializer.java:29)
> > >> > >
> > >> > > at
> > >> > >
> > >> >
> > >>
> >
> >>org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(Reu
> > >>singDeserializationDelegate.java:57)
> > >> > >
> > >> > > at
> > >> > >
> > >> >
> > >>
> >
> 

Re: [gelly] Spargel model rework

2015-11-11 Thread Stephan Ewen
"Either" an "Optional" types are quite useful.

Let's add them to the core Java API.

On Wed, Nov 11, 2015 at 10:00 AM, Vasiliki Kalavri <
vasilikikala...@gmail.com> wrote:

> Thanks Fabian! I'll try that :)
>
> On 10 November 2015 at 22:31, Fabian Hueske  wrote:
>
> > You could implement a Java Either type (similar to Scala's Either) that
> > either has a Message or the VertexState and a corresponding
> TypeInformation
> > and TypeSerializer that serializes a byte flag to indicate which both
> types
> > is used.
> > It might actually make sense, to add a generic Either type to the Java
> API
> > in general (similar to the Java Tuples with resemble the Scala Tuples).
> >
> > Cheers, Fabian
> >
> > 2015-11-10 22:16 GMT+01:00 Vasiliki Kalavri :
> >
> > > Hi,
> > >
> > > after running a few experiments, I can confirm that putting the
> combiner
> > > after the flatMap is indeed more efficient.
> > >
> > > I ran SSSP and Connected Components with Spargel, GSA, and the Pregel
> > model
> > > and the results are the following:
> > >
> > > - for SSSP, Spargel is always the slowest, GSA is a ~1.2x faster and
> > Pregel
> > > is ~1.1x faster without combiner, ~1.3x faster with combiner.
> > > - for Connected Components, Spargel and GSA perform similarly, while
> > Pregel
> > > is 1.4-1.6x slower.
> > >
> > > To start with, this is much better than I expected :)
> > > However, there is a main shortcoming in my current implementation that
> > > negatively impacts performance:
> > > Since the compute function coGroup needs to output both new vertex
> values
> > > and new messages, I emit a wrapping tuple that contains both vertex
> state
> > > and messages and then filter them out based on a boolean field. The
> > problem
> > > is that since I cannot emit null fields, I emit a dummy message for
> each
> > > new vertex state and a dummy vertex state for each new message. That
> > > essentially means that the intermediate messages result is double in
> > size,
> > > if say the vertex values are of the same type as the messages (can be
> > worse
> > > if the vertex values are more complex).
> > > So my question is, is there a way to avoid this redundancy, by either
> > > emitting null fields or by creating an operator that could emit 2
> > different
> > > types of tuples?
> > >
> > > Thanks!
> > > -Vasia.
> > >
> > > On 9 November 2015 at 15:20, Fabian Hueske  wrote:
> > >
> > > > Hi Vasia,
> > > >
> > > > sorry for the late reply.
> > > > I don't think there is a big difference. In both cases, the
> > partitioning
> > > > and sorting happens at the end of the iteration.
> > > > If the groupReduce is applied before the workset is returned, the
> > sorting
> > > > happens on the filtered result (after the flatMap) which might be a
> > > little
> > > > bit more efficient (depending on the ratio of messages and solution
> set
> > > > updates). Also it does not require that the initial workset is sorted
> > for
> > > > the first groupReduce.
> > > >
> > > > I would put it at the end.
> > > >
> > > > Cheers, Fabian
> > > >
> > > > 2015-11-05 17:19 GMT+01:00 Vasiliki Kalavri <
> vasilikikala...@gmail.com
> > >:
> > > >
> > > > > @Fabian
> > > > >
> > > > > Is there any advantage in putting the reducer-combiner before
> > updating
> > > > the
> > > > > workset vs. after (i.e. right before the join with the solution
> set)?
> > > > >
> > > > > If it helps, here are the plans of these 2 alternatives:
> > > > >
> > > > >
> > > > >
> > > >
> > >
> >
> https://drive.google.com/file/d/0BzQJrI2eGlyYcFV2RFo5dUFNXzg/view?usp=sharing
> > > > >
> > > > >
> > > >
> > >
> >
> https://drive.google.com/file/d/0BzQJrI2eGlyYN014NXp6OEZUdGs/view?usp=sharing
> > > > >
> > > > > Thanks a lot for the help!
> > > > >
> > > > > -Vasia.
> > > > >
> > > > > On 30 October 2015 at 21:28, Fabian Hueske 
> > wrote:
> > > > >
> > > > > > We can of course inject an optional ReduceFunction (or
> GroupReduce,
> > > or
> > > > > > combinable GroupReduce) to reduce the size of the work set.
> > > > > > I suggested to remove the GroupReduce function, because it did
> only
> > > > > collect
> > > > > > all messages into a single record by emitting the input iterator
> > > which
> > > > is
> > > > > > quite dangerous. Applying a combinable reduce function is could
> > > improve
> > > > > the
> > > > > > performance considerably.
> > > > > >
> > > > > > The good news is that it would come "for free" because the
> > necessary
> > > > > > partitioning and sorting can be reused (given the forwardField
> > > > > annotations
> > > > > > are correctly set):
> > > > > > - The partitioning of the reduce can be reused for the join with
> > the
> > > > > > solution set
> > > > > > - The sort of the reduce is preserved by the join with the
> > in-memory
> > > > > > hash-table of the solution set and can be reused for the coGroup.
> > > > > >
> > > > > > Best,
> > > > > > Fabian
> > > > > >
> > 

[jira] [Created] (FLINK-3000) Add ShutdownHook to YARN CLI to prevent lingering sessions

2015-11-11 Thread Ufuk Celebi (JIRA)
Ufuk Celebi created FLINK-3000:
--

 Summary: Add ShutdownHook to YARN CLI to prevent lingering sessions
 Key: FLINK-3000
 URL: https://issues.apache.org/jira/browse/FLINK-3000
 Project: Flink
  Issue Type: Bug
  Components: YARN Client
Affects Versions: 0.10
Reporter: Ufuk Celebi
 Fix For: 0.10.1


Submitting a job via
{code}
bin/flink run -m yarn-cluster ...
{code}
and terminating the client can lead to lingering YARN sessions allocating 
cluster resources.

This was reported by a user.

{code}
1) One starts a flink job in the yarn mode
2) He sees that containers are not allocated since cluster is busy
3) Presses Ctrl+C
4) An “empty” flink session remains in the cluster although the Flink didn’t 
print that “you can track your application on the X URL”
{code}




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [VOTE] Release Apache Flink 0.10.0 (release-0.10.0-rc8)

2015-11-11 Thread Henry Saputra
LICENSE file looks good
NOTICE file looks good
Hash files looks good for source artifact
Signature file checked for source artifact
No third party executable in source artifact
Source compiled
Tests passed
Run Word Count with local and Apache Hadoop YARN 2.6.0 in session mode.

+1

On Tue, Nov 10, 2015 at 12:41 PM, Maximilian Michels  wrote:
> Please note that this vote has a slightly shorter voting period of 48
> hours. Only very small changes have been made since the last release
> candidate. Since the community has already done extensive testing of the
> previous release candidates, I'm assuming 48 hours will suffice to vote on
> this release candidate.
>
> -
>
> Please vote on releasing the following candidate as Apache Flink version
> 0.10.0:
>
> The commit to be voted on:
> ab2cca4891f58e31bc3ec8d758d253a6cf84bc71
>
> Branch:
> release-0.10.0-rc8 (see
> https://git1-us-west.apache.org/repos/asf/flink/?p=flink.git)
>
> The release artifacts to be voted on can be found at:
> http://people.apache.org/~mxm/flink-0.10.0-rc8/
>
> The release artifacts are signed with the key with fingerprint C2909CBF:
> http://www.apache.org/dist/flink/KEYS
>
> The staging repository for this release can be found at:
> https://repository.apache.org/content/repositories/orgapacheflink-1055
>
> -
>
> The vote is open for the next 48 hours and passes if a majority of at least
> three +1 PMC votes are cast.
>
> The vote ends on Thursday November 12, 2015.
>
> [ ] +1 Release this package as Apache Flink 0.10.0
> [ ] -1 Do not release this package because ...
>
> ===
>
> The following commits have been added on top of release-0.10.0-rc7:
>
> c0fe305 [FLINK-2992] Remove use of SerializationUtils
> c098377 [hotfix] Check for null in StreamSource.cancel()


Re: Streaming statefull operator with hashmap

2015-11-11 Thread Gyula Fóra
Hey,

Yes what you wrote should work. You can alternatively use
TypeExtractor.getForObject(modelMapInit) to extract the tye information.

I also like to implement my custom type info for Hashmaps and the other
types and use that.

Cheers,
Gyula

Martin Neumann  ezt írta (időpont: 2015. nov. 11., Sze,
16:30):

> Hej,
>
> What is the correct way of initializing a state-full operator that is using
> a hashmap? modelMapInit.getClass() does not work neither does
> HashMap.class. Do I have to implement my own TypeInformation class or is
> there a simpler way?
>
> cheers Martin
>
> private OperatorState> microModelMap;
>
> @Override
> public void open(Configuration parameters) throws Exception {
> HashMap modelMapInit = new HashMap<>();
> this.microModelMap =
> getRuntimeContext().getKeyValueState("microModelMap",
> modelMapInit.getClass() , modelMapInit);
> }
>


Re: Streaming statefull operator with hashmap

2015-11-11 Thread Stephan Ewen
It should suffice to do something like

"getRuntimeContext().getKeyValueState("microModelMap", new
HashMap().getClass(), null);"

Two more comments:

1) Making null the default value and initializing manually is probably more
efficient, because otherwise the empty map would have to be cloned each
time the default value is returned, which adds avoidable overhead.

2) The HashMap type will most likely go through Kryo, so for efficiency,
make sure you register the types "InputType" and "MicroModel" on the
execution environment.
Here you need to do that manually, because they are type erased and
Flink cannot auto-register them.

Greetings,
Stephan



On Wed, Nov 11, 2015 at 4:32 PM, Gyula Fóra  wrote:

> Hey,
>
> Yes what you wrote should work. You can alternatively use
> TypeExtractor.getForObject(modelMapInit) to extract the tye information.
>
> I also like to implement my custom type info for Hashmaps and the other
> types and use that.
>
> Cheers,
> Gyula
>
> Martin Neumann  ezt írta (időpont: 2015. nov. 11., Sze,
> 16:30):
>
> > Hej,
> >
> > What is the correct way of initializing a state-full operator that is
> using
> > a hashmap? modelMapInit.getClass() does not work neither does
> > HashMap.class. Do I have to implement my own TypeInformation class or is
> > there a simpler way?
> >
> > cheers Martin
> >
> > private OperatorState> microModelMap;
> >
> > @Override
> > public void open(Configuration parameters) throws Exception {
> > HashMap modelMapInit = new HashMap<>();
> > this.microModelMap =
> > getRuntimeContext().getKeyValueState("microModelMap",
> > modelMapInit.getClass() , modelMapInit);
> > }
> >
>


Re: Core Memory Error

2015-11-11 Thread Kashmar, Ali
Fabian,

I tried running it again and I noticed there were some more exceptions in
the log. I fixed those and I don’t see the original error but I do see
other ArrayIndexOutofBoundExceptions in the Kryo serializer code (I didn’t
even enable that yet like you suggested). Examples:

1)

10:49:36,331 ERROR org.apache.flink.streaming.api.collector.StreamOutput
  - Emit failed due to: java.lang.ArrayIndexOutOfBoundsException: 255
at 
com.esotericsoftware.kryo.util.IdentityObjectIntMap.clear(IdentityObjectInt
Map.java:364)
at 
com.esotericsoftware.kryo.util.MapReferenceResolver.reset(MapReferenceResol
ver.java:47)
at com.esotericsoftware.kryo.Kryo.reset(Kryo.java:836)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:601)
at 
com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.jav
a:95)
at 
com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.jav
a:21)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(K
ryoSerializer.java:186)
at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.serialize(PojoSe
rializer.java:372)
at 
org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.seri
alize(StreamRecordSerializer.java:89)
at 
org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.seri
alize(StreamRecordSerializer.java:29)
at 
org.apache.flink.runtime.plugable.SerializationDelegate.write(Serialization
Delegate.java:51)
at 
org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSeriali
zer.addRecord(SpanningRecordSerializer.java:76)
at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWrit
er.java:83)
at 
org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordW
riter.java:58)
at 
org.apache.flink.streaming.api.collector.StreamOutput.collect(StreamOutput.
java:62)
at 
org.apache.flink.streaming.api.collector.CollectorWrapper.collect(Collector
Wrapper.java:40)
at 
org.apache.flink.streaming.api.operators.StreamSource$1.collect(StreamSourc
e.java:40)
at com.emc.ngen.analytics.flink.job.ClientTask.run(ClientTask.java:36)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1
142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:
617)
at java.lang.Thread.run(Thread.java:745)



2)
10:49:36,333 ERROR org.apache.flink.streaming.api.collector.StreamOutput
  - Emit failed due to: java.lang.ArrayIndexOutOfBoundsException: 334
at 
com.esotericsoftware.kryo.util.IdentityObjectIntMap.push(IdentityObjectIntM
ap.java:207)
at 
com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMa
p.java:117)
at 
com.esotericsoftware.kryo.util.MapReferenceResolver.addWrittenObject(MapRef
erenceResolver.java:23)
at com.esotericsoftware.kryo.Kryo.writeReferenceOrNull(Kryo.java:629)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:594)
at 
com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.jav
a:88)
at 
com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.jav
a:21)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(K
ryoSerializer.java:186)
at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.serialize(PojoSe
rializer.java:372)
at 
org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.seri
alize(StreamRecordSerializer.java:89)
at 
org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.seri
alize(StreamRecordSerializer.java:29)
at 
org.apache.flink.runtime.plugable.SerializationDelegate.write(Serialization
Delegate.java:51)
at 
org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSeriali
zer.addRecord(SpanningRecordSerializer.java:76)
at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWrit
er.java:83)
at 
org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordW
riter.java:58)
at 
org.apache.flink.streaming.api.collector.StreamOutput.collect(StreamOutput.
java:62)
at 
org.apache.flink.streaming.api.collector.CollectorWrapper.collect(Collector
Wrapper.java:40)
at 
org.apache.flink.streaming.api.operators.StreamSource$1.collect(StreamSourc
e.java:40)
at com.emc.ngen.analytics.flink.job.ClientTask.run(ClientTask.java:36)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

[DISCUSSION] Type hints versus TypeInfoParser

2015-11-11 Thread Stephan Ewen
Hi all!

We discovered a nice way to give TypeHints in the Google Cloud Dataflow
SDK, in a way that would fit Flink perfectly. I created a JIRA for that:
https://issues.apache.org/jira/browse/FLINK-2788

Since this is more powerful and type safe than the String/Parser way of
giving hints, I was wondering whether we should add this and deprecate the
String variant. If we do that, 1.0 is the time to do that.

What do you think about this idea?

@Timo Walther Since you worked a lot on types/parser/etc - what is your
take on this?

Greetings,
Stephan


Streaming statefull operator with hashmap

2015-11-11 Thread Martin Neumann
Hej,

What is the correct way of initializing a state-full operator that is using
a hashmap? modelMapInit.getClass() does not work neither does
HashMap.class. Do I have to implement my own TypeInformation class or is
there a simpler way?

cheers Martin

private OperatorState> microModelMap;

@Override
public void open(Configuration parameters) throws Exception {
HashMap modelMapInit = new HashMap<>();
this.microModelMap =
getRuntimeContext().getKeyValueState("microModelMap",
modelMapInit.getClass() , modelMapInit);
}


Re: Fixing the ExecutionConfig

2015-11-11 Thread Robert Metzger
I think now (before the 1.0 release) is the right time to clean it up.

Are you suggesting to have two execution configs for batch and streaming?

I'm not sure if we need to distinguish between pre-flight and runtime
options: From a user's perspective, it doesn't matter. For example the
serializer settings are evaluated during pre-flight but they have a impact
during execution.






On Wed, Nov 11, 2015 at 11:59 AM, Stephan Ewen  wrote:

> Hi all!
>
> The ExecutionConfig is a bit of a strange thing right now. It looks like it
> became the place where everyone just put the stuff they want to somehow
> push from the client to runtime, plus a random assortment of conflig flags.
>
> As a result:
>
>   - The ExecutionConfig is available in batch and streaming, but has a
> number of fields that are very streaming specific, like the watermark
> interval, etc.
>
>   - Several fields that are purely pre-flight time relevant are in there,
> like whether to use the closure cleaner, or whether to force Avro or Kryo
> serializers for POJOs.
>
> Any interest in cleaning this up? Because these messy classes simply grow
> ever more messy unless we establish a proper definition of what its
> concerns and non-concerns are...
>
> Greetings,
> Stephan
>


Re: Core Memory Error

2015-11-11 Thread Fabian Hueske
Hi Ali,

I looked into this issue. This problem seems to be caused because the
deserializer reads more data than it should read.
This might happen because of two reasons:
  1) the meta information of how much data is safe to read is incorrect.
  2) the serializer and deserializer logic are not in sync which can cause
the deserializer to read more data than the serializer wrote.

The first case is less likely: Flink writes the binary length of each
record in front of its serialized representation. This happens whenever
data is sent over the network, regardless of the data type. A bug in this
part would be very crucial, but is also less likely because this happens
very often and has not occurred yet.

IMO, this looks like an issue of the serialization logic. Looking at your
code, the problem occurs when deserializing ProtocolEvent objects.
Is it possible that you share this class with me?

If it is not possible to share the class, it would be good, to know the
field types of the Pojo and the associated TypeInformation.
For that you can run the code in this gist [1] which will recursively print
the field types and their TypeInformation.

As a temporal workaround, you can try to use Kryo to serialize and
deserialize your Pojos as follows:
ExecutionEnvironment env = ...
env.getConfig().enableForceKryo();

Best,
Fabian

[1] https://gist.github.com/fhueske/6c5aa386fc79ab69712b

2015-11-11 10:38 GMT+01:00 Fabian Hueske :

> Hi Ali,
>
> one more thing. Did that error occur once or is it reproducable?
>
> Thanks for your help,
> Fabian
>
> 2015-11-11 9:50 GMT+01:00 Ufuk Celebi :
>
>> Hey Ali,
>>
>> thanks for sharing the code. I assume that the custom
>> ProtocolEvent, ProtocolDetailMap, Subscriber type are all PoJos. They
>> should not be a problem. I think this is a bug in Flink 0.9.1.
>>
>> Is it possible to re-run your program with the upcoming 0.10.0 (RC8)
>> version and report back?
>>
>> 1) Add
>> https://repository.apache.org/content/repositories/orgapacheflink-1055
>> as a
>> snapshot repository
>>
>> 
>> 
>> apache.snapshots
>> Apache Development Snapshot Repository
>> 
>> https://repository.apache.org/content/repositories/orgapacheflink-1055
>> 
>> 
>> false
>> 
>> 
>> true
>> 
>> 
>> 
>>
>> 2) Set the Flink dependency version to 0.10.0
>>
>> 3) Use the Flink binary matching your Hadoop installation from here:
>> http://people.apache.org/~mxm/flink-0.10.0-rc8/ (since you use Java, you
>> can go with the Scala 2.10 builds)
>>
>> Sorry for the inconvenience! The release is about to be finished (the
>> voting process is already going on).
>>
>> – Ufuk
>>
>>
>> On Tue, Nov 10, 2015 at 8:05 PM, Kashmar, Ali 
>> wrote:
>>
>> > Thanks for the quick reply guys! A lot of interest in this one. I¹ve
>> > attached the source code is attached. There are other supporting
>> > modules/classes but the main flink component is in the included zip
>> file.
>> >
>> > In answer to Fabian¹s question: I¹m using the 0.9.1 release right off
>> the
>> > website (flink-0.9.1-bin-hadoop1.tgz).
>> >
>> > In answer to Ufuk¹s question: Yes I¹m using custom data types.
>> >
>> > Thanks,
>> > Ali
>> >
>> >
>> >
>> > On 2015-11-10, 3:01 PM, "Ufuk Celebi"  wrote:
>> >
>> > >Thanks for reporting this. Are you using any custom data types?
>> > >
>> > >If you can share your code, it would be very helpful in order to debug
>> > >this.
>> > >
>> > >­ Ufuk
>> > >
>> > >On Tuesday, 10 November 2015, Fabian Hueske  wrote:
>> > >
>> > >> I agree with Robert. Looks like a bug in Flink.
>> > >> Maybe an off-by-one issue (violating index is 32768 and the default
>> > >>memory
>> > >> segment size is 32KB).
>> > >>
>> > >> Which Flink version are you using?
>> > >> In case you are using a custom build, can you share the commit ID (is
>> > >> reported in the first lines of the JobManager log file)?
>> > >>
>> > >> Thanks, Fabian
>> > >>
>> > >> 2015-11-10 18:29 GMT+01:00 Robert Metzger > > >> >:
>> > >>
>> > >> > Hi Ali,
>> > >> >
>> > >> > this could be a bug in Flink.
>> > >> > Can you share the code of your program with us to debug the issue?
>> > >> >
>> > >> > On Tue, Nov 10, 2015 at 6:25 PM, Kashmar, Ali > > >> > wrote:
>> > >> >
>> > >> > > Hello,
>> > >> > >
>> > >> > > I¹m getting this error while running a streaming module on a
>> cluster
>> > >> of 3
>> > >> > > nodes:
>> > >> > >
>> > >> > >
>> > >> > > java.lang.ArrayIndexOutOfBoundsException: 32768
>> > >> > >
>> > >> > > at
>> > >>
>> org.apache.flink.core.memory.MemorySegment.get(MemorySegment.java:178)
>> > >> > >
>> > >> > > at
>> > >> > >
>> > >> >
>> > >>
>> >
>> >>org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpa
>> >
>> >>nningRecordDeserializer$NonSpanningWrapper.readByte(SpillingAdaptiveSpann
>> > >>ingRecordDeserializer.java:214)
>> > >> > >
>> > >> > > at
>> > >> > >
>> > >> >
>> > >>
>> >

[jira] [Created] (FLINK-3005) Commons-collections object deserialization remote command execution vulnerability

2015-11-11 Thread Ted Yu (JIRA)
Ted Yu created FLINK-3005:
-

 Summary: Commons-collections object deserialization remote command 
execution vulnerability
 Key: FLINK-3005
 URL: https://issues.apache.org/jira/browse/FLINK-3005
 Project: Flink
  Issue Type: Bug
Reporter: Ted Yu


http://foxglovesecurity.com/2015/11/06/what-do-weblogic-websphere-jboss-jenkins-opennms-and-your-application-have-in-common-this-vulnerability/

TL;DR: If you have commons-collections on your classpath and accept and process 
Java object serialization data, then you may have an exploitable remote command 
execution vulnerability.

Brief search in code base for ObjectInputStream reveals several places where 
the vulnerability exists.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSSION] Type hints versus TypeInfoParser

2015-11-11 Thread Aljoscha Krettek
Big +1

Of course, we had the initial talk about it… :D
> On 11 Nov 2015, at 19:33, Kirschnick, Johannes 
>  wrote:
> 
> Hi Stephan,
> 
> looking at the TypeHint, I got reminded on how Gson (a Json Parser) handles 
> this situation of parsing generics.
> 
> See here for an overview
> https://sites.google.com/site/gson/gson-user-guide#TOC-Serializing-and-Deserializing-Generic-Types
> 
> Seems like this method was rediscovered :) And maybe there are some tricks 
> that can be learned from the implementation
> 
> I'm all in favor for "hard" types over string literals.
> 
> Johannes
> 
> P.S.
> Apparently GWT uses the same "trick" to handle generics ...
> 
> -Ursprüngliche Nachricht-
> Von: ewenstep...@gmail.com [mailto:ewenstep...@gmail.com] Im Auftrag von 
> Stephan Ewen
> Gesendet: Mittwoch, 11. November 2015 15:53
> An: dev@flink.apache.org
> Betreff: [DISCUSSION] Type hints versus TypeInfoParser
> 
> Hi all!
> 
> We discovered a nice way to give TypeHints in the Google Cloud Dataflow SDK, 
> in a way that would fit Flink perfectly. I created a JIRA for that:
> https://issues.apache.org/jira/browse/FLINK-2788
> 
> Since this is more powerful and type safe than the String/Parser way of 
> giving hints, I was wondering whether we should add this and deprecate the 
> String variant. If we do that, 1.0 is the time to do that.
> 
> What do you think about this idea?
> 
> @Timo Walther Since you worked a lot on types/parser/etc - what is your take 
> on this?
> 
> Greetings,
> Stephan



AW: [DISCUSSION] Type hints versus TypeInfoParser

2015-11-11 Thread Kirschnick, Johannes
Hi Stephan,

looking at the TypeHint, I got reminded on how Gson (a Json Parser) handles 
this situation of parsing generics.

See here for an overview
https://sites.google.com/site/gson/gson-user-guide#TOC-Serializing-and-Deserializing-Generic-Types

Seems like this method was rediscovered :) And maybe there are some tricks that 
can be learned from the implementation

I'm all in favor for "hard" types over string literals.

Johannes

P.S.
Apparently GWT uses the same "trick" to handle generics ...

-Ursprüngliche Nachricht-
Von: ewenstep...@gmail.com [mailto:ewenstep...@gmail.com] Im Auftrag von 
Stephan Ewen
Gesendet: Mittwoch, 11. November 2015 15:53
An: dev@flink.apache.org
Betreff: [DISCUSSION] Type hints versus TypeInfoParser

Hi all!

We discovered a nice way to give TypeHints in the Google Cloud Dataflow SDK, in 
a way that would fit Flink perfectly. I created a JIRA for that:
https://issues.apache.org/jira/browse/FLINK-2788

Since this is more powerful and type safe than the String/Parser way of giving 
hints, I was wondering whether we should add this and deprecate the String 
variant. If we do that, 1.0 is the time to do that.

What do you think about this idea?

@Timo Walther Since you worked a lot on types/parser/etc - what is your take on 
this?

Greetings,
Stephan


Re: neo4j - Flink connector

2015-11-11 Thread Martin Junghanns

Hi,

I am a bit stuck with that dependency problem. Any help would be 
appreciated as I would like to continue working on the formats. Thanks!


Best,
Martin

On 07.11.2015 17:28, Martin Junghanns wrote:

Hi Robert,

Thank you for the hints. I tried to narrow down the error:

Flink version: 0.10-SNAPSHOT
Neo4j version: 2.3.0

I start with two dependencies:
flink-java
flink-gelly

(1) Add neo4j-harness and run basic example from Neo4j [1]
Leads to:

java.lang.ClassNotFoundException:
org.eclipse.jetty.server.ConnectionFactory

(2) I excluded jetty-server from flink-java and flink-gelly
It now uses jetty-server:9.2.4.v20141103 (was 8.0.0.M1)
Leads to:

leads to: java.lang.NoSuchMethodError:
org.eclipse.jetty.servlet.ServletContextHandler.

(3) I excluded jetty-servlet from flink-java and flink-gelly
It now uses jetty-servlet:9.2.4.v20141103 (was 8.0.0.M1)
Leads to:

java.lang.NoSuchMethodError: scala.Predef$.$conforms()

(4) I excluded scala-library from flink-java and flink-gelly
It now uses scala-library:2.11.7 (was 2.10.4)

Now, the basic Neo4j example (without Flink runs).

Next, I added Flink to the mix and wrote a simple test using
neo4j-harness features, ExecutionEnvironment and my InputFormat.
Leads to:

java.lang.NoSuchMethodError:
scala.collection.immutable.HashSet$.empty()Lscala/collection/immutable/HashSet;

 at akka.actor.ActorCell$.(ActorCell.scala:336)
 at akka.actor.ActorCell$.(ActorCell.scala)
 at akka.actor.RootActorPath.$div(ActorPath.scala:159)
 at akka.actor.LocalActorRefProvider.(ActorRefProvider.scala:464)
 at akka.actor.LocalActorRefProvider.(ActorRefProvider.scala:452)
 at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
Method)
 at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)

 at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)

 at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
 at
akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$2.apply(DynamicAccess.scala:78)

 at scala.util.Try$.apply(Try.scala:192)
 at
akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:73)

 at
akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84)

 at
akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84)

 at scala.util.Success.flatMap(Try.scala:231)
 at
akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:84)

 at akka.actor.ActorSystemImpl.liftedTree1$1(ActorSystem.scala:585)
 at akka.actor.ActorSystemImpl.(ActorSystem.scala:578)
 at akka.actor.ActorSystem$.apply(ActorSystem.scala:142)
 at akka.actor.ActorSystem$.apply(ActorSystem.scala:119)
 at akka.actor.ActorSystem$.create(ActorSystem.scala:67)
 at
org.apache.flink.runtime.akka.AkkaUtils$.createActorSystem(AkkaUtils.scala:84)

 at
org.apache.flink.runtime.minicluster.FlinkMiniCluster.startJobManagerActorSystem(FlinkMiniCluster.scala:203)

 at
org.apache.flink.runtime.minicluster.FlinkMiniCluster.singleActorSystem$lzycompute$1(FlinkMiniCluster.scala:232)

 at
org.apache.flink.runtime.minicluster.FlinkMiniCluster.org$apache$flink$runtime$minicluster$FlinkMiniCluster$$singleActorSystem$1(FlinkMiniCluster.scala:232)

 at
org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$1.apply(FlinkMiniCluster.scala:237)

 at
org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$1.apply(FlinkMiniCluster.scala:235)

 at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)

 at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)

 at scala.collection.immutable.Range.foreach(Range.scala:166)
 at
scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
 at scala.collection.AbstractTraversable.map(Traversable.scala:104)
 at
org.apache.flink.runtime.minicluster.FlinkMiniCluster.start(FlinkMiniCluster.scala:235)

 at
org.apache.flink.runtime.minicluster.FlinkMiniCluster.start(FlinkMiniCluster.scala:226)

 at org.apache.flink.client.LocalExecutor.start(LocalExecutor.java:115)
 at
org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:173)
 at
org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:87)

 at
org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:821)

 at
org.apache.flink.api.java.io.neo4j.Neo4jInputTest.inputFormatTest(Neo4jInputTest.java:109)


This is where I don't know what to exclude next. Seems that some
components (akka?) need scala 2.10.4 and Neo4j (cypher) depends on scala
2.11.7.

How can I make use of the maven shade plugin in that case?

Again, thank you!

Cheers,
Martin

[1]
http://neo4j.com/docs/stable/server-unmanaged-extensions-testing.html
(testMyExtensionWithFunctionFixture())


On 

Re: [DISCUSSION] Type hints versus TypeInfoParser

2015-11-11 Thread Timo Walther

+1

It's still hacky but we don't have better alternatives.

I'm not 100% sure if we can get rid of the parser. I think it's still a 
nice way for quickly defining the fields of a POJO if the type extractor 
fails to analyze it. But actually I don't know an example where it fails.


Regards,
Timo

On 11.11.2015 19:56, Aljoscha Krettek wrote:

Big +1

Of course, we had the initial talk about it… :D

On 11 Nov 2015, at 19:33, Kirschnick, Johannes 
 wrote:

Hi Stephan,

looking at the TypeHint, I got reminded on how Gson (a Json Parser) handles 
this situation of parsing generics.

See here for an overview
https://sites.google.com/site/gson/gson-user-guide#TOC-Serializing-and-Deserializing-Generic-Types

Seems like this method was rediscovered :) And maybe there are some tricks that 
can be learned from the implementation

I'm all in favor for "hard" types over string literals.

Johannes

P.S.
Apparently GWT uses the same "trick" to handle generics ...

-Ursprüngliche Nachricht-
Von: ewenstep...@gmail.com [mailto:ewenstep...@gmail.com] Im Auftrag von 
Stephan Ewen
Gesendet: Mittwoch, 11. November 2015 15:53
An: dev@flink.apache.org
Betreff: [DISCUSSION] Type hints versus TypeInfoParser

Hi all!

We discovered a nice way to give TypeHints in the Google Cloud Dataflow SDK, in 
a way that would fit Flink perfectly. I created a JIRA for that:
https://issues.apache.org/jira/browse/FLINK-2788

Since this is more powerful and type safe than the String/Parser way of giving 
hints, I was wondering whether we should add this and deprecate the String 
variant. If we do that, 1.0 is the time to do that.

What do you think about this idea?

@Timo Walther Since you worked a lot on types/parser/etc - what is your take on 
this?

Greetings,
Stephan




Re: Streaming statefull operator with hashmap

2015-11-11 Thread Martin Neumann
Thanks for the help.

TypeExtractor.getForObject(modelMapInit) did the job. Its possible that its
an IDE problem that .getClass() did not work. Intellij is a bit fiddly with
those things.

1) Making null the default value and initializing manually is probably more
> efficient, because otherwise the empty map would have to be cloned each
> time the default value is returned, which adds avoidable overhead.


What do you mean by initialize manually? Can I do that direct in the open
function or are we talking about checking for null in the FlatMap and
initializing there? In general the program is supposed to constantly run
once deployed, so I can get away with a little slower setup.

2) The HashMap type will most likely go through Kryo, so for efficiency,
> make sure you register the types "InputType" and "MicroModel" on the
> execution environment.
> Here you need to do that manually, because they are type erased and
> Flink cannot auto-register them.


Can you point me to an example on how to do this?

cheers Martin


On Wed, Nov 11, 2015 at 4:52 PM, Stephan Ewen  wrote:

> It should suffice to do something like
>
> "getRuntimeContext().getKeyValueState("microModelMap", new
> HashMap().getClass(), null);"
>
> Two more comments:
>
> 1) Making null the default value and initializing manually is probably more
> efficient, because otherwise the empty map would have to be cloned each
> time the default value is returned, which adds avoidable overhead.
>
> 2) The HashMap type will most likely go through Kryo, so for efficiency,
> make sure you register the types "InputType" and "MicroModel" on the
> execution environment.
> Here you need to do that manually, because they are type erased and
> Flink cannot auto-register them.
>
> Greetings,
> Stephan
>
>
>
> On Wed, Nov 11, 2015 at 4:32 PM, Gyula Fóra  wrote:
>
> > Hey,
> >
> > Yes what you wrote should work. You can alternatively use
> > TypeExtractor.getForObject(modelMapInit) to extract the tye information.
> >
> > I also like to implement my custom type info for Hashmaps and the other
> > types and use that.
> >
> > Cheers,
> > Gyula
> >
> > Martin Neumann  ezt írta (időpont: 2015. nov. 11.,
> Sze,
> > 16:30):
> >
> > > Hej,
> > >
> > > What is the correct way of initializing a state-full operator that is
> > using
> > > a hashmap? modelMapInit.getClass() does not work neither does
> > > HashMap.class. Do I have to implement my own TypeInformation class or
> is
> > > there a simpler way?
> > >
> > > cheers Martin
> > >
> > > private OperatorState> microModelMap;
> > >
> > > @Override
> > > public void open(Configuration parameters) throws Exception {
> > > HashMap modelMapInit = new HashMap<>();
> > > this.microModelMap =
> > > getRuntimeContext().getKeyValueState("microModelMap",
> > > modelMapInit.getClass() , modelMapInit);
> > > }
> > >
> >
>


Re: Core Memory Error

2015-11-11 Thread Fabian Hueske
Hi Ali,

Flink uses different serializers for different data types. For example,
(boxed) primitives are serialized using dedicated serializers
(IntSerializer, StringSerializer, etc.) and the ProtocolEvent class is
recognized as a Pojo type and therefore serialized using Flink's
PojoSerializer.
Types that cannot be (fully) analyzed are handled as GenericTypes and
serialized using Flink's KryoSerializer.

By forcing Kryo serialization as I suggested before, Pojo types (such as
ProtocolEvent) will be serialized with Kryo instead of Flink's
PojoSerializer.
Hence, forcing Kryo only affects Pojo types. GenericTypes (such as
ProtocolAttributeMap and ProtocolDetailMap) are always handled by Kryo
(also without forcing it).

The exceptions you are facing might be caused by a bug in the
KryoSerializer that we recently fixed (see FLINK-2800 [1]). This bug
basically corrupts the stream of serialized data and might very well also
be responsible for the original exception you posted. As you see from the
JIRA issue, a bug fix was merged to all active branches however it is not
yet contained in an official release.

I would recommend you to try the latest candidate of the upcoming 0.10
release [2] or build Flink from the 0.9-release branch [3].

Please let me know if you have any questions or still facing problems when
switching to version with a fix for FLINK-2800.

Best, Fabian

[1] https://issues.apache.org/jira/browse/FLINK-2800
[2] http://people.apache.org/~mxm/flink-0.10.0-rc8/
[3] https://github.com/apache/flink/tree/release-0.9

2015-11-11 17:20 GMT+01:00 Kashmar, Ali :

> Fabian,
>
> I tried running it again and I noticed there were some more exceptions in
> the log. I fixed those and I don’t see the original error but I do see
> other ArrayIndexOutofBoundExceptions in the Kryo serializer code (I didn’t
> even enable that yet like you suggested). Examples:
>
> 1)
>
> 10:49:36,331 ERROR org.apache.flink.streaming.api.collector.StreamOutput
>   - Emit failed due to: java.lang.ArrayIndexOutOfBoundsException: 255
> at
> com.esotericsoftware.kryo.util.IdentityObjectIntMap.clear(IdentityObjectInt
> Map.java:364)
> at
> com.esotericsoftware.kryo.util.MapReferenceResolver.reset(MapReferenceResol
> ver.java:47)
> at com.esotericsoftware.kryo.Kryo.reset(Kryo.java:836)
> at
> com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:601)
> at
> com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.jav
> a:95)
> at
> com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.jav
> a:21)
> at
> com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
> at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(K
> ryoSerializer.java:186)
> at
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.serialize(PojoSe
> rializer.java:372)
> at
> org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.seri
> alize(StreamRecordSerializer.java:89)
> at
> org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.seri
> alize(StreamRecordSerializer.java:29)
> at
> org.apache.flink.runtime.plugable.SerializationDelegate.write(Serialization
> Delegate.java:51)
> at
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSeriali
> zer.addRecord(SpanningRecordSerializer.java:76)
> at
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWrit
> er.java:83)
> at
> org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordW
> riter.java:58)
> at
> org.apache.flink.streaming.api.collector.StreamOutput.collect(StreamOutput.
> java:62)
> at
> org.apache.flink.streaming.api.collector.CollectorWrapper.collect(Collector
> Wrapper.java:40)
> at
> org.apache.flink.streaming.api.operators.StreamSource$1.collect(StreamSourc
> e.java:40)
> at
> com.emc.ngen.analytics.flink.job.ClientTask.run(ClientTask.java:36)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1
> 142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:
> 617)
> at java.lang.Thread.run(Thread.java:745)
>
>
>
> 2)
> 10:49:36,333 ERROR org.apache.flink.streaming.api.collector.StreamOutput
>   - Emit failed due to: java.lang.ArrayIndexOutOfBoundsException: 334
> at
> com.esotericsoftware.kryo.util.IdentityObjectIntMap.push(IdentityObjectIntM
> ap.java:207)
> at
> com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMa
> p.java:117)
> at
> com.esotericsoftware.kryo.util.MapReferenceResolver.addWrittenObject(MapRef
> erenceResolver.java:23)
> at
> 

[jira] [Created] (FLINK-3004) ForkableMiniCluster does not call RichFunction#open

2015-11-11 Thread Nick Dimiduk (JIRA)
Nick Dimiduk created FLINK-3004:
---

 Summary: ForkableMiniCluster does not call RichFunction#open
 Key: FLINK-3004
 URL: https://issues.apache.org/jira/browse/FLINK-3004
 Project: Flink
  Issue Type: Bug
Affects Versions: 0.9.1, 0.10
Reporter: Nick Dimiduk


I have a n "integration test" of my streaming flow. It creates a 
{{ForkableFlinkMiniCluster}} in the JUnit {{@BeforeClass}} method, to which 
test streams are submitted. My flow involves a custom 
{{RichCoFlatMapFunction}}. In this function, I finish object initialization via 
the {{void open(Configuration)}} method.

When I submit the flow to a local streaming cluster, it runs without incident. 
However, submitting the flow in my integration test via maven surefire plugin 
has the effect of {{open}} never being called and my object being consumed 
without complete initialization.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)