Hi,

Below is the method which gets called inside the emit() method. I am using the 
input stream to parse each line. The highlighted inputStream I would want to 
create only Once. But the stream is getting closed if I create the inputStream 
inside setup() or beginWindow() method.

private KeyValue<String, String> parseTuple(KeyValue<String, String> tuple) {

              KeyValue<String, String> newTuple = new KeyValue<String, 
String>();
              try {
                     Parser parser;
                     inputConfStream = getFS().open(new 
Path(getInputConfFile()));
                     if (inputDelimiter != null) {
                           LOG.debug("parseTuple:sourceId = {},delimiter = 
{},inputConf = {}", getSourceId(), getInputDelimiter(),
                                         getInputConfFile());
                           parser = 
DefaultParserFactory.getInstance().newDelimitedParser(new 
InputStreamReader(inputConfStream),
                                         new StringReader(tuple.value), 
getInputDelimiter().charAt(0), '"', false);
                     } else {
                           parser = 
DefaultParserFactory.getInstance().newFixedLengthParser(new 
InputStreamReader(inputConfStream),
                                         new StringReader(tuple.value));
                     }
                     parser.setIgnoreExtraColumns(true);
                     final DataSet ds = parser.parse();
                     if (ds == null || ds.getRowCount() == 0) {
                           throw new RuntimeException("Could not parse record:" 
+ tuple.value);
                     }

                     if (ds.next()) {
                           StringBuilder sb = new StringBuilder();
                           for (String col : ds.getColumns()) {
                                  LOG.debug("parseTuple: Col: {}", col);
                           }
                           List<Field> fields = outputFields.getFields();
                           String oldValue;
                           String adjustedValue;
                           for (Field field : fields) {
                                  if (field.getValue().equals("")) {
                                         oldValue = 
ds.getString(field.getName());
                                         adjustedValue = 
Helper.adjustValue(oldValue, Integer.parseInt(field.getLength()),
                                                       outputDelimiter, 
inputDelimiter);
                                         sb.append(adjustedValue);
                                  } else {
                                         oldValue = field.getValue();
                                         adjustedValue = 
Helper.adjustValue(oldValue, Integer.parseInt(field.getLength()),
                                                       outputDelimiter, 
inputDelimiter);
                                         sb.append(adjustedValue);
                                  }
                                  if (outputDelimiter != null) {
                                         sb.append(outputDelimiter);
                                  }
                           }
                           sb.append("\n");
                           newTuple.key = tuple.key;
                           newTuple.value = sb.toString();
                     }
              } catch (Exception e) {
                     LOG.error("parseTuple:error while parsing the sourceID : 
{},line:{},Error Message : {}", getSourceId(),
                                  tuple.value, e.getMessage());
                     e.printStackTrace();
                     return new KeyValue<String, String>(tuple.key, null);
              }
              LOG.debug("parseTuple: The old tuple is:{} ## The new tuple 
is:{}", tuple, newTuple);
              return newTuple;
       }

Regards,
Surya Vamshi

From: Mukkamula, Suryavamshivardhan (CWM-NR) 
[mailto:[email protected]]
Sent: 2016, August, 02 4:48 PM
To: [email protected]
Subject: RE: Information Needed

Hi,

inputConfStream is used to parse the input line from the feed. This is used for 
all the lines from the feed. Not sure why the stream is getting closed?

Regards,
Surya Vamshi

From: Vlad Rozov [mailto:[email protected]]
Sent: 2016, August, 02 4:26 PM
To: [email protected]<mailto:[email protected]>
Subject: Re: Information Needed

Both setup() and beginWindow() should work. It will be more correct to open the 
configuration stream and parse the configuration file in setup() as you tried 
in the initial implementation as long as configuration path does not depend on 
window Id. Where the inputConfStream is used? Most likely it reaches EOF 
unexpectedly.

Vlad


On 8/2/16 12:19, Mukkamula, Suryavamshivardhan (CWM-NR) wrote:
Hi Team,

When I am trying to read input line from feed, to parse the line I am reading 
another configuration file from HDFS. To avoid reading the configuration file 
for every line I would like to read it in the beginWindow() method. But the 
Input stream is getting closed and operator is not holding the stream for all 
the tuples.

Can I read the input Stream Once for all the tuples? (I tried in the setup() 
method as well , but no luck)

@Override
          public void beginWindow(long windowId)
          {
                super.beginWindow(windowId);
                try {
                        inputConfStream = getFS().open(new 
Path(getInputConfFile()));
                                        } catch (Exception e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                        LOG.error("beginWindow: Error while streaming the input 
Configuration File = {}", getInputConfFile());
                }
          }

Regards,
Surya Vamshi


_______________________________________________________________________

If you received this email in error, please advise the sender (by return email 
or otherwise) immediately. You have consented to receive the attached 
electronically at the above-noted email address; please retain a copy of this 
confirmation for future reference.

Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur 
immédiatement, par retour de courriel ou par un autre moyen. Vous avez accepté 
de recevoir le(s) document(s) ci-joint(s) par voie électronique à l'adresse 
courriel indiquée ci-dessus; veuillez conserver une copie de cette confirmation 
pour les fins de reference future.


_______________________________________________________________________

If you received this email in error, please advise the sender (by return email 
or otherwise) immediately. You have consented to receive the attached 
electronically at the above-noted email address; please retain a copy of this 
confirmation for future reference.

Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur 
immédiatement, par retour de courriel ou par un autre moyen. Vous avez accepté 
de recevoir le(s) document(s) ci-joint(s) par voie électronique à l'adresse 
courriel indiquée ci-dessus; veuillez conserver une copie de cette confirmation 
pour les fins de reference future.
_______________________________________________________________________
If you received this email in error, please advise the sender (by return email 
or otherwise) immediately. You have consented to receive the attached 
electronically at the above-noted email address; please retain a copy of this 
confirmation for future reference.  

Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur 
immédiatement, par retour de courriel ou par un autre moyen. Vous avez accepté 
de recevoir le(s) document(s) ci-joint(s) par voie électronique à l'adresse 
courriel indiquée ci-dessus; veuillez conserver une copie de cette confirmation 
pour les fins de reference future.

Reply via email to