Re: Hello, a question about Dashborad in Flink

2016-01-29 Thread Philip Lee
Great,

you menat the difference between narrow shuffle and global shuffle?

I use Flink version 0.9,
but it did not not work to access REST interface when I use "ssh tunnel" to
remote server.

it is from version of probelm?

Best,
Phil



On Fri, Jan 29, 2016 at 9:46 AM, Fabian Hueske <fhue...@gmail.com> wrote:

> The REST interface does also provide metrics about the number of records
> and the size of the input and output of all tasks.
> See:
> - /jobs//vertices/
> -
> /jobs//vertices//subtasks//attempts/
> in
> https://ci.apache.org/projects/flink/flink-docs-release-0.10/internals/monitoring_rest_api.html#details-of-a-running-or-completed-job
>
> However, not all of this data is going over the network because some tasks
> can be locally connected.
>
> Best, Fabian
>
> 2016-01-29 8:50 GMT+01:00 Philip Lee <philjj...@gmail.com>:
>
>> Thanks,
>>
>> Is there any way to measure shuffle data (read and write) on Flink or
>> Dashboard?
>>
>> I did not find the network usage metric in it.
>>
>> Best,
>> Phil
>>
>> On Mon, Jan 25, 2016 at 5:06 PM, Fabian Hueske <fhue...@gmail.com> wrote:
>>
>>> You can start a job and then periodically request and store information
>>> about the running job and vertices from using corresponding REST calls [1].
>>> The data will be in JSON format.
>>> After the job finished, you can stop requesting data.
>>>
>>> Next you parse the JSON, extract the information you need and give it to
>>> some plotting library.
>>> As I said, it is not possible to pass this data back into Flink's
>>> dashboard, but you have to process and plot it yourself.
>>>
>>> Best, Fabian
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-master/internals/monitoring_rest_api.html#overview-of-jobs
>>>
>>>
>>>
>>> 2016-01-25 16:15 GMT+01:00 Philip Lee <philjj...@gmail.com>:
>>>
>>>> Hello,
>>>>
>>>> According to
>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Apache-Flink-Web-Dashboard-Completed-Job-history-td4067.html,
>>>> I cannot retrieve the job history from Dashboard after turnning off JM.
>>>>
>>>> But as Fabian mentioned here,
>>>> "However, you can query all stats that are displayed by the dashboard
>>>> via a REST API [1] while the JM is running and save them yourself. This way
>>>> you can analyze the data also after the JM was stopped" could you explain
>>>> about this sentence in detail.
>>>>
>>>> I want to evaluate timeline view of each function after a job is done.
>>>>
>>>> Thanks,
>>>> Phil
>>>>
>>>
>>>
>>
>


Re: Hello, a question about Dashborad in Flink

2016-01-28 Thread Philip Lee
Thanks,

Is there any way to measure shuffle data (read and write) on Flink or
Dashboard?

I did not find the network usage metric in it.

Best,
Phil

On Mon, Jan 25, 2016 at 5:06 PM, Fabian Hueske <fhue...@gmail.com> wrote:

> You can start a job and then periodically request and store information
> about the running job and vertices from using corresponding REST calls [1].
> The data will be in JSON format.
> After the job finished, you can stop requesting data.
>
> Next you parse the JSON, extract the information you need and give it to
> some plotting library.
> As I said, it is not possible to pass this data back into Flink's
> dashboard, but you have to process and plot it yourself.
>
> Best, Fabian
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/internals/monitoring_rest_api.html#overview-of-jobs
>
>
>
> 2016-01-25 16:15 GMT+01:00 Philip Lee <philjj...@gmail.com>:
>
>> Hello,
>>
>> According to
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Apache-Flink-Web-Dashboard-Completed-Job-history-td4067.html,
>> I cannot retrieve the job history from Dashboard after turnning off JM.
>>
>> But as Fabian mentioned here,
>> "However, you can query all stats that are displayed by the dashboard via
>> a REST API [1] while the JM is running and save them yourself. This way you
>> can analyze the data also after the JM was stopped" could you explain about
>> this sentence in detail.
>>
>> I want to evaluate timeline view of each function after a job is done.
>>
>> Thanks,
>> Phil
>>
>
>


