The problem is likely in the parser. Please check it's documentation to see if parser can be reused to parse tuples. If not and you need to construct a new parser for each tuple, read configuration file into a byte array and construct new input stream using the byte array.

Vlad

On 8/2/16 14:05, Mukkamula, Suryavamshivardhan (CWM-NR) wrote:

Hi,

Below is the method which gets called inside the emit() method. I am using the input stream to parse each line. The highlighted inputStream I would want to create only Once. But the stream is getting closed if I create the inputStream inside setup() or beginWindow() method.

*private*KeyValue<String, String> parseTuple(KeyValue<String, String> tuple) {

KeyValue<String, String> newTuple= *new*KeyValue<String, String>();

*try*{

Parser parser;

inputConfStream= getFS().open(*new*Path(getInputConfFile()));

*if*(inputDelimiter!= *null*) {

*/LOG/*.debug("parseTuple:sourceId = {},delimiter = {},inputConf = {}", getSourceId(), getInputDelimiter(),

getInputConfFile());

parser= DefaultParserFactory./getInstance/().newDelimitedParser(*new*InputStreamReader(inputConfStream),

*new*StringReader(tuple.value), getInputDelimiter().charAt(0), '"', *false*);

} *else*{

parser= DefaultParserFactory./getInstance/().newFixedLengthParser(*new*InputStreamReader(inputConfStream),

*new*StringReader(tuple.value));

}

parser.setIgnoreExtraColumns(*true*);

*final*DataSet ds= parser.parse();

*if*(ds== *null*|| ds.getRowCount() == 0) {

*throw**new*RuntimeException("Could not parse record:"+ tuple.value);

}

*if*(ds.next()) {

StringBuilder sb= *new*StringBuilder();

*for*(String col: ds.getColumns()) {

*/LOG/*.debug("parseTuple: Col: {}", col);

}

List<Field> fields= outputFields.getFields();

String oldValue;

String adjustedValue;

*for*(Field field: fields) {

*if*(field.getValue().equals("")) {

oldValue= ds.getString(field.getName());

adjustedValue= Helper./adjustValue/(oldValue, Integer./parseInt/(field.getLength()),

outputDelimiter, inputDelimiter);

sb.append(adjustedValue);

} *else*{

oldValue= field.getValue();

adjustedValue= Helper./adjustValue/(oldValue, Integer./parseInt/(field.getLength()),

outputDelimiter, inputDelimiter);

sb.append(adjustedValue);

}

*if*(outputDelimiter!= *null*) {

sb.append(outputDelimiter);

}

}

sb.append("\n");

newTuple.key= tuple.key;

newTuple.value= sb.toString();

}

} *catch*(Exception e) {

*/LOG/*.error("parseTuple:error while parsing the sourceID : {},line:{},Error Message : {}", getSourceId(),

tuple.value, e.getMessage());

e.printStackTrace();

*return**new*KeyValue<String, String>(tuple.key, *null*);

}

*/LOG/*.debug("parseTuple: The old tuple is:{} ## The new tuple is:{}", tuple, newTuple);

*return*newTuple;

       }

Regards,

Surya Vamshi

*From:*Mukkamula, Suryavamshivardhan (CWM-NR) [mailto:suryavamshivardhan.mukkam...@rbc.com]
*Sent:* 2016, August, 02 4:48 PM
*To:* users@apex.apache.org
*Subject:* RE: Information Needed

Hi,

inputConfStream is used to parse the input line from the feed. This is used for all the lines from the feed. Not sure why the stream is getting closed?

Regards,

Surya Vamshi

*From:*Vlad Rozov [mailto:v.ro...@datatorrent.com]
*Sent:* 2016, August, 02 4:26 PM
*To:* users@apex.apache.org <mailto:users@apex.apache.org>
*Subject:* Re: Information Needed

Both setup() and beginWindow() should work. It will be more correct to open the configuration stream and parse the configuration file in setup() as you tried in the initial implementation as long as configuration path does not depend on window Id. Where the inputConfStream is used? Most likely it reaches EOF unexpectedly.

Vlad

On 8/2/16 12:19, Mukkamula, Suryavamshivardhan (CWM-NR) wrote:

    Hi Team,

    When I am trying to read input line from feed, to parse the line I
    am reading another configuration file from HDFS. To avoid reading
    the configuration file for every line I would like to read it in
    the beginWindow() method. But the Input stream is getting closed
    and operator is not holding the stream for all the tuples.

    Can I read the input Stream Once for all the tuples? (I tried in
    the setup() method as well , but no luck)

    @Override

      public void beginWindow(long windowId)

      {

    super.beginWindow(windowId);

    try {

    inputConfStream = getFS().open(new Path(getInputConfFile()));

    } catch (Exception e) {

    // TODO Auto-generated catch block

    e.printStackTrace();

    LOG.error("beginWindow: Error while streaming the input
    Configuration File = {}", getInputConfFile());

    }

      }

    Regards,

    Surya Vamshi

    _______________________________________________________________________

    If you received this email in error, please advise the sender (by
    return email or otherwise) immediately. You have consented to
    receive the attached electronically at the above-noted email
    address; please retain a copy of this confirmation for future
    reference.

    Si vous recevez ce courriel par erreur, veuillez en aviser
    l'expéditeur immédiatement, par retour de courriel ou par un autre
    moyen. Vous avez accepté de recevoir le(s) document(s) ci-joint(s)
    par voie électronique à l'adresse courriel indiquée ci-dessus;
    veuillez conserver une copie de cette confirmation pour les fins
    de reference future.

_______________________________________________________________________

If you received this email in error, please advise the sender (by return email or otherwise) immediately. You have consented to receive the attached electronically at the above-noted email address; please retain a copy of this confirmation for future reference.

Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur immédiatement, par retour de courriel ou par un autre moyen. Vous avez accepté de recevoir le(s) document(s) ci-joint(s) par voie électronique à l'adresse courriel indiquée ci-dessus; veuillez conserver une copie de cette confirmation pour les fins de reference future.

_______________________________________________________________________

If you received this email in error, please advise the sender (by return email or otherwise) immediately. You have consented to receive the attached electronically at the above-noted email address; please retain a copy of this confirmation for future reference.

Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur immédiatement, par retour de courriel ou par un autre moyen. Vous avez accepté de recevoir le(s) document(s) ci-joint(s) par voie électronique à l'adresse courriel indiquée ci-dessus; veuillez conserver une copie de cette confirmation pour les fins de reference future.


Reply via email to