Hi,I'm new to spark, and am facing a peculiar problem. I'm writing a simple Java Driver program where i'm creating Key / Value data structure and collecting them, once created. The problem i'm facing is that, when i increase the iterations of a for loop which creates the ArrayList of Long Values which i have to put into the Key / Value data structure and save in Spark as a Java Collection, the serialized size of tasks also increases proportionately. e.g: for Loop count : 10 Task Size : 1120 bytes for Loop Count : 10000 Task Size : 33402 bytes for Loop Count : 10000000 Task Size : 453434 bytes etc. I'm not able to understand why Task size increases, i tried to run the same example via Spark Shell, and i noticed the Task size remains the same, irrespective of the loop iteration count. Code : @Override public void execute() { // do something List numbers = new ArrayList(); JavaRDD distData = null; JavaPairRDD<String, Long> mapOfKeys = null; JavaRDD keysRDD = null; class ByKeyImpl implements Function<Long, String>, Serializable { /** * */ private static final long serialVersionUID = 5749098182016143296L; public String call(Long paramT1) throws Exception { // TODO Auto-generated method stub StringBuilder builder = new StringBuilder(); builder.append(paramT1).append(',').append(paramT1 + 1); return builder.toString(); } } System.out.println(" ************** STARTING BENCHMARK EXAMPLE ...*****************"); while(true) { System.out.println(" ************** DO YOU WANT TO CONTINUE ? (YES/NO) *****************"); BufferedReader reader = new BufferedReader(new InputStreamReader(System.in)); try { String continueString = reader.readLine(); if("yes".equalsIgnoreCase(continueString)) { if( numbers.size() == 0 ) { // List not populated for (long i = 0; i < num; i++) { numbers.add(i); } } // at this time numbers has long values in it. // check for RDD if already created or not. if( distData == null) { System.out.println("******************** NEW RDD CREATED.********************"); if ( numPartitions > 0) { distData = sc.parallelize(numbers,numPartitions) ; } else { distData = sc.parallelize(numbers) ; } } // at this time, RDD is already present or newly created // check if map is null or not if(mapOfKeys == null) { mapOfKeys = distData .keyBy(new ByKeyImpl()); keysRDD = mapOfKeys.keys(); keysRDD.persist(StorageLevel.MEMORY_ONLY()); } System.out.println("******** DO YOU WANT TO COUNT OR COLLECT THE COLLECTION ? *******************"); String inputOperation = reader.readLine(); if("count".equalsIgnoreCase(inputOperation)) { long startTime = Calendar.getInstance().getTimeInMillis(); System.out.println(" START Time of Function ... *** " + startTime); System.out.println("*************** KEYS COUNT IS **************** " + keysRDD.count()) ; long endTime = Calendar.getInstance().getTimeInMillis(); System.out.println(" END Time of Function ... *** " + endTime + " and difference is ************" + ((endTime - startTime) / 1000) + " sec(s)..."); } else if ("collect".equalsIgnoreCase(inputOperation)) { long startTime = Calendar.getInstance().getTimeInMillis(); System.out.println(" START Time of Function ... *** " + startTime); System.out.println("************* AFTER COLLECTING KEYS COUNT IS **************** " + keysRDD.collect().size() ); long endTime = Calendar.getInstance().getTimeInMillis(); System.out.println(" END Time of Function ... *** " + endTime + " and difference is ************" + ((endTime - startTime) / 1000) + " sec(s)..."); } } else if("no".equalsIgnoreCase(continueString)) { System.exit(0); } } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } In the code, the more the count of loop iterations, the collect / count, sends more data along with the Task size. Is there any way to reduce this ????????
-- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Task-Size-Increases-when-using-loops-tp17694.html Sent from the Apache Spark User List mailing list archive at Nabble.com.