Re: Gelly forward

2015-07-08 Thread Vasiliki Kalavri
Hi Flavio,

yes, I think it's possible. I have one question before I try to explain
how:
do you model Rome, Berlin, 101.3 etc. in your above example as
separate vertices or as properties of vertices?

On 8 July 2015 at 10:43, Flavio Pompermaier pomperma...@okkam.it wrote:

 Let's say I have some nodes of some type of interest (type1 and type2).
 My source data looks like sourceNodeId, type, edgeName, destNodeId.
 For example, I could be interested in sourceNodes having type == Person
 to gather the value obtained from the expansion of some paths (for
 example livesIn.name and marriedTo.name). Notice that I could define other
 paths of interest for different types.. (For nodes of type Place I could be
 interested in gathering containedIn.name).
 If my source data looks like:

 (1, Person, livesIn, 2)
 (1, Person, livesIn, 3)
 (2, Place, name, Rome)
 (2, Place, lat, 101.3)
 (2, Place, long, 101.3)
 (3, Place, name, Berlin)
 (3, Place, containedIn, 4)
 (4, Country, name, Germany)

 I'd like that node 1 (in the end) collect the following paths:

 livesIn.name : Rome (from node 2)
 livesIn.name : Berlin (from node 3)
 livesIn.containedIn.name: Germany (from node 4)
 marriedTo.name : null because not married :)

 So, in my vertexCentricIteration each Vertex knows its neighbors (e.g.
 node 1 knows that 2 and 3 should be queried for their name attribute).
 If they receive a message asking for name from node 1 they have to reply
 to node 1 with the value of that property.

 So in my implementation, I check whether my node has to send some query to
 neighbors and wait for the response. The problem is that node 3 for
 example, once queried for property containedIn.name from node 1 it just
 have to forward this path to node 4 and thell to 4 to reply to 1.

 Is that possible?


 On Wed, Jul 8, 2015 at 10:19 AM, Vasiliki Kalavri 
 vasilikikala...@gmail.com wrote:

 Is it the same message that you propagate or is it different for each
 vertex / neighbor? If you have to store a neighborID, msg pair for each
 neighbor, then you will quickly run out of memory. If it's the same message
 you need to send, or you simply need to add the current vertex Id, then you
 can probably get rid of the neighborID.

 By outbox I believe you mean storing them in the vertex value, correct?
 I don't think you will have to explicitly reset it, as in each superstep
 vertices only receive messages sent in the previous superstep, i.e. old
 messages don't get re-sent.
 On Jul 8, 2015 9:48 AM, Flavio Pompermaier pomperma...@okkam.it
 wrote:

 The problem is that my nodes have to gather data coming from some path
 of interest along the graph (depending on the type of the node), otherwise
 they just have to forward the received message adding their id to the
 message path (more or less). It's like a postal tracking system.

 The problem is that I have to reset the outbox of each vertex once the
 messages have been sent..
 Do you think that it makes sense in this case to have an outbox of
 messages (destination, message) in each vertex and reset it in the
 postSuperstep() of the VertexUpdateFunction?

 On Wed, Jul 8, 2015 at 9:38 AM, Vasiliki Kalavri 
 vasilikikala...@gmail.com wrote:

 Hi Flavio!

 Are you talking about vertex-centric iterations in gelly?

 If yes, you can send messages to a particular vertex with
 sendMessageTo(vertexId, msg) and
 to all neighbors with sendMessageToAllNeighbors(msg). These methods
 are available inside the MessagingFunction.
 Accessing received messages happens inside the VertexUpdateFunction.
 So, the usual way of writing these programs is to:
 (1) go through received messages in the VertexUpdateFunction and
 compute the new vertex value based them
 (2) compute and send messages in the MessagingFunction.

 Would that work in you case?

 Cheers,
 Vasia.

 On 8 July 2015 at 08:47, Flavio Pompermaier pomperma...@okkam.it
 wrote:


 Hi to all,
 is there a way in gelly to forward received messages (and modify their
 content before sending)?

 Best,
 Flavio







