> "For csv reading, i deliberately did not use csv reader since i want to run
> same code across spark and flink."
>
> If your objective deviates from writing and running the fastest Spark and
> fastest Flink programs, then your comparison is worthless.

Well, I don't really agree with this. I would say that it can actually
be a valid objective to compare different systems in a way that we
don't tune the code very much to the individual systems. This is
because I guess it also happens sometimes in real (non-benchmark)
jobs, that we don't want to spend too much time on tuning.

However, in this case I also think that using the built-in CSV reader
method would not constitute as "too much tuning to a specific system".
So I would do this comparison with using the built-in CSV reader in
both systems.

Best,
Gábor



2016-11-18 15:30 GMT+01:00 Greg Hogan <c...@greghogan.com>:
> "For csv reading, i deliberately did not use csv reader since i want to run
> same code across spark and flink."
>
> If your objective deviates from writing and running the fastest Spark and
> fastest Flink programs, then your comparison is worthless.
>
>
>
> On Fri, Nov 18, 2016 at 5:37 AM, CPC <acha...@gmail.com> wrote:
>>
>> Hi Gabor,
>>
>> Thank you for your kind response. I forget to mention that i have actually
>> three workers. This is why i set default paralelism to 6.
>>
>> For csv reading, i deliberately did not use csv reader since i want to run
>> same code across spark and flink. Collect is returning 40k records which is
>> not so big.
>>
>> I will try same test with spark 1.5 and 1.6 as well to understand whether
>> spark 2.x series has some performance improvements because in those kind of
>> tests, spark and flink was either on par or flink 10-15% faster than spark
>> in the past. Aside from that are any configuration parameters you may
>> propose to fine tune flink?
>>
>> Best,
>> Anıl
>>
>> On Nov 18, 2016 12:25, "Gábor Gévay" <gga...@gmail.com> wrote:
>>>
>>> Hello,
>>>
>>> Your program looks mostly fine, but there are a few minor things that
>>> might help a bit:
>>>
>>> Parallelism: In your attached flink-conf.yaml, you have 2 task slots
>>> per task manager, and if you have 1 task manager, then your total
>>> number of task slots is also 2. However, your default parallelism is
>>> 6. In Flink, the recommended default parallelism is exactly the total
>>> number of task slots [1]. (This is in contrast to Spark, where the
>>> recommended setting is 2-3 per CPU core [2].)
>>>
>>> CSV reading: If your input is a CSV file, then you should use
>>> readCsvFile (instead of readTextFile and then parsing it manually).
>>>
>>> Collect call: How large is the DataSet that you are using collect on?
>>> If it is large, then we might try to figure out a way to get the top
>>> 10 elements without first collecting the DataSet.
>>>
>>> Best,
>>> Gábor
>>>
>>> [1]
>>> https://flink.apache.org/faq.html#what-is-the-parallelism-how-do-i-set-it
>>> [2] https://spark.apache.org/docs/latest/tuning.html#level-of-parallelism
>>>
>>>
>>>
>>>
>>>
>>> 2016-11-16 22:38 GMT+01:00 CPC <acha...@gmail.com>:
>>> > Hi all,
>>> >
>>> > I am trying to compare spark and flink batch performance. In my test i
>>> > am
>>> > using ratings.csv in
>>> > http://files.grouplens.org/datasets/movielens/ml-latest.zip dataset. I
>>> > also
>>> > concatenated ratings.csv 16 times to increase dataset size(total of
>>> > 390465536 records almost 10gb).I am reading from google storage with
>>> > gcs-connector and  file schema is : userId,movieId,rating,timestamp.
>>> > Basically i am calculating average rating per movie
>>> >
>>> > Code for flink(i tested CombineHint.HASH and CombineHint.SORT)
>>> >>
>>> >> case class Rating(userID: String, movieID: String, rating: Double,
>>> >> date:
>>> >> Timestamp)
>>> >
>>> >
>>> >>
>>> >> def parseRating(line: String): Rating = {
>>> >>   val arr = line.split(",")
>>> >>   Rating(arr(0), arr(1), arr(2).toDouble, new Timestamp((arr(3).toLong
>>> >> *
>>> >> 1000)))
>>> >> }
>>> >
>>> >
>>> >>
>>> >> val ratings: DataSet[Rating] =
>>> >>
>>> >> env.readTextFile("gs://cpcflink/wikistream/ratingsheadless16x.csv").map(a
>>> >>  =>
>>> >> parseRating(a))
>>> >> ratings
>>> >>   .map(i => (i.movieID, 1, i.rating))
>>> >>   .groupBy(0).reduce((l, r) => (l._1, l._2 + r._2, l._3 + r._3),
>>> >> CombineHint.HASH)
>>> >>   .map(i => (i._1, i._3 /
>>> >>
>>> >> i._2)).collect().sortBy(_._1).sortBy(_._2)(Ordering.Double.reverse).take(10)
>>> >
>>> >
>>> > with CombineHint.HASH 3m49s and with CombineHint.SORT 5m9s
>>> >
>>> > Code for Spark(i tested reduceByKey and reduceByKeyLocaly)
>>> >>
>>> >> case class Rating(userID: String, movieID: String, rating: Double,
>>> >> date:
>>> >> Timestamp)
>>> >> def parseRating(line: String): Rating = {
>>> >>   val arr = line.split(",")
>>> >>   Rating(arr(0), arr(1), arr(2).toDouble, new Timestamp((arr(3).toLong
>>> >> *
>>> >> 1000)))
>>> >> }
>>> >> val conf = new SparkConf().setAppName("Simple Application")
>>> >> val sc = new SparkContext(conf)
>>> >> val keyed: RDD[(String, (Int, Double))] =
>>> >>
>>> >> sc.textFile("gs://cpcflink/wikistream/ratingsheadless16x.csv").map(parseRating).map(r
>>> >> => (r.movieID, (1, r.rating)))
>>> >> keyed.reduceByKey((l, r) => (l._1 + r._1, l._2 + r._2)).mapValues(i =>
>>> >> i._2 /
>>> >>
>>> >> i._1).collect.sortBy(_._1).sortBy(a=>a._2)(Ordering.Double.reverse).take(10).foreach(println)
>>> >
>>> >
>>> > with reduceByKeyLocaly 2.9 minute(almost 2m54s) and reduceByKey 3.1
>>> > minute(almost 3m6s)
>>> >
>>> > Machine config on google cloud:
>>> > taskmanager/sparkmaster: n1-standard-1 (1 vCPU, 3.75 GB memory)
>>> > jobmanager/sparkworkers: n1-standard-2 (2 vCPUs, 7.5 GB memory)
>>> > java version:jdk jdk-8u102
>>> > flink:1.1.3
>>> > spark:2.0.2
>>> >
>>> > I also attached flink-conf.yaml. Although it is not such a big
>>> > difference
>>> > there is a 40% performance difference between spark and flink. Is there
>>> > something i am doing wrong? If there is not how can i fine tune flink
>>> > or is
>>> > it normal spark has better performance with batch data?
>>> >
>>> > Thank you in advance...
>
>

Reply via email to