Yes, and that is indeed the problem. It is trying to process all the data
in Kafka, and therefore taking 60 seconds. You need to set the rate limits
for that.

On Thu, Jul 30, 2015 at 8:51 AM, Cody Koeninger <c...@koeninger.org> wrote:

> If you don't set it, there is no maximum rate, it will get everything from
> the end of the last batch to the maximum available offset
>
> On Thu, Jul 30, 2015 at 10:46 AM, Guillermo Ortiz <konstt2...@gmail.com>
> wrote:
>
>> The difference is that one recives more data than the others two. I can
>> pass thought parameters the topics, so, I could execute the code trying
>> with one topic and figure out with one is the topic, although I guess that
>> it's the topics which gets more data.
>>
>> Anyway it's pretty weird those delays in just one of the cluster even if
>> the another one is not running.
>> I have seen the parameter "spark.streaming.kafka.maxRatePerPartition", I
>> haven't set any value for this parameter, how does it work if this
>> parameter doesn't have a value?
>>
>> 2015-07-30 16:32 GMT+02:00 Cody Koeninger <c...@koeninger.org>:
>>
>>> If the jobs are running on different topicpartitions, what's different
>>> about them?  Is one of them 120x the throughput of the other, for
>>> instance?  You should be able to eliminate cluster config as a difference
>>> by running the same topic partition on the different clusters and comparing
>>> the results.
>>>
>>> On Thu, Jul 30, 2015 at 9:29 AM, Guillermo Ortiz <konstt2...@gmail.com>
>>> wrote:
>>>
>>>> I have three topics with one partition each topic. So each jobs run
>>>> about one topics.
>>>>
>>>> 2015-07-30 16:20 GMT+02:00 Cody Koeninger <c...@koeninger.org>:
>>>>
>>>>> Just so I'm clear, the difference in timing you're talking about is
>>>>> this:
>>>>>
>>>>> 15/07/30 14:33:59 INFO DAGScheduler: Job 24 finished: foreachRDD at
>>>>> MetricsSpark.scala:67, took 60.391761 s
>>>>>
>>>>> 15/07/30 14:37:35 INFO DAGScheduler: Job 93 finished: foreachRDD at
>>>>> MetricsSpark.scala:67, took 0.531323 s
>>>>>
>>>>>
>>>>> Are those jobs running on the same topicpartition?
>>>>>
>>>>>
>>>>> On Thu, Jul 30, 2015 at 8:03 AM, Guillermo Ortiz <konstt2...@gmail.com
>>>>> > wrote:
>>>>>
>>>>>> I read about maxRatePerPartition parameter, I haven't set this
>>>>>> parameter. Could it be the problem?? Although this wouldn't explain why 
>>>>>> it
>>>>>> doesn't work in one of the clusters.
>>>>>>
>>>>>> 2015-07-30 14:47 GMT+02:00 Guillermo Ortiz <konstt2...@gmail.com>:
>>>>>>
>>>>>>> They just share the kafka, the rest of resources are independents. I
>>>>>>> tried to stop one cluster and execute just the cluster isn't working 
>>>>>>> but it
>>>>>>> happens the same.
>>>>>>>
>>>>>>> 2015-07-30 14:41 GMT+02:00 Guillermo Ortiz <konstt2...@gmail.com>:
>>>>>>>
>>>>>>>> I have some problem with the JobScheduler. I have executed same
>>>>>>>> code in two cluster. I read from three topics in Kafka with 
>>>>>>>> DirectStream so
>>>>>>>> I have three tasks.
>>>>>>>>
>>>>>>>> I have check YARN and there aren't more jobs launched.
>>>>>>>>
>>>>>>>> The cluster where I have troubles I got this logs:
>>>>>>>>
>>>>>>>> 15/07/30 14:32:58 INFO TaskSetManager: Starting task 0.0 in stage
>>>>>>>> 24.0 (TID 72, xxxxxxxxx, RACK_LOCAL, 14856 bytes)
>>>>>>>> 15/07/30 14:32:58 INFO TaskSetManager: Starting task 1.0 in stage
>>>>>>>> 24.0 (TID 73, xxxxxxxxxxxxxxx, RACK_LOCAL, 14852 bytes)
>>>>>>>> 15/07/30 14:32:58 INFO BlockManagerInfo: Added broadcast_24_piece0
>>>>>>>> in memory on xxxxxxxxxxx:44909 (size: 1802.0 B, free: 530.3 MB)
>>>>>>>> 15/07/30 14:32:58 INFO BlockManagerInfo: Added broadcast_24_piece0
>>>>>>>> in memory on xxxxxxxxx:43477 (size: 1802.0 B, free: 530.3 MB)
>>>>>>>> 15/07/30 14:32:59 INFO TaskSetManager: Starting task 2.0 in stage
>>>>>>>> 24.0 (TID 74, xxxxxxxxx, RACK_LOCAL, 14860 bytes)
>>>>>>>> 15/07/30 14:32:59 INFO TaskSetManager: Finished task 0.0 in stage
>>>>>>>> 24.0 (TID 72) in 208 ms on xxxxxxxxx (1/3)
>>>>>>>> 15/07/30 14:32:59 INFO TaskSetManager: Finished task 2.0 in stage
>>>>>>>> 24.0 (TID 74) in 49 ms on xxxxxxxxx (2/3)
>>>>>>>> *15/07/30 14:33:00 INFO JobScheduler: Added jobs for time
>>>>>>>> 1438259580000 ms*
>>>>>>>> *15/07/30 14:33:05 INFO JobScheduler: Added jobs for time
>>>>>>>> 1438259585000 ms*
>>>>>>>> *15/07/30 14:33:10 INFO JobScheduler: Added jobs for time
>>>>>>>> 1438259590000 ms*
>>>>>>>> *15/07/30 14:33:15 INFO JobScheduler: Added jobs for time
>>>>>>>> 1438259595000 ms*
>>>>>>>> *15/07/30 14:33:20 INFO JobScheduler: Added jobs for time
>>>>>>>> 1438259600000 ms*
>>>>>>>> *15/07/30 14:33:25 INFO JobScheduler: Added jobs for time
>>>>>>>> 1438259605000 ms*
>>>>>>>> *15/07/30 14:33:30 INFO JobScheduler: Added jobs for time
>>>>>>>> 1438259610000 ms*
>>>>>>>> *15/07/30 14:33:35 INFO JobScheduler: Added jobs for time
>>>>>>>> 1438259615000 ms*
>>>>>>>> *15/07/30 14:33:40 INFO JobScheduler: Added jobs for time
>>>>>>>> 1438259620000 ms*
>>>>>>>> *15/07/30 14:33:45 INFO JobScheduler: Added jobs for time
>>>>>>>> 1438259625000 ms*
>>>>>>>> *15/07/30 14:33:50 INFO JobScheduler: Added jobs for time
>>>>>>>> 1438259630000 ms*
>>>>>>>> *15/07/30 14:33:55 INFO JobScheduler: Added jobs for time
>>>>>>>> 1438259635000 ms*
>>>>>>>> 15/07/30 14:33:59 INFO TaskSetManager: Finished task 1.0 in stage
>>>>>>>> 24.0 (TID 73) in 60373 ms onxxxxxxxxxxxxxxxx (3/3)
>>>>>>>> 15/07/30 14:33:59 INFO YarnScheduler: Removed TaskSet 24.0, whose
>>>>>>>> tasks have all completed, from pool
>>>>>>>> 15/07/30 14:33:59 INFO DAGScheduler: Stage 24 (foreachRDD at
>>>>>>>> MetricsSpark.scala:67) finished in 60.379 s
>>>>>>>> 15/07/30 14:33:59 INFO DAGScheduler: Job 24 finished: foreachRDD at
>>>>>>>> MetricsSpark.scala:67, took 60.391761 s
>>>>>>>> 15/07/30 14:33:59 INFO JobScheduler: Finished job streaming job
>>>>>>>> 1438258210000 ms.0 from job set of time 1438258210000 ms
>>>>>>>> 15/07/30 14:33:59 INFO JobScheduler: Total delay: 1429.249 s for
>>>>>>>> time 1438258210000 ms (execution: 60.399 s)
>>>>>>>> 15/07/30 14:33:59 INFO JobScheduler: Starting job streaming job
>>>>>>>> 1438258215000 ms.0 from job set of time 1438258215000 ms
>>>>>>>>
>>>>>>>> There are *always *a minute of delay in the third task, when I
>>>>>>>> have executed same code in another cluster there isn't this delay in 
>>>>>>>> the
>>>>>>>> JobScheduler. I checked the configuration in YARN in both clusters and 
>>>>>>>> it
>>>>>>>> seems the same.
>>>>>>>>
>>>>>>>> The log in the cluster is working good is
>>>>>>>>
>>>>>>>> 15/07/30 14:37:35 INFO YarnScheduler: Adding task set 93.0 with 3
>>>>>>>> tasks
>>>>>>>> 15/07/30 14:37:35 INFO TaskSetManager: Starting task 0.0 in stage
>>>>>>>> 93.0 (TID 279, xxxxxxxxxxxxxxxxxx, RACK_LOCAL, 14643 bytes)
>>>>>>>> 15/07/30 14:37:35 INFO TaskSetManager: Starting task 1.0 in stage
>>>>>>>> 93.0 (TID 280, xxxxxxxxx, RACK_LOCAL, 14639 bytes)
>>>>>>>> 15/07/30 14:37:35 INFO BlockManagerInfo: Added broadcast_93_piece0
>>>>>>>> in memory on xxxxxxxxxxxxxxxxx:45132 (size: 1801.0 B, free: 530.3 MB)
>>>>>>>> 15/07/30 14:37:35 INFO TaskSetManager: Starting task 2.0 in stage
>>>>>>>> 93.0 (TID 281, xxxxxxxxxxxxxxxxxxx, RACK_LOCAL, 14647 bytes)
>>>>>>>> 15/07/30 14:37:35 INFO TaskSetManager: Finished task 0.0 in stage
>>>>>>>> 93.0 (TID 279) in 121 ms on xxxxxxxxxxxxxxxxxxxx (1/3)
>>>>>>>> 15/07/30 14:37:35 INFO BlockManagerInfo: Added broadcast_93_piece0
>>>>>>>> in memory on xxxxxxxxx:49886 (size: 1801.0 B, free: 530.3 MB)
>>>>>>>> 15/07/30 14:37:35 INFO TaskSetManager: Finished task 2.0 in stage
>>>>>>>> 93.0 (TID 281) in 261 ms on xxxxxxxxxxxxxxxxxx (2/3)
>>>>>>>> 15/07/30 14:37:35 INFO TaskSetManager: Finished task 1.0 in stage
>>>>>>>> 93.0 (TID 280) in 519 ms on xxxxxxxxx (3/3)
>>>>>>>> 15/07/30 14:37:35 INFO DAGScheduler: Stage 93 (foreachRDD at
>>>>>>>> MetricsSpark.scala:67) finished in 0.522 s
>>>>>>>> 15/07/30 14:37:35 INFO YarnScheduler: Removed TaskSet 93.0, whose
>>>>>>>> tasks have all completed, from pool
>>>>>>>> 15/07/30 14:37:35 INFO DAGScheduler: Job 93 finished: foreachRDD at
>>>>>>>> MetricsSpark.scala:67, took 0.531323 s
>>>>>>>> 15/07/30 14:37:35 INFO JobScheduler: Finished job streaming job
>>>>>>>> 1438259855000 ms.0 from job set of time 1438259855000 ms
>>>>>>>> 15/07/30 14:37:35 INFO JobScheduler: Total delay: 0.548 s for time
>>>>>>>> 1438259855000 ms (execution: 0.540 s)
>>>>>>>> 15/07/30 14:37:35 INFO KafkaRDD: Removing RDD 184 from persistence
>>>>>>>> list
>>>>>>>>
>>>>>>>> Any clue about where I could take a look? Number of cpus in YARN is
>>>>>>>> enough. I executing YARN with same options (--master yarn-server with 
>>>>>>>> 1g of
>>>>>>>> memory in both)
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to