Re: why when use orders.aggregate(Aggregations.MAX, 2) not return one value but return more value

2015-07-08 Thread Michele Bertoni
hi, you are not printing the aggregation but the input



val result = orders.aggregate(Aggregations.MAX, 2)  
result.print


cheers
michele



 Il giorno 08/lug/2015, alle ore 02:00, hagersaleh loveallah1...@yahoo.com 
 ha scritto:
 
 why when use orders.aggregate(Aggregations.MAX, 2) not return one value but
 return more value
 
 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 DataSetOrders orders=(DataSetOrders)
 env.readCsvFile(/home/hadoop/Desktop/Dataset/orders.csv)
  .fieldDelimiter('|')
  .includeFields(mask).ignoreFirstLine()
  .tupleType(get_Order().getClass());
   orders.aggregate(Aggregations.MAX, 2)  ;
 
 orders.print();
 
 orders.writeAsCsv(/home/hadoop/Desktop/Dataset/join_instead_of_optmization,
 \n, |,WriteMode.OVERWRITE);
 env.execute();
 
 out put
 1 (280866,129457,96048.38)
 1 (280867,16568,89875.17)
 1 (280868,47827,118013.89)
 1 (280869,104143,220415.76)
 1 (280870,105512,262166.76)
 1 (280871,36244,123478.83)
 1 (280896,148177,92956.99)
 1 (280897,83611,128889.07)
 1 (280898,29863,289893.15)
 1 (280899,143962,111581.46)
 1 (280900,43577,26781.38)
 1 (280901,87340,30915.09)
 1 (280902,6769,235803.72)
 1 (280903,61471,138553.46)
 1 (280928,69407,168763.3)
 1 (280929,114457,5392.93)
 1 (280930,58939,47427.22)
 1 (280931,110210,125524.13)
 1 (280932,91751,11434.53)
 
 
 
 --
 View this message in context: 
 http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/why-when-use-orders-aggregate-Aggregations-MAX-2-not-return-one-value-but-return-more-value-tp1977.html
 Sent from the Apache Flink User Mailing List archive. mailing list archive at 
 Nabble.com.



Re: Gelly forward

2015-07-08 Thread Flavio Pompermaier
The problem is that my nodes have to gather data coming from some path of
interest along the graph (depending on the type of the node), otherwise
they just have to forward the received message adding their id to the
message path (more or less). It's like a postal tracking system.

The problem is that I have to reset the outbox of each vertex once the
messages have been sent..
Do you think that it makes sense in this case to have an outbox of messages
(destination, message) in each vertex and reset it in the postSuperstep()
of the VertexUpdateFunction?

On Wed, Jul 8, 2015 at 9:38 AM, Vasiliki Kalavri vasilikikala...@gmail.com
wrote:

 Hi Flavio!

 Are you talking about vertex-centric iterations in gelly?

 If yes, you can send messages to a particular vertex with
 sendMessageTo(vertexId, msg) and
 to all neighbors with sendMessageToAllNeighbors(msg). These methods are
 available inside the MessagingFunction.
 Accessing received messages happens inside the VertexUpdateFunction.
 So, the usual way of writing these programs is to:
 (1) go through received messages in the VertexUpdateFunction and compute
 the new vertex value based them
 (2) compute and send messages in the MessagingFunction.

 Would that work in you case?

 Cheers,
 Vasia.

 On 8 July 2015 at 08:47, Flavio Pompermaier pomperma...@okkam.it wrote:


 Hi to all,
 is there a way in gelly to forward received messages (and modify their
 content before sending)?

 Best,
 Flavio





Re: Gelly forward

2015-07-08 Thread Vasiliki Kalavri
Hi Flavio!

Are you talking about vertex-centric iterations in gelly?

If yes, you can send messages to a particular vertex with
sendMessageTo(vertexId, msg) and
to all neighbors with sendMessageToAllNeighbors(msg). These methods are
available inside the MessagingFunction.
Accessing received messages happens inside the VertexUpdateFunction.
So, the usual way of writing these programs is to:
(1) go through received messages in the VertexUpdateFunction and compute
the new vertex value based them
(2) compute and send messages in the MessagingFunction.

