Hi

I'm trying to do a join of two datasets: 800GB with ~50MB.

My code looks like this:

  private def parseClickEventLine(line: String, jsonFormatBC: 
Broadcast[LazyJsonFormat]): ClickEvent = {
    val json = line.parseJson.asJsObject
    val eventJson = if (json.fields.contains("recommendationId")) json else 
json.fields("message").asJsObject

    jsonFormatBC.value.clickEventJsonFormat.read(eventJson)
  }

    val jsonFormatBc: Broadcast[LazyJsonFormat] = sc.broadcast(new 
LazyJsonFormat)

    val views = sc.recoLogRdd(jobConfig.viewsDirectory)
      .map(view => (view.id.toString, view))

    val clicks = sc.textFile(s"${jobConfig.clicksDirectory}/*")
      .map(parseClickEventLine(_, jsonFormatBc))
      .map(click => (click.recommendationId, click))

    val clicksCounts = views.leftOuterJoin(clicks).map({ case 
(recommendationId, (view, click)) =>
      val metaItemType = click.flatMap(c => 
view.itemDetailsById.get(c.itemIdStr).map(_.metaItemType))
      (view, metaItemType) -> click.map(_ => 1).getOrElse(0)
    })
    clicksCounts.reduceByKey(_ + 
_).map(toCSV).saveAsTextFile(jobConfig.outputDirectory)

I'm using Spark 1.2.0 and have the following options set:

spark.default.parallelism = 24
spark.serializer': 'org.apache.spark.serializer.KryoSerializer',
spark.test.disableBlockManagerHeartBeat': 'true',
spark.shuffle.netty.connect.timeout': '30000',
spark.storage.blockManagerSlaveTimeoutMs': '30000',
spark.yarn.user.classpath.first': 'true',
spark.yarn.executor.memoryOverhead': '1536'

The job is run on YARN and I see errors in container logs:

015-03-11 09:16:56,629 WARN 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
 Container [pid=24500,containerID=container_1425476483191_402083_01_000019] is 
running beyond physical memory limits. Current usage: 6.0 GB of 6 GB physical 
memory used; 6.9 GB of 12.6 GB virtual memory used. Killing container.

So the problems is related to the excessive use of memory.

Could you advise me what should I fix in my code to make it work for my 
usecase? 
The strange thing is, that the code worked earlier, with versions around 1.0.0. 
Is it possible that changes between 1.0.0 and 1.2.0 caused that kind of 
regression?

Regards 
Marcin

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to