Vamshi,
If not try this approach :
Declare global static variable
static <var- type> inputConfStream = null;
In your parseTuple(KeyValue<String, String> tuple) method:
Add if condition
if(inputConfStream == null){
inputConfStream = getFS().open(new Path(getInputConfFile()));
}
Thanks,
Samba Surabhi.
> On Aug 2, 2016, at 5:48 PM, Vlad Rozov <[email protected]
> <mailto:[email protected]>> wrote:
>
> The problem is likely in the parser. Please check it's documentation to see
> if parser can be reused to parse tuples. If not and you need to construct a
> new parser for each tuple, read configuration file into a byte array and
> construct new input stream using the byte array.
>
> Vlad
>
> On 8/2/16 14:05, Mukkamula, Suryavamshivardhan (CWM-NR) wrote:
>> Hi,
>>
>> Below is the method which gets called inside the emit() method. I am using
>> the input stream to parse each line. The highlighted inputStream I would
>> want to create only Once. But the stream is getting closed if I create the
>> inputStream inside setup() or beginWindow() method.
>>
>> private KeyValue<String, String> parseTuple(KeyValue<String, String> tuple) {
>>
>> KeyValue<String, String> newTuple = new KeyValue<String,
>> String>();
>> try {
>> Parser parser;
>> inputConfStream = getFS().open(new
>> Path(getInputConfFile()));
>> if (inputDelimiter != null) {
>> LOG.debug("parseTuple:sourceId = {},delimiter =
>> {},inputConf = {}", getSourceId(), getInputDelimiter(),
>> getInputConfFile());
>> parser =
>> DefaultParserFactory.getInstance().newDelimitedParser(newInputStreamReader(inputConfStream),
>> new StringReader(tuple.value),
>> getInputDelimiter().charAt(0), '"',false);
>> } else {
>> parser =
>> DefaultParserFactory.getInstance().newFixedLengthParser(newInputStreamReader(inputConfStream),
>> new StringReader(tuple.value));
>> }
>> parser.setIgnoreExtraColumns(true);
>> final DataSet ds = parser.parse();
>> if (ds == null || ds.getRowCount() == 0) {
>> throw new RuntimeException("Could not parse
>> record:" + tuple.value);
>> }
>>
>> if (ds.next()) {
>> StringBuilder sb = new StringBuilder();
>> for (String col : ds.getColumns()) {
>> LOG.debug("parseTuple: Col: {}", col);
>> }
>> List<Field> fields = outputFields.getFields();
>> String oldValue;
>> String adjustedValue;
>> for (Field field : fields) {
>> if (field.getValue().equals("")) {
>> oldValue =
>> ds.getString(field.getName());
>> adjustedValue =
>> Helper.adjustValue(oldValue, Integer.parseInt(field.getLength()),
>> outputDelimiter,
>> inputDelimiter);
>> sb.append(adjustedValue);
>> } else {
>> oldValue = field.getValue();
>> adjustedValue =
>> Helper.adjustValue(oldValue, Integer.parseInt(field.getLength()),
>> outputDelimiter,
>> inputDelimiter);
>> sb.append(adjustedValue);
>> }
>> if (outputDelimiter != null) {
>> sb.append(outputDelimiter);
>> }
>> }
>> sb.append("\n");
>> newTuple.key = tuple.key;
>> newTuple.value = sb.toString();
>> }
>> } catch (Exception e) {
>> LOG.error("parseTuple:error while parsing the sourceID
>> : {},line:{},Error Message : {}", getSourceId(),
>> tuple.value, e.getMessage());
>> e.printStackTrace();
>> return new KeyValue<String, String>(tuple.key, null);
>> }
>> LOG.debug("parseTuple: The old tuple is:{} ## The new tuple
>> is:{}", tuple, newTuple);
>> return newTuple;
>> }
>>
>> Regards,
>> Surya Vamshi
>>
>> From: Mukkamula, Suryavamshivardhan (CWM-NR)
>> [mailto:[email protected]
>> <mailto:[email protected]>]
>> Sent: 2016, August, 02 4:48 PM
>> To: [email protected] <mailto:[email protected]>
>> Subject: RE: Information Needed
>>
>> Hi,
>>
>> inputConfStream is used to parse the input line from the feed. This is used
>> for all the lines from the feed. Not sure why the stream is getting closed?
>>
>> Regards,
>> Surya Vamshi
>>
>> From: Vlad Rozov [mailto:[email protected]
>> <mailto:[email protected]>]
>> Sent: 2016, August, 02 4:26 PM
>> To: [email protected] <mailto:[email protected]>
>> Subject: Re: Information Needed
>>
>> Both setup() and beginWindow() should work. It will be more correct to open
>> the configuration stream and parse the configuration file in setup() as you
>> tried in the initial implementation as long as configuration path does not
>> depend on window Id. Where the inputConfStream is used? Most likely it
>> reaches EOF unexpectedly.
>>
>> Vlad
>>
>>
>> On 8/2/16 12:19, Mukkamula, Suryavamshivardhan (CWM-NR) wrote:
>> Hi Team,
>>
>> When I am trying to read input line from feed, to parse the line I am
>> reading another configuration file from HDFS. To avoid reading the
>> configuration file for every line I would like to read it in the
>> beginWindow() method. But the Input stream is getting closed and operator is
>> not holding the stream for all the tuples.
>>
>> Can I read the input Stream Once for all the tuples? (I tried in the setup()
>> method as well , but no luck)
>>
>> @Override
>> public void beginWindow(long windowId)
>> {
>> super.beginWindow(windowId);
>> try {
>> inputConfStream = getFS().open(new
>> Path(getInputConfFile()));
>> } catch (Exception e) {
>> // TODO Auto-generated catch block
>> e.printStackTrace();
>> LOG.error("beginWindow: Error while streaming the
>> input Configuration File = {}", getInputConfFile());
>> }
>> }
>>
>> Regards,
>> Surya Vamshi
>>
>> _______________________________________________________________________
>>
>> If you received this email in error, please advise the sender (by return
>> email or otherwise) immediately. You have consented to receive the attached
>> electronically at the above-noted email address; please retain a copy of
>> this confirmation for future reference.
>>
>> Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur
>> immédiatement, par retour de courriel ou par un autre moyen. Vous avez
>> accepté de recevoir le(s) document(s) ci-joint(s) par voie électronique à
>> l'adresse courriel indiquée ci-dessus; veuillez conserver une copie de cette
>> confirmation pour les fins de reference future.
>>
>>
>> _______________________________________________________________________
>>
>> If you received this email in error, please advise the sender (by return
>> email or otherwise) immediately. You have consented to receive the attached
>> electronically at the above-noted email address; please retain a copy of
>> this confirmation for future reference.
>>
>> Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur
>> immédiatement, par retour de courriel ou par un autre moyen. Vous avez
>> accepté de recevoir le(s) document(s) ci-joint(s) par voie électronique à
>> l'adresse courriel indiquée ci-dessus; veuillez conserver une copie de cette
>> confirmation pour les fins de reference future.
>>
>> _______________________________________________________________________
>>
>> If you received this email in error, please advise the sender (by return
>> email or otherwise) immediately. You have consented to receive the attached
>> electronically at the above-noted email address; please retain a copy of
>> this confirmation for future reference.
>>
>> Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur
>> immédiatement, par retour de courriel ou par un autre moyen. Vous avez
>> accepté de recevoir le(s) document(s) ci-joint(s) par voie électronique à
>> l'adresse courriel indiquée ci-dessus; veuillez conserver une copie de cette
>> confirmation pour les fins de reference future.
>>
>
>