Reading ORC format on Flink

2016-01-27 Thread Philip Lee
Hello,

Question about reading ORC format on Flink.

I want to use dataset after loadtesting csv to orc format by Hive.
Can Flink support reading ORC format?

If so, please let me know how to use the dataset in Flink.

Best,
Phil


Hello, a question about Dashborad in Flink

2016-01-25 Thread Philip Lee
Hello,

According to
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Apache-Flink-Web-Dashboard-Completed-Job-history-td4067.html,
I cannot retrieve the job history from Dashboard after turnning off JM.

But as Fabian mentioned here,
"However, you can query all stats that are displayed by the dashboard via a
REST API [1] while the JM is running and save them yourself. This way you
can analyze the data also after the JM was stopped" could you explain about
this sentence in detail.

I want to evaluate timeline view of each function after a job is done.

Thanks,
Phil


Re: Hive bug? about no such table

2015-12-18 Thread Philip Lee
Opps, sorry

I was supposed to email this one to hive mailiing list.


On Fri, Dec 18, 2015 at 2:19 AM, Philip Lee <philjj...@gmail.com> wrote:

> I think It is from Hive Bug about something related to metastore.
>
> Here is the thing.
>
> After I generated scale factor 300 named bigbench300 and bigbench100,
> which already existed before,
> I run "hive job with bigbench300". At first it was really fine.
> Then I run hive job with bigbench100 again. It was still okay.
> *but then when I run bigbench300 again, the error "no such table"
> happened.*
>
> << FAILED: SemanticException [Error 10001]: Line 8:7 Table not found
> 'product_reviews' >>
>
> I tried to delete every "metastore_db" in a whole folder, but it still
> happens now.
>
> Have you ever seen this kind of issue before?
> I think there is something, belonging to hive metastore conf, leading this
> problem.
> I did not see much infomation about this in Stackoverflow yet.
>
> Best,
> Philip
>


Re: Hello a question about metrics

2015-12-11 Thread Philip Lee
Thanks for your suggestion!

I will try it later!
My group member want to use linux application like cpustat, memstat, vmstat.

The point is that running on spark and flink is on JVM, right?
FYI, cpustat and memstat capture the hardware resource, not virtual mahcine.
Do you think it could be appropriate metrics by using cpustat and memstat
on running on Flink and Spark?

Regatds,
Phil



On Tue, Dec 8, 2015 at 8:50 AM, Chiwan Park <chiwanp...@apache.org> wrote:

> Hi Philip,
>
> As far as I know, ganglia[1] is widely used for monitoring Hadoop
> ecosystem. If your Hadoop distribution is installed by Apache Ambari[2],
> you can fetch metrics easily from pre-installed ganglia.
>
> [1]: http://ganglia.sourceforge.net
> [2]: https://ambari.apache.org
>
> > On Dec 8, 2015, at 4:54 AM, Philip Lee <philjj...@gmail.com> wrote:
> >
> > Hello, a question about metrics.
> >
> > I want to evaluate some queris on Spark, Flink, and Hive for a
> comparison.
> >
> > I am using 'vmstat' to check metrics to see the amount of memory used,
> swap, io, cpu. My way of evaulating is right? becaues they use JVM's
> resource for memory, cpu.
> >
> > Is there any linux application you use for metrics?
> >
> > Best,
> > Phil.
>
> Regards,
> Chiwan Park
>
>
>
>


Hello, the performance of apply function after join

2015-12-01 Thread Philip Lee
Hello, the performance of apply function after join.

Just for your information, I am running Flink job on the cluster consisted
of 9 machine with each 48 cores. I am working on some benchmark with
comparison of Flink, Spark-Sql, and Hive.

