Hello, we are using cloudera: kudu 1.20 with 5 tablet server and 3 master server spark 2.0 running on 5 worker nodes spark-jobs started with yarn-client and python 2.7.10 via spark2-submit
The code i´m running is essentially: df = sqlContext.read.format('org.apache.kudu.spark.kudu')\ .option('kudu.master', self.kudu_master)\ .option('kudu.table', self.source_table)\ .load() rdd = df.rdd.map(lambda x: compute_intensive_function(x)) print rdd.count() After 2 to 10 minutes spark reproducible throws something like: 17/03/17 09:18:18 INFO scheduler.TaskSetManager: Starting task 2.1 in stage 1.0 (TID 5, xx.xx.de, executor 19, partition 3, NODE_LOCAL, 6133 bytes) 17/03/17 09:18:18 INFO cluster.YarnSchedulerBackend$YarnDriverEndpoint: Launching task 5 on executor id: 19 hostname: xx.xx.de. 17/03/17 09:20:40 WARN scheduler.TaskSetManager: Lost task 2.1 in stage 1.0 (TID 5, xx.xx.de, executor 19): org.apache.kudu.client.NonRecoverableException: Scanner not found at org.apache.kudu.client.TabletClient.dispatchTSErrorOrReturnException(TabletClient.java:557) On the kudu tablet server in the logs something appears like: I0317 10:10:33.199148 28240 scanners.cc:165] Expiring scanner id: eb38ff5eb718426e85bc5bc22de7bdac, of tablet 4d316a9dd5424ceb91c91aff90290164, after 60314403 us of inactivity, which is > TTL (60000000 us). I0317 10:13:03.201413 28240 scanners.cc:165] Expiring scanner id: 00d5bec1906241ee8749e354665497e9, of tablet 4d316a9dd5424ceb91c91aff90290164, after 62854357 us of inactivity, which is > TTL (60000000 us). I0317 10:15:33.203809 28240 scanners.cc:165] Expiring scanner id: cabb68f96756463989e0abdbad1bbaaf, of tablet 4d316a9dd5424ceb91c91aff90290164, after 61797695 us of inactivity, which is > TTL (60000000 us). Using parquet rather than kudu as data source df = sqlContext.read.parquet('/tmp/test/foo') rdd = df.rdd.map(lambda x: compute_intensive_function(x)) print rdd.count() everything works out fine, even if one single computation task takes 15 minutes or more. In the kudu flags i found this parameter: --scanner_ttl_ms=60000 Altering it to some bigger value is helping. As our computation can take up to 90 minutes on one executor i would have to set scanner_ttl_ms to such a big value, that i fear some sideeffects i can't estimate. So my question to you is if there are other parameters i should try to alter, like --scanner_batch_size_rows=100 --scanner_default_batch_size_bytes=1048576 --scanner_max_batch_size_bytes=8388608 --scanner_max_wait_ms=1000 or parameters within spark which could affect the behaviour in a desirable way. The number of tasks spark is creating depends on the number of partitions in kudu. Probably it would be helpful to rise the number of tasks, as one single task would need less computation time, which would lead to fewer errors like reported above. I don´t know a performant and simple way to rise the number of tasks at will and i´m not sure if this approach would be desireable from a spark point of view. What would be an ideal number of kudu partitions when we have 5*20 Cores in the spark computation nodes? Thank you for your help Frank