Would that work in you case?

Cheers,
Vasia.

On 8 July 2015 at 08:47, Flavio Pompermaier pomperma...@okkam.it wrote:


 Hi to all,
 is there a way in gelly to forward received messages (and modify their
 content before sending)?

 Best,
 Flavio




Re: Gelly forward

2015-07-08 Thread Flavio Pompermaier
Let's say I have some nodes of some type of interest (type1 and type2).
My source data looks like sourceNodeId, type, edgeName, destNodeId.
For example, I could be interested in sourceNodes having type == Person
to gather the value obtained from the expansion of some paths (for
example livesIn.name and marriedTo.name). Notice that I could define other
paths of interest for different types.. (For nodes of type Place I could be
interested in gathering containedIn.name).
If my source data looks like:

(1, Person, livesIn, 2)
(1, Person, livesIn, 3)
(2, Place, name, Rome)
(2, Place, lat, 101.3)
(2, Place, long, 101.3)
(3, Place, name, Berlin)
(3, Place, containedIn, 4)
(4, Country, name, Germany)

I'd like that node 1 (in the end) collect the following paths:

livesIn.name : Rome (from node 2)
livesIn.name : Berlin (from node 3)
livesIn.containedIn.name: Germany (from node 4)
marriedTo.name : null because not married :)

So, in my vertexCentricIteration each Vertex knows its neighbors (e.g. node
1 knows that 2 and 3 should be queried for their name attribute).
If they receive a message asking for name from node 1 they have to reply
to node 1 with the value of that property.

So in my implementation, I check whether my node has to send some query to
neighbors and wait for the response. The problem is that node 3 for
example, once queried for property containedIn.name from node 1 it just
have to forward this path to node 4 and thell to 4 to reply to 1.

Is that possible?

On Wed, Jul 8, 2015 at 10:19 AM, Vasiliki Kalavri vasilikikala...@gmail.com
 wrote:

 Is it the same message that you propagate or is it different for each
 vertex / neighbor? If you have to store a neighborID, msg pair for each
 neighbor, then you will quickly run out of memory. If it's the same message
 you need to send, or you simply need to add the current vertex Id, then you
 can probably get rid of the neighborID.

 By outbox I believe you mean storing them in the vertex value, correct?
 I don't think you will have to explicitly reset it, as in each superstep
 vertices only receive messages sent in the previous superstep, i.e. old
 messages don't get re-sent.
 On Jul 8, 2015 9:48 AM, Flavio Pompermaier pomperma...@okkam.it wrote:

 The problem is that my nodes have to gather data coming from some path of
 interest along the graph (depending on the type of the node), otherwise
 they just have to forward the received message adding their id to the
 message path (more or less). It's like a postal tracking system.

 The problem is that I have to reset the outbox of each vertex once the
 messages have been sent..
 Do you think that it makes sense in this case to have an outbox of
 messages (destination, message) in each vertex and reset it in the
 postSuperstep() of the VertexUpdateFunction?

 On Wed, Jul 8, 2015 at 9:38 AM, Vasiliki Kalavri 
 vasilikikala...@gmail.com wrote:

 Hi Flavio!

 Are you talking about vertex-centric iterations in gelly?

 If yes, you can send messages to a particular vertex with
 sendMessageTo(vertexId, msg) and
 to all neighbors with sendMessageToAllNeighbors(msg). These methods
 are available inside the MessagingFunction.
 Accessing received messages happens inside the VertexUpdateFunction.
 So, the usual way of writing these programs is to:
 (1) go through received messages in the VertexUpdateFunction and compute
 the new vertex value based them
 (2) compute and send messages in the MessagingFunction.

 Would that work in you case?

 Cheers,
 Vasia.

 On 8 July 2015 at 08:47, Flavio Pompermaier pomperma...@okkam.it
 wrote:


 Hi to all,
 is there a way in gelly to forward received messages (and modify their
 content before sending)?

 Best,
 Flavio






Re: Gelly forward

