Ah ok, in the case of initial the problem is the following. When you apply
an aggregation, then only the aggregated fields are valid. Data in the
other fields doesn’t necessarily correspond to the element where the
maximum value, for example, has been found. This becomes clear when you
compute the sum aggregate. Which record would you choose to fill in the
non-aggregated fields. Thus depending on the aggregation order you will see
different values in the non-aggregated fields. If you need to find the
record with the maximum value, for example, then you should use maxBy.

Cheers,
Till
​

On Tue, Mar 22, 2016 at 3:31 PM, Lydia Ickler <ickle...@googlemail.com>
wrote:

> Sorry I was not clear:
> I meant the initial DataSet is changing. Not the ds. :)
>
>
>
> Am 22.03.2016 um 15:28 schrieb Till Rohrmann <till.rohrm...@gmail.com>:
>
> From the code extract I cannot tell what could be wrong because the code
> looks ok. If ds changes, then your normalization result should change as
> well, I would assume.
> ​
>
> On Tue, Mar 22, 2016 at 3:15 PM, Lydia Ickler <ickle...@googlemail.com>
> wrote:
>
>> Hi Till,
>>
>> maybe it is doing so because I rewrite the ds in the next step again and
>> then the working steps get mixed?
>> I am reading the data from a local .csv file with readMatrix(env,
>> „filename")
>>
>> See code below.
>>
>> Best regards,
>> Lydia
>>
>> //read input file
>> DataSet<Tuple3<Integer, Integer, Double>> ds = readMatrix(env, input);
>>
>> /****************
>>  POWER ITERATION
>>  *****************/
>>
>> //get initial vector - which equals matrixA * [1, ... , 1]
>> DataSet<Tuple3<Integer, Integer, Double>> initial = 
>> ds(0).aggregate(Aggregations.SUM,2);
>>
>> //normalize by maximum value
>> initial = initial.cross(initial.aggregate(Aggregations.MAX, 2)).map(new 
>> normalizeByMax());
>>
>> public static DataSource<Tuple3<Integer, Integer, Double>> 
>> readMatrix(ExecutionEnvironment env,
>>                                                                       String 
>> filePath) {
>>     CsvReader csvReader = env.readCsvFile(filePath);
>>     csvReader.fieldDelimiter(",");
>>     csvReader.includeFields("ttt");
>>     return csvReader.types(Integer.class, Integer.class, Double.class);
>> }
>>
>>
>> Am 22.03.2016 um 14:47 schrieb Till Rohrmann <trohrm...@apache.org>:
>>
>> Hi Lydia,
>>
>> I tried to reproduce your problem but I couldn't. Can it be that you have
>> somewhere a non deterministic operation in your program or do you read the
>> data from a source with varying data? Maybe you could send us a compilable
>> and complete program which reproduces your problem.
>>
>> Cheers,
>> Till
>>
>> On Tue, Mar 22, 2016 at 2:21 PM, Lydia Ickler <ickle...@googlemail.com>
>> wrote:
>>
>>> Hi all,
>>>
>>> I have a question.
>>> If I have a DataSet DataSet<Tuple3<Integer, Integer, Double>> ds and I
>>> want to normalize all values (at position 2) in it by the maximum of the
>>> DataSet (ds.aggregate(Aggregations.MAX, 2)).
>>> How do I tackle that?
>>>
>>> If I use the cross operator my result changes every time I run the
>>> program (see code below)
>>> Any suggestions?
>>>
>>> Thanks in advance!
>>> Lydia
>>>
>>> ds.cross(ds.aggregate(Aggregations.MAX, 2)).map(new normalizeByMax());
>>>
>>> public static final class normalizeByMax implements
>>>         MapFunction<Tuple2<Tuple3<Integer, Integer, Double>, 
>>> Tuple3<Integer, Integer, Double>>,
>>>                 Tuple3<Integer, Integer, Double>> {
>>>
>>>     public Tuple3<Integer, Integer, Double> map(
>>>             Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, 
>>> Integer, Double>> value)
>>>             throws Exception {
>>>         return new Tuple3<Integer, Integer, 
>>> Double>(value.f0.f0,value.f0.f1,value.f0.f2/value.f1.f2);
>>>     }
>>> }
>>>
>>>
>>>
>>>
>>
>>
>
>

Reply via email to