Vamshi,

If not try this approach :

Declare global static variable
static <var- type> inputConfStream = null;

In your parseTuple(KeyValue<String, String> tuple) method:

Add if condition 
if(inputConfStream == null){
        inputConfStream = getFS().open(new Path(getInputConfFile()));
}

Thanks,
Samba Surabhi.


> On Aug 2, 2016, at 5:48 PM, Vlad Rozov <[email protected] 
> <mailto:[email protected]>> wrote:
> 
> The problem is likely in the parser. Please check it's documentation to see 
> if parser can be reused to parse tuples. If not and you need to construct a 
> new parser for each tuple, read configuration file into a byte array and 
> construct new input stream using the byte array.
> 
> Vlad
> 
> On 8/2/16 14:05, Mukkamula, Suryavamshivardhan (CWM-NR) wrote:
>> Hi,
>>  
>> Below is the method which gets called inside the emit() method. I am using 
>> the input stream to parse each line. The highlighted inputStream I would 
>> want to create only Once. But the stream is getting closed if I create the 
>> inputStream inside setup() or beginWindow() method.
>>  
>> private KeyValue<String, String> parseTuple(KeyValue<String, String> tuple) {
>>  
>>               KeyValue<String, String> newTuple = new KeyValue<String, 
>> String>();
>>               try {
>>                      Parser parser;
>>                      inputConfStream = getFS().open(new 
>> Path(getInputConfFile()));
>>                      if (inputDelimiter != null) {
>>                            LOG.debug("parseTuple:sourceId = {},delimiter = 
>> {},inputConf = {}", getSourceId(), getInputDelimiter(),
>>                                          getInputConfFile());
>>                            parser = 
>> DefaultParserFactory.getInstance().newDelimitedParser(newInputStreamReader(inputConfStream),
>>                                          new StringReader(tuple.value), 
>> getInputDelimiter().charAt(0), '"',false);
>>                      } else {
>>                            parser = 
>> DefaultParserFactory.getInstance().newFixedLengthParser(newInputStreamReader(inputConfStream),
>>                                          new StringReader(tuple.value));
>>                      }
>>                      parser.setIgnoreExtraColumns(true);
>>                      final DataSet ds = parser.parse();
>>                      if (ds == null || ds.getRowCount() == 0) {
>>                            throw new RuntimeException("Could not parse 
>> record:" + tuple.value);
>>                      }
>>  
>>                      if (ds.next()) {
>>                            StringBuilder sb = new StringBuilder();
>>                            for (String col : ds.getColumns()) {
>>                                   LOG.debug("parseTuple: Col: {}", col);
>>                            }
>>                            List<Field> fields = outputFields.getFields();
>>                            String oldValue;
>>                            String adjustedValue;
>>                            for (Field field : fields) {
>>                                   if (field.getValue().equals("")) {
>>                                          oldValue = 
>> ds.getString(field.getName());
>>                                          adjustedValue = 
>> Helper.adjustValue(oldValue, Integer.parseInt(field.getLength()),
>>                                                        outputDelimiter, 
>> inputDelimiter);
>>                                          sb.append(adjustedValue);
>>                                   } else {
>>                                          oldValue = field.getValue();
>>                                          adjustedValue = 
>> Helper.adjustValue(oldValue, Integer.parseInt(field.getLength()),
>>                                                        outputDelimiter, 
>> inputDelimiter);
>>                                          sb.append(adjustedValue);
>>                                   }
>>                                   if (outputDelimiter != null) {
>>                                          sb.append(outputDelimiter);
>>                                   }
>>                            }
>>                            sb.append("\n");
>>                            newTuple.key = tuple.key;
>>                            newTuple.value = sb.toString();
>>                      }
>>               } catch (Exception e) {
>>                      LOG.error("parseTuple:error while parsing the sourceID 
>> : {},line:{},Error Message : {}", getSourceId(),
>>                                   tuple.value, e.getMessage());
>>                      e.printStackTrace();
>>                      return new KeyValue<String, String>(tuple.key, null);
>>               }
>>               LOG.debug("parseTuple: The old tuple is:{} ## The new tuple 
>> is:{}", tuple, newTuple);
>>               return newTuple;
>>        }
>>  
>> Regards,
>> Surya Vamshi
>>  
>> From: Mukkamula, Suryavamshivardhan (CWM-NR) 
>> [mailto:[email protected] 
>> <mailto:[email protected]>] 
>> Sent: 2016, August, 02 4:48 PM
>> To: [email protected] <mailto:[email protected]>
>> Subject: RE: Information Needed
>>  
>> Hi,
>>  
>> inputConfStream is used to parse the input line from the feed. This is used 
>> for all the lines from the feed. Not sure why the stream is getting closed?
>>  
>> Regards,
>> Surya Vamshi
>>  
>> From: Vlad Rozov [mailto:[email protected] 
>> <mailto:[email protected]>] 
>> Sent: 2016, August, 02 4:26 PM
>> To: [email protected] <mailto:[email protected]>
>> Subject: Re: Information Needed
>>  
>> Both setup() and beginWindow() should work. It will be more correct to open 
>> the configuration stream and parse the configuration file in setup() as you 
>> tried in the initial implementation as long as configuration path does not 
>> depend on window Id. Where the inputConfStream is used? Most likely it 
>> reaches EOF unexpectedly.
>> 
>> Vlad
>> 
>>  
>> On 8/2/16 12:19, Mukkamula, Suryavamshivardhan (CWM-NR) wrote:
>> Hi Team,
>>  
>> When I am trying to read input line from feed, to parse the line I am 
>> reading another configuration file from HDFS. To avoid reading the 
>> configuration file for every line I would like to read it in the 
>> beginWindow() method. But the Input stream is getting closed and operator is 
>> not holding the stream for all the tuples.
>>  
>> Can I read the input Stream Once for all the tuples? (I tried in the setup() 
>> method as well , but no luck)
>>  
>> @Override
>>           public void beginWindow(long windowId)
>>           {
>>                 super.beginWindow(windowId);
>>                 try {
>>                         inputConfStream = getFS().open(new 
>> Path(getInputConfFile()));
>>                                         } catch (Exception e) {
>>                         // TODO Auto-generated catch block
>>                         e.printStackTrace();
>>                         LOG.error("beginWindow: Error while streaming the 
>> input Configuration File = {}", getInputConfFile());
>>                 }
>>           } 
>>  
>> Regards,
>> Surya Vamshi
>>  
>> _______________________________________________________________________
>> 
>> If you received this email in error, please advise the sender (by return 
>> email or otherwise) immediately. You have consented to receive the attached 
>> electronically at the above-noted email address; please retain a copy of 
>> this confirmation for future reference.
>> 
>> Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur 
>> immédiatement, par retour de courriel ou par un autre moyen. Vous avez 
>> accepté de recevoir le(s) document(s) ci-joint(s) par voie électronique à 
>> l'adresse courriel indiquée ci-dessus; veuillez conserver une copie de cette 
>> confirmation pour les fins de reference future.
>> 
>>  
>> _______________________________________________________________________
>> 
>> If you received this email in error, please advise the sender (by return 
>> email or otherwise) immediately. You have consented to receive the attached 
>> electronically at the above-noted email address; please retain a copy of 
>> this confirmation for future reference.
>> 
>> Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur 
>> immédiatement, par retour de courriel ou par un autre moyen. Vous avez 
>> accepté de recevoir le(s) document(s) ci-joint(s) par voie électronique à 
>> l'adresse courriel indiquée ci-dessus; veuillez conserver une copie de cette 
>> confirmation pour les fins de reference future.
>> 
>> _______________________________________________________________________
>> 
>> If you received this email in error, please advise the sender (by return 
>> email or otherwise) immediately. You have consented to receive the attached 
>> electronically at the above-noted email address; please retain a copy of 
>> this confirmation for future reference.
>> 
>> Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur 
>> immédiatement, par retour de courriel ou par un autre moyen. Vous avez 
>> accepté de recevoir le(s) document(s) ci-joint(s) par voie électronique à 
>> l'adresse courriel indiquée ci-dessus; veuillez conserver une copie de cette 
>> confirmation pour les fins de reference future.
>> 
> 
> 

Reply via email to