2015-07-08 Thread Flavio Pompermaier
In my Implementation every vertex has its own bags of knowledge (i.e. it
knows all tuples belonging to it..).
So in Vertex 1 I have a field (an HashMap) containing the following info:

   - type=Person
   - livesIn=2 (and I know also that 2 is a vertexId)

In Vertex 3 I know:

   - type=Place
   - name=Berlin
   - containedIn=4 (and I know also that 4 is a vertexId)

and so on..


On Wed, Jul 8, 2015 at 2:29 PM, Vasiliki Kalavri vasilikikala...@gmail.com
wrote:

 Hi Flavio,

 yes, I think it's possible. I have one question before I try to explain
 how:
 do you model Rome, Berlin, 101.3 etc. in your above example as
 separate vertices or as properties of vertices?

 On 8 July 2015 at 10:43, Flavio Pompermaier pomperma...@okkam.it wrote:

 Let's say I have some nodes of some type of interest (type1 and type2).
 My source data looks like sourceNodeId, type, edgeName, destNodeId.
 For example, I could be interested in sourceNodes having type == Person
 to gather the value obtained from the expansion of some paths (for
 example livesIn.name and marriedTo.name). Notice that I could define other
 paths of interest for different types.. (For nodes of type Place I could be
 interested in gathering containedIn.name).
 If my source data looks like:

 (1, Person, livesIn, 2)
 (1, Person, livesIn, 3)
 (2, Place, name, Rome)
 (2, Place, lat, 101.3)
 (2, Place, long, 101.3)
 (3, Place, name, Berlin)
 (3, Place, containedIn, 4)
 (4, Country, name, Germany)

 I'd like that node 1 (in the end) collect the following paths:

 livesIn.name : Rome (from node 2)
 livesIn.name : Berlin (from node 3)
 livesIn.containedIn.name: Germany (from node 4)
 marriedTo.name : null because not married :)

 So, in my vertexCentricIteration each Vertex knows its neighbors (e.g.
 node 1 knows that 2 and 3 should be queried for their name attribute).
 If they receive a message asking for name from node 1 they have to
 reply to node 1 with the value of that property.

 So in my implementation, I check whether my node has to send some query
 to neighbors and wait for the response. The problem is that node 3 for
 example, once queried for property containedIn.name from node 1 it just
 have to forward this path to node 4 and thell to 4 to reply to 1.

 Is that possible?


 On Wed, Jul 8, 2015 at 10:19 AM, Vasiliki Kalavri 
 vasilikikala...@gmail.com wrote:

 Is it the same message that you propagate or is it different for each
 vertex / neighbor? If you have to store a neighborID, msg pair for each
 neighbor, then you will quickly run out of memory. If it's the same message
 you need to send, or you simply need to add the current vertex Id, then you
 can probably get rid of the neighborID.

 By outbox I believe you mean storing them in the vertex value, correct?
 I don't think you will have to explicitly reset it, as in each superstep
 vertices only receive messages sent in the previous superstep, i.e. old
 messages don't get re-sent.
 On Jul 8, 2015 9:48 AM, Flavio Pompermaier pomperma...@okkam.it
 wrote:

 The problem is that my nodes have to gather data coming from some path
 of interest along the graph (depending on the type of the node), otherwise
 they just have to forward the received message adding their id to the
 message path (more or less). It's like a postal tracking system.

 The problem is that I have to reset the outbox of each vertex once
 the messages have been sent..
 Do you think that it makes sense in this case to have an outbox of
 messages (destination, message) in each vertex and reset it in the
 postSuperstep() of the VertexUpdateFunction?

 On Wed, Jul 8, 2015 at 9:38 AM, Vasiliki Kalavri 
 vasilikikala...@gmail.com wrote:

 Hi Flavio!

 Are you talking about vertex-centric iterations in gelly?

 If yes, you can send messages to a particular vertex with
 sendMessageTo(vertexId, msg) and
 to all neighbors with sendMessageToAllNeighbors(msg). These methods
 are available inside the MessagingFunction.
 Accessing received messages happens inside the VertexUpdateFunction.
 So, the usual way of writing these programs is to:
 (1) go through received messages in the VertexUpdateFunction and
 compute the new vertex value based them
 (2) compute and send messages in the MessagingFunction.

 Would that work in you case?

 Cheers,
 Vasia.

 On 8 July 2015 at 08:47, Flavio Pompermaier pomperma...@okkam.it
 wrote:


 Hi to all,
 is there a way in gelly to forward received messages (and modify
 their content before sending)?

 Best,
 Flavio








