Hi, thanks for having a look at this, Aljoscha. Not being able to read a DataSet[Row] from csv is definitively the most major issue for me right now. Everything else I could work around with Scala magic. I can create an issue for this if you'd like.
Regarding the other points: 1. Oh absolutely, that's really the most important reason I implemented those classes in the first place, although I also liked being able to deserialize the schema from json/yaml, before converting it to a RowTypeInfo. 2. I have no strong opinion either way regarding serializability of TypeInformations. But having the field name <-> index mapping available in the UDF would be very very nice to have, so that I can access fields by name instead of by index. If you decide to make TypeInformation non-serializable, maybe having a dedicated Schema class for this purpose isn't such a bad idea after all. 3. Fair enough, I guess I was too excited with Scala implicit magic when I wrote the prototype :-) I wouldn't want to include the Schema with each Row either. I guess non-implicit utility functions for that would work just as well, or the user can work around this by themselves if the field name <-> index mapping is available in the UDF (see point 2) Thanks, Johann On 5 November 2015 at 13:59, Aljoscha Krettek <aljos...@apache.org> wrote: > Hi, > these are some interesting Ideas. > > I have some thoughts, though, about the current implementation. > 1. With Schema and Field you are basically re-implementing RowTypeInfo, > so it should not be required. Maybe just an easier way to create a > RowTypeInfo. > 2. Right now, in Flink the TypeInformation is technically not meant to be > Serializable and be shipped to runtime operations. (Although practically it > is in places, there is an ongoing discussion about this.) > 3. Having the Schema as an implicit parameter does not work in the > general case because the compiler does not know which Schema to take if > there are several Schemas around. Maybe the Row would have to be extended > to contain the Schema. But this could have performance implications. > > We should definitely add support for creating a DataSet[Row] directly from > the a CSV-Input, since otherwise you have to go trough tuples which does > not work > with dynamic schemas and if you have more than a certain amount of fields. > > Cheers, > Aljoscha > > On 02 Nov 2015, at 17:41, Johann Kovacs <m...@jkovacs.de> wrote: > > > > Hi, > > thanks once again for those pointers. I did a bit of experimenting the > past couple of days and came to the following conclusions: > > 1. Unfortunately, I don't think I can get away with option 1 (generating > POJOs on runtime). At least not without generating lots of boiler plate > code, because I'd like to be able to access fields, also inside UDFs, by > index or name. Also instantiating generated classes in UDFs seems like a > pain. > > 2. Table API looks nice, but doesn't seem to solve the problem of > getting the data into a DataSet in the first place. And I'll likely have to > revert to the DataSet API to do low level operations, such as a simple > map(), as far as I can tell. > > > > However, it seems like the table api already provides exactly Fabian's > option 3 with its Row, RowTypeInfo, and RowSerializer implementations, if > I'm not mistaken, am I? > > I played around a bit, trying to use a DataSet[Row] and it seems to work > great. I had to add a hack to make the ScalaCsvInputFormat read Rows > instead of Tuples, as well as add a few convenience methods to the Row > class and a Schema API to quickly generate RowTypeInfos. I pushed my > experiments here: > https://github.com/jkovacs/flink/commit/2cee8502968f8815cd3a5f9d2994d3e5355e96df > > A toy example of how I'd like to use the APIs is included in the > TableExperiments.scala file. > > > > Is this a maintainable solution, or is the Row api supposed to be > internal only? If it's supposed to be used outside of the Table api, do you > think we can add the Row and Schema convenience layer like I started to > implement to the core flink-table? Would make it much easier to work with > with the regular DataSet API. > > > > Thanks, > > Johann > > > > On 30 October 2015 at 03:34, Stephan Ewen <se...@apache.org> wrote: > > Hi Johann! > > > > You can try and use the Table API, it has logical tuples that you > program with, rather than tuple classes. > > > > Have a look here: > https://ci.apache.org/projects/flink/flink-docs-master/libs/table.html > > > > Stephan > > > > > > On Thu, Oct 29, 2015 at 6:53 AM, Fabian Hueske <fhue...@gmail.com> > wrote: > > Hi Johann, > > > > I see three options for your use case. > > > > 1) Generate Pojo code at planning time, i.e., when the program is > composed. This does not work when the program is already running. The > benefit is that you can use key expressions, have typed fields, and type > specific serializers and comparators. > > > > 2) Use Record, an Object[], or List<Object> (or some own holder with a > few convenience methods) to store the data untyped and work with > KeySelectors to extract grouping and join keys. The drawbacks of this > approach are generic serializers (Kryo) which won't as efficient as the > native ones. If you know the key types and don't use generic types for > keys, sorting and joining should be still fast. > > > > 3) A hybrid approach of both, which works without code generation. Use a > generic holder, e.g., Object[] for your data records but implement you own > type information, serializers and comparators. After each operation, you > can define the type information of the result using the returns() method, > e.g.; > > myData.map(new MapFunction<Object[], > Object[]>).returns(myCustomTypeInfo). This approach requires a good > understanding of Flink's type system, but if done correctly, you can also > use expressions or positions to define keys and benefit from efficient > serialization and binary comparisons. However, similar to the first > approach, you need to know the schema of the data in advance (before the > program is executed). > > > > In my opinion the first approach is the better, but as you said it is > more effort to implement and might not work depending on what information > is available at which point in time. > > > > Let me know if you have any questions. > > > > Cheers, Fabian > > > > 2015-10-28 20:01 GMT+01:00 Johann Kovacs <m...@jkovacs.de>: > > Hi all, > > > > I currently find myself evaluating a use case, where I have to deal > > with wide (i.e. about 50-60 columns, definitely more than the 25 > > supported by the Tuple types), structured data from CSV files, with a > > potentially dynamically (during runtime) generated (or automatically > > inferred from the CSV file) schema. > > SparkSQL works very well for this case, because I can generate or > > infer the schema dynamically at runtime, access fields in UDFs via > > index or name (via the Row API), generate new schemata for UDF results > > on the fly, and use those schemata to read and write from/to CSV. > > Obviously Spark and SparkSQL have other quirks and I'd like to find a > > good solution to do this with Flink. > > > > The main limitation seems to be that I can't seem to have DataSets of > > arbitrary-length, arbitrary-type (i.e. unknown during compile time), > > tuples. The Record API/type looks like it was meant to provide > > something like that but it appears to become deprecated and is not > > well supported by the DataSet APIs (e.g. I can't do a join on Records > > by field index, nor does the CsvReader API support Records), and it > > has no concept of field names, either. > > > > I though about generating Java classes of my schemata on runtime (e.g. > > via Javassist), but that seems like a hack, and I'd probably have to > > do this for each intermediate schema as well (e.g. when a map > > operation alters the schema). I haven't tried this avenue yet, so I'm > > not certain it would actually work, and even less certain that this is > > a nice and maintainable solution > > > > Can anyone suggest a nice way to deal with this kind of use case? I > > can prepare an example if that would make it more clear. > > > > Thanks, > > Johann > > > > > > >