Hi I'm trying to do a join of two datasets: 800GB with ~50MB.
My code looks like this: private def parseClickEventLine(line: String, jsonFormatBC: Broadcast[LazyJsonFormat]): ClickEvent = { val json = line.parseJson.asJsObject val eventJson = if (json.fields.contains("recommendationId")) json else json.fields("message").asJsObject jsonFormatBC.value.clickEventJsonFormat.read(eventJson) } val jsonFormatBc: Broadcast[LazyJsonFormat] = sc.broadcast(new LazyJsonFormat) val views = sc.recoLogRdd(jobConfig.viewsDirectory) .map(view => (view.id.toString, view)) val clicks = sc.textFile(s"${jobConfig.clicksDirectory}/*") .map(parseClickEventLine(_, jsonFormatBc)) .map(click => (click.recommendationId, click)) val clicksCounts = views.leftOuterJoin(clicks).map({ case (recommendationId, (view, click)) => val metaItemType = click.flatMap(c => view.itemDetailsById.get(c.itemIdStr).map(_.metaItemType)) (view, metaItemType) -> click.map(_ => 1).getOrElse(0) }) clicksCounts.reduceByKey(_ + _).map(toCSV).saveAsTextFile(jobConfig.outputDirectory) I'm using Spark 1.2.0 and have the following options set: spark.default.parallelism = 24 spark.serializer': 'org.apache.spark.serializer.KryoSerializer', spark.test.disableBlockManagerHeartBeat': 'true', spark.shuffle.netty.connect.timeout': '30000', spark.storage.blockManagerSlaveTimeoutMs': '30000', spark.yarn.user.classpath.first': 'true', spark.yarn.executor.memoryOverhead': '1536' The job is run on YARN and I see errors in container logs: 015-03-11 09:16:56,629 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Container [pid=24500,containerID=container_1425476483191_402083_01_000019] is running beyond physical memory limits. Current usage: 6.0 GB of 6 GB physical memory used; 6.9 GB of 12.6 GB virtual memory used. Killing container. So the problems is related to the excessive use of memory. Could you advise me what should I fix in my code to make it work for my usecase? The strange thing is, that the code worked earlier, with versions around 1.0.0. Is it possible that changes between 1.0.0 and 1.2.0 caused that kind of regression? Regards Marcin --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org