I have a simple spark job that seems to hang when saving to hdfs.  When
looking at the spark web ui, the job reached 97 of 100 tasks completed. I
need some help determining why the job appears to hang.  The job hangs on
the "saveAsTextFile()" call.



https://www.dropbox.com/s/fdp7ck91hhm9w68/Screenshot%202014-08-19%2010.53.24.png


The job is pretty simple:

     JavaRDD<String> analyticsLogs = context
                .textFile(Joiner.on(",").join(hdfs.glob("/spark-dfs",
".*\\.log$")), 4);

        JavaRDD<AnalyticsLogFlyweight> flyweights = analyticsLogs
                .map(line -> {
                    try {
                        AnalyticsLog log = GSON.fromJson(line,
AnalyticsLog.class);
                        AnalyticsLogFlyweight flyweight = new
AnalyticsLogFlyweight();
                        flyweight.ipAddress = log.getIpAddress();
                        flyweight.time = log.getTime();
                        flyweight.trackingId = log.getTrackingId();
                        return flyweight;

                    } catch (Exception e) {
                        LOG.error("error parsing json", e);
                        return null;
                    }
                });


        JavaRDD<AnalyticsLogFlyweight> filtered = flyweights
                .filter(log -> log != null);

        JavaPairRDD<String, AnalyticsLogFlyweight> partitioned = filtered
                .mapToPair((AnalyticsLogFlyweight log) -> new
Tuple2<>(log.trackingId, log))
                .partitionBy(new HashPartitioner(100)).cache();


        Ordering<AnalyticsLogFlyweight> ordering =
Ordering.natural().nullsFirst().onResultOf(new
Function<AnalyticsLogFlyweight, Long>() {
            public Long apply(AnalyticsLogFlyweight log) {
                return log.time;
            }
        });

        JavaPairRDD<String, Iterable<AnalyticsLogFlyweight>>
stringIterableJavaPairRDD = partitioned.groupByKey();
        JavaPairRDD<String, Integer> stringIntegerJavaPairRDD =
stringIterableJavaPairRDD.mapToPair((log) -> {
            List<AnalyticsLogFlyweight> sorted =
Lists.newArrayList(log._2());
            sorted.forEach(l -> LOG.info("sorted {}", l));
           return new Tuple2<>(log._1(), sorted.size());
        });

        String outputPath = "/summarized/groupedByTrackingId4";
        hdfs.rm(outputPath, true);
        stringIntegerJavaPairRDD.saveAsTextFile(String.format("%s/%s",
hdfs.getUrl(), outputPath));


Thanks in advance, David

Reply via email to