I tried to optimize *join function with Hint* for better performance. I
want to increase the performance as much as possible.

Here are Questions===
1) When seeing job progress log, apply() after join function seems like it
takes a bit long time. Do you think if I do not use apply() to format
tuples, I would gain the better performance? Well, I could set just the
column number instead of apply()

2) on using *join with Hint* like Huge or Tiny, is there the ideal ratio
regarding to the size of two tables? For me, if some table is 10 times
bigger than the other table, I use join with Hint. Otherwise, I usually use
the general join().

Best,
Phil


Re: Hi, question about orderBy two columns more

2015-11-02 Thread Philip Lee
​​
You are welcome.​

I am  wondering if there is a way of noticing when you update RC solving
the *sortPartition* problem and then how we could apply the new version
like just downloading the new relased Flink version?

Thanks, Phil





On Mon, Nov 2, 2015 at 2:09 PM, Fabian Hueske <fhue...@gmail.com> wrote:

> Hi Philip,
>
> thanks for reporting the issue. I just verified the problem.
> It is working correctly for the Java API, but is broken in Scala.
>
> I will work on a fix and include it in the next RC for 0.10.0.
>
> Thanks, Fabian
>
> 2015-11-02 12:58 GMT+01:00 Philip Lee <philjj...@gmail.com>:
>
>> Thanks for your reply, Stephan.
>>
>> So you said this is same as SQL
>> but I got this result from this code. This is what we did not expect,
>> right?
>>
>> val inputTuple = Seq((2,5),(2,3),(2,4),(3,2),(3,6))
>>
>> val outputTuple = env.fromCollection(inputTuple)
>>   .sortPartition(0,Order.DESCENDING)
>>   //.sortPartition(1,Order.ASCENDING)
>>   .print()
>>
>> Output:
>> (3,2)
>> (3,6)
>> (2,5)
>> (2,3)
>> (2,4)
>>
>> val inputTuple = Seq((2,5),(2,3),(2,4),(3,2),(3,6))
>>
>> val outputTuple = env.fromCollection(inputTuple)
>>   .sortPartition(0,Order.DESCENDING)
>>   .sortPartition(1,Order.ASCENDING)
>>   .print()
>>
>> **
>> Actual Output:
>> (3,2)
>> (2,3)
>> (2,4)
>> (2,5)
>> (3,6)
>> ​
>> *Expected Output:*
>> *(3,2)*
>> *(3,6)*
>> *(2,3)*
>> *(2,4)*
>> *(2,5)*
>>
>>
>> Thanks,
>> Phil
>>
>>
>> On Mon, Nov 2, 2015 at 5:54 AM, Stephan Ewen <se...@apache.org> wrote:
>>
>>> Actually, sortPartition(col1).sortPartition(col2) results in a single
>>> sort that primarily sorts after col1 and secondarily sorts after col2, so
>>> it is the same as in SQL when you state "ORDER BY col1, col2".
>>>
>>> The SortPartitionOperator created with the first "sortPartition(col1)"
>>> call appends further columns, rather than instantiating a new sort.
>>>
>>> Greetings,
>>> Stephan
>>>
>>>
>>> On Sun, Nov 1, 2015 at 11:29 AM, Philip Lee <philjj...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I know when applying order by col, it would be
>>>> sortPartition(col).setParralism(1)
>>>>
>>>> What about orderBy two columns more?
>>>> If the sql is to state order by col_1, col_2,  sortPartition().
>>>> sortPartition () does not solve this SQL.
>>>>
>>>> because orderby in sql is to sort the fisrt coulmn and the second
>>>> column in the sorted first column. but for flink the funtion totally sorts
>>>> each column.
>>>>
>>>> Any suggestion?
>>>>
>>>> Thanks,
>>>> phil
>>>>
>>>>
>>>
>>
>


Re: Hi, question about orderBy two columns more

2015-11-02 Thread Philip Lee
Thanks for your reply, Stephan.

