The difference is that one recives more data than the others two. I can pass thought parameters the topics, so, I could execute the code trying with one topic and figure out with one is the topic, although I guess that it's the topics which gets more data.
Anyway it's pretty weird those delays in just one of the cluster even if the another one is not running. I have seen the parameter "spark.streaming.kafka.maxRatePerPartition", I haven't set any value for this parameter, how does it work if this parameter doesn't have a value? 2015-07-30 16:32 GMT+02:00 Cody Koeninger <c...@koeninger.org>: > If the jobs are running on different topicpartitions, what's different > about them? Is one of them 120x the throughput of the other, for > instance? You should be able to eliminate cluster config as a difference > by running the same topic partition on the different clusters and comparing > the results. > > On Thu, Jul 30, 2015 at 9:29 AM, Guillermo Ortiz <konstt2...@gmail.com> > wrote: > >> I have three topics with one partition each topic. So each jobs run about >> one topics. >> >> 2015-07-30 16:20 GMT+02:00 Cody Koeninger <c...@koeninger.org>: >> >>> Just so I'm clear, the difference in timing you're talking about is this: >>> >>> 15/07/30 14:33:59 INFO DAGScheduler: Job 24 finished: foreachRDD at >>> MetricsSpark.scala:67, took 60.391761 s >>> >>> 15/07/30 14:37:35 INFO DAGScheduler: Job 93 finished: foreachRDD at >>> MetricsSpark.scala:67, took 0.531323 s >>> >>> >>> Are those jobs running on the same topicpartition? >>> >>> >>> On Thu, Jul 30, 2015 at 8:03 AM, Guillermo Ortiz <konstt2...@gmail.com> >>> wrote: >>> >>>> I read about maxRatePerPartition parameter, I haven't set this >>>> parameter. Could it be the problem?? Although this wouldn't explain why it >>>> doesn't work in one of the clusters. >>>> >>>> 2015-07-30 14:47 GMT+02:00 Guillermo Ortiz <konstt2...@gmail.com>: >>>> >>>>> They just share the kafka, the rest of resources are independents. I >>>>> tried to stop one cluster and execute just the cluster isn't working but >>>>> it >>>>> happens the same. >>>>> >>>>> 2015-07-30 14:41 GMT+02:00 Guillermo Ortiz <konstt2...@gmail.com>: >>>>> >>>>>> I have some problem with the JobScheduler. I have executed same code >>>>>> in two cluster. I read from three topics in Kafka with DirectStream so I >>>>>> have three tasks. >>>>>> >>>>>> I have check YARN and there aren't more jobs launched. >>>>>> >>>>>> The cluster where I have troubles I got this logs: >>>>>> >>>>>> 15/07/30 14:32:58 INFO TaskSetManager: Starting task 0.0 in stage >>>>>> 24.0 (TID 72, xxxxxxxxx, RACK_LOCAL, 14856 bytes) >>>>>> 15/07/30 14:32:58 INFO TaskSetManager: Starting task 1.0 in stage >>>>>> 24.0 (TID 73, xxxxxxxxxxxxxxx, RACK_LOCAL, 14852 bytes) >>>>>> 15/07/30 14:32:58 INFO BlockManagerInfo: Added broadcast_24_piece0 in >>>>>> memory on xxxxxxxxxxx:44909 (size: 1802.0 B, free: 530.3 MB) >>>>>> 15/07/30 14:32:58 INFO BlockManagerInfo: Added broadcast_24_piece0 in >>>>>> memory on xxxxxxxxx:43477 (size: 1802.0 B, free: 530.3 MB) >>>>>> 15/07/30 14:32:59 INFO TaskSetManager: Starting task 2.0 in stage >>>>>> 24.0 (TID 74, xxxxxxxxx, RACK_LOCAL, 14860 bytes) >>>>>> 15/07/30 14:32:59 INFO TaskSetManager: Finished task 0.0 in stage >>>>>> 24.0 (TID 72) in 208 ms on xxxxxxxxx (1/3) >>>>>> 15/07/30 14:32:59 INFO TaskSetManager: Finished task 2.0 in stage >>>>>> 24.0 (TID 74) in 49 ms on xxxxxxxxx (2/3) >>>>>> *15/07/30 14:33:00 INFO JobScheduler: Added jobs for time >>>>>> 1438259580000 ms* >>>>>> *15/07/30 14:33:05 INFO JobScheduler: Added jobs for time >>>>>> 1438259585000 ms* >>>>>> *15/07/30 14:33:10 INFO JobScheduler: Added jobs for time >>>>>> 1438259590000 ms* >>>>>> *15/07/30 14:33:15 INFO JobScheduler: Added jobs for time >>>>>> 1438259595000 ms* >>>>>> *15/07/30 14:33:20 INFO JobScheduler: Added jobs for time >>>>>> 1438259600000 ms* >>>>>> *15/07/30 14:33:25 INFO JobScheduler: Added jobs for time >>>>>> 1438259605000 ms* >>>>>> *15/07/30 14:33:30 INFO JobScheduler: Added jobs for time >>>>>> 1438259610000 ms* >>>>>> *15/07/30 14:33:35 INFO JobScheduler: Added jobs for time >>>>>> 1438259615000 ms* >>>>>> *15/07/30 14:33:40 INFO JobScheduler: Added jobs for time >>>>>> 1438259620000 ms* >>>>>> *15/07/30 14:33:45 INFO JobScheduler: Added jobs for time >>>>>> 1438259625000 ms* >>>>>> *15/07/30 14:33:50 INFO JobScheduler: Added jobs for time >>>>>> 1438259630000 ms* >>>>>> *15/07/30 14:33:55 INFO JobScheduler: Added jobs for time >>>>>> 1438259635000 ms* >>>>>> 15/07/30 14:33:59 INFO TaskSetManager: Finished task 1.0 in stage >>>>>> 24.0 (TID 73) in 60373 ms onxxxxxxxxxxxxxxxx (3/3) >>>>>> 15/07/30 14:33:59 INFO YarnScheduler: Removed TaskSet 24.0, whose >>>>>> tasks have all completed, from pool >>>>>> 15/07/30 14:33:59 INFO DAGScheduler: Stage 24 (foreachRDD at >>>>>> MetricsSpark.scala:67) finished in 60.379 s >>>>>> 15/07/30 14:33:59 INFO DAGScheduler: Job 24 finished: foreachRDD at >>>>>> MetricsSpark.scala:67, took 60.391761 s >>>>>> 15/07/30 14:33:59 INFO JobScheduler: Finished job streaming job >>>>>> 1438258210000 ms.0 from job set of time 1438258210000 ms >>>>>> 15/07/30 14:33:59 INFO JobScheduler: Total delay: 1429.249 s for time >>>>>> 1438258210000 ms (execution: 60.399 s) >>>>>> 15/07/30 14:33:59 INFO JobScheduler: Starting job streaming job >>>>>> 1438258215000 ms.0 from job set of time 1438258215000 ms >>>>>> >>>>>> There are *always *a minute of delay in the third task, when I have >>>>>> executed same code in another cluster there isn't this delay in the >>>>>> JobScheduler. I checked the configuration in YARN in both clusters and it >>>>>> seems the same. >>>>>> >>>>>> The log in the cluster is working good is >>>>>> >>>>>> 15/07/30 14:37:35 INFO YarnScheduler: Adding task set 93.0 with 3 >>>>>> tasks >>>>>> 15/07/30 14:37:35 INFO TaskSetManager: Starting task 0.0 in stage >>>>>> 93.0 (TID 279, xxxxxxxxxxxxxxxxxx, RACK_LOCAL, 14643 bytes) >>>>>> 15/07/30 14:37:35 INFO TaskSetManager: Starting task 1.0 in stage >>>>>> 93.0 (TID 280, xxxxxxxxx, RACK_LOCAL, 14639 bytes) >>>>>> 15/07/30 14:37:35 INFO BlockManagerInfo: Added broadcast_93_piece0 in >>>>>> memory on xxxxxxxxxxxxxxxxx:45132 (size: 1801.0 B, free: 530.3 MB) >>>>>> 15/07/30 14:37:35 INFO TaskSetManager: Starting task 2.0 in stage >>>>>> 93.0 (TID 281, xxxxxxxxxxxxxxxxxxx, RACK_LOCAL, 14647 bytes) >>>>>> 15/07/30 14:37:35 INFO TaskSetManager: Finished task 0.0 in stage >>>>>> 93.0 (TID 279) in 121 ms on xxxxxxxxxxxxxxxxxxxx (1/3) >>>>>> 15/07/30 14:37:35 INFO BlockManagerInfo: Added broadcast_93_piece0 in >>>>>> memory on xxxxxxxxx:49886 (size: 1801.0 B, free: 530.3 MB) >>>>>> 15/07/30 14:37:35 INFO TaskSetManager: Finished task 2.0 in stage >>>>>> 93.0 (TID 281) in 261 ms on xxxxxxxxxxxxxxxxxx (2/3) >>>>>> 15/07/30 14:37:35 INFO TaskSetManager: Finished task 1.0 in stage >>>>>> 93.0 (TID 280) in 519 ms on xxxxxxxxx (3/3) >>>>>> 15/07/30 14:37:35 INFO DAGScheduler: Stage 93 (foreachRDD at >>>>>> MetricsSpark.scala:67) finished in 0.522 s >>>>>> 15/07/30 14:37:35 INFO YarnScheduler: Removed TaskSet 93.0, whose >>>>>> tasks have all completed, from pool >>>>>> 15/07/30 14:37:35 INFO DAGScheduler: Job 93 finished: foreachRDD at >>>>>> MetricsSpark.scala:67, took 0.531323 s >>>>>> 15/07/30 14:37:35 INFO JobScheduler: Finished job streaming job >>>>>> 1438259855000 ms.0 from job set of time 1438259855000 ms >>>>>> 15/07/30 14:37:35 INFO JobScheduler: Total delay: 0.548 s for time >>>>>> 1438259855000 ms (execution: 0.540 s) >>>>>> 15/07/30 14:37:35 INFO KafkaRDD: Removing RDD 184 from persistence >>>>>> list >>>>>> >>>>>> Any clue about where I could take a look? Number of cpus in YARN is >>>>>> enough. I executing YARN with same options (--master yarn-server with 1g >>>>>> of >>>>>> memory in both) >>>>>> >>>>> >>>>> >>>> >>> >> >