Hi all,

I'm having a binary file composed of messages which  length is 57 bytes. The 
binary file contains exactly 100000 message and its size is about 44 MB ( I ' 
ve already verified that ).

What I do simply is reading the file via 
JavaStreaminContext.binaryRecordsStream("folder",57)  so I construct  a 
JavaDStream<byte[]>. After that I collect it and prints it in  a buffer .

The problem is that an only record is being created  if I put its length to 57 
when using JavaStreamingContext.binaryRecordsStream()  after that in the 
collect() ther are exactly 1000000 records which are created if length of the 
record is 57 byte but are all the same (it contains the value of the first 
message of the file).

In the other hand if i put the record length in binaryRecordStream() to 
57000000 , an only record would be created when using both of the method. I 
verified that it does contains all of messages.

If the record file is 285000 byte then binaryRecordStream() would create only 
one record which contains the first half of messages and collect() would create 
two records which are the same.

I do believe the problem is in binaryRecordsStream() and collect() does only 
what is supposed to do.

I'm very grateful in advance for your help.

Hamza


Here is a sample of the code :

public static void main(String[] args) throws IniParserException, IOException, 
InterruptedException{

                                Path input = Paths.get("folder");
                                Ini ini = new Ini().read(input);
                                Map<String,Map<String,String>> sections = 
ini.getSections();
                                //get key,values of SPARK_PARAMETERS
                                Map<String,String> SPARK_PARAMETERS = 
sections.get("SPARK_PARAMETERS");
                                // Set up the context
                                SparkConf spark_conf = new SparkConf();
                                spark_conf.setMaster("local[*]");
                                spark_conf.setAppName( "name" );
                                //configration des paramètres de Spark
                                for (String optionKey: 
SPARK_PARAMETERS.keySet()) {
                                        spark_conf.set(optionKey, 
SPARK_PARAMETERS.get(optionKey));
                                }
                                //configuration spark streaming
                                JavaSparkContext sc = new 
JavaSparkContext(spark_conf);
                                //Configuration des paramètres concernant 
SPARK_STREAMING
                                final Map<String,String> 
SPARK_STREAMING_PARAMETERS = sections.get("SPARK_STREAMING_PARAMETERS");
                                JavaStreamingContext ssc = new 
JavaStreamingContext(sc,Durations.seconds(Integer.parseInt(SPARK_STREAMING_PARAMETERS.get("bash.duration"))));
                        
ssc.checkpoint(SPARK_STREAMING_PARAMETERS.get("checkpoint.dir")+"/checkpoint_" 
+ Long.toString(System.currentTimeMillis()));
                        //read a binary Source
                        JavaDStream<byte[]> binary = 
ssc.binaryRecordsStream("folder",171);
                        binary.foreachRDD(new VoidFunction<JavaRDD<byte[]>>() {

                                    public void call(JavaRDD<byte[]> rdd) 
throws IOException {

                                         if(!rdd.isEmpty()){
                                                 //Collect
                                                 List<byte[]>         
list1Structure =rdd.collect();


                                                 //decode the message
                                                 for(byte[] oct : 
list1Structure){
                                                        byte[] data = new 
byte[2];
                                                        //read size message 
message
                                                    for(int j =0;j<2;j++ ){
                                                            data[j] = oct[j];
                                                        }
                                                long size = 
byteArrayToInt(data);


                                                 //lire la signature
                                                 data = new byte[11];
                                                 for(int j =0;j<11;j++ ){
                                                        data[j] = oct[j+10];
                                                  }
                                                  String signature = new 
String(data);




                                                System.out.println("size 
"+size+" signature "+signature);


                                                 }

                                         }
                                 }
                        });

                        ssc.start();
                        ssc.awaitTermination();

        }

Reply via email to