Re: why when use orders.aggregate(Aggregations.MAX, 2) not return one value but return more value

2015-07-08 Thread fhueske
Hi,


aggregate performs an in-place aggregation, i.e., the input and output type of 
aggregate() is identical, but the aggregated fields are updated. 

Causion: All fields that are not associated with an aggregation function and 
are not not a grouping field have non-deterministic values. That means as well, 
that you cannot apply more than one aggregation function for each fields (e.g., 
computing min and max for the same field is not possible with aggregate())


Best, Fabian






From: Michele Bertoni
Sent: ‎Wednesday‎, ‎8‎. ‎July‎, ‎2015 ‎10‎:‎06
To: user@flink.apache.org





hi, you are not printing the aggregation but the input



val result = orders.aggregate(Aggregations.MAX, 2)  
result.print


cheers
michele



 Il giorno 08/lug/2015, alle ore 02:00, hagersaleh loveallah1...@yahoo.com 
 ha scritto:
 
 why when use orders.aggregate(Aggregations.MAX, 2) not return one value but
 return more value
 
 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 DataSetOrders orders=(DataSetOrders)
 env.readCsvFile(/home/hadoop/Desktop/Dataset/orders.csv)
  .fieldDelimiter('|')
  .includeFields(mask).ignoreFirstLine()
  .tupleType(get_Order().getClass());
   orders.aggregate(Aggregations.MAX, 2)  ;
 
 orders.print();
 
 orders.writeAsCsv(/home/hadoop/Desktop/Dataset/join_instead_of_optmization,
 \n, |,WriteMode.OVERWRITE);
 env.execute();
 
 out put
 1 (280866,129457,96048.38)
 1 (280867,16568,89875.17)
 1 (280868,47827,118013.89)
 1 (280869,104143,220415.76)
 1 (280870,105512,262166.76)
 1 (280871,36244,123478.83)
 1 (280896,148177,92956.99)
 1 (280897,83611,128889.07)
 1 (280898,29863,289893.15)
 1 (280899,143962,111581.46)
 1 (280900,43577,26781.38)
 1 (280901,87340,30915.09)
 1 (280902,6769,235803.72)
 1 (280903,61471,138553.46)
 1 (280928,69407,168763.3)
 1 (280929,114457,5392.93)
 1 (280930,58939,47427.22)
 1 (280931,110210,125524.13)
 1 (280932,91751,11434.53)
 
 
 
 --
 View this message in context: 
 http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/why-when-use-orders-aggregate-Aggregations-MAX-2-not-return-one-value-but-return-more-value-tp1977.html
 Sent from the Apache Flink User Mailing List archive. mailing list archive at 
 Nabble.com.

Channel to path could not be opened, temporary files missing?

2015-07-08 Thread Heller, David
Hello,

we are implementing a text mining algorithm on Flink (0.9.0) and encountered a 
mysterious IOException (see below for detailed stacktrace). Flink seems to have 
problems finding some temporary file even though there is enough free space on 
the disk. Do you have an idea why the file cannot be found?

We've encountered the exception both in local and cluster execution and on 
MacOS as well as on linux.
Data size does not seem to be the reason: the error occurred on a 6.5GB dataset 
as well as on a small 400MB dataset
Our algorithm works iteratively and uses Bulk iterations. Interestingly, on one 
occasion the error disappeared consistently when setting the iteration number 
higher (from 2 to 6).
On another occasion, the exception appeared when adding a simple map operator 
at the end (holding the identity function).
Generally, the error is quite hard to reproduce.

Thanks in advance for any ideas and your time!

David


Stacktrace:

