Re: Setting startingOffsets to earliest in structured streaming never catches up

2017-01-22 Thread Timothy Chan
I'm using version 2.02.

The difference I see between using latest and earliest is a series of jobs
that take less than a second vs. one job that goes on for over 24 hours.

On Sun, Jan 22, 2017 at 6:54 PM Shixiong(Ryan) Zhu 
wrote:

> Which Spark version are you using? If you are using 2.1.0, could you use
> the monitoring APIs (
> http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#monitoring-streaming-queries)
> to check the input rate and the processing rate? One possible issue is that
> the Kafka source launched a pretty large batch and it took too long to
> finish it. If so, you can use "maxOffsetsPerTrigger" option to limit the
> data size in a batch in order to observe the progress.
>
> On Sun, Jan 22, 2017 at 10:22 AM, Timothy Chan 
> wrote:
>
> I'm running my structured streaming jobs in EMR. We were thinking a worst
> case scenario recovery situation would be to spin up another cluster and
> set startingOffsets to earliest (our Kafka cluster has a retention policy
> of 7 days).
>
> My observation is that the job never catches up to latest. This is not
> acceptable. I've set the number of partitions for the topic to 6. I've
> tried using a cluster of 4 in EMR.
>
> The producer rate for this topic is 4 events/second. Does anyone have any
> suggestions on what I can do to have my consumer catch up to latest faster?
>
>
>


Re: Setting startingOffsets to earliest in structured streaming never catches up

2017-01-22 Thread Shixiong(Ryan) Zhu
Which Spark version are you using? If you are using 2.1.0, could you use
the monitoring APIs (
http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#monitoring-streaming-queries)
to check the input rate and the processing rate? One possible issue is that
the Kafka source launched a pretty large batch and it took too long to
finish it. If so, you can use "maxOffsetsPerTrigger" option to limit the
data size in a batch in order to observe the progress.

On Sun, Jan 22, 2017 at 10:22 AM, Timothy Chan  wrote:

> I'm running my structured streaming jobs in EMR. We were thinking a worst
> case scenario recovery situation would be to spin up another cluster and
> set startingOffsets to earliest (our Kafka cluster has a retention policy
> of 7 days).
>
> My observation is that the job never catches up to latest. This is not
> acceptable. I've set the number of partitions for the topic to 6. I've
> tried using a cluster of 4 in EMR.
>
> The producer rate for this topic is 4 events/second. Does anyone have any
> suggestions on what I can do to have my consumer catch up to latest faster?
>


Setting startingOffsets to earliest in structured streaming never catches up

2017-01-22 Thread Timothy Chan
I'm running my structured streaming jobs in EMR. We were thinking a worst
case scenario recovery situation would be to spin up another cluster and
set startingOffsets to earliest (our Kafka cluster has a retention policy
of 7 days).

My observation is that the job never catches up to latest. This is not
acceptable. I've set the number of partitions for the topic to 6. I've
tried using a cluster of 4 in EMR.

The producer rate for this topic is 4 events/second. Does anyone have any
suggestions on what I can do to have my consumer catch up to latest faster?


Re: physical memory usage keep increasing for spark app on Yarn

2017-01-22 Thread Yang Cao
Also, do you know why this happen? 
> On 2017年1月20日, at 18:23, Pavel Plotnikov  
> wrote:
> 
> Hi Yang,
> i have faced with the same problem on Mesos and to circumvent this issue i am 
> usually increase partition number. On last step in your code you reduce 
> number of partitions to 1, try to set bigger value, may be it solve this 
> problem.
> 
> Cheers,
> Pavel
> 
> On Fri, Jan 20, 2017 at 12:35 PM Yang Cao  > wrote:
> Hi all,
> 
> I am running a spark application on YARN-client mode with 6 executors (each 4 
> cores and executor memory = 6G and Overhead = 4G, spark version: 1.6.3 / 
> 2.1.0). I find that my executor memory keeps increasing until get killed by 
> node manager; and give out the info that tells me to boost 
> spark.yarn.excutor.memoryOverhead. I know that this param mainly control the 
> size of memory allocated off-heap. But I don’t know when and how the spark 
> engine will use this part of memory. Also increase that part of memory not 
> always solve my problem. sometimes works sometimes not. It trends to be 
> useless when the input data is large.
> 
> FYI, my app’s logic is quite simple. It means to combine the small files 
> generated in one single day (one directory one day) into a single one and 
> write back to hdfs. Here is the core code:
> val df = spark.read.parquet(originpath).filter(s"m = ${ts.month} AND d = 
> ${ts.day}").coalesce(400)
> val dropDF = 
> df.drop("hh").drop("mm").drop("mode").drop("y").drop("m").drop("d")
> dropDF.repartition(1).write.mode(SaveMode.ErrorIfExists).parquet(targetpath)
> The source file may have hundreds to thousands level’s partition. And the 
> total parquet file is around 1to 5 gigs. Also I find that in the step that 
> shuffle reading data from different machines, The size of shuffle read is 
> about 4 times larger than the input size, Which is wired or some principle I 
> don’t know. 
> 
> Anyway, I have done some search myself for this problem. Some article said 
> that it’s on the direct buffer memory (I don’t set myself). Some article said 
> that people solve it with more frequent full GC. Also I find one people on SO 
> with very similar situation: 
> http://stackoverflow.com/questions/31646679/ever-increasing-physical-memory-for-a-spark-application-in-yarn
>  
> 
> This guy claimed that it’s a bug with parquet but comment questioned him. 
> People in this mail list may also receive an email hours ago from blondowski 
> who described this problem while writing json: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Executors-running-out-of-memory-tt28325.html#none
>  
> 
> 
> So it looks like to be common question for different output format. I hope 
> someone with experience about this problem could make an explanation about 
> this issue. Why this happen and what is a reliable way to solve this problem. 
> 
> Best,
> 
> 



Re: physical memory usage keep increasing for spark app on Yarn

2017-01-22 Thread Yang Cao
Hi,
Thank you for your suggestion. As I know If I set to bigger number I won’t get 
the output number as one file, right? My task is design to combine all that 
small files in one day to one big parquet file. THX again.

Best,
> On 2017年1月20日, at 18:23, Pavel Plotnikov  
> wrote:
> 
> Hi Yang,
> i have faced with the same problem on Mesos and to circumvent this issue i am 
> usually increase partition number. On last step in your code you reduce 
> number of partitions to 1, try to set bigger value, may be it solve this 
> problem.
> 
> Cheers,
> Pavel
> 
> On Fri, Jan 20, 2017 at 12:35 PM Yang Cao  > wrote:
> Hi all,
> 
> I am running a spark application on YARN-client mode with 6 executors (each 4 
> cores and executor memory = 6G and Overhead = 4G, spark version: 1.6.3 / 
> 2.1.0). I find that my executor memory keeps increasing until get killed by 
> node manager; and give out the info that tells me to boost 
> spark.yarn.excutor.memoryOverhead. I know that this param mainly control the 
> size of memory allocated off-heap. But I don’t know when and how the spark 
> engine will use this part of memory. Also increase that part of memory not 
> always solve my problem. sometimes works sometimes not. It trends to be 
> useless when the input data is large.
> 
> FYI, my app’s logic is quite simple. It means to combine the small files 
> generated in one single day (one directory one day) into a single one and 
> write back to hdfs. Here is the core code:
> val df = spark.read.parquet(originpath).filter(s"m = ${ts.month} AND d = 
> ${ts.day}").coalesce(400)
> val dropDF = 
> df.drop("hh").drop("mm").drop("mode").drop("y").drop("m").drop("d")
> dropDF.repartition(1).write.mode(SaveMode.ErrorIfExists).parquet(targetpath)
> The source file may have hundreds to thousands level’s partition. And the 
> total parquet file is around 1to 5 gigs. Also I find that in the step that 
> shuffle reading data from different machines, The size of shuffle read is 
> about 4 times larger than the input size, Which is wired or some principle I 
> don’t know. 
> 
> Anyway, I have done some search myself for this problem. Some article said 
> that it’s on the direct buffer memory (I don’t set myself). Some article said 
> that people solve it with more frequent full GC. Also I find one people on SO 
> with very similar situation: 
> http://stackoverflow.com/questions/31646679/ever-increasing-physical-memory-for-a-spark-application-in-yarn
>  
> 
> This guy claimed that it’s a bug with parquet but comment questioned him. 
> People in this mail list may also receive an email hours ago from blondowski 
> who described this problem while writing json: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Executors-running-out-of-memory-tt28325.html#none
>  
> 
> 
> So it looks like to be common question for different output format. I hope 
> someone with experience about this problem could make an explanation about 
> this issue. Why this happen and what is a reliable way to solve this problem. 
> 
> Best,
> 
>