Hello Victor ,

You are totally right , so now this turn into is Flink capable to handle
these cases where would be required define the type info in the row and the
Table will infer the columns separated by comma or something similar?

thanks
AU

On Wed, Aug 7, 2019 at 10:33 AM Victor Wong <jiasheng.w...@outlook.com>
wrote:

> Hi Andres,
>
>
>
> I’d like to share my thoughts:
>
> When you register a “Table”, you need to specify its “schema”, so how can
> you register the table when the number of elements/columns and data types
> are both nondeterministic.
>
> Correct me if I misunderstood your meaning.
>
>
>
> Best,
>
> Victor
>
>
>
> *From: *Andres Angel <ingenieroandresan...@gmail.com>
> *Date: *Wednesday, August 7, 2019 at 9:55 PM
> *To: *Haibo Sun <sunhaib...@163.com>
> *Cc: *user <user@flink.apache.org>
> *Subject: *Re: FlatMap returning Row<> based on ArrayList elements()
>
>
>
> Hello everyone, let me be more precis on what I'm looking for at the end
> because your example is right and very accurate in the way about how to
> turn an array into a Row() object.
>
> I have done it seamlessly:
>
>
>
> out.collect(Row.of(pelements.toArray()));
>
>
>
> Then I printed and the outcome is as expected:
>
>
>
> 5d2df2c2e7370c7843dad9ca,359731,13333196,156789925,619381
>
>
>
> Now I need to register this DS as a table and here is basically how I'm
> planning to do it:
>
>
>
> tenv.registerDataStream("newtable", ds, 'col1,col2,col3,col4,col5');
>
>
>
> However, this returns an error on the DS registration due to I need to
> specify the RowTypeInfo. Here is the big deal because yes I know I would be
> able to use something like :
>
>
>
>
>
> TypeInformation<?>[] types = {
>
> BasicTypeInfo.STRING_TYPE_INFO,
>
> BasicTypeInfo.INT_TYPE_INFO,
>
> BasicTypeInfo.INT_TYPE_INFO,
>
> BasicTypeInfo.INT_TYPE_INFO,
>
> BasicTypeInfo.INT_TYPE_INFO};
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> DataStream ds = previousds.flatMap(new FlatMapFunction<List<Integer>,
> Row>() {
>
> @Override
>
> public void flatMap(List<Integer> value, Collector<Row> out) throws
> Exception {
>
> out.collect(Row.of(value.toArray(new Integer[0])));
>
> }
>
> }).return(types);
>
>
>
>
>
> The problem with this approach is that I'm looking for a standard FlatMap
> anonymous function that could return every time: 1. different number of
> elements within the Array and 2. the data type can be random likewise. I
> mean is not fixed the whole time then my TypeInformation return would fix
> every execution.
>
>
>
> How could I approach this?
>
>
>
> thanks so much
>
> AU
>
>
>
>
>
> On Wed, Aug 7, 2019 at 3:57 AM Haibo Sun <sunhaib...@163.com> wrote:
>
> Hi Andres Angel,
>
>
>
> I guess people don't understand your problem (including me). I don't know
> if the following sample code is what you want, if not, can you describe the
> problem more clearly?
>
>
>
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
> env.fromElements(Arrays.asList(1, 2, 3, 4, 5))
>
> .flatMap(new FlatMapFunction<List<Integer>, Row>() {
>
> @Override
>
> public void flatMap(List<Integer> value, Collector<Row> out) throws
> Exception {
>
> out.collect(Row.of(value.toArray(new Integer[0])));
>
> }
>
> }).print();
>
>
>
> env.execute("test job");
>
>
>
> Best,
>
> Haibo
>
>
> At 2019-07-30 02:30:27, "Andres Angel" <ingenieroandresan...@gmail.com>
> wrote:
>
> Hello everyone,
>
>
>
> I need to parse into an anonymous function an input data to turn it into
> several Row elements. Originally I would have done something like
> Row.of(1,2,3,4) but these elements can change on the flight as part of my
> function. This is why I have decided to store them in a list and right now
> it looks something like this:
>
>
>
> [image: image.png]
>
>
>
> Now, I need to return my out Collector it Row<> based on this elements. I
> checked on the Flink documentation but the Lambda functions are not
> supported :
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/java_lambdas.html ,
> Then I though ok I can loop my ArrayList in a Tuple and pass this tuple as
> Row.of(myTuple):
>
>
>
>                     Tuple mytuple = Tuple.newInstance(5);
>                     for (int i = 0; i < pelements.size(); i++) {
>                         mytuple.setField(pelements.get(i), i);
>                     }
>                     out.collect(Row.of(mytuple));
>
>
>
>
>
> However , it doesnt work because this is being parsed s 1 element for
> sqlQuery step. how could I do something like:
>
>
>
> pelements.forEach(n->out.collect(Row.of(n)));
>
>
>
> Thanks so much
>
>

Reply via email to