java.io.IOException: Channel to path 
'/var/folders/xx/0dd3w4jd7fbb4ytmhqxm157hgn/T/flink-io-f5061483-ff59-43dc-883f-79af813d5804/19a70637e025c7ee3919b30239060895.23.channel'
 could not be opened.
at 
org.apache.flink.runtime.io.disk.iomanager.AbstractFileIOChannel.init(AbstractFileIOChannel.java:61)
at 
org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.init(AsynchronousFileIOChannel.java:86)
at 
org.apache.flink.runtime.io.disk.iomanager.AsynchronousBulkBlockReader.init(AsynchronousBulkBlockReader.java:46)
at 
org.apache.flink.runtime.io.disk.iomanager.AsynchronousBulkBlockReader.init(AsynchronousBulkBlockReader.java:39)
at 
org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync.createBulkBlockChannelReader(IOManagerAsync.java:263)
at 
org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:751)
at 
org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:508)
at 
org.apache.flink.runtime.operators.hash.ReOpenableMutableHashTable.prepareNextPartition(ReOpenableMutableHashTable.java:167)
at 
org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:544)
at 
org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104)
at 
org.apache.flink.runtime.operators.AbstractCachedBuildSideMatchDriver.run(AbstractCachedBuildSideMatchDriver.java:155)
at 
org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
at 
org.apache.flink.runtime.iterative.task.AbstractIterativePactTask.run(AbstractIterativePactTask.java:139)
at 
org.apache.flink.runtime.iterative.task.IterationIntermediatePactTask.run(IterationIntermediatePactTask.java:92)
at 
org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)

Caused by: java.io.FileNotFoundException: 
/var/folders/xx/0dd3w4jd7fbb4ytmhqxm157hgn/T/flink-io-f5061483-ff59-43dc-883f-79af813d5804/19a70637e025c7ee3919b30239060895.23.channel
 (No such file or directory)
at java.io.RandomAccessFile.open0(Native Method)
at java.io.RandomAccessFile.open(RandomAccessFile.java:316)
at java.io.RandomAccessFile.init(RandomAccessFile.java:243)
at java.io.RandomAccessFile.init(RandomAccessFile.java:124)
at 
org.apache.flink.runtime.io.disk.iomanager.AbstractFileIOChannel.init(AbstractFileIOChannel.java:57)
... 16 more

Re: why when use orders.aggregate(Aggregations.MAX, 2) not return one value but return more value

2015-07-08 Thread Matthias J. Sax
This is your code (it applied the print before the aggregation is done)

 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  DataSetOrders orders=(DataSetOrders)
 env.readCsvFile(/home/hadoop/Desktop/Dataset/orders.csv)
   .fieldDelimiter('|')
   .includeFields(mask).ignoreFirstLine()
   .tupleType(get_Order().getClass());
orders.aggregate(Aggregations.MAX, 2)  ;
 
 orders.print();

You need to put the print direct after the aggregate() of use a new
variable:

  orders.aggregate(Aggregations.MAX, 2).print();

or

  DataSetOrders aggedOrders = orders.aggregate(Aggregations.MAX, 2);
  aggedOrders.print();


-Matthias

On 07/08/2015 10:30 PM, hagersaleh wrote:
 I did not understand what you mean
 
 
 
 --
 View this message in context: 
 http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/why-when-use-orders-aggregate-Aggregations-MAX-2-not-return-one-value-but-return-more-value-tp1977p1989.html
 Sent from the Apache Flink User Mailing List archive. mailing list archive at 
 Nabble.com.
 



signature.asc
Description: OpenPGP digital signature


Gelly forward

2015-07-08 Thread Flavio Pompermaier
Hi to all,
is there a way in gelly to forward received messages (and modify their
content before sending)?

Best,
Flavio


Re: why when use orders.aggregate(Aggregations.MAX, 2) not return one value but return more value

2015-07-08 Thread hagersaleh
I did not understand what you mean



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/why-when-use-orders-aggregate-Aggregations-MAX-2-not-return-one-value-but-return-more-value-tp1977p1989.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.