So you said this is same as SQL
but I got this result from this code. This is what we did not expect, right?

val inputTuple = Seq((2,5),(2,3),(2,4),(3,2),(3,6))

val outputTuple = env.fromCollection(inputTuple)
  .sortPartition(0,Order.DESCENDING)
  //.sortPartition(1,Order.ASCENDING)
  .print()

Output:
(3,2)
(3,6)
(2,5)
(2,3)
(2,4)

val inputTuple = Seq((2,5),(2,3),(2,4),(3,2),(3,6))

val outputTuple = env.fromCollection(inputTuple)
  .sortPartition(0,Order.DESCENDING)
  .sortPartition(1,Order.ASCENDING)
  .print()

**
Actual Output:
(3,2)
(2,3)
(2,4)
(2,5)
(3,6)
​
*Expected Output:*
*(3,2)*
*(3,6)*
*(2,3)*
*(2,4)*
*(2,5)*


Thanks,
Phil


On Mon, Nov 2, 2015 at 5:54 AM, Stephan Ewen <se...@apache.org> wrote:

> Actually, sortPartition(col1).sortPartition(col2) results in a single sort
> that primarily sorts after col1 and secondarily sorts after col2, so it is
> the same as in SQL when you state "ORDER BY col1, col2".
>
> The SortPartitionOperator created with the first "sortPartition(col1)"
> call appends further columns, rather than instantiating a new sort.
>
> Greetings,
> Stephan
>
>
> On Sun, Nov 1, 2015 at 11:29 AM, Philip Lee <philjj...@gmail.com> wrote:
>
>> Hi,
>>
>> I know when applying order by col, it would be
>> sortPartition(col).setParralism(1)
>>
>> What about orderBy two columns more?
>> If the sql is to state order by col_1, col_2,  sortPartition().
>> sortPartition () does not solve this SQL.
>>
>> because orderby in sql is to sort the fisrt coulmn and the second column
>> in the sorted first column. but for flink the funtion totally sorts each
>> column.
>>
>> Any suggestion?
>>
>> Thanks,
>> phil
>>
>>
>


Hi, question about orderBy two columns more

2015-11-01 Thread Philip Lee
Hi,

I know when applying order by col, it would be
sortPartition(col).setParralism(1)

What about orderBy two columns more?
If the sql is to state order by col_1, col_2,  sortPartition().sortPartition
 () does not solve this SQL.

because orderby in sql is to sort the fisrt coulmn and the second column in
the sorted first column. but for flink the funtion totally sorts each
column.

Any suggestion?

Thanks,
phil


Re: reading csv file from null value

2015-10-26 Thread Philip Lee
Thanks for your reply.

What if I do not use Table API?
The error happens when using just env.readFromCsvFile().

I heard that using RowSerializer would handle this null value, but its
error of TypeInformation happens when it is converted

On Mon, Oct 26, 2015 at 10:26 AM, Maximilian Michels <m...@apache.org> wrote:

