Hello together,
I need your help because I won't get the iterative file reader working.
The probelm is still the cumulativ arriving of the data in the
downstream activies. I mean if I divide my file i.e. into parts of size
50, the downstream
activities get first 50 then 100, 150, ... elements.
The output ports were declared as shown in:
http://code.google.com/p/taverna-plugins/source/browse/trunk/sequencefile/sequencefile-activity/src/main/java/net/sf/taverna/t2/activities/sequencefile/SequenceFileActivity.java
The activity classes are attached.
I will be very grateful if someone could help me.
Regards,
Andreas
Andreas Truszkowski schrieb:
Hi,
David Withers schrieb:
Have you set the _granular_ depth of your output port to be less that the depth?
That is my output port declaration:
addOutput(this.RESULT_PORTS[0], 1, 0);
But there is a second port without defining a granular depth because it
should only be updated at the end of the task. Could it be the problem?
addOutput(this.COMMENT_PORT, 1);
Regards,
Andreas
On 8 Jul 2010, at 17:29, Andreas Truszkowski wrote:
Hello David,
Thanks for your help so far. But I have problem with the activity
behaviour. The data read from the iterative reading activity arrives at
the downstream activities accumulative.
I mean if I divide my file i.e. into parts of size 50 the downstream
activities get then first 50 then 100, 150, ... elements. Have you an
idea what is the mistake I have made?
Furthermore the complete workbench freezes after a while.
Regards
Andreas
David Withers schrieb:
On 06/07/2010 15:21, Andreas Truszkowski wrote:
David Withers schrieb:
On 06/07/2010 14:41, Andreas Truszkowski wrote:
David Withers schrieb:
On 05/07/2010 18:33, Andreas Truszkowski wrote:
Hi!
I have to write an iterative filereader activitie. Is there
any documentation about writing iterative activies or can
anyone give me a short description?
Hi Andreas,
There is a tutorial for writing activities here:
http://www.mygrid.org.uk/dev/wiki/display/developer/Tutorial+-+Service+invocation+plugin
David.
Hi David,
writing an activitie is not the problem. I need to develope an
activitie which is able to read i.e. a big file iterative.
However that is not described in the wiki documentation.
Hi Andreas,
I don't understand what you mean by iterative in this context. What
type of file do you want to read? Do you need to process the file
stream as it's being read?
David.
The file should be read in small pieces so that every piece is
processed separatly within the underlying workflow. If my file ist
several GBs big or maybe a database it is not possible to load the
data into the memory at once. I have read somewhere that the API of
taverna is be able to do so but I can't find any longer the source.
Your activity needs to read a file stream and output a list where each
element of the list is part of the file. There's an example of an
activity that does this at
http://code.google.com/p/taverna-plugins/source/browse/trunk/sequencefile/sequencefile-activity/src/main/java/net/sf/taverna/t2/activities/sequencefile/SequenceFileActivity.java
If you look at lines 126 to 132 of the file, callback.receiveResult is
called for each part of the input file. (Note that the complete list is
also output at line 140).
You also need to specify that the activity output port has a granular
depth of 0 (see line 161).
David.
------------------------------------------------------------------------------
This SF.net email is sponsored by Sprint
What will you do first with EVO, the first 4G phone?
Visit sprint.com/first -- http://p.sf.net/sfu/sprint-com-first
_______________________________________________
taverna-hackers mailing list
[email protected]
Web site: http://www.taverna.org.uk
Mailing lists: http://www.taverna.org.uk/about/contact-us/
Developers Guide: http://www.taverna.org.uk/developers/
------------------------------------------------------------------------------
This SF.net email is sponsored by Sprint
What will you do first with EVO, the first 4G phone?
Visit sprint.com/first -- http://p.sf.net/sfu/sprint-com-first
_______________________________________________
taverna-hackers mailing list
[email protected]
Web site: http://www.taverna.org.uk
Mailing lists: http://www.taverna.org.uk/about/contact-us/
Developers Guide: http://www.taverna.org.uk/developers/
/*
* Copyright (C) 2010 by Andreas Truszkowski <[email protected]>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public License
* as published by the Free Software Foundation; either version 2.1
* of the License, or (at your option) any later version.
* All we ask is that proper credit is given for our work, which includes
* - but is not limited to - adding the above copyright notice to the beginning
* of your source code files, and to any copyright notice that you may
distribute
* with programs based on this work.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*/
package org.openscience.cdk.applications.taverna;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import net.sf.taverna.t2.invocation.InvocationContext;
import net.sf.taverna.t2.reference.ReferenceService;
import net.sf.taverna.t2.reference.T2Reference;
import
net.sf.taverna.t2.workflowmodel.processor.activity.AbstractAsynchronousActivity;
import
net.sf.taverna.t2.workflowmodel.processor.activity.ActivityConfigurationException;
import net.sf.taverna.t2.workflowmodel.processor.activity.AsynchronousActivity;
import
net.sf.taverna.t2.workflowmodel.processor.activity.AsynchronousActivityCallback;
/**
* Abstract class to describe an Taverna 2 activity.
*
* @author Andreas Truszkowski
*
*/
public abstract class AbstractCDKActivity extends
AbstractAsynchronousActivity<CDKActivityConfigurationBean> implements
AsynchronousActivity<CDKActivityConfigurationBean> {
protected String[] INPUT_PORTS;
protected String[] RESULT_PORTS;
protected final String COMMENT_PORT = "Comment";
protected List<String> comment = new ArrayList<String>();
private CDKActivityConfigurationBean configBean;
public AbstractCDKActivity() {
// empty
}
@Override
public void configure(CDKActivityConfigurationBean configBean) throws
ActivityConfigurationException {
// Any pre-config sanity checks
if (configBean.getActivityName().equals("invalidExample")) {
throw new ActivityConfigurationException("Example
string can't be 'invalidExample'");
}
this.configBean = configBean;
configurePorts();
}
/**
* Do port configuration.
*/
protected void configurePorts() {
// In case we are being reconfigured - remove existing ports
first
// to avoid duplicates
removeInputs();
removeOutputs();
this.addInputPorts();
this.addOutputPorts();
// Add always comment port
this.addOutput(this.COMMENT_PORT, 1);
}
/**
* Use command addInput() here to add and configure input ports.
*/
protected abstract void addInputPorts();
/**
* Use command addOutput() here to add and configure output ports.
*/
protected abstract void addOutputPorts();
@Override
public void executeAsynch(final Map<String, T2Reference> inputs, final
AsynchronousActivityCallback callback) {
// Don't execute service directly now, request to be run ask to
be run
// from thread pool and return asynchronously
callback.requestRun(new Runnable() {
public void run() {
// Do work
Map<String, T2Reference> outputs = null;
InvocationContext context =
callback.getContext();
ReferenceService referenceService =
context.getReferenceService();
try {
outputs =
AbstractCDKActivity.this.work(inputs, callback);
} catch (CDKTavernaException e) {
e.printStackTrace();
comment.add(e.getMessage());
callback.fail(e.getMessage());
}
if (outputs == null) {
outputs = new HashMap<String,
T2Reference>();
}
T2Reference containerRef =
referenceService.register(AbstractCDKActivity.this.comment, 1, true, context);
outputs.put(AbstractCDKActivity.this.COMMENT_PORT, containerRef);
// return map of output data, with empty index
array as this is
// the only and final result (this index
parameter is used if
// pipelining output)
callback.receiveResult(outputs, new int[0]);
}
});
}
/**
* Abstract method with does the work.
*/
public abstract Map<String, T2Reference> work(final Map<String,
T2Reference> inputs,
final AsynchronousActivityCallback callback) throws
CDKTavernaException;
@Override
public CDKActivityConfigurationBean getConfiguration() {
return this.configBean;
}
/**
* @return The name of the activity.
*/
public abstract String getActivityName();
/**
* @return The folder of the activity.
*/
public abstract String getFolderName();
/**
* @return The description of the activity.
*/
public abstract String getDescription();
/**
* @return Additional properties.
*/
public abstract HashMap<String, Object> getAdditionalProperties();
/**
* @return Input port names/identifiers.
*/
public String[] getINPUT_PORTS() {
return INPUT_PORTS;
}
/**
* @return Result port names/identifiers.
*/
public String[] getRESULT_PORTS() {
return RESULT_PORTS;
}
/**
* @return Comment port name/identifier.
*/
public String getCOMMENT_PORT() {
return COMMENT_PORT;
}
}
package org.openscience.cdk.applications.taverna.iterativeio;
import java.io.File;
import java.io.FileReader;
import java.io.LineNumberReader;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import net.sf.taverna.t2.invocation.InvocationContext;
import net.sf.taverna.t2.reference.ReferenceService;
import net.sf.taverna.t2.reference.T2Reference;
import
net.sf.taverna.t2.workflowmodel.processor.activity.AsynchronousActivityCallback;
import org.openscience.cdk.applications.taverna.AbstractCDKActivity;
import org.openscience.cdk.applications.taverna.CDKTavernaConstants;
import org.openscience.cdk.applications.taverna.CDKTavernaException;
import org.openscience.cdk.applications.taverna.CMLChemFile;
import org.openscience.cdk.applications.taverna.basicutilities.CDKObjectHandler;
import
org.openscience.cdk.applications.taverna.basicutilities.CMLChemFileWrapper;
import org.openscience.cdk.applications.taverna.interfaces.IFileReader;
import org.openscience.cdk.io.MDLV2000Reader;
public class IterativeSDFileReaderActivity extends AbstractCDKActivity
implements IFileReader {
public static final String ITERATIVE_SD_FILE_READER_ACTIVITY =
"Iterative SDfile Reader";
public IterativeSDFileReaderActivity() {
this.RESULT_PORTS = new String[] { "Structures" };
}
@Override
protected void addInputPorts() {
// Nothing to add
}
@Override
protected void addOutputPorts() {
addOutput(this.RESULT_PORTS[0], 1, 0);
}
@Override
public String getActivityName() {
return
IterativeSDFileReaderActivity.ITERATIVE_SD_FILE_READER_ACTIVITY;
}
@Override
public HashMap<String, Object> getAdditionalProperties() {
HashMap<String, Object> properties = new HashMap<String,
Object>();
properties.put(CDKTavernaConstants.PROPERTY_FILE_EXTENSION,
".sdf");
properties.put(CDKTavernaConstants.PROPERTY_FILE_EXTENSION_DESCRIPTION, "MDL
SDFile");
properties.put(CDKTavernaConstants.PROPERTY_ITERATIVE_READ_SIZE, 50);
return properties;
}
@Override
public String getDescription() {
return "Description: " +
IterativeSDFileReaderActivity.ITERATIVE_SD_FILE_READER_ACTIVITY;
}
@Override
public String getFolderName() {
return CDKTavernaConstants.ITERATIVE_IO_FOLDER_NAME;
}
@Override
public Map<String, T2Reference> work(Map<String, T2Reference> inputs,
AsynchronousActivityCallback callback)
throws CDKTavernaException {
int readSize = (Integer)
this.getConfiguration().getAdditionalProperty(CDKTavernaConstants.PROPERTY_ITERATIVE_READ_SIZE);
Map<String, T2Reference> outputs = new HashMap<String,
T2Reference>();
InvocationContext context = callback.getContext();
ReferenceService referenceService =
context.getReferenceService();
List<CMLChemFile> cmlChemFileList = null;
List<byte[]> dataList = new ArrayList<byte[]>();
// Read SDfile
File file = (File)
this.getConfiguration().getAdditionalProperty(CDKTavernaConstants.PROPERTY_FILE);
if (file == null) {
throw new CDKTavernaException(this.getActivityName(),
"Error, no file chosen!");
}
List<T2Reference> outputList = new ArrayList<T2Reference>();
try {
LineNumberReader lineReader = new LineNumberReader(new
FileReader(file));
String line;
String SDFilePart = "";
int counter = 0;
int index = 0;
while (true) {
line = lineReader.readLine();
if (line != null) {
SDFilePart += line + "\n";
if (line.contains("$$$$")) {
counter++;
}
}
if (line == null || counter >= readSize) {
CMLChemFile cmlChemFile = new
CMLChemFile();
MDLV2000Reader tmpMDLReader = new
MDLV2000Reader(new StringReader(SDFilePart));
tmpMDLReader.read(cmlChemFile);
cmlChemFileList =
CMLChemFileWrapper.wrapInChemModelList(cmlChemFile);
// Congfigure output
for (CMLChemFile c : cmlChemFileList) {
dataList.add(CDKObjectHandler.getBytes(c));
}
T2Reference containerRef =
referenceService.register(dataList, 1, true, context);
outputList.add(index, containerRef);
outputs.put(this.RESULT_PORTS[0],
containerRef);
callback.receiveResult(outputs, new
int[] { index });
index++;
counter = 0;
SDFilePart = "";
}
if (line == null) {
break;
}
}
T2Reference containerRef =
referenceService.register(outputList, 1, true, context);
outputs.put(this.RESULT_PORTS[0], containerRef);
} catch (Exception e) {
throw new CDKTavernaException(this.getActivityName(),
"Error reading SDF file!");
}
comment.add("done");
// Return results
return outputs;
}
}
------------------------------------------------------------------------------
Sell apps to millions through the Intel(R) Atom(Tm) Developer Program
Be part of this innovative community and reach millions of netbook users
worldwide. Take advantage of special opportunities to increase revenue and
speed time-to-market. Join now, and jumpstart your future.
http://p.sf.net/sfu/intel-atom-d2d
_______________________________________________
taverna-hackers mailing list
[email protected]
Web site: http://www.taverna.org.uk
Mailing lists: http://www.taverna.org.uk/about/contact-us/
Developers Guide: http://www.taverna.org.uk/developers/