Hi,
thanks for having a look at this, Aljoscha.

Not being able to read a DataSet[Row] from csv is definitively the most
major issue for me right now.
Everything else I could work around with Scala magic. I can create an issue
for this if you'd like.

Regarding the other points:
1. Oh absolutely, that's really the most important reason I implemented
those classes in the first place,
although I also liked being able to deserialize the schema from json/yaml,
before converting it to a RowTypeInfo.
2. I have no strong opinion either way regarding serializability of
TypeInformations. But having the field name <-> index mapping available
in the UDF would be very very nice to have, so that I can access fields by
name instead of by index.
If you decide to make TypeInformation non-serializable, maybe having a
dedicated Schema class for this purpose isn't such a bad idea after all.
3. Fair enough, I guess I was too excited with Scala implicit magic when I
wrote the prototype :-)
I wouldn't want to include the Schema with each Row either. I guess
non-implicit utility functions for that would work just as well, or the
user can
 work around this by themselves if the field name <-> index mapping is
available in the UDF (see point 2)

Thanks,
Johann


On 5 November 2015 at 13:59, Aljoscha Krettek <aljos...@apache.org> wrote:

> Hi,
> these are some interesting Ideas.
>
> I have some thoughts, though, about the current implementation.
>  1. With Schema and Field you are basically re-implementing RowTypeInfo,
> so it should not be required. Maybe just an easier way to create a
> RowTypeInfo.
>  2. Right now, in Flink the TypeInformation is technically not meant to be
> Serializable and be shipped to runtime operations. (Although practically it
> is in places, there is an ongoing discussion about this.)
>  3. Having the Schema as an implicit parameter does not work in the
> general case because the compiler does not know which Schema to take if
> there are several Schemas around. Maybe the Row would have to be extended
> to contain the Schema. But this could have performance implications.
>
> We should definitely add support for creating a DataSet[Row] directly from
> the a CSV-Input, since otherwise you have to go trough tuples which does
> not work
> with dynamic schemas and if you have more than a certain amount of fields.
>
> Cheers,
> Aljoscha
> > On 02 Nov 2015, at 17:41, Johann Kovacs <m...@jkovacs.de> wrote:
> >
> > Hi,
> > thanks once again for those pointers. I did a bit of experimenting the
> past couple of days and came to the following conclusions:
> > 1. Unfortunately, I don't think I can get away with option 1 (generating
> POJOs on runtime). At least not without generating lots of boiler plate
> code, because I'd like to be able to access fields, also inside UDFs, by
> index or name. Also instantiating generated classes in UDFs seems like a
> pain.
> > 2. Table API looks nice, but doesn't seem to solve the problem of
> getting the data into a DataSet in the first place. And I'll likely have to
> revert to the DataSet API to do low level operations, such as a simple
> map(), as far as I can tell.
> >
> > However, it seems like the table api already provides exactly Fabian's
> option 3 with its Row, RowTypeInfo, and RowSerializer implementations, if
> I'm not mistaken, am I?
> > I played around a bit, trying to use a DataSet[Row] and it seems to work
> great. I had to add a hack to make the ScalaCsvInputFormat read Rows
> instead of Tuples, as well as add a few convenience methods to the Row
> class and a Schema API to quickly generate RowTypeInfos. I pushed my
> experiments here:
> https://github.com/jkovacs/flink/commit/2cee8502968f8815cd3a5f9d2994d3e5355e96df
> > A toy example of how I'd like to use the APIs is included in the
> TableExperiments.scala file.
> >
> > Is this a maintainable solution, or is the Row api supposed to be
> internal only? If it's supposed to be used outside of the Table api, do you
> think we can add the Row and Schema convenience layer like I started to
> implement to the core flink-table? Would make it much easier to work with
> with the regular DataSet API.
> >
> > Thanks,
> > Johann
> >
> > On 30 October 2015 at 03:34, Stephan Ewen <se...@apache.org> wrote:
> > Hi Johann!
> >
> > You can try and use the Table API, it has logical tuples that you
> program with, rather than tuple classes.
> >
> > Have a look here:
> https://ci.apache.org/projects/flink/flink-docs-master/libs/table.html
> >
> > Stephan
> >
> >
> > On Thu, Oct 29, 2015 at 6:53 AM, Fabian Hueske <fhue...@gmail.com>
> wrote:
> > Hi Johann,
> >
> > I see three options for your use case.
> >
> > 1) Generate Pojo code at planning time, i.e., when the program is
> composed. This does not work when the program is already running. The
> benefit is that you can use key expressions, have typed fields, and type
> specific serializers and comparators.
> >
> > 2) Use Record, an Object[], or List<Object> (or some own holder with a
> few convenience methods) to store the data untyped and work with
> KeySelectors to extract grouping and join keys. The drawbacks of this
> approach are generic serializers (Kryo) which won't as efficient as the
> native ones. If you know the key types and don't use generic types for
> keys, sorting and joining should be still fast.
> >
> > 3) A hybrid approach of both, which works without code generation. Use a
> generic holder, e.g., Object[] for your data records but implement you own
> type information, serializers and comparators. After each operation, you
> can define the type information of the result using the returns() method,
> e.g.;
> > myData.map(new MapFunction<Object[],
> Object[]>).returns(myCustomTypeInfo). This approach requires a good
> understanding of Flink's type system, but if done correctly, you can also
> use expressions or positions to define keys and benefit from efficient
> serialization and binary comparisons. However, similar to the first
> approach, you need to know the schema of the data in advance (before the
> program is executed).
> >
> > In my opinion the first approach is the better, but as you said it is
> more effort to implement and might not work depending on what information
> is available at which point in time.
> >
> > Let me know if you have any questions.
> >
> > Cheers, Fabian
> >
> > 2015-10-28 20:01 GMT+01:00 Johann Kovacs <m...@jkovacs.de>:
> > Hi all,
> >
> > I currently find myself evaluating a use case, where I have to deal
> > with wide (i.e. about 50-60 columns, definitely more than the 25
> > supported by the Tuple types), structured data from CSV files, with a
> > potentially dynamically (during runtime) generated (or automatically
> > inferred from the CSV file) schema.
> > SparkSQL works very well for this case, because I can generate or
> > infer the schema dynamically at runtime, access fields in UDFs via
> > index or name (via the Row API), generate new schemata for UDF results
> > on the fly, and use those schemata to read and write from/to CSV.
> > Obviously Spark and SparkSQL have other quirks and I'd like to find a
> > good solution to do this with Flink.
> >
> > The main limitation seems to be that I can't seem to have DataSets of
> > arbitrary-length, arbitrary-type (i.e. unknown during compile time),
> > tuples. The Record API/type looks like it was meant to provide
> > something like that but it appears to become deprecated and is not
> > well supported by the DataSet APIs (e.g. I can't do a join on Records
> > by field index, nor does the CsvReader API support Records), and it
> > has no concept of field names, either.
> >
> > I though about generating Java classes of my schemata on runtime (e.g.
> > via Javassist), but that seems like a hack, and I'd probably have to
> > do this for each intermediate schema as well (e.g. when a map
> > operation alters the schema). I haven't tried this avenue yet, so I'm
> > not certain it would actually work, and even less certain that this is
> > a nice and maintainable solution
> >
> > Can anyone suggest a nice way to deal with this kind of use case? I
> > can prepare an example if that would make it more clear.
> >
> > Thanks,
> > Johann
> >
> >
> >
>

Reply via email to