> As far as I know the null support was removed from the Table API because
> its support was consistently supported with all operations. See
> https://issues.apache.org/jira/browse/FLINK-2236
>
> On Fri, Oct 23, 2015 at 7:18 PM, Shiti Saxena <ssaxena@gmail.com>
> wrote:
>
>> For a similar problem where we wanted to preserve and track null entries,
>> we load the CSV as a DataSet[Array[Object]] and then transform it into
>> DataSet[Row] using a custom RowSerializer(
>> https://gist.github.com/Shiti/d0572c089cc08654019c) which handles null.
>>
>> The Table API(which supports null) can then be used on the resulting
>> DataSet[Row].
>>
>>
>> On Fri, Oct 23, 2015 at 7:38 PM, Maximilian Michels <m...@apache.org>
>> wrote:
>>
>>> Hi Philip,
>>>
>>> How about making the empty field of type String? Then you can read the
>>> CSV into a DataSet and treat the empty string as a null value. Not very
>>> nice but a workaround. As of now, Flink deliberately doesn't support null
>>> values.
>>>
>>> Regards,
>>> Max
>>>
>>>
>>> On Thu, Oct 22, 2015 at 4:30 PM, Philip Lee <philjj...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I am trying to load the dataset with the part of null value by using
>>>> readCsvFile().
>>>>
>>>> // e.g  _date|_click|_sales|_item|_web_page|_user
>>>>
>>>> case class WebClick(_click_date: Long, _click_time: Long, _sales: Int, 
>>>> _item: Int,_page: Int, _user: Int)
>>>>
>>>> private def getWebClickDataSet(env: ExecutionEnvironment): 
>>>> DataSet[WebClick] = {
>>>>
>>>>   env.readCsvFile[WebClick](
>>>> webClickPath,
>>>> fieldDelimiter = "|",
>>>> includedFields = Array(0, 1, 2, 3, 4, 5),
>>>> // lenient = true
>>>>   )
>>>> }
>>>>
>>>>
>>>> Well, I know there is an option to ignore malformed value, but I have
>>>> to read the dataset even though it has null value.
>>>>
>>>> as it follows, dataset (third column is null) looks like
>>>> 37794|24669||16705|23|54810
>>>> but I have to read null value as well because I have to use filter or
>>>> where function ( _sales == null )
>>>>
>>>> Is there any detail suggestion to do it?
>>>>
>>>> Thanks,
>>>> Philip
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> --
>>>>
>>>> ==
>>>>
>>>> *Hae Joon Lee*
>>>>
>>>>
>>>> Now, in Germany,
>>>>
>>>> M.S. Candidate, Interested in Distributed System, Iterative Processing
>>>>
>>>> Dept. of Computer Science, Informatik in German, TUB
>>>>
>>>> Technical University of Berlin
>>>>
>>>>
>>>> In Korea,
>>>>
>>>> M.S. Candidate, Computer Architecture Laboratory
>>>>
>>>> Dept. of Computer Science, KAIST
>>>>
>>>>
>>>> Rm# 4414 CS Dept. KAIST
>>>>
>>>> 373-1 Guseong-dong, Yuseong-gu, Daejon, South Korea (305-701)
>>>>
>>>>
>>>> Mobile) 49) 015-251-448-278 in Germany, no cellular in Korea
>>>>
>>>> ==
>>>>
>>>
>>>
>>
>


-- 

==

*Hae Joon Lee*


Now, in Germany,

M.S. Candidate, Interested in Distributed System, Iterative Processing

Dept. of Computer Science, Informatik in German, TUB

Technical University of Berlin


In Korea,

M.S. Candidate, Computer Architecture Laboratory

Dept. of Computer Science, KAIST


Rm# 4414 CS Dept. KAIST

373-1 Guseong-dong, Yuseong-gu, Daejon, South Korea (305-701)


Mobile) 49) 015-251-448-278 in Germany, no cellular in Korea

==


Re: reading csv file from null value

2015-10-24 Thread Philip Lee
Plus, from Shiti to overcome this null value, we could use RowSerializer,
right?

I tried it in many ways, but it still did not work.
Could you take an example for it according to the previous email?



On Sat, Oct 24, 2015 at 11:19 PM, Philip Lee <philjj...@gmail.com> wrote:

