My batch interval is 1s
slide interval is 1s
window interval is 1 minute

I am using a standalone alone cluster. I don't have any storage layer like
HDFS.  so I dont know what is a connection between RDD and blocks (I know
that for every batch one RDD is produced)? what is a block in this context?
is it a disk block ? if so, what is it default size? and Finally, why does
the following error happens so often?

java.lang.Exception: Could not compute split, block input-0-1480539568000
not found



On Thu, Dec 1, 2016 at 12:42 AM, kant kodali <kanth...@gmail.com> wrote:

> I also use this super(StorageLevel.MEMORY_AND_DISK_2());
>
> inside my receiver
>
> On Wed, Nov 30, 2016 at 10:44 PM, kant kodali <kanth...@gmail.com> wrote:
>
>> Here is another transformation that might cause the error but it has to
>> be one of these two since I only have two transformations
>>
>> jsonMessagesDStream
>>         .window(new Duration(60000), new Duration(1000))
>>         .mapToPair(new PairFunction<String, String, Long>() {
>>             @Override
>>             public Tuple2<String, Long> call(String s) throws Exception {
>>                 //System.out.println(s + " *****************************");
>>                 JsonParser parser = new JsonParser();
>>                 JsonObject jsonObj = parser.parse(s).getAsJsonObject();
>>
>>                 if (jsonObj != null && jsonObj.has("var1")) {
>>                     JsonObject jsonObject = 
>> jsonObj.get("var1").getAsJsonObject();
>>                     if (jsonObject != null && jsonObject.has("var2") && 
>> jsonObject.get("var2").getAsBoolean() && jsonObject.has("var3") ) {
>>                         long num = jsonObject.get("var3").getAsLong();
>>
>>                         return new Tuple2<String, Long>("var3", num);
>>                     }
>>                 }
>>
>>                 return new Tuple2<String, Long>("var3", 0L);
>>             }
>>         }).reduceByKey(new Function2<Long, Long, Long>() {
>>             @Override
>>             public Long call(Long v1, Long v2) throws Exception {
>>                 return v1+v2;
>>          }
>>         }).foreachRDD(new VoidFunction<JavaPairRDD<String, Long>>() {
>>             @Override
>>             public void call(JavaPairRDD<String, Long> 
>> stringIntegerJavaPairRDD) throws Exception {
>>                 Map<String, Long> map = new HashMap<>();
>>                 Gson gson = new Gson();
>>                 stringIntegerJavaPairRDD
>>                     .collect()
>>                     .forEach((Tuple2<String, Long> KV) -> {
>>                             String status = KV._1();
>>                             Long count = KV._2();
>>                             map.put(status, count);
>>                         }
>>                     );
>>                 NSQReceiver.send(producer, "dashboard", 
>> gson.toJson(map).getBytes());
>>             }
>>         });
>>
>>
>> On Wed, Nov 30, 2016 at 10:40 PM, kant kodali <kanth...@gmail.com> wrote:
>>
>>> Hi Marco,
>>>
>>>
>>> Here is what my code looks like
>>>
>>> Config config = new Config("hello");
>>> SparkConf sparkConf = config.buildSparkConfig();
>>> sparkConf.setJars(JavaSparkContext.jarOfClass(Driver.class));
>>> JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new 
>>> Duration(config.getSparkStremingBatchInterval()));
>>> ssc.sparkContext().setLogLevel("ERROR");
>>>
>>>
>>> NSQReceiver sparkStreamingReceiver = new NSQReceiver(config, "input_test");
>>> JavaReceiverInputDStream<String> jsonMessagesDStream = 
>>> ssc.receiverStream(sparkStreamingReceiver);
>>>
>>>
>>> NSQProducer producer = new NSQProducer()
>>>         .addAddress(config.getServerConfig().getProperty("NSQD_IP"), 
>>> Integer.parseInt(config.getServerConfig().getProperty("NSQD_PORT")))
>>>         .start();
>>>
>>> jsonMessagesDStream
>>>         .mapToPair(new PairFunction<String, String, Integer>() {
>>>             @Override
>>>             public Tuple2<String, Integer> call(String s) throws Exception {
>>>                 JsonParser parser = new JsonParser();
>>>                 JsonObject jsonObj = parser.parse(s).getAsJsonObject();
>>>                 if (jsonObj != null && jsonObj.has("var1") ) {
>>>                     JsonObject transactionObject = 
>>> jsonObj.get("var1").getAsJsonObject();
>>>                     if(transactionObject != null && 
>>> transactionObject.has("var2")) {
>>>                         String key = 
>>> transactionObject.get("var2").getAsString();
>>>                         return new Tuple2<>(key, 1);
>>>                     }
>>>                 }
>>>                 return new Tuple2<>("", 0);
>>>             }
>>>         }).reduceByKey(new Function2<Integer, Integer, Integer>() {
>>>                 @Override
>>>                 public Integer call(Integer v1, Integer v2) throws 
>>> Exception {
>>>                     return v1+v2;
>>>                 }
>>>         }).foreachRDD(new VoidFunction<JavaPairRDD<String, Integer>>() {
>>>                 @Override
>>>                 public void call(JavaPairRDD<String, Integer> 
>>> stringIntegerJavaPairRDD) throws Exception {
>>>                     Map<String, Integer> map = new HashMap<>();
>>>                     Gson gson = new Gson();
>>>                     stringIntegerJavaPairRDD
>>>                             .collect()
>>>                             .forEach((Tuple2<String, Integer> KV) -> {
>>>                                 String status = KV._1();
>>>                                 Integer count = KV._2();
>>>                                 map.put(status, count);
>>>                             }
>>>                     );
>>>                     NSQReceiver.send(producer, "output_777", 
>>> gson.toJson(map).getBytes());
>>>                 }
>>>         });
>>>
>>>
>>> Thanks,
>>>
>>> kant
>>>
>>>
>>> On Wed, Nov 30, 2016 at 2:11 PM, Marco Mistroni <mmistr...@gmail.com>
>>> wrote:
>>>
>>>> Could you paste reproducible snippet code?
>>>> Kr
>>>>
>>>> On 30 Nov 2016 9:08 pm, "kant kodali" <kanth...@gmail.com> wrote:
>>>>
>>>>> I have lot of these exceptions happening
>>>>>
>>>>> java.lang.Exception: Could not compute split, block
>>>>> input-0-1480539568000 not found
>>>>>
>>>>>
>>>>> Any ideas what this could be?
>>>>>
>>>>
>>>
>>
>

Reply via email to