Kant,
We need to narrow it down to a reproducible code. You are using streaming
What is the content of ur streamed data. If u provide that I can run a
streaming programming that reads from a local dir and narrow down the
problem
I have seen similar error for doing something completely different. As u
say there might be problem with ur transformation coming from the structure
of the data. Send me a sample of the data you are streaming and I write a
small test case....kr

On 1 Dec 2016 9:44 am, "kant kodali" <kanth...@gmail.com> wrote:

> sorry for multiple emails. I just think more info is needed every time to
> address this problem
>
> My Spark Client program runs in a client mode and it runs on a node that
> has 2 vCPU's and 8GB RAM (m4.large)
> I have 2 Spark worker nodes and each have 4 vCPU's and 16GB RAM
>  (m3.xlarge for each spark worker instance)
>
>
>
> On Thu, Dec 1, 2016 at 12:55 AM, kant kodali <kanth...@gmail.com> wrote:
>
>> My batch interval is 1s
>> slide interval is 1s
>> window interval is 1 minute
>>
>> I am using a standalone alone cluster. I don't have any storage layer
>> like HDFS.  so I dont know what is a connection between RDD and blocks (I
>> know that for every batch one RDD is produced)? what is a block in this
>> context? is it a disk block ? if so, what is it default size? and Finally,
>> why does the following error happens so often?
>>
>> java.lang.Exception: Could not compute split, block input-0-1480539568000
>> not found
>>
>>
>>
>> On Thu, Dec 1, 2016 at 12:42 AM, kant kodali <kanth...@gmail.com> wrote:
>>
>>> I also use this super(StorageLevel.MEMORY_AND_DISK_2());
>>>
>>> inside my receiver
>>>
>>> On Wed, Nov 30, 2016 at 10:44 PM, kant kodali <kanth...@gmail.com>
>>> wrote:
>>>
>>>> Here is another transformation that might cause the error but it has to
>>>> be one of these two since I only have two transformations
>>>>
>>>> jsonMessagesDStream
>>>>         .window(new Duration(60000), new Duration(1000))
>>>>         .mapToPair(new PairFunction<String, String, Long>() {
>>>>             @Override
>>>>             public Tuple2<String, Long> call(String s) throws Exception {
>>>>                 //System.out.println(s + " *****************************");
>>>>                 JsonParser parser = new JsonParser();
>>>>                 JsonObject jsonObj = parser.parse(s).getAsJsonObject();
>>>>
>>>>                 if (jsonObj != null && jsonObj.has("var1")) {
>>>>                     JsonObject jsonObject = 
>>>> jsonObj.get("var1").getAsJsonObject();
>>>>                     if (jsonObject != null && jsonObject.has("var2") && 
>>>> jsonObject.get("var2").getAsBoolean() && jsonObject.has("var3") ) {
>>>>                         long num = jsonObject.get("var3").getAsLong();
>>>>
>>>>                         return new Tuple2<String, Long>("var3", num);
>>>>                     }
>>>>                 }
>>>>
>>>>                 return new Tuple2<String, Long>("var3", 0L);
>>>>             }
>>>>         }).reduceByKey(new Function2<Long, Long, Long>() {
>>>>             @Override
>>>>             public Long call(Long v1, Long v2) throws Exception {
>>>>                 return v1+v2;
>>>>          }
>>>>         }).foreachRDD(new VoidFunction<JavaPairRDD<String, Long>>() {
>>>>             @Override
>>>>             public void call(JavaPairRDD<String, Long> 
>>>> stringIntegerJavaPairRDD) throws Exception {
>>>>                 Map<String, Long> map = new HashMap<>();
>>>>                 Gson gson = new Gson();
>>>>                 stringIntegerJavaPairRDD
>>>>                     .collect()
>>>>                     .forEach((Tuple2<String, Long> KV) -> {
>>>>                             String status = KV._1();
>>>>                             Long count = KV._2();
>>>>                             map.put(status, count);
>>>>                         }
>>>>                     );
>>>>                 NSQReceiver.send(producer, "dashboard", 
>>>> gson.toJson(map).getBytes());
>>>>             }
>>>>         });
>>>>
>>>>
>>>> On Wed, Nov 30, 2016 at 10:40 PM, kant kodali <kanth...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Marco,
>>>>>
>>>>>
>>>>> Here is what my code looks like
>>>>>
>>>>> Config config = new Config("hello");
>>>>> SparkConf sparkConf = config.buildSparkConfig();
>>>>> sparkConf.setJars(JavaSparkContext.jarOfClass(Driver.class));
>>>>> JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new 
>>>>> Duration(config.getSparkStremingBatchInterval()));
>>>>> ssc.sparkContext().setLogLevel("ERROR");
>>>>>
>>>>>
>>>>> NSQReceiver sparkStreamingReceiver = new NSQReceiver(config, 
>>>>> "input_test");
>>>>> JavaReceiverInputDStream<String> jsonMessagesDStream = 
>>>>> ssc.receiverStream(sparkStreamingReceiver);
>>>>>
>>>>>
>>>>> NSQProducer producer = new NSQProducer()
>>>>>         .addAddress(config.getServerConfig().getProperty("NSQD_IP"), 
>>>>> Integer.parseInt(config.getServerConfig().getProperty("NSQD_PORT")))
>>>>>         .start();
>>>>>
>>>>> jsonMessagesDStream
>>>>>         .mapToPair(new PairFunction<String, String, Integer>() {
>>>>>             @Override
>>>>>             public Tuple2<String, Integer> call(String s) throws 
>>>>> Exception {
>>>>>                 JsonParser parser = new JsonParser();
>>>>>                 JsonObject jsonObj = parser.parse(s).getAsJsonObject();
>>>>>                 if (jsonObj != null && jsonObj.has("var1") ) {
>>>>>                     JsonObject transactionObject = 
>>>>> jsonObj.get("var1").getAsJsonObject();
>>>>>                     if(transactionObject != null && 
>>>>> transactionObject.has("var2")) {
>>>>>                         String key = 
>>>>> transactionObject.get("var2").getAsString();
>>>>>                         return new Tuple2<>(key, 1);
>>>>>                     }
>>>>>                 }
>>>>>                 return new Tuple2<>("", 0);
>>>>>             }
>>>>>         }).reduceByKey(new Function2<Integer, Integer, Integer>() {
>>>>>                 @Override
>>>>>                 public Integer call(Integer v1, Integer v2) throws 
>>>>> Exception {
>>>>>                     return v1+v2;
>>>>>                 }
>>>>>         }).foreachRDD(new VoidFunction<JavaPairRDD<String, Integer>>() {
>>>>>                 @Override
>>>>>                 public void call(JavaPairRDD<String, Integer> 
>>>>> stringIntegerJavaPairRDD) throws Exception {
>>>>>                     Map<String, Integer> map = new HashMap<>();
>>>>>                     Gson gson = new Gson();
>>>>>                     stringIntegerJavaPairRDD
>>>>>                             .collect()
>>>>>                             .forEach((Tuple2<String, Integer> KV) -> {
>>>>>                                 String status = KV._1();
>>>>>                                 Integer count = KV._2();
>>>>>                                 map.put(status, count);
>>>>>                             }
>>>>>                     );
>>>>>                     NSQReceiver.send(producer, "output_777", 
>>>>> gson.toJson(map).getBytes());
>>>>>                 }
>>>>>         });
>>>>>
>>>>>
>>>>> Thanks,
>>>>>
>>>>> kant
>>>>>
>>>>>
>>>>> On Wed, Nov 30, 2016 at 2:11 PM, Marco Mistroni <mmistr...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Could you paste reproducible snippet code?
>>>>>> Kr
>>>>>>
>>>>>> On 30 Nov 2016 9:08 pm, "kant kodali" <kanth...@gmail.com> wrote:
>>>>>>
>>>>>>> I have lot of these exceptions happening
>>>>>>>
>>>>>>> java.lang.Exception: Could not compute split, block
>>>>>>> input-0-1480539568000 not found
>>>>>>>
>>>>>>>
>>>>>>> Any ideas what this could be?
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to