> Maximilian said if we handle null value with String, it would be
> acceptable.
> But in fact, readCsvFile() still cannot accept null value; they said "Row
> too short" in error msg.
>
> case class WebClick(click_date: String, click_time: String, user: String, 
> item: String)
> private def getWebClickDataSet(env: ExecutionEnvironment): DataSet[WebClick] 
> = {
>   env.readCsvFile[WebClick](
> webClickPath,
> fieldDelimiter = "|",
> includedFields = Array(0, 1, 3, 5)
> //lenient = true
> )
> }
>
> ​// e.g. 36890|26789|0|3725|20|85457
> // e.g _date|_click|_sales|_item|_web_page|_user​
>
> ​Caused by: org.apache.flink.api.common.io.ParseException: Row too short:
> 36890|4749||13183|29|
> at
> org.apache.flink.api.common.io.GenericCsvInputFormat.parseRecord(GenericCsvInputFormat.java:383)
> at
> org.apache.flink.api.scala.operators.ScalaCsvInputFormat.readRecord(ScalaCsvInputFormat.java:214)
> at
> org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:454)
> at
> org.apache.flink.api.scala.operators.ScalaCsvInputFormat.nextRecord(ScalaCsvInputFormat.java:182)
> at
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:176)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> at java.lang.Thread.run(Thread.java:745)
>
> Is there any suggestion?
>
>
>
> ​
>
>
>
> On Fri, Oct 23, 2015 at 7:18 PM, Shiti Saxena <ssaxena@gmail.com>
> wrote:
>
>> For a similar problem where we wanted to preserve and track null entries,
>> we load the CSV as a DataSet[Array[Object]] and then transform it into
>> DataSet[Row] using a custom RowSerializer(
>> https://gist.github.com/Shiti/d0572c089cc08654019c) which handles null.
>>
>> The Table API(which supports null) can then be used on the resulting
>> DataSet[Row].
>>
>>
>> On Fri, Oct 23, 2015 at 7:38 PM, Maximilian Michels <m...@apache.org>
>> wrote:
>>
>>> Hi Philip,
>>>
>>> How about making the empty field of type String? Then you can read the
>>> CSV into a DataSet and treat the empty string as a null value. Not very
>>> nice but a workaround. As of now, Flink deliberately doesn't support null
>>> values.
>>>
>>> Regards,
>>> Max
>>>
>>>
>>> On Thu, Oct 22, 2015 at 4:30 PM, Philip Lee <philjj...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I am trying to load the dataset with the part of null value by using
>>>> readCsvFile().
>>>>
>>>> // e.g  _date|_click|_sales|_item|_web_page|_user
>>>>
>>>> case class WebClick(_click_date: Long, _click_time: Long, _sales: Int, 
>>>> _item: Int,_page: Int, _user: Int)
>>>>
>>>> private def getWebClickDataSet(env: ExecutionEnvironment): 
>>>> DataSet[WebClick] = {
>>>>
>>>>   env.readCsvFile[WebClick](
>>>> webClickPath,
>>>> fieldDelimiter = "|",
>>>> includedFields = Array(0, 1, 2, 3, 4, 5),
>>>> // lenient = true
>>>>   )
>>>> }
>>>>
>>>>
>>>> Well, I know there is an option to ignore malformed value, but I have
>>>> to read the dataset even though it has null value.
>>>>
>>>> as it follows, dataset (third column is null) looks like
>>>> 37794|24669||16705|23|54810
>>>> but I have to read null value as well because I have to use filter or
>>>> where function ( _sales == null )
>>>>
>>>> Is there any detail suggestion to do it?
>>>>
>>>> Thanks,
>>>> Philip
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> --
>>>>
>>>> ==
>>>>
>>>> *Hae Joon Lee*
>>>>
>>>>
>>>> Now, in Germany,
>>>>
>>>> M.S. Candidate, Interested in Distributed System, Iterative Processing
>>>>
>>>> Dept. of Computer Science

Re: Hi, Flink people, a question about translation from HIVE Query to Flink fucntioin by using Table API

2015-10-19 Thread Philip Lee
Thanks, Fabian.

I just want to check one thing again.
As you said, [Distribute By] is partitionByHash(). and [Sort By] should be
sortGroup on Flink. However, [Cluster By] is consist of partitionByHash().
*sortPartition()*.

As far as I know, [Cluster By] is same as the combination with [Distribute
By] + [Sort By]. Therefore, according to your suggestion, should it be
partitionByHash() + sortGroup() instead of sortPartition() ?

Or probably I did not still get much difference between Partition and scope
within a reduce.

Regards,
Philip

On Mon, Oct 19, 2015 at 2:17 PM, Fabian Hueske <fhue...@gmail.com> wrote:

> Hi Philip,
>
> here a few additions to what Max said:
> - ORDER BY: As Max said, Flink's sortPartition() does only sort with a
> partition and does not produce a total order. You can either set the
> parallelism to 1 as Max suggested or use a custom partitioner to range
> partition the data.
> - SORT BY: From your description, the semantics are not 100% clear. If
> SORT BY refers to the order of tuples WITHIN a reduce function call, it
> should be groupBy().sortGroup() in Flink instead of sortPartition
> - DISTRIBUTE BY: This should be partitionByHash() instead of groupBy().
> GroupBy() will also sort the data which is not required for DISTRIBUTE BY.
> - CLUSTER BY: This should be partitionByHash().sortPartition().
> - Reduce vs. GroupReduce: A ReduceFunction is always combinable. This is
> optional for GroupReduceFunctions.
>
> Cheers, Fabian
>
>
>
> 2015-10-19 13:01 GMT+02:00 Maximilian Michels <m...@apache.org>:
>
>> Hi Philip,
>>
>> Thank you for your questions. I think you have mapped the HIVE
>> functions to the Flink ones correctly. Just a remark on the ORDER BY.
>> You wrote that it produces a total order of all the records. In this
>> case, you'd have do a SortPartition operation with parallelism set to
>> 1. This is necessary because we need to have all records in one place
>> to perform a sort on them.
>>
>> Considering your reduce question: There is no fundamental
>> advantage/disadvantage of using GroupReduce over Reduce. It depends on
>> your use case which one is more convenient or efficient. For the
>> regular reduce, you just get two elements and produce one. You can't
>> easily keep state between the reduces other than in the value itself.
>> The GroupReduce, on the other hand, may produce none, one, or multiple
>> elements per grouping and keep state in between emitting values. Thus,
>> GroupReduce is a more powerful operator and can be seen as a superset
>> of the Reduce operator. I would advise you to use the one you find
>> easiest to use.
>>
>> Best regards,
>> Max
>>
>> On Sun, Oct 18, 2015 at 9:16 PM, Philip Lee <philjj...@gmail.com> wrote:
>> > Hi, Flink people, a question about translation from HIVE Query to Flink
>> > fucntioin by using Table API. In sum up, I am working on some benchmark
>> for
>> > flink
>> >
>> > I am Philip Lee majoring in Computer Science in Master Degree of TUB. ,
>> I
>> > work on translation from Hive Query of Benchmark to Flink codes.
>> >
>> > As I stuided it, I have a few of questions.
>> >
>> > First of all, if there are people who do no know Hive functions, let me
>> > briefly explan.
>> >
>> > ORDER BY: it just guarntees total order in the output.
>> > SORT BY: it only guarntess ordering of the rows within a reducer.
>> > GROUP BY: this is just groupBy function in SQL.
>> > DISTRIBUTE BY: all rows with the same distributed by columns will go to
>> the
>> > same reducer.
>> > CLUSTER BY: this is just consisted of Distribute By the same column +
>> Sort
>> > By the same column.
>> >
>> > I just want to check that the flink functions I use are equal to Hive
>> one.
>> > < Hive SQL Query = Flink functions >
>> >
>> > ORDER BY = sortPartition(,)
>> > SORT BY= groupBy(`col).sortPartition(,)
>> > GROUP BY: this is just groupBy function.
>> > DISTRIBUTE BY = groupBy(`col)
>> > CLUSTER BY = groupBy(`col).sortPartition(,)
>> >
>> > I do not see much difference between groupBy and distributed by if I
>> apply
>> > it to flink function.
>> > If this is hadoop version, we could say mapper is distribute by on
>> hadoop.
>> > However, I am not much sure what could be DISTRIBUTE BY on flink. I
>> tried to
>> > guess groupBy on Flink could be the function which is to distribute the
>> rows
>> > by the specified key.
>> >
>> > Please feel fr