The problem is likely in the parser. Please check it's documentation to
see if parser can be reused to parse tuples. If not and you need to
construct a new parser for each tuple, read configuration file into a
byte array and construct new input stream using the byte array.
Vlad
On 8/2/16 14:05, Mukkamula, Suryavamshivardhan (CWM-NR) wrote:
Hi,
Below is the method which gets called inside the emit() method. I am
using the input stream to parse each line. The highlighted inputStream
I would want to create only Once. But the stream is getting closed if
I create the inputStream inside setup() or beginWindow() method.
*private*KeyValue<String, String> parseTuple(KeyValue<String, String>
tuple) {
KeyValue<String, String> newTuple= *new*KeyValue<String, String>();
*try*{
Parser parser;
inputConfStream= getFS().open(*new*Path(getInputConfFile()));
*if*(inputDelimiter!= *null*) {
*/LOG/*.debug("parseTuple:sourceId = {},delimiter = {},inputConf =
{}", getSourceId(), getInputDelimiter(),
getInputConfFile());
parser=
DefaultParserFactory./getInstance/().newDelimitedParser(*new*InputStreamReader(inputConfStream),
*new*StringReader(tuple.value), getInputDelimiter().charAt(0), '"',
*false*);
} *else*{
parser=
DefaultParserFactory./getInstance/().newFixedLengthParser(*new*InputStreamReader(inputConfStream),
*new*StringReader(tuple.value));
}
parser.setIgnoreExtraColumns(*true*);
*final*DataSet ds= parser.parse();
*if*(ds== *null*|| ds.getRowCount() == 0) {
*throw**new*RuntimeException("Could not parse record:"+ tuple.value);
}
*if*(ds.next()) {
StringBuilder sb= *new*StringBuilder();
*for*(String col: ds.getColumns()) {
*/LOG/*.debug("parseTuple: Col: {}", col);
}
List<Field> fields= outputFields.getFields();
String oldValue;
String adjustedValue;
*for*(Field field: fields) {
*if*(field.getValue().equals("")) {
oldValue= ds.getString(field.getName());
adjustedValue= Helper./adjustValue/(oldValue,
Integer./parseInt/(field.getLength()),
outputDelimiter, inputDelimiter);
sb.append(adjustedValue);
} *else*{
oldValue= field.getValue();
adjustedValue= Helper./adjustValue/(oldValue,
Integer./parseInt/(field.getLength()),
outputDelimiter, inputDelimiter);
sb.append(adjustedValue);
}
*if*(outputDelimiter!= *null*) {
sb.append(outputDelimiter);
}
}
sb.append("\n");
newTuple.key= tuple.key;
newTuple.value= sb.toString();
}
} *catch*(Exception e) {
*/LOG/*.error("parseTuple:error while parsing the sourceID :
{},line:{},Error Message : {}", getSourceId(),
tuple.value, e.getMessage());
e.printStackTrace();
*return**new*KeyValue<String, String>(tuple.key, *null*);
}
*/LOG/*.debug("parseTuple: The old tuple is:{} ## The new tuple
is:{}", tuple, newTuple);
*return*newTuple;
}
Regards,
Surya Vamshi
*From:*Mukkamula, Suryavamshivardhan (CWM-NR)
[mailto:suryavamshivardhan.mukkam...@rbc.com]
*Sent:* 2016, August, 02 4:48 PM
*To:* users@apex.apache.org
*Subject:* RE: Information Needed
Hi,
inputConfStream is used to parse the input line from the feed. This is
used for all the lines from the feed. Not sure why the stream is
getting closed?
Regards,
Surya Vamshi
*From:*Vlad Rozov [mailto:v.ro...@datatorrent.com]
*Sent:* 2016, August, 02 4:26 PM
*To:* users@apex.apache.org <mailto:users@apex.apache.org>
*Subject:* Re: Information Needed
Both setup() and beginWindow() should work. It will be more correct to
open the configuration stream and parse the configuration file in
setup() as you tried in the initial implementation as long as
configuration path does not depend on window Id. Where the
inputConfStream is used? Most likely it reaches EOF unexpectedly.
Vlad
On 8/2/16 12:19, Mukkamula, Suryavamshivardhan (CWM-NR) wrote:
Hi Team,
When I am trying to read input line from feed, to parse the line I
am reading another configuration file from HDFS. To avoid reading
the configuration file for every line I would like to read it in
the beginWindow() method. But the Input stream is getting closed
and operator is not holding the stream for all the tuples.
Can I read the input Stream Once for all the tuples? (I tried in
the setup() method as well , but no luck)
@Override
public void beginWindow(long windowId)
{
super.beginWindow(windowId);
try {
inputConfStream = getFS().open(new Path(getInputConfFile()));
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
LOG.error("beginWindow: Error while streaming the input
Configuration File = {}", getInputConfFile());
}
}
Regards,
Surya Vamshi
_______________________________________________________________________
If you received this email in error, please advise the sender (by
return email or otherwise) immediately. You have consented to
receive the attached electronically at the above-noted email
address; please retain a copy of this confirmation for future
reference.
Si vous recevez ce courriel par erreur, veuillez en aviser
l'expéditeur immédiatement, par retour de courriel ou par un autre
moyen. Vous avez accepté de recevoir le(s) document(s) ci-joint(s)
par voie électronique à l'adresse courriel indiquée ci-dessus;
veuillez conserver une copie de cette confirmation pour les fins
de reference future.
_______________________________________________________________________
If you received this email in error, please advise the sender (by
return email or otherwise) immediately. You have consented to receive
the attached electronically at the above-noted email address; please
retain a copy of this confirmation for future reference.
Si vous recevez ce courriel par erreur, veuillez en aviser
l'expéditeur immédiatement, par retour de courriel ou par un autre
moyen. Vous avez accepté de recevoir le(s) document(s) ci-joint(s) par
voie électronique à l'adresse courriel indiquée ci-dessus; veuillez
conserver une copie de cette confirmation pour les fins de reference
future.
_______________________________________________________________________
If you received this email in error, please advise the sender (by
return email or otherwise) immediately. You have consented to receive
the attached electronically at the above-noted email address; please
retain a copy of this confirmation for future reference.
Si vous recevez ce courriel par erreur, veuillez en aviser
l'expéditeur immédiatement, par retour de courriel ou par un autre
moyen. Vous avez accepté de recevoir le(s) document(s) ci-joint(s) par
voie électronique à l'adresse courriel indiquée ci-dessus; veuillez
conserver une copie de cette confirmation pour les fins de reference
future.