Gelly vertex ID type requirements?
Hello, I am having some trouble building a graph where the vertex ID type is a POJO. Specifically, it would be a class that has two fields: a long field, and a field which is of another class that has four byte fields. (Both classes implement the Comparable interface, as the Gelly guide specifies.) If the outer class has a default constructor, then it is recognized as a POJO, but I get the following exception: Exception in thread main java.lang.IllegalArgumentException: Wrong field type: PojoTypemalom.GameState, fields = [board: Long, sid: PojoTypemalom.SectorId, fields = [b: Byte, bf: Byte, w: Byte, wf: Byte]] at com.google.common.base.Preconditions.checkArgument(Preconditions.java:122) at org.apache.flink.api.java.operators.Keys$ExpressionKeys.init(Keys.java:241) at org.apache.flink.api.java.operators.Keys$ExpressionKeys.init(Keys.java:203) at org.apache.flink.api.java.operators.CoGroupOperator$CoGroupOperatorSets.where(CoGroupOperator.java:458) at org.apache.flink.graph.Graph.inDegrees(Graph.java:701) at org.apache.flink.graph.spargel.VertexCentricIteration.createResultVerticesWithDegrees(VertexCentricIteration.java:610) at org.apache.flink.graph.spargel.VertexCentricIteration.createResult(VertexCentricIteration.java:180) at org.apache.flink.api.java.DataSet.runOperation(DataSet.java:1044) at org.apache.flink.graph.Graph.runVertexCentricIteration(Graph.java:1312) at malom.Retrograde.run(Retrograde.java:64) at malom.Solver.main(Solver.java:32) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) Note, that originally the exception just said Wrong field type, from which I had no idea what type is it referring to, so I modified line 241 of Keys.java to include the type in the msg like this: Preconditions.checkArgument(fieldType instanceof AtomicType, Wrong field type: + fieldType.toString()); On the other hand, if I comment out the default constructor of the outer class, then it is not a POJO, but only a GenericTypeInfo is created from it, which implements AtomicType, so the previous exception does not appear. Does this mean that the Vertex IDs cannot be POJOs? I am not sure if this is the intended behaviour. After all, if we have a POJO, that should always be better then if we have a generic type, right? (I am just guessing here, but maybe CompositeType could also be regarded as an AtomicType: the only method declared by the AtomicType interface is createComparator, which is also defined in CompositeType (of which PojoTypeInfo is a subclass), but with different parameters, but maybe CompositeType could implement AtomicType by delegating the createComparator call with all of the fields specified?) I encountered another problem, which is may or may not be related to the above: without the default constructor (the GenericTypeInfo case), the VertexCentricIteration eats my graph, that is, the result graph has zero vertices. I traced this problem to the first join in VertexCentricIteration.createResultVerticesWithDegrees, where the degrees DataSet is created: both inputs of the join (outDegrees and inDegrees) contains the correct data, but the result (degrees) is empty. Interestingly, this problem disappears, if I add JoinHint.REPARTITION_SORT_MERGE. Best regards, Gabor
Re: Gelly vertex ID type requirements?
Thanks for reporting this issue. The Wrong field type error looks like a bug to me. This happens, because PojoType is neither a TupleType nor an AtomicType. To me it looks like the TupleTypeInfoBase condition should be generalized to CompositeType. I will look into this. Cheers, Fabian 2015-07-30 14:18 GMT+02:00 Gábor Gévay gga...@gmail.com: Hello, I am having some trouble building a graph where the vertex ID type is a POJO. Specifically, it would be a class that has two fields: a long field, and a field which is of another class that has four byte fields. (Both classes implement the Comparable interface, as the Gelly guide specifies.) If the outer class has a default constructor, then it is recognized as a POJO, but I get the following exception: Exception in thread main java.lang.IllegalArgumentException: Wrong field type: PojoTypemalom.GameState, fields = [board: Long, sid: PojoTypemalom.SectorId, fields = [b: Byte, bf: Byte, w: Byte, wf: Byte]] at com.google.common.base.Preconditions.checkArgument(Preconditions.java:122) at org.apache.flink.api.java.operators.Keys$ExpressionKeys.init(Keys.java:241) at org.apache.flink.api.java.operators.Keys$ExpressionKeys.init(Keys.java:203) at org.apache.flink.api.java.operators.CoGroupOperator$CoGroupOperatorSets.where(CoGroupOperator.java:458) at org.apache.flink.graph.Graph.inDegrees(Graph.java:701) at org.apache.flink.graph.spargel.VertexCentricIteration.createResultVerticesWithDegrees(VertexCentricIteration.java:610) at org.apache.flink.graph.spargel.VertexCentricIteration.createResult(VertexCentricIteration.java:180) at org.apache.flink.api.java.DataSet.runOperation(DataSet.java:1044) at org.apache.flink.graph.Graph.runVertexCentricIteration(Graph.java:1312) at malom.Retrograde.run(Retrograde.java:64) at malom.Solver.main(Solver.java:32) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) Note, that originally the exception just said Wrong field type, from which I had no idea what type is it referring to, so I modified line 241 of Keys.java to include the type in the msg like this: Preconditions.checkArgument(fieldType instanceof AtomicType, Wrong field type: + fieldType.toString()); On the other hand, if I comment out the default constructor of the outer class, then it is not a POJO, but only a GenericTypeInfo is created from it, which implements AtomicType, so the previous exception does not appear. Does this mean that the Vertex IDs cannot be POJOs? I am not sure if this is the intended behaviour. After all, if we have a POJO, that should always be better then if we have a generic type, right? (I am just guessing here, but maybe CompositeType could also be regarded as an AtomicType: the only method declared by the AtomicType interface is createComparator, which is also defined in CompositeType (of which PojoTypeInfo is a subclass), but with different parameters, but maybe CompositeType could implement AtomicType by delegating the createComparator call with all of the fields specified?) I encountered another problem, which is may or may not be related to the above: without the default constructor (the GenericTypeInfo case), the VertexCentricIteration eats my graph, that is, the result graph has zero vertices. I traced this problem to the first join in VertexCentricIteration.createResultVerticesWithDegrees, where the degrees DataSet is created: both inputs of the join (outDegrees and inDegrees) contains the correct data, but the result (degrees) is empty. Interestingly, this problem disappears, if I add JoinHint.REPARTITION_SORT_MERGE. Best regards, Gabor
Re: Tuple model project
You could try to use the TypeSerializerInputFormat. On Thu, Jul 30, 2015 at 2:08 PM, Flavio Pompermaier pomperma...@okkam.it wrote: How can I create a Flink dataset given a directory path that contains a set of java objects serialized with kryo (one file per object)? On Thu, Jul 30, 2015 at 1:41 PM, Till Rohrmann trohrm...@apache.org wrote: Hi Flavio, in order to use the Kryo serializer for a given type you can use the registerTypeWithKryoSerializer of the ExecutionEnvironment object. What you provide to the method is the type you want to be serialized with kryo and an implementation of the com.esotericsoftware.kryo.Serializer class. If the given type is not supported by Flink’s own serialization framework, then this custom serializer should be used. You register the types at the beginning of your Flink program: def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment env.registerTypeWithKryoSerializer(classOf[MyType], classOf[MyTypeSerializer]) ... env.execute() } Cheers, Till On Thu, Jul 30, 2015 at 12:45 PM, Flavio Pompermaier pomperma...@okkam.it wrote: I have a project that produce RDF quads and I have to store to read them with Flink afterwards. I could use thrift/protobuf/avro but this means to add a lot of transitive dependencies to my project. Maybe I could use Kryo to store those objects..is there any example to create a dataset of objects serialized with kryo? On Thu, Jul 30, 2015 at 11:10 AM, Stephan Ewen se...@apache.org wrote: Quick response: I am not opposed to that, but there are tuple libraries around already. Do you need specifically the Flink tuples, for interoperability between Flink and other projects? On Thu, Jul 30, 2015 at 11:07 AM, Stephan Ewen se...@apache.org wrote: Should we move this to the dev list? On Thu, Jul 30, 2015 at 10:43 AM, Flavio Pompermaier pomperma...@okkam.it wrote: Any thought about this (move tuples classes in a separate self-contained project with no transitive dependencies so that to be easily used in other external projects)? On Mon, Jul 6, 2015 at 11:09 AM, Flavio Pompermaier pomperma...@okkam.it wrote: Do you think it could be a good idea to extract Flink tuples in a separate project so that to allow simpler dependency management in Flin-compatible projects? On Mon, Jul 6, 2015 at 11:06 AM, Fabian Hueske fhue...@gmail.com wrote: Hi, at the moment, Tuples are more efficient than POJOs, because POJO fields are accessed via Java reflection whereas Tuple fields are directly accessed. This performance penalty could be overcome by code-generated seriliazers and comparators but I am not aware of any work in that direction. Best, Fabian 2015-07-06 11:01 GMT+02:00 Flavio Pompermaier pomperma...@okkam.it : Hi to all, I was thinking to write my own flink-compatible library and I need basically a Tuple5. Is there any performace loss in using a POJO with 5 String fields vs a Tuple5? If yes, wouldn't be a good idea to extract flink tuples in a separate simple project (e.g. flink-java-tuples) that has no other dependency to enable other libs to write their flink-compatible logic without the need to exclude all the transitive dependency of flink-java? Best, Flavio
Re: Gelly vertex ID type requirements?
Thanks for the response. As a temporary workaround, I tried to change these problematic lines: } else { Preconditions.checkArgument(fieldType instanceof AtomicType, Wrong field type: + fieldType.toString()); keyFields.add(new FlatFieldDescriptor(keyId, fieldType)); } into this: } else if (fieldType instanceof AtomicType) { keyFields.add(new FlatFieldDescriptor(keyId, fieldType)); } else { Preconditions.checkArgument(fieldType instanceof PojoTypeInfo, Wrong field type: + fieldType.toString()); ((PojoTypeInfo)fieldType).getFlatFields(*, 0, keyFields); } But then I ran into another problem: The TypeExtractor creates the TupleTypeInfoBase for the Edge type of my graph with the following types: 0 = {PojoTypeInfo@1067} PojoTypemalom.GameState, fields = [board: Long, sid: PojoTypemalom.SectorId, fields = [b: Byte, bf: Byte, w: Byte, wf: Byte]] 1 = {GenericTypeInfo@1068} GenericTypemalom.GameState 2 = {ValueTypeInfo@1069} ValueTypeNullValue The problem here is that the first two types should clearly be the same, as the Edge type looks like this: public class EdgeK, V extends Tuple3K, K, V I did a bit of debugging on this, and the source of the problem seems to be the mechanism in TypeExtractor that would detect recursive types (see the alreadySeen field in TypeExtractor), as it mistakes the second appearance of malom.GameState with a recursive field. Specifically the following happens: createTypeInfoWithTypeHierarchy starts to process the Edge type, and in line 433 it calls itself for the first field, which proceeds into the privateGetForClass case which correctly detects that it is a POJO, and correctly returns a PojoTypeInfo; but in the meantime in line 1190, privateGetForClass adds malom.GameState to alreadySeen. Then the outer createTypeInfoWithTypeHierarchy approaches the second field, goes into privateGetForClass, which mistakenly returns a GenericTypeInfo, as it thinks in line 1186, that a recursive type is being processed. Should I open a Jira for this? A possible solution would be to change the alreadySeen field into a parameter of all the various type extraction methods, and make it contain only those types that are ancestors in the nesting hierarchy. Best regards, Gabor 2015-07-30 14:32 GMT+02:00 Fabian Hueske fhue...@gmail.com: Thanks for reporting this issue. The Wrong field type error looks like a bug to me. This happens, because PojoType is neither a TupleType nor an AtomicType. To me it looks like the TupleTypeInfoBase condition should be generalized to CompositeType. I will look into this. Cheers, Fabian 2015-07-30 14:18 GMT+02:00 Gábor Gévay gga...@gmail.com: Hello, I am having some trouble building a graph where the vertex ID type is a POJO. Specifically, it would be a class that has two fields: a long field, and a field which is of another class that has four byte fields. (Both classes implement the Comparable interface, as the Gelly guide specifies.) If the outer class has a default constructor, then it is recognized as a POJO, but I get the following exception: Exception in thread main java.lang.IllegalArgumentException: Wrong field type: PojoTypemalom.GameState, fields = [board: Long, sid: PojoTypemalom.SectorId, fields = [b: Byte, bf: Byte, w: Byte, wf: Byte]] at com.google.common.base.Preconditions.checkArgument(Preconditions.java:122) at org.apache.flink.api.java.operators.Keys$ExpressionKeys.init(Keys.java:241) at org.apache.flink.api.java.operators.Keys$ExpressionKeys.init(Keys.java:203) at org.apache.flink.api.java.operators.CoGroupOperator$CoGroupOperatorSets.where(CoGroupOperator.java:458) at org.apache.flink.graph.Graph.inDegrees(Graph.java:701) at org.apache.flink.graph.spargel.VertexCentricIteration.createResultVerticesWithDegrees(VertexCentricIteration.java:610) at org.apache.flink.graph.spargel.VertexCentricIteration.createResult(VertexCentricIteration.java:180) at org.apache.flink.api.java.DataSet.runOperation(DataSet.java:1044) at org.apache.flink.graph.Graph.runVertexCentricIteration(Graph.java:1312) at malom.Retrograde.run(Retrograde.java:64) at malom.Solver.main(Solver.java:32) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) Note, that originally the exception just said Wrong field type, from which I had no idea what type is it referring to, so I modified line 241 of Keys.java to include the type in the msg like this: Preconditions.checkArgument(fieldType instanceof AtomicType, Wrong field type: + fieldType.toString()); On the other hand, if I comment out the default constructor of the outer class, then it is not a
output writer
Hi everybody, I have a question about the writer I have to save my dataset in different files according to a field of the tuples let’s assume I have a groupId in the tuple, I need to store each group in a different file, with a custom name: any idea on how i can do that? thanks! Michele
RE: output writer
Hi, My 2 cents ... based on something similar that I have tried. I have created an own implementation for OutputFormat where you define your own logic for what happens in the writerecord function. This logic would consist in making a distinction between the ids and write each to the appropriate file Might be that other solutions exist Dr. Radu Tudoran Research Engineer IT RD Division HUAWEI TECHNOLOGIES Duesseldorf GmbH European Research Center Riesstrasse 25, 80992 München E-mail: radu.tudo...@huawei.com Mobile: +49 15209084330 Telephone: +49 891588344173 HUAWEI TECHNOLOGIES Duesseldorf GmbH Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063, Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN -Original Message- From: Michele Bertoni [mailto:michele1.bert...@mail.polimi.it] Sent: Thursday, July 30, 2015 10:15 AM To: user@flink.apache.org Subject: output writer Hi everybody, I have a question about the writer I have to save my dataset in different files according to a field of the tuples let’s assume I have a groupId in the tuple, I need to store each group in a different file, with a custom name: any idea on how i can do that? thanks! Michele
Re: Tuple model project
Any thought about this (move tuples classes in a separate self-contained project with no transitive dependencies so that to be easily used in other external projects)? On Mon, Jul 6, 2015 at 11:09 AM, Flavio Pompermaier pomperma...@okkam.it wrote: Do you think it could be a good idea to extract Flink tuples in a separate project so that to allow simpler dependency management in Flin-compatible projects? On Mon, Jul 6, 2015 at 11:06 AM, Fabian Hueske fhue...@gmail.com wrote: Hi, at the moment, Tuples are more efficient than POJOs, because POJO fields are accessed via Java reflection whereas Tuple fields are directly accessed. This performance penalty could be overcome by code-generated seriliazers and comparators but I am not aware of any work in that direction. Best, Fabian 2015-07-06 11:01 GMT+02:00 Flavio Pompermaier pomperma...@okkam.it: Hi to all, I was thinking to write my own flink-compatible library and I need basically a Tuple5. Is there any performace loss in using a POJO with 5 String fields vs a Tuple5? If yes, wouldn't be a good idea to extract flink tuples in a separate simple project (e.g. flink-java-tuples) that has no other dependency to enable other libs to write their flink-compatible logic without the need to exclude all the transitive dependency of flink-java? Best, Flavio
optimal deployment model for Flink Streaming programs
Hi! We want to build an infrastructure for automated deployment of Flink Streaming programs to a dedicated environment. This includes automated tests (unit and integration) via Jenkins and in case of a successful buildtest the program should be deployed to the execution environment. Since streaming programs run infinitely, the problem is to switch from the running program to the newly deployed. The CLI has some features that would make it possible (list, cancel). Is there another way of somehow restarting a streaming program? Do you have a suggested way for the deployment (automated!)? Regards Rico B.
RE: output writer
I will double check and try to commit this in the next days Dr. Radu Tudoran Research Engineer IT RD Division [cid:image007.jpg@01CD52EB.AD060EE0] HUAWEI TECHNOLOGIES Duesseldorf GmbH European Research Center Riesstrasse 25, 80992 München E-mail: radu.tudo...@huawei.com Mobile: +49 15209084330 Telephone: +49 891588344173 HUAWEI TECHNOLOGIES Duesseldorf GmbH Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.comhttp://www.huawei.com/ Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063, Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN From: Fabian Hueske [mailto:fhue...@gmail.com] Sent: Thursday, July 30, 2015 11:34 AM To: user@flink.apache.org Subject: Re: output writer Hi Michele, hi Radu Flink does not have such an OutputFormat, but I agree, it would be a valuable addition. Radu's approach looks like the way to go to implement this feature. @Radu, is there a way to contribute your OutputFormat to Flink? Cheers, Fabian 2015-07-30 10:24 GMT+02:00 Radu Tudoran radu.tudo...@huawei.commailto:radu.tudo...@huawei.com: Hi, My 2 cents ... based on something similar that I have tried. I have created an own implementation for OutputFormat where you define your own logic for what happens in the writerecord function. This logic would consist in making a distinction between the ids and write each to the appropriate file Might be that other solutions exist Dr. Radu Tudoran Research Engineer IT RD Division HUAWEI TECHNOLOGIES Duesseldorf GmbH European Research Center Riesstrasse 25, 80992 München E-mail: radu.tudo...@huawei.commailto:radu.tudo...@huawei.com Mobile: +49 15209084330tel:%2B49%2015209084330 Telephone: +49 891588344173tel:%2B49%20891588344173 HUAWEI TECHNOLOGIES Duesseldorf GmbH Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.comhttp://www.huawei.com Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063, Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN -Original Message- From: Michele Bertoni [mailto:michele1.bert...@mail.polimi.itmailto:michele1.bert...@mail.polimi.it] Sent: Thursday, July 30, 2015 10:15 AM To: user@flink.apache.orgmailto:user@flink.apache.org Subject: output writer Hi everybody, I have a question about the writer I have to save my dataset in different files according to a field of the tuples let’s assume I have a groupId in the tuple, I need to store each group in a different file, with a custom name: any idea on how i can do that? thanks! Michele
RE: output writer
Re-hi, I have double –checked and actually there is an OutputFormat interface in flink which can be extended. I believe that for this kind of specific formats as mentioned by Michele, each can develop the appropriate format. On the other hand, having more outputformats I believe is something that could be contributed. We should identify a couple of common formats. The first one that comes in my mind is to have something for writing to memory (e.g. memory buffer) Dr. Radu Tudoran Research Engineer IT RD Division [cid:image007.jpg@01CD52EB.AD060EE0] HUAWEI TECHNOLOGIES Duesseldorf GmbH European Research Center Riesstrasse 25, 80992 München E-mail: radu.tudo...@huawei.com Mobile: +49 15209084330 Telephone: +49 891588344173 HUAWEI TECHNOLOGIES Duesseldorf GmbH Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.comhttp://www.huawei.com/ Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063, Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN From: Fabian Hueske [mailto:fhue...@gmail.com] Sent: Thursday, July 30, 2015 11:34 AM To: user@flink.apache.org Subject: Re: output writer Hi Michele, hi Radu Flink does not have such an OutputFormat, but I agree, it would be a valuable addition. Radu's approach looks like the way to go to implement this feature. @Radu, is there a way to contribute your OutputFormat to Flink? Cheers, Fabian 2015-07-30 10:24 GMT+02:00 Radu Tudoran radu.tudo...@huawei.commailto:radu.tudo...@huawei.com: Hi, My 2 cents ... based on something similar that I have tried. I have created an own implementation for OutputFormat where you define your own logic for what happens in the writerecord function. This logic would consist in making a distinction between the ids and write each to the appropriate file Might be that other solutions exist Dr. Radu Tudoran Research Engineer IT RD Division HUAWEI TECHNOLOGIES Duesseldorf GmbH European Research Center Riesstrasse 25, 80992 München E-mail: radu.tudo...@huawei.commailto:radu.tudo...@huawei.com Mobile: +49 15209084330tel:%2B49%2015209084330 Telephone: +49 891588344173tel:%2B49%20891588344173 HUAWEI TECHNOLOGIES Duesseldorf GmbH Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.comhttp://www.huawei.com Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063, Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN -Original Message- From: Michele Bertoni [mailto:michele1.bert...@mail.polimi.itmailto:michele1.bert...@mail.polimi.it] Sent: Thursday, July 30, 2015 10:15 AM To: user@flink.apache.orgmailto:user@flink.apache.org Subject: output writer Hi everybody, I have a question about the writer I have to save my dataset in different files according to a field of the tuples let’s assume I have a groupId in the tuple, I need to store each group in a different file, with a custom name: any idea on how i can do that? thanks! Michele
Re: Using Date or other types in a POJO?
Hi Stefan, The problem is that the CsvParser does not know how to parse types other than the ones that are supported. It would be nice if it supported a custom parser which is either manually specified or included in the PoJo class itself. You can either change your PoJo fields to be of a supported types (like you already did), or read your data into a TupleString, Double, Double,.. first and convert the Tuples in a Map operation to a Pojo. In the map operation you can specify your own parsing logic. Best, Max On Thu, Jul 30, 2015 at 11:40 AM, Stefan Winterstein stefan.winterst...@dfki.de wrote: Hi, I'm new to Flink and just taking the first steps... I want to parse a CSV file that contains a date and time as the first field, then some values: 07.02.201549.9871 234.677 ... So I’d like to use this POJO: import java.util.Date; public class DataPoint { private String dateStr; // String value of date private Date date; // the actual date ... private static SimpleDateFormat dateFormat = new SimpleDateFormat(dd.MM.); public DataPoint() {} // String setter, converts to Date public void setDateStr(String value) { this.dateStr = value; try { this.date = dateFormat.parse(dateStr); // parse string and store date } catch (ParseException e) { e.printStackTrace(); } } public String getDateStr() { return this.dateStr; } public Date getDate() { return this.date; } … } ...and pass it to the CSVReader: DataSetDataPoint csvInput = env.readCsvFile(filename) .pojoType(DataPoint.class, dateStr, ...); However, this fails with an exception: Exception in thread main java.lang.IllegalArgumentException: The type 'java.util.Date' is not supported for the CSV input format. at org.apache.flink.api.common.io.GenericCsvInputFormat.setFieldTypesGeneric(GenericCsvInputFormat.java:236) at org.apache.flink.api.java.io.CsvInputFormat.setFieldTypes(CsvInputFormat.java:115) at org.apache.flink.api.java.io.CsvInputFormat.init(CsvInputFormat.java:77) at org.apache.flink.api.java.io.CsvInputFormat.init(CsvInputFormat.java:61) at org.apache.flink.api.java.io.CsvReader.pojoType(CsvReader.java:295) at de.dfki.iui.MyJob.main(MyJob.java:60) I managed to work around this by storing the long value of Date.getTime() instead of Date, but: Does the POJO semantic really need to be that strict? Wouldn't it be sufficient if there was an appropriate getter/setter for the member names given to pojoType()? Best regards, -Stefan
Re: thread model issue in TaskManager
Hi, it is currently not possible to isolate tasks that consume a lot of JVM heap memory and schedule them to a specific slot (or TaskManager). If you operate in a YARN setup, you can isolate different jobs from each other by starting a new YARN session for each job, but tasks within the same job cannot be isolated from each other right now. Cheers, Fabian 2015-07-30 4:02 GMT+02:00 wangzhijiang999 wangzhijiang...@aliyun.com: As I know, flink uses thread model in TaskManager, that means one taskmanager process may run many different operator threads,and these threads will compete the memory of the process. I know that flink has memoryManage component in each taskManager, and it will control the localBufferPool of InputGate, ResultPartition for each task,but if UDF consume much memory, it will use jvm heap memory, so it can not be controlled by flink. If I use flink as common platform, some users will consume much memory in UDF, and it may influence other threads in the process, especially for OOM. I know that it has sharedslot or isolated slot properties , but it just limit the task schedule in one taskmanager, can i schedule task in separate taskmanger if i consume much memory and donot want to influence other tasks. Or are there any suggestions for the issue of thread model. As I know spark is also thread model, but hadoop2 use process model. Thank you for any suggestions in advance!
Re: optimal deployment model for Flink Streaming programs
I think what you outline is the right way to go for the time being. The Client class is being reworked to bet more of these deployment/controlling methods, to make it easier to deploy/cancel/restart jobs programatically, but that will probably take a few more weeks to converge. Stephan On Thu, Jul 30, 2015 at 8:59 AM, Dipl.-Inf. Rico Bergmann i...@ricobergmann.de wrote: Hi! We want to build an infrastructure for automated deployment of Flink Streaming programs to a dedicated environment. This includes automated tests (unit and integration) via Jenkins and in case of a successful buildtest the program should be deployed to the execution environment. Since streaming programs run infinitely, the problem is to switch from the running program to the newly deployed. The CLI has some features that would make it possible (list, cancel). Is there another way of somehow restarting a streaming program? Do you have a suggested way for the deployment (automated!)? Regards Rico B.
Using Date or other types in a POJO?
Hi, I'm new to Flink and just taking the first steps... I want to parse a CSV file that contains a date and time as the first field, then some values: 07.02.201549.9871 234.677 ... So I’d like to use this POJO: import java.util.Date; public class DataPoint { private String dateStr; // String value of date private Date date; // the actual date ... private static SimpleDateFormat dateFormat = new SimpleDateFormat(dd.MM.); public DataPoint() {} // String setter, converts to Date public void setDateStr(String value) { this.dateStr = value; try { this.date = dateFormat.parse(dateStr); // parse string and store date } catch (ParseException e) { e.printStackTrace(); } } public String getDateStr() { return this.dateStr; } public Date getDate() { return this.date; } … } ...and pass it to the CSVReader: DataSetDataPoint csvInput = env.readCsvFile(filename) .pojoType(DataPoint.class, dateStr, ...); However, this fails with an exception: Exception in thread main java.lang.IllegalArgumentException: The type 'java.util.Date' is not supported for the CSV input format. at org.apache.flink.api.common.io.GenericCsvInputFormat.setFieldTypesGeneric(GenericCsvInputFormat.java:236) at org.apache.flink.api.java.io.CsvInputFormat.setFieldTypes(CsvInputFormat.java:115) at org.apache.flink.api.java.io.CsvInputFormat.init(CsvInputFormat.java:77) at org.apache.flink.api.java.io.CsvInputFormat.init(CsvInputFormat.java:61) at org.apache.flink.api.java.io.CsvReader.pojoType(CsvReader.java:295) at de.dfki.iui.MyJob.main(MyJob.java:60) I managed to work around this by storing the long value of Date.getTime() instead of Date, but: Does the POJO semantic really need to be that strict? Wouldn't it be sufficient if there was an appropriate getter/setter for the member names given to pojoType()? Best regards, -Stefan
Re: Tuple model project
Should we move this to the dev list? On Thu, Jul 30, 2015 at 10:43 AM, Flavio Pompermaier pomperma...@okkam.it wrote: Any thought about this (move tuples classes in a separate self-contained project with no transitive dependencies so that to be easily used in other external projects)? On Mon, Jul 6, 2015 at 11:09 AM, Flavio Pompermaier pomperma...@okkam.it wrote: Do you think it could be a good idea to extract Flink tuples in a separate project so that to allow simpler dependency management in Flin-compatible projects? On Mon, Jul 6, 2015 at 11:06 AM, Fabian Hueske fhue...@gmail.com wrote: Hi, at the moment, Tuples are more efficient than POJOs, because POJO fields are accessed via Java reflection whereas Tuple fields are directly accessed. This performance penalty could be overcome by code-generated seriliazers and comparators but I am not aware of any work in that direction. Best, Fabian 2015-07-06 11:01 GMT+02:00 Flavio Pompermaier pomperma...@okkam.it: Hi to all, I was thinking to write my own flink-compatible library and I need basically a Tuple5. Is there any performace loss in using a POJO with 5 String fields vs a Tuple5? If yes, wouldn't be a good idea to extract flink tuples in a separate simple project (e.g. flink-java-tuples) that has no other dependency to enable other libs to write their flink-compatible logic without the need to exclude all the transitive dependency of flink-java? Best, Flavio