Hi,
Below is the method which gets called inside the emit() method. I am using the
input stream to parse each line. The highlighted inputStream I would want to
create only Once. But the stream is getting closed if I create the inputStream
inside setup() or beginWindow() method.
private KeyValue<String, String> parseTuple(KeyValue<String, String> tuple) {
KeyValue<String, String> newTuple = new KeyValue<String,
String>();
try {
Parser parser;
inputConfStream = getFS().open(new
Path(getInputConfFile()));
if (inputDelimiter != null) {
LOG.debug("parseTuple:sourceId = {},delimiter =
{},inputConf = {}", getSourceId(), getInputDelimiter(),
getInputConfFile());
parser =
DefaultParserFactory.getInstance().newDelimitedParser(new
InputStreamReader(inputConfStream),
new StringReader(tuple.value),
getInputDelimiter().charAt(0), '"', false);
} else {
parser =
DefaultParserFactory.getInstance().newFixedLengthParser(new
InputStreamReader(inputConfStream),
new StringReader(tuple.value));
}
parser.setIgnoreExtraColumns(true);
final DataSet ds = parser.parse();
if (ds == null || ds.getRowCount() == 0) {
throw new RuntimeException("Could not parse record:"
+ tuple.value);
}
if (ds.next()) {
StringBuilder sb = new StringBuilder();
for (String col : ds.getColumns()) {
LOG.debug("parseTuple: Col: {}", col);
}
List<Field> fields = outputFields.getFields();
String oldValue;
String adjustedValue;
for (Field field : fields) {
if (field.getValue().equals("")) {
oldValue =
ds.getString(field.getName());
adjustedValue =
Helper.adjustValue(oldValue, Integer.parseInt(field.getLength()),
outputDelimiter,
inputDelimiter);
sb.append(adjustedValue);
} else {
oldValue = field.getValue();
adjustedValue =
Helper.adjustValue(oldValue, Integer.parseInt(field.getLength()),
outputDelimiter,
inputDelimiter);
sb.append(adjustedValue);
}
if (outputDelimiter != null) {
sb.append(outputDelimiter);
}
}
sb.append("\n");
newTuple.key = tuple.key;
newTuple.value = sb.toString();
}
} catch (Exception e) {
LOG.error("parseTuple:error while parsing the sourceID :
{},line:{},Error Message : {}", getSourceId(),
tuple.value, e.getMessage());
e.printStackTrace();
return new KeyValue<String, String>(tuple.key, null);
}
LOG.debug("parseTuple: The old tuple is:{} ## The new tuple
is:{}", tuple, newTuple);
return newTuple;
}
Regards,
Surya Vamshi
From: Mukkamula, Suryavamshivardhan (CWM-NR)
[mailto:[email protected]]
Sent: 2016, August, 02 4:48 PM
To: [email protected]
Subject: RE: Information Needed
Hi,
inputConfStream is used to parse the input line from the feed. This is used for
all the lines from the feed. Not sure why the stream is getting closed?
Regards,
Surya Vamshi
From: Vlad Rozov [mailto:[email protected]]
Sent: 2016, August, 02 4:26 PM
To: [email protected]<mailto:[email protected]>
Subject: Re: Information Needed
Both setup() and beginWindow() should work. It will be more correct to open the
configuration stream and parse the configuration file in setup() as you tried
in the initial implementation as long as configuration path does not depend on
window Id. Where the inputConfStream is used? Most likely it reaches EOF
unexpectedly.
Vlad
On 8/2/16 12:19, Mukkamula, Suryavamshivardhan (CWM-NR) wrote:
Hi Team,
When I am trying to read input line from feed, to parse the line I am reading
another configuration file from HDFS. To avoid reading the configuration file
for every line I would like to read it in the beginWindow() method. But the
Input stream is getting closed and operator is not holding the stream for all
the tuples.
Can I read the input Stream Once for all the tuples? (I tried in the setup()
method as well , but no luck)
@Override
public void beginWindow(long windowId)
{
super.beginWindow(windowId);
try {
inputConfStream = getFS().open(new
Path(getInputConfFile()));
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
LOG.error("beginWindow: Error while streaming the input
Configuration File = {}", getInputConfFile());
}
}
Regards,
Surya Vamshi
_______________________________________________________________________
If you received this email in error, please advise the sender (by return email
or otherwise) immediately. You have consented to receive the attached
electronically at the above-noted email address; please retain a copy of this
confirmation for future reference.
Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur
immédiatement, par retour de courriel ou par un autre moyen. Vous avez accepté
de recevoir le(s) document(s) ci-joint(s) par voie électronique à l'adresse
courriel indiquée ci-dessus; veuillez conserver une copie de cette confirmation
pour les fins de reference future.
_______________________________________________________________________
If you received this email in error, please advise the sender (by return email
or otherwise) immediately. You have consented to receive the attached
electronically at the above-noted email address; please retain a copy of this
confirmation for future reference.
Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur
immédiatement, par retour de courriel ou par un autre moyen. Vous avez accepté
de recevoir le(s) document(s) ci-joint(s) par voie électronique à l'adresse
courriel indiquée ci-dessus; veuillez conserver une copie de cette confirmation
pour les fins de reference future.
_______________________________________________________________________
If you received this email in error, please advise the sender (by return email
or otherwise) immediately. You have consented to receive the attached
electronically at the above-noted email address; please retain a copy of this
confirmation for future reference.
Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur
immédiatement, par retour de courriel ou par un autre moyen. Vous avez accepté
de recevoir le(s) document(s) ci-joint(s) par voie électronique à l'adresse
courriel indiquée ci-dessus; veuillez conserver une copie de cette confirmation
pour les fins de reference future.