I am planning to calculating word count and two word dependency via spark, but the data is skew, how can i solve this problem. And do you have some suggest about double level data slice?
I have some topics, and each topic corresponding to lots of text. so I have a RDD structure like this: JavaPairRDD<String, Iterable<String>> topicGroup Then I want compute word count and word dependency to generate some text pattern for each topic. But unfortunately, some topic has a little data, such as 10k, some topic has a large data, such as 5G, this imbalance drive me crazy. So I have this solution: List<String> topics = topicsGroup.keys().collect(); LOG.warn("topics group key: " + topics.size()); for (String key : topics) { JavaRDD<String> valuesRDD = getValueRDDByKey(topicsGroup, key); long lineCount = valuesRDD.count(); int minNums = (int)10; List<String> lines = valuesRDD.collect(); JavaRDD<String[]> agentWordsRDD = valuesRDD.map((Function<String, String[]>) v1 -> splitAndParseLogLine(v1, logMiningOptions)).cache(); Map<String, Integer> dict = agentWordsRDD.flatMap((FlatMapFunction<String[], String>) strings -> Arrays.asList(strings)) .mapToPair((PairFunction<String, String, Integer>) s -> new Tuple2<>(s, 1)) .reduceByKey((Function2<Integer, Integer, Integer>) (v1, v2) -> v1 + v2) .filter((Function<Tuple2<String, Integer>, Boolean>) v1 -> { if (v1._2() < minNums || v1._1().length() < 1) return true; return false; }).collectAsMap(); JavaRDD<Tuple2<List<String>, List<Integer> > > tupleVarsRDD = agentWordsRDD.map((Function<String[], Tuple2<List<String>, List<Integer>>>) words -> { List<String> tuple = new ArrayList<String>(); List<Integer> vars = new ArrayList<Integer>(); int v = 0; for (String word : words) { if (dict.containsKey(word)) { tuple.add(word); vars.add(v); v = 0; } else ++v; } vars.add(v); return new Tuple2<>(tuple, vars); }); List<Tuple2<List<String>, List<Integer> > > tupleVars = tupleVarsRDD.collect(); LogMining logMining = new LogMining(logMiningOptions); Tuple2<List<Candidates>, Integer> patterns = new Tuple2<>(logMining.batchProcess(tupleVars, dict), logMining.getLineCount()); putPatternsToDatabase(logFileDate, key, patterns); } } But, you can see, I still don't solve the problem of data skew problem, and for each key i will generate a RDD, it has a lot of redundant operator. So my question is "do you have some good advice or example about how to handle this problem" or "when data is skew, how can i cut data by length, not sort by length, then cut avg like JavaRDD<String> valuesRDDPartition = valuesRDD.sortBy(new Function<String, Integer>() { @Override public Integer call(String v1) throws Exception { return logMiningOptions.getSplitWord().splitWord(v1).length; } }, true, 16);" it is 16, or some other numbers, not by length. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/handle-data-skew-problem-when-calculating-word-count-and-word-dependency-tp28068.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org