Hi all, I'm having a binary file composed of messages which length is 57 bytes. The binary file contains exactly 100000 message and its size is about 44 MB ( I ' ve already verified that ).
What I do simply is reading the file via JavaStreaminContext.binaryRecordsStream("folder",57) so I construct a JavaDStream<byte[]>. After that I collect it and prints it in a buffer . The problem is that an only record is being created if I put its length to 57 when using JavaStreamingContext.binaryRecordsStream() after that in the collect() ther are exactly 1000000 records which are created if length of the record is 57 byte but are all the same (it contains the value of the first message of the file). In the other hand if i put the record length in binaryRecordStream() to 57000000 , an only record would be created when using both of the method. I verified that it does contains all of messages. If the record file is 285000 byte then binaryRecordStream() would create only one record which contains the first half of messages and collect() would create two records which are the same. I do believe the problem is in binaryRecordsStream() and collect() does only what is supposed to do. I'm very grateful in advance for your help. Hamza Here is a sample of the code : public static void main(String[] args) throws IniParserException, IOException, InterruptedException{ Path input = Paths.get("folder"); Ini ini = new Ini().read(input); Map<String,Map<String,String>> sections = ini.getSections(); //get key,values of SPARK_PARAMETERS Map<String,String> SPARK_PARAMETERS = sections.get("SPARK_PARAMETERS"); // Set up the context SparkConf spark_conf = new SparkConf(); spark_conf.setMaster("local[*]"); spark_conf.setAppName( "name" ); //configration des paramètres de Spark for (String optionKey: SPARK_PARAMETERS.keySet()) { spark_conf.set(optionKey, SPARK_PARAMETERS.get(optionKey)); } //configuration spark streaming JavaSparkContext sc = new JavaSparkContext(spark_conf); //Configuration des paramètres concernant SPARK_STREAMING final Map<String,String> SPARK_STREAMING_PARAMETERS = sections.get("SPARK_STREAMING_PARAMETERS"); JavaStreamingContext ssc = new JavaStreamingContext(sc,Durations.seconds(Integer.parseInt(SPARK_STREAMING_PARAMETERS.get("bash.duration")))); ssc.checkpoint(SPARK_STREAMING_PARAMETERS.get("checkpoint.dir")+"/checkpoint_" + Long.toString(System.currentTimeMillis())); //read a binary Source JavaDStream<byte[]> binary = ssc.binaryRecordsStream("folder",171); binary.foreachRDD(new VoidFunction<JavaRDD<byte[]>>() { public void call(JavaRDD<byte[]> rdd) throws IOException { if(!rdd.isEmpty()){ //Collect List<byte[]> list1Structure =rdd.collect(); //decode the message for(byte[] oct : list1Structure){ byte[] data = new byte[2]; //read size message message for(int j =0;j<2;j++ ){ data[j] = oct[j]; } long size = byteArrayToInt(data); //lire la signature data = new byte[11]; for(int j =0;j<11;j++ ){ data[j] = oct[j+10]; } String signature = new String(data); System.out.println("size "+size+" signature "+signature); } } } }); ssc.start(); ssc.awaitTermination(); }