[jira] [Commented] (FLINK-2055) Implement Streaming HBaseSink
[ https://issues.apache.org/jira/browse/FLINK-2055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15126027#comment-15126027 ] PJ Van Aeken commented on FLINK-2055: - For Flink batch, I think HFileOutputFormat2 is a valid approach, but so is the TableOutputFormat. Possibly we could offer both, depending on the consistency needs of the user. For Flink streaming however, right now we are suggesting to also use TableOutputformat. I would propose we use the non-buffered HBase native client instead (no output format, just the classes from the client package). In a one-at-a-time processing scenario, buffering the writes in the BufferedMutator seems like an unnecessary delay. Or it should at least be an optional choice. Right now, the milliseconds we are defining seem to be a bit redundant, since the BufferedMutator uses its own heuristics to determine when to flush. > Implement Streaming HBaseSink > - > > Key: FLINK-2055 > URL: https://issues.apache.org/jira/browse/FLINK-2055 > Project: Flink > Issue Type: New Feature > Components: Streaming, Streaming Connectors >Affects Versions: 0.9 >Reporter: Robert Metzger >Assignee: Hilmi Yildirim > > As per : > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Write-Stream-to-HBase-td1300.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2055) Implement Streaming HBaseSink
[ https://issues.apache.org/jira/browse/FLINK-2055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15126231#comment-15126231 ] PJ Van Aeken commented on FLINK-2055: - Indeed the example that you described uses the native client API which I think is the way to go. Unfortunately, HTable is now deprecated so the examples are outdated. In the link to the mailing list (see the issue description), it is suggested to now use the write method on DataStream combined with TableOutputFormat. https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/datastream/DataStream.html#write%28org.apache.flink.api.common.io.OutputFormat,%20long%29 What I am proposing instead is to make a SinkFunction (like we have for Flume for instance) that uses the new HBase client API's, similar to how the example you referred to used to work, rather than using this TableOutputFormat which as far as I understand buffers requests on the client side based on some internal heuristics, as per the HBase documentation: https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/client/BufferedMutator.html > Implement Streaming HBaseSink > - > > Key: FLINK-2055 > URL: https://issues.apache.org/jira/browse/FLINK-2055 > Project: Flink > Issue Type: New Feature > Components: Streaming, Streaming Connectors >Affects Versions: 0.9 >Reporter: Robert Metzger >Assignee: Hilmi Yildirim > > As per : > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Write-Stream-to-HBase-td1300.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (FLINK-2055) Implement Streaming HBaseSink
[ https://issues.apache.org/jira/browse/FLINK-2055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15126231#comment-15126231 ] PJ Van Aeken edited comment on FLINK-2055 at 2/1/16 2:11 PM: - Indeed the example that you described uses the native client API which I think is the way to go. Unfortunately, HTable is now deprecated so the examples are outdated. In the link to the mailing list (see the issue description), it is suggested to now use the write method on DataStream combined with TableOutputFormat. https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/datastream/DataStream.html#write%28org.apache.flink.api.common.io.OutputFormat,%20long%29 What I am proposing instead is to make a SinkFunction (like we have for Flume for instance) that uses the new HBase client API's, similar to how the example you referred to used to work, rather than using this TableOutputFormat which as far as I understand buffers requests on the client side based on some internal heuristics, as per the HBase documentation: https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/client/BufferedMutator.html EDIT: There appears to be a version mismatch which is why we are not seeing the same problems. Turns out my assumptions are not true in version 0.98x, I am unsure about 1.x for now and its definitely true for 2.x which is in snapshot currently. So the inner workings of the TableOutputFormat have changed in recent versions, which introduces the problem I have described. was (Author: vanaepi): Indeed the example that you described uses the native client API which I think is the way to go. Unfortunately, HTable is now deprecated so the examples are outdated. In the link to the mailing list (see the issue description), it is suggested to now use the write method on DataStream combined with TableOutputFormat. https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/datastream/DataStream.html#write%28org.apache.flink.api.common.io.OutputFormat,%20long%29 What I am proposing instead is to make a SinkFunction (like we have for Flume for instance) that uses the new HBase client API's, similar to how the example you referred to used to work, rather than using this TableOutputFormat which as far as I understand buffers requests on the client side based on some internal heuristics, as per the HBase documentation: https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/client/BufferedMutator.html > Implement Streaming HBaseSink > - > > Key: FLINK-2055 > URL: https://issues.apache.org/jira/browse/FLINK-2055 > Project: Flink > Issue Type: New Feature > Components: Streaming, Streaming Connectors >Affects Versions: 0.9 >Reporter: Robert Metzger >Assignee: Hilmi Yildirim > > As per : > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Write-Stream-to-HBase-td1300.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (FLINK-2055) Implement Streaming HBaseSink
[ https://issues.apache.org/jira/browse/FLINK-2055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15126231#comment-15126231 ] PJ Van Aeken edited comment on FLINK-2055 at 2/1/16 2:22 PM: - Indeed the example that you described uses the native client API which I think is the way to go. Unfortunately, HTable is now deprecated so the examples are outdated. In the link to the mailing list (see the issue description), it is suggested to now use the write method on DataStream combined with TableOutputFormat. https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/datastream/DataStream.html#write%28org.apache.flink.api.common.io.OutputFormat,%20long%29 What I am proposing instead is to make a SinkFunction (like we have for Flume for instance) that uses the new HBase client API's, similar to how the example you referred to used to work, rather than using this TableOutputFormat which as far as I understand buffers requests on the client side based on some internal heuristics, as per the HBase documentation: https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/client/BufferedMutator.html was (Author: vanaepi): Indeed the example that you described uses the native client API which I think is the way to go. Unfortunately, HTable is now deprecated so the examples are outdated. In the link to the mailing list (see the issue description), it is suggested to now use the write method on DataStream combined with TableOutputFormat. https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/datastream/DataStream.html#write%28org.apache.flink.api.common.io.OutputFormat,%20long%29 What I am proposing instead is to make a SinkFunction (like we have for Flume for instance) that uses the new HBase client API's, similar to how the example you referred to used to work, rather than using this TableOutputFormat which as far as I understand buffers requests on the client side based on some internal heuristics, as per the HBase documentation: https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/client/BufferedMutator.html EDIT: There appears to be a version mismatch which is why we are not seeing the same problems. Turns out my assumptions are not true in version 0.98x, I am unsure about 1.x for now and its definitely true for 2.x which is in snapshot currently. So the inner workings of the TableOutputFormat have changed in recent versions, which introduces the problem I have described. > Implement Streaming HBaseSink > - > > Key: FLINK-2055 > URL: https://issues.apache.org/jira/browse/FLINK-2055 > Project: Flink > Issue Type: New Feature > Components: Streaming, Streaming Connectors >Affects Versions: 0.9 >Reporter: Robert Metzger >Assignee: Hilmi Yildirim > > As per : > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Write-Stream-to-HBase-td1300.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2055) Implement Streaming HBaseSink
[ https://issues.apache.org/jira/browse/FLINK-2055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15126287#comment-15126287 ] PJ Van Aeken commented on FLINK-2055: - There appears to be a version mismatch which is why we are not seeing the same problems. Turns out my assumptions are not true in version 0.98x but they are for the more recent versions 1.x and 2.x (snapshot currently). The inner workings of the TableOutputFormat have changed in recent versions, which introduces the problem I have described. There is no more auto flush option and in stead the TableOutputFormat now uses the BufferedMutator which does the Put caching I described earlier. > Implement Streaming HBaseSink > - > > Key: FLINK-2055 > URL: https://issues.apache.org/jira/browse/FLINK-2055 > Project: Flink > Issue Type: New Feature > Components: Streaming, Streaming Connectors >Affects Versions: 0.9 >Reporter: Robert Metzger >Assignee: Hilmi Yildirim > > As per : > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Write-Stream-to-HBase-td1300.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2055) Implement Streaming HBaseSink
[ https://issues.apache.org/jira/browse/FLINK-2055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15119736#comment-15119736 ] PJ Van Aeken commented on FLINK-2055: - What is the latest news on this? Having read through the mailing thread, and the corresponding code, it seems like the current solution is more of a workaround. I can understand the desire for reusing what is already out there, but reusing the HBase TableOutputFormat feels a bit like making a sacrifice. I haven't had time to thoroughly investigate my suspicions though and am very interested to learn if anyone else has. I am by no means an HBase expert, but based on what I think I know about HBase, this is the sacrifice I think we're making here: The native HBaseTableOutputFormat was built for use in batch jobs. It uses the BufferedMutator under the hood, which as far as I understood decides to flush based on constraints which are determined by HBase itself, such as the cumulative size of the Puts etc. That means that while we may "write" to our TableOutputFormat every X milliseconds, HBase will still decide on its own when to actually flush the records. The HBase client, in order to avoid a large amount of small files, also groups the Puts together, but in the mean time exposes them through a component called the memstore, making them available before the flush. I believe that by using the TableOutputFormat with the BufferedMutator, we are skipping the memstore and therefore new Puts remain unavailable until the flush. We could off course configure HBase to flush to disk more frequently, but should we really do that if we have an alternative? Now, as mentioned, I'm not sure I fully grasped the inner workings of HBase so if I made some false assumptions, I'm sorry. But based on what I think I know now, it seems like we're making an unnecessary sacrifice here. > Implement Streaming HBaseSink > - > > Key: FLINK-2055 > URL: https://issues.apache.org/jira/browse/FLINK-2055 > Project: Flink > Issue Type: New Feature > Components: Streaming, Streaming Connectors >Affects Versions: 0.9 >Reporter: Robert Metzger >Assignee: Hilmi Yildirim > > As per : > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Write-Stream-to-HBase-td1300.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2055) Implement Streaming HBaseSink
[ https://issues.apache.org/jira/browse/FLINK-2055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15119821#comment-15119821 ] PJ Van Aeken commented on FLINK-2055: - It appears I was wrong in my first comment. HBase provides two output formats, and the TableOutputFormat (currently used in Flink) still passes through the write path and thus the WAL and memstore, unlike its bulk counterpart HFileOutputFormat2. That being said, the TableOutputFormat does use BufferedMutator, which appearantly buffers the requests client side and sends them across the wire in small batches. This still has an inherent delay which is still completely different from the milliseconds defined in the Flink API, and it can easily be avoided by using the native HBase client API. Thoughts? > Implement Streaming HBaseSink > - > > Key: FLINK-2055 > URL: https://issues.apache.org/jira/browse/FLINK-2055 > Project: Flink > Issue Type: New Feature > Components: Streaming, Streaming Connectors >Affects Versions: 0.9 >Reporter: Robert Metzger >Assignee: Hilmi Yildirim > > As per : > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Write-Stream-to-HBase-td1300.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2033) Add overloaded methods with explicit TypeInformation parameters to Gelly
[ https://issues.apache.org/jira/browse/FLINK-2033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14964914#comment-14964914 ] PJ Van Aeken commented on FLINK-2033: - Totally forgot about this one, sorry. Yes it's ok now. Gelly has the explicit declarations. > Add overloaded methods with explicit TypeInformation parameters to Gelly > > > Key: FLINK-2033 > URL: https://issues.apache.org/jira/browse/FLINK-2033 > Project: Flink > Issue Type: Task > Components: Gelly >Affects Versions: 0.9 >Reporter: PJ Van Aeken >Assignee: PJ Van Aeken > Fix For: 0.10 > > > For the implementation of the Scala API for Gelly (FLINK-1962), we need to > pass explicit TypeInformation since the Java TypeExtractor does not work for > all Scala Types (see FLINK-2023). > To do this, the java Gelly API needs to be expanded with methods that allow > for explicit passing of TypeInformation. > An example with mapVertices: > {code} > public GraphmapVertices(final MapFunction , > NV> mapper) { > TypeInformation keyType = ((TupleTypeInfo) > vertices.getType()).getTypeAt(0); > String callLocation = Utils.getCallLocationName(); > TypeInformation valueType = > TypeExtractor.getMapReturnTypes(mapper, vertices.getType(), callLocation, > false); > TypeInformation > returnType = > (TypeInformation >) new TupleTypeInfo( > Vertex.class, keyType, valueType); > return mapVertices(mapper,returnType); > } > public Graph mapVertices(final MapFunction , > NV> mapper, TypeInformation > returnType) { > DataSet > mappedVertices = vertices.map( > new MapFunction , Vertex >() { > public Vertex map(Vertex value) throws > Exception { > return new Vertex (value.f0, mapper.map(value)); > } > }).returns(returnType); > return new Graph (mappedVertices, this.edges, this.context); > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-2277) In Scala API delta Iterations can not be set to unmanaged
[ https://issues.apache.org/jira/browse/FLINK-2277?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] PJ Van Aeken reassigned FLINK-2277: --- Assignee: PJ Van Aeken In Scala API delta Iterations can not be set to unmanaged - Key: FLINK-2277 URL: https://issues.apache.org/jira/browse/FLINK-2277 Project: Flink Issue Type: Improvement Components: Scala API Reporter: Aljoscha Krettek Assignee: PJ Van Aeken Labels: Starter DeltaIteration.java has method solutionSetUnManaged(). In the Scala API this could be added as an optional parameter on iterateDelta() on the DataSet. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2023) TypeExtractor does not work for (some) Scala Classes
[ https://issues.apache.org/jira/browse/FLINK-2023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14624724#comment-14624724 ] PJ Van Aeken commented on FLINK-2023: - I am back as of today ;-) Adding the overloaded methods to the Java API which take an explicit TypeInformation as additional input solved this issue. Not sure how you want to label it, but it does not block the progress on the Scala Graph API anymore. TypeExtractor does not work for (some) Scala Classes Key: FLINK-2023 URL: https://issues.apache.org/jira/browse/FLINK-2023 Project: Flink Issue Type: Bug Components: Type Serialization System Reporter: Aljoscha Krettek [~vanaepi] discovered some problems while working on the Scala Gelly API where, for example, a Scala MapFunction can not be correctly analyzed by the type extractor. For example, generic types will not be correctly detected. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2033) Add overloaded methods with explicit TypeInformation parameters to Gelly
[ https://issues.apache.org/jira/browse/FLINK-2033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14580225#comment-14580225 ] PJ Van Aeken commented on FLINK-2033: - If the PR for FLINK-1962 gets accepted, this can be marked as resolved Add overloaded methods with explicit TypeInformation parameters to Gelly Key: FLINK-2033 URL: https://issues.apache.org/jira/browse/FLINK-2033 Project: Flink Issue Type: Task Components: Gelly Affects Versions: 0.9 Reporter: PJ Van Aeken Assignee: PJ Van Aeken For the implementation of the Scala API for Gelly (FLINK-1962), we need to pass explicit TypeInformation since the Java TypeExtractor does not work for all Scala Types (see FLINK-2023). To do this, the java Gelly API needs to be expanded with methods that allow for explicit passing of TypeInformation. An example with mapVertices: {code} public NV GraphK, NV, EV mapVertices(final MapFunctionVertexK, VV, NV mapper) { TypeInformationK keyType = ((TupleTypeInfo?) vertices.getType()).getTypeAt(0); String callLocation = Utils.getCallLocationName(); TypeInformationNV valueType = TypeExtractor.getMapReturnTypes(mapper, vertices.getType(), callLocation, false); TypeInformationVertexK, NV returnType = (TypeInformationVertexK, NV) new TupleTypeInfo( Vertex.class, keyType, valueType); return mapVertices(mapper,returnType); } public NV GraphK, NV, EV mapVertices(final MapFunctionVertexK, VV, NV mapper, TypeInformationVertexK, NV returnType) { DataSetVertexK, NV mappedVertices = vertices.map( new MapFunctionVertexK, VV, VertexK, NV() { public VertexK, NV map(VertexK, VV value) throws Exception { return new VertexK, NV(value.f0, mapper.map(value)); } }).returns(returnType); return new GraphK, NV, EV(mappedVertices, this.edges, this.context); } {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1962) Add Gelly Scala API
[ https://issues.apache.org/jira/browse/FLINK-1962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14570601#comment-14570601 ] PJ Van Aeken commented on FLINK-1962: - [~ssc], you can find an implementation to play with in my fork (branch scala-gelly-api). It has all of the functionalities from the Java API except for a few utility methods for creating graphs, and I am also still working on Vertex Centric Iterations and Gather Sum Apply Iterations. Other than that most of it should be there, although I am a couple commits behind. Add Gelly Scala API --- Key: FLINK-1962 URL: https://issues.apache.org/jira/browse/FLINK-1962 Project: Flink Issue Type: Task Components: Gelly, Scala API Affects Versions: 0.9 Reporter: Vasia Kalavri Assignee: PJ Van Aeken -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-2033) Add overloaded methods with explicit TypeInformation parameters to Gelly
[ https://issues.apache.org/jira/browse/FLINK-2033?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] PJ Van Aeken reassigned FLINK-2033: --- Assignee: PJ Van Aeken Add overloaded methods with explicit TypeInformation parameters to Gelly Key: FLINK-2033 URL: https://issues.apache.org/jira/browse/FLINK-2033 Project: Flink Issue Type: Task Components: Gelly Affects Versions: 0.9 Reporter: PJ Van Aeken Assignee: PJ Van Aeken For the implementation of the Scala API for Gelly (FLINK-1962), we need to pass explicit TypeInformation since the Java TypeExtractor does not work for all Scala Types (see FLINK-2023). To do this, the java Gelly API needs to be expanded with methods that allow for explicit passing of TypeInformation. An example with mapVertices: {code} public NV GraphK, NV, EV mapVertices(final MapFunctionVertexK, VV, NV mapper) { TypeInformationK keyType = ((TupleTypeInfo?) vertices.getType()).getTypeAt(0); String callLocation = Utils.getCallLocationName(); TypeInformationNV valueType = TypeExtractor.getMapReturnTypes(mapper, vertices.getType(), callLocation, false); TypeInformationVertexK, NV returnType = (TypeInformationVertexK, NV) new TupleTypeInfo( Vertex.class, keyType, valueType); return mapVertices(mapper,returnType); } public NV GraphK, NV, EV mapVertices(final MapFunctionVertexK, VV, NV mapper, TypeInformationVertexK, NV returnType) { DataSetVertexK, NV mappedVertices = vertices.map( new MapFunctionVertexK, VV, VertexK, NV() { public VertexK, NV map(VertexK, VV value) throws Exception { return new VertexK, NV(value.f0, mapper.map(value)); } }).returns(returnType); return new GraphK, NV, EV(mappedVertices, this.edges, this.context); } {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2033) Add overloaded methods with explicit TypeInformation parameters to Gelly
[ https://issues.apache.org/jira/browse/FLINK-2033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14559213#comment-14559213 ] PJ Van Aeken commented on FLINK-2033: - I will add overloaded methods to the Java API on a need-to-have basis for implementing the Scala Gelly API. Progress can be followed here: https://github.com/PieterJanVanAeken/flink/tree/scala-gelly-api/ Add overloaded methods with explicit TypeInformation parameters to Gelly Key: FLINK-2033 URL: https://issues.apache.org/jira/browse/FLINK-2033 Project: Flink Issue Type: Task Components: Gelly Affects Versions: 0.9 Reporter: PJ Van Aeken Assignee: PJ Van Aeken For the implementation of the Scala API for Gelly (FLINK-1962), we need to pass explicit TypeInformation since the Java TypeExtractor does not work for all Scala Types (see FLINK-2023). To do this, the java Gelly API needs to be expanded with methods that allow for explicit passing of TypeInformation. An example with mapVertices: {code} public NV GraphK, NV, EV mapVertices(final MapFunctionVertexK, VV, NV mapper) { TypeInformationK keyType = ((TupleTypeInfo?) vertices.getType()).getTypeAt(0); String callLocation = Utils.getCallLocationName(); TypeInformationNV valueType = TypeExtractor.getMapReturnTypes(mapper, vertices.getType(), callLocation, false); TypeInformationVertexK, NV returnType = (TypeInformationVertexK, NV) new TupleTypeInfo( Vertex.class, keyType, valueType); return mapVertices(mapper,returnType); } public NV GraphK, NV, EV mapVertices(final MapFunctionVertexK, VV, NV mapper, TypeInformationVertexK, NV returnType) { DataSetVertexK, NV mappedVertices = vertices.map( new MapFunctionVertexK, VV, VertexK, NV() { public VertexK, NV map(VertexK, VV value) throws Exception { return new VertexK, NV(value.f0, mapper.map(value)); } }).returns(returnType); return new GraphK, NV, EV(mappedVertices, this.edges, this.context); } {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1962) Add Gelly Scala API
[ https://issues.apache.org/jira/browse/FLINK-1962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14559203#comment-14559203 ] PJ Van Aeken commented on FLINK-1962: - Thanks for merging. I would like to get some input on my current implementation of joinWithVertices. https://github.com/PieterJanVanAeken/flink/blob/scala-gelly-api/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala As you can see, I need to transform the mapper for scala tuples into a mapper for java tuples and I need to map the scala tuples dataset into a java tuples dataset. Do you guys have any suggestions for a more elegant solution? I can reimplement the entire thing using Scala tuples, but we already established that that is not the preferred way. Add Gelly Scala API --- Key: FLINK-1962 URL: https://issues.apache.org/jira/browse/FLINK-1962 Project: Flink Issue Type: Task Components: Gelly, Scala API Affects Versions: 0.9 Reporter: Vasia Kalavri Assignee: PJ Van Aeken -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1962) Add Gelly Scala API
[ https://issues.apache.org/jira/browse/FLINK-1962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14552289#comment-14552289 ] PJ Van Aeken commented on FLINK-1962: - There are no blocking issues at the moment. I have to do the modification of the Java API to add methods with explicit TypeInformation first, but that should not be a problem. I've just been strapped for time the past couple of days. Add Gelly Scala API --- Key: FLINK-1962 URL: https://issues.apache.org/jira/browse/FLINK-1962 Project: Flink Issue Type: Task Components: Gelly, Scala API Affects Versions: 0.9 Reporter: Vasia Kalavri -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1962) Add Gelly Scala API
[ https://issues.apache.org/jira/browse/FLINK-1962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14552501#comment-14552501 ] PJ Van Aeken commented on FLINK-1962: - Ok cool. One more thing perhaps, if you could merge the fixes from [~aljoscha] into the master branch, that would be great. https://github.com/apache/flink/pull/669 Add Gelly Scala API --- Key: FLINK-1962 URL: https://issues.apache.org/jira/browse/FLINK-1962 Project: Flink Issue Type: Task Components: Gelly, Scala API Affects Versions: 0.9 Reporter: Vasia Kalavri Assignee: PJ Van Aeken -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2023) TypeExtractor does not work for (some) Scala Classes
[ https://issues.apache.org/jira/browse/FLINK-2023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14547747#comment-14547747 ] PJ Van Aeken commented on FLINK-2023: - So this works, I'm just not entirely sure this is what you meant : I extended the java Gelly API with a method like the one below so I can explicitely pass all TypeInformation: public NV GraphK, NV, EV mapVertices(final MapFunctionVertexK, VV, NV mapper, TypeInformationK keyType, TypeInformationNV valueType, TypeInformationVertexK, NV returnType) { DataSetVertexK, NV mappedVertices = vertices.map( new MapFunctionVertexK, VV, VertexK, NV() { public VertexK, NV map(VertexK, VV value) throws Exception { return new VertexK, NV(value.f0, mapper.map(value)); } }).returns(returnType); return new GraphK, NV, EV(mappedVertices, this.edges, this.context); } I then call this method in the Scala API like this : new Graph[K, NV, EV](jgraph.mapVertices[NV]( mapper, createTypeInformation[K], createTypeInformation[NV], createTypeInformation[Vertex[K, NV]] )) TypeExtractor does not work for (some) Scala Classes Key: FLINK-2023 URL: https://issues.apache.org/jira/browse/FLINK-2023 Project: Flink Issue Type: Bug Components: Type Serialization System Reporter: Aljoscha Krettek [~vanaepi] discovered some problems while working on the Scala Gelly API where, for example, a Scala MapFunction can not be correctly analyzed by the type extractor. For example, generic types will not be correctly detected. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2033) Add overloaded methods with explicit TypeInformation parameters to Gelly
PJ Van Aeken created FLINK-2033: --- Summary: Add overloaded methods with explicit TypeInformation parameters to Gelly Key: FLINK-2033 URL: https://issues.apache.org/jira/browse/FLINK-2033 Project: Flink Issue Type: Task Components: Gelly Affects Versions: 0.9 Reporter: PJ Van Aeken For the implementation of the Scala API for Gelly (FLINK-1962), we need to pass explicit TypeInformation since the Java TypeExtractor does not work for all Scala Types (see FLINK-2023). To do this, the java Gelly API needs to be expanded with methods that allow for explicit passing of TypeInformation. An example with mapVertices: {code} public NV GraphK, NV, EV mapVertices(final MapFunctionVertexK, VV, NV mapper) { TypeInformationK keyType = ((TupleTypeInfo?) vertices.getType()).getTypeAt(0); String callLocation = Utils.getCallLocationName(); TypeInformationNV valueType = TypeExtractor.getMapReturnTypes(mapper, vertices.getType(), callLocation, false); TypeInformationVertexK, NV returnType = (TypeInformationVertexK, NV) new TupleTypeInfo( Vertex.class, keyType, valueType); return mapVertices(mapper,returnType); } public NV GraphK, NV, EV mapVertices(final MapFunctionVertexK, VV, NV mapper, TypeInformationVertexK, NV returnType) { DataSetVertexK, NV mappedVertices = vertices.map( new MapFunctionVertexK, VV, VertexK, NV() { public VertexK, NV map(VertexK, VV value) throws Exception { return new VertexK, NV(value.f0, mapper.map(value)); } }).returns(returnType); return new GraphK, NV, EV(mappedVertices, this.edges, this.context); } {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2023) TypeExtractor does not work for (some) Scala Classes
[ https://issues.apache.org/jira/browse/FLINK-2023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14547776#comment-14547776 ] PJ Van Aeken commented on FLINK-2023: - Indeed you are right. In the Java API, keyType and valueType are used to construct returnType, but since I am already constructing it, they serve no purpose. In any case, I will log a ticket to add these overloaded methods into the Java Gelly API. Thanks! TypeExtractor does not work for (some) Scala Classes Key: FLINK-2023 URL: https://issues.apache.org/jira/browse/FLINK-2023 Project: Flink Issue Type: Bug Components: Type Serialization System Reporter: Aljoscha Krettek [~vanaepi] discovered some problems while working on the Scala Gelly API where, for example, a Scala MapFunction can not be correctly analyzed by the type extractor. For example, generic types will not be correctly detected. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (FLINK-2023) TypeExtractor does not work for (some) Scala Classes
[ https://issues.apache.org/jira/browse/FLINK-2023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14547776#comment-14547776 ] PJ Van Aeken edited comment on FLINK-2023 at 5/18/15 9:46 AM: -- Indeed you are right. In the Java API, keyType and valueType are used to construct returnType, but since I am already constructing it, they serve no purpose. In any case, I will log a ticket to add these overloaded methods into the Java Gelly API. Thanks! Edit: FLINK-2033 was (Author: vanaepi): Indeed you are right. In the Java API, keyType and valueType are used to construct returnType, but since I am already constructing it, they serve no purpose. In any case, I will log a ticket to add these overloaded methods into the Java Gelly API. Thanks! TypeExtractor does not work for (some) Scala Classes Key: FLINK-2023 URL: https://issues.apache.org/jira/browse/FLINK-2023 Project: Flink Issue Type: Bug Components: Type Serialization System Reporter: Aljoscha Krettek [~vanaepi] discovered some problems while working on the Scala Gelly API where, for example, a Scala MapFunction can not be correctly analyzed by the type extractor. For example, generic types will not be correctly detected. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1962) Add Gelly Scala API
[ https://issues.apache.org/jira/browse/FLINK-1962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14545102#comment-14545102 ] PJ Van Aeken commented on FLINK-1962: - Don't worry about it. I'm just glad we got it working, something I could not have done without the help of you guys. Plus I got to discover a bit more about the typing mechanism, which is a nice bonus. I can start wrapping the Gelly API now, and I think we've made some useful fixes for porting other API's in the future as well. Add Gelly Scala API --- Key: FLINK-1962 URL: https://issues.apache.org/jira/browse/FLINK-1962 Project: Flink Issue Type: Task Components: Gelly, Scala API Affects Versions: 0.9 Reporter: Vasia Kalavri -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1962) Add Gelly Scala API
[ https://issues.apache.org/jira/browse/FLINK-1962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14545170#comment-14545170 ] PJ Van Aeken commented on FLINK-1962: - I spoke a little bit too soon. The syntactic sugar lines fail with the exception below. Just using the Java way (with CustomMap extends MapFunction) works perfectly now though. Any ideas? Exception in thread main org.apache.flink.api.common.functions.InvalidTypesException: Type of TypeVariable 'NV' in 'public org.apache.flink.graph.wrappedjava.Graph org.apache.flink.graph.wrappedjava.Graph.mapVertices(scala.Function1,org.apache.flink.api.common.typeinfo.TypeInformation,scala.reflect.ClassTag)' could not be determined. This is most likely a type erasure problem. The type extraction currently supports types with generic variables only in cases where all variables in the return type can be deduced from the input type(s). Add Gelly Scala API --- Key: FLINK-1962 URL: https://issues.apache.org/jira/browse/FLINK-1962 Project: Flink Issue Type: Task Components: Gelly, Scala API Affects Versions: 0.9 Reporter: Vasia Kalavri -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1962) Add Gelly Scala API
[ https://issues.apache.org/jira/browse/FLINK-1962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14545434#comment-14545434 ] PJ Van Aeken commented on FLINK-1962: - So the fix works for the syntactic sugar. I also made it work for the regular Mapper by wrapping the regular Mapper inside another mapper which extends ResultTypeQueryable. It looks aweful but it works: https://github.com/PieterJanVanAeken/flink/blob/master/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/wrappedjava/Graph.scala Is this the way to continue or should we look for a more elegant solution? Add Gelly Scala API --- Key: FLINK-1962 URL: https://issues.apache.org/jira/browse/FLINK-1962 Project: Flink Issue Type: Task Components: Gelly, Scala API Affects Versions: 0.9 Reporter: Vasia Kalavri -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1962) Add Gelly Scala API
[ https://issues.apache.org/jira/browse/FLINK-1962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14541783#comment-14541783 ] PJ Van Aeken commented on FLINK-1962: - So the line I mentioned is definitely the problem. What you'd really need to call there is the other constructor for TupleTypeInfo which also takes a Class as constructor parameter. Unfortunately, due to type erasure, the class T is not available. Since T : WeakTypeTag, perhaps there is a way to pass the class parameter after all? I am not that familiar with how WeakTypeTags work exactly. Add Gelly Scala API --- Key: FLINK-1962 URL: https://issues.apache.org/jira/browse/FLINK-1962 Project: Flink Issue Type: Task Components: Gelly, Scala API Affects Versions: 0.9 Reporter: Vasia Kalavri -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1962) Add Gelly Scala API
[ https://issues.apache.org/jira/browse/FLINK-1962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14539441#comment-14539441 ] PJ Van Aeken commented on FLINK-1962: - Is there a specific reason why Flink has custom TupleX classes, rather than just reusing scala.TupleX? Add Gelly Scala API --- Key: FLINK-1962 URL: https://issues.apache.org/jira/browse/FLINK-1962 Project: Flink Issue Type: Task Components: Gelly, Scala API Affects Versions: 0.9 Reporter: Vasia Kalavri -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1962) Add Gelly Scala API
[ https://issues.apache.org/jira/browse/FLINK-1962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14539558#comment-14539558 ] PJ Van Aeken commented on FLINK-1962: - I suspect there is some IDE shannanigans going on at the JavaTupleDescriptor declaration. I think it should be something like this: case class JavaTupleDescriptor( id: Int, tpe: Type, fields: Seq[UDTDescriptor] ) extends UDTDescriptor Add Gelly Scala API --- Key: FLINK-1962 URL: https://issues.apache.org/jira/browse/FLINK-1962 Project: Flink Issue Type: Task Components: Gelly, Scala API Affects Versions: 0.9 Reporter: Vasia Kalavri -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (FLINK-1962) Add Gelly Scala API
[ https://issues.apache.org/jira/browse/FLINK-1962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14537797#comment-14537797 ] PJ Van Aeken edited comment on FLINK-1962 at 5/11/15 10:33 AM: --- I was wrong before. The TypeErasure fix has one more problem. Method createTypeInformation creates a PojoTypeInfo for Vertex, rather than a TupleTypeInfo which is what the Java API expects. was (Author: vanaepi): The combination of both pull requests did the trick. Perhaps someone should merge the TypeErasure fix into master as well. Seems like an important fix for future combined java/scala api's Add Gelly Scala API --- Key: FLINK-1962 URL: https://issues.apache.org/jira/browse/FLINK-1962 Project: Flink Issue Type: Task Components: Gelly, Scala API Affects Versions: 0.9 Reporter: Vasia Kalavri -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1962) Add Gelly Scala API
[ https://issues.apache.org/jira/browse/FLINK-1962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14537797#comment-14537797 ] PJ Van Aeken commented on FLINK-1962: - The combination of both pull requests did the trick. Perhaps someone should merge the TypeErasure fix into master as well. Seems like an important fix for future combined java/scala api's Add Gelly Scala API --- Key: FLINK-1962 URL: https://issues.apache.org/jira/browse/FLINK-1962 Project: Flink Issue Type: Task Components: Gelly, Scala API Affects Versions: 0.9 Reporter: Vasia Kalavri -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (FLINK-1962) Add Gelly Scala API
[ https://issues.apache.org/jira/browse/FLINK-1962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14537797#comment-14537797 ] PJ Van Aeken edited comment on FLINK-1962 at 5/11/15 11:30 AM: --- I was wrong before. The TypeErasure fix has one more problem. Method createTypeInformation creates a PojoTypeInfo for Vertex, rather than a TupleTypeInfo which is what the Java API expects. I think this is because the Scala Type Extraction does not recognize java's Tuple2 as a Tuple... was (Author: vanaepi): I was wrong before. The TypeErasure fix has one more problem. Method createTypeInformation creates a PojoTypeInfo for Vertex, rather than a TupleTypeInfo which is what the Java API expects. Add Gelly Scala API --- Key: FLINK-1962 URL: https://issues.apache.org/jira/browse/FLINK-1962 Project: Flink Issue Type: Task Components: Gelly, Scala API Affects Versions: 0.9 Reporter: Vasia Kalavri -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1962) Add Gelly Scala API
[ https://issues.apache.org/jira/browse/FLINK-1962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14532224#comment-14532224 ] PJ Van Aeken commented on FLINK-1962: - If there are no problems, feel free to push it. I am currently a bit preoccupied but I will continue working on this during the weekend. Add Gelly Scala API --- Key: FLINK-1962 URL: https://issues.apache.org/jira/browse/FLINK-1962 Project: Flink Issue Type: Task Components: Gelly, Scala API Affects Versions: 0.9 Reporter: Vasia Kalavri -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1962) Add Gelly Scala API
[ https://issues.apache.org/jira/browse/FLINK-1962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14530225#comment-14530225 ] PJ Van Aeken commented on FLINK-1962: - I did a new commit to my personal Github, which gives you a better idea of what I am currently doing. But I absolutely agree that the way I am currently doing it is a maintenance and sync nightmare. I would be more than willing to implement it as a wrapper of the Java API but there are some problems. Most of them are easily fixed, but the one I can't seem to fix is dealing with the constraints of Vertices/Edges key and values. Basically, I want the key and value types to be either scala case classes or scala primitives. For case classes, you can force implementation of Ordered and Serializable (which is conform to the java constraints) but scala primitives implement neither Ordered nor Serializable. You can solve part of this by doing [K : Ordered[K]] but Ordered doesn't implement Serializable either. I've tried many things and none of them compile for both scala primitives and case classes. Is there something I am missing here? Add Gelly Scala API --- Key: FLINK-1962 URL: https://issues.apache.org/jira/browse/FLINK-1962 Project: Flink Issue Type: Task Components: Gelly, Scala API Affects Versions: 0.9 Reporter: Vasia Kalavri -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1962) Add Gelly Scala API
[ https://issues.apache.org/jira/browse/FLINK-1962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14530317#comment-14530317 ] PJ Van Aeken commented on FLINK-1962: - Update : Using implicit conversions, I managed to get this to work but this still looks terrible to be honest: import java.{lang = jlang} val simpleEdge: Edge[jlang.Long, jlang.Long] = Edge(1L, 2L, 3L) It implicitly converts the Scala Long to a java.lang.Long but you still have to define that you want a java.lang.Long as Key and Value types, which is an implementation detail that you'd want to hide from the user... Add Gelly Scala API --- Key: FLINK-1962 URL: https://issues.apache.org/jira/browse/FLINK-1962 Project: Flink Issue Type: Task Components: Gelly, Scala API Affects Versions: 0.9 Reporter: Vasia Kalavri -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1962) Add Gelly Scala API
[ https://issues.apache.org/jira/browse/FLINK-1962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14530353#comment-14530353 ] PJ Van Aeken commented on FLINK-1962: - [~StephanEwen] I think that removing the constraints on the Java Gelly API would make things a lot easier for me. But are you certain it won't break the Gelly API? Add Gelly Scala API --- Key: FLINK-1962 URL: https://issues.apache.org/jira/browse/FLINK-1962 Project: Flink Issue Type: Task Components: Gelly, Scala API Affects Versions: 0.9 Reporter: Vasia Kalavri -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1962) Add Gelly Scala API
[ https://issues.apache.org/jira/browse/FLINK-1962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14526329#comment-14526329 ] PJ Van Aeken commented on FLINK-1962: - I am working on one right now which implements the basic functionalities of the Java API with the addition of the usual syntactic sugar methods. I will report back once I have a sufficient amount of functionality implemented to play around with. Add Gelly Scala API --- Key: FLINK-1962 URL: https://issues.apache.org/jira/browse/FLINK-1962 Project: Flink Issue Type: Task Components: Gelly, Scala API Affects Versions: 0.9 Reporter: Vasia Kalavri -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1962) Add Gelly Scala API
[ https://issues.apache.org/jira/browse/FLINK-1962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14526402#comment-14526402 ] PJ Van Aeken commented on FLINK-1962: - Actually I am currently building the Scala Gelly API on top of the regular Scala API. Are you suggesting I build the Scala Gelly API directly on top of the Java API instead (using these wrapping mechanisms)? Add Gelly Scala API --- Key: FLINK-1962 URL: https://issues.apache.org/jira/browse/FLINK-1962 Project: Flink Issue Type: Task Components: Gelly, Scala API Affects Versions: 0.9 Reporter: Vasia Kalavri -- This message was sent by Atlassian JIRA (v6.3.4#6332)