Hi,
yes please, open an Issue for that. I think the method would have to be added 
to TableEnvironment.

Aljoscha
> On 09 Nov 2015, at 12:19, Johann Kovacs <m...@jkovacs.de> wrote:
> 
> Hi,
> thanks for having a look at this, Aljoscha.
> 
> Not being able to read a DataSet[Row] from csv is definitively the most major 
> issue for me right now. 
> Everything else I could work around with Scala magic. I can create an issue 
> for this if you'd like.
> 
> Regarding the other points:
> 1. Oh absolutely, that's really the most important reason I implemented those 
> classes in the first place, 
> although I also liked being able to deserialize the schema from json/yaml, 
> before converting it to a RowTypeInfo.
> 2. I have no strong opinion either way regarding serializability of 
> TypeInformations. But having the field name <-> index mapping available
> in the UDF would be very very nice to have, so that I can access fields by 
> name instead of by index.
> If you decide to make TypeInformation non-serializable, maybe having a 
> dedicated Schema class for this purpose isn't such a bad idea after all.
> 3. Fair enough, I guess I was too excited with Scala implicit magic when I 
> wrote the prototype :-) 
> I wouldn't want to include the Schema with each Row either. I guess 
> non-implicit utility functions for that would work just as well, or the user 
> can
>  work around this by themselves if the field name <-> index mapping is 
> available in the UDF (see point 2)
> 
> Thanks,
> Johann
> 
> 
> On 5 November 2015 at 13:59, Aljoscha Krettek <aljos...@apache.org> wrote:
> Hi,
> these are some interesting Ideas.
> 
> I have some thoughts, though, about the current implementation.
>  1. With Schema and Field you are basically re-implementing RowTypeInfo, so 
> it should not be required. Maybe just an easier way to create a RowTypeInfo.
>  2. Right now, in Flink the TypeInformation is technically not meant to be 
> Serializable and be shipped to runtime operations. (Although practically it 
> is in places, there is an ongoing discussion about this.)
>  3. Having the Schema as an implicit parameter does not work in the general 
> case because the compiler does not know which Schema to take if there are 
> several Schemas around. Maybe the Row would have to be extended to contain 
> the Schema. But this could have performance implications.
> 
> We should definitely add support for creating a DataSet[Row] directly from 
> the a CSV-Input, since otherwise you have to go trough tuples which does not 
> work
> with dynamic schemas and if you have more than a certain amount of fields.
> 
> Cheers,
> Aljoscha
> > On 02 Nov 2015, at 17:41, Johann Kovacs <m...@jkovacs.de> wrote:
> >
> > Hi,
> > thanks once again for those pointers. I did a bit of experimenting the past 
> > couple of days and came to the following conclusions:
> > 1. Unfortunately, I don't think I can get away with option 1 (generating 
> > POJOs on runtime). At least not without generating lots of boiler plate 
> > code, because I'd like to be able to access fields, also inside UDFs, by 
> > index or name. Also instantiating generated classes in UDFs seems like a 
> > pain.
> > 2. Table API looks nice, but doesn't seem to solve the problem of getting 
> > the data into a DataSet in the first place. And I'll likely have to revert 
> > to the DataSet API to do low level operations, such as a simple map(), as 
> > far as I can tell.
> >
> > However, it seems like the table api already provides exactly Fabian's 
> > option 3 with its Row, RowTypeInfo, and RowSerializer implementations, if 
> > I'm not mistaken, am I?
> > I played around a bit, trying to use a DataSet[Row] and it seems to work 
> > great. I had to add a hack to make the ScalaCsvInputFormat read Rows 
> > instead of Tuples, as well as add a few convenience methods to the Row 
> > class and a Schema API to quickly generate RowTypeInfos. I pushed my 
> > experiments here: 
> > https://github.com/jkovacs/flink/commit/2cee8502968f8815cd3a5f9d2994d3e5355e96df
> > A toy example of how I'd like to use the APIs is included in the 
> > TableExperiments.scala file.
> >
> > Is this a maintainable solution, or is the Row api supposed to be internal 
> > only? If it's supposed to be used outside of the Table api, do you think we 
> > can add the Row and Schema convenience layer like I started to implement to 
> > the core flink-table? Would make it much easier to work with with the 
> > regular DataSet API.
> >
> > Thanks,
> > Johann
> >
> > On 30 October 2015 at 03:34, Stephan Ewen <se...@apache.org> wrote:
> > Hi Johann!
> >
> > You can try and use the Table API, it has logical tuples that you program 
> > with, rather than tuple classes.
> >
> > Have a look here: 
> > https://ci.apache.org/projects/flink/flink-docs-master/libs/table.html
> >
> > Stephan
> >
> >
> > On Thu, Oct 29, 2015 at 6:53 AM, Fabian Hueske <fhue...@gmail.com> wrote:
> > Hi Johann,
> >
> > I see three options for your use case.
> >
> > 1) Generate Pojo code at planning time, i.e., when the program is composed. 
> > This does not work when the program is already running. The benefit is that 
> > you can use key expressions, have typed fields, and type specific 
> > serializers and comparators.
> >
> > 2) Use Record, an Object[], or List<Object> (or some own holder with a few 
> > convenience methods) to store the data untyped and work with KeySelectors 
> > to extract grouping and join keys. The drawbacks of this approach are 
> > generic serializers (Kryo) which won't as efficient as the native ones. If 
> > you know the key types and don't use generic types for keys, sorting and 
> > joining should be still fast.
> >
> > 3) A hybrid approach of both, which works without code generation. Use a 
> > generic holder, e.g., Object[] for your data records but implement you own 
> > type information, serializers and comparators. After each operation, you 
> > can define the type information of the result using the returns() method, 
> > e.g.;
> > myData.map(new MapFunction<Object[], Object[]>).returns(myCustomTypeInfo). 
> > This approach requires a good understanding of Flink's type system, but if 
> > done correctly, you can also use expressions or positions to define keys 
> > and benefit from efficient serialization and binary comparisons. However, 
> > similar to the first approach, you need to know the schema of the data in 
> > advance (before the program is executed).
> >
> > In my opinion the first approach is the better, but as you said it is more 
> > effort to implement and might not work depending on what information is 
> > available at which point in time.
> >
> > Let me know if you have any questions.
> >
> > Cheers, Fabian
> >
> > 2015-10-28 20:01 GMT+01:00 Johann Kovacs <m...@jkovacs.de>:
> > Hi all,
> >
> > I currently find myself evaluating a use case, where I have to deal
> > with wide (i.e. about 50-60 columns, definitely more than the 25
> > supported by the Tuple types), structured data from CSV files, with a
> > potentially dynamically (during runtime) generated (or automatically
> > inferred from the CSV file) schema.
> > SparkSQL works very well for this case, because I can generate or
> > infer the schema dynamically at runtime, access fields in UDFs via
> > index or name (via the Row API), generate new schemata for UDF results
> > on the fly, and use those schemata to read and write from/to CSV.
> > Obviously Spark and SparkSQL have other quirks and I'd like to find a
> > good solution to do this with Flink.
> >
> > The main limitation seems to be that I can't seem to have DataSets of
> > arbitrary-length, arbitrary-type (i.e. unknown during compile time),
> > tuples. The Record API/type looks like it was meant to provide
> > something like that but it appears to become deprecated and is not
> > well supported by the DataSet APIs (e.g. I can't do a join on Records
> > by field index, nor does the CsvReader API support Records), and it
> > has no concept of field names, either.
> >
> > I though about generating Java classes of my schemata on runtime (e.g.
> > via Javassist), but that seems like a hack, and I'd probably have to
> > do this for each intermediate schema as well (e.g. when a map
> > operation alters the schema). I haven't tried this avenue yet, so I'm
> > not certain it would actually work, and even less certain that this is
> > a nice and maintainable solution
> >
> > Can anyone suggest a nice way to deal with this kind of use case? I
> > can prepare an example if that would make it more clear.
> >
> > Thanks,
> > Johann
> >
> >
> >
> 

Reply via email to