Hello together,

I need your help because I won't get the iterative file reader working.
The probelm is still the cumulativ arriving of the data in the downstream activies. I mean if I divide my file i.e. into parts of size 50, the downstream
activities get first 50 then 100, 150, ... elements.

The output ports were declared as shown in:

http://code.google.com/p/taverna-plugins/source/browse/trunk/sequencefile/sequencefile-activity/src/main/java/net/sf/taverna/t2/activities/sequencefile/SequenceFileActivity.java

The activity classes are attached.

I will be very grateful if someone could help me.

Regards,
Andreas

Andreas Truszkowski schrieb:
Hi,

David Withers schrieb:
Have you set the _granular_ depth of your output port to be less that the depth?
That is my output port declaration:
addOutput(this.RESULT_PORTS[0], 1, 0);
But there is a second port without defining a granular depth because it should only be updated at the end of the task. Could it be the problem?
addOutput(this.COMMENT_PORT, 1);

Regards,
Andreas
On 8 Jul 2010, at 17:29, Andreas Truszkowski wrote:

Hello David,

Thanks for your help so far. But I have problem with the activity behaviour. The data read from the iterative reading activity arrives at the downstream activities accumulative. I mean if I divide my file i.e. into parts of size 50 the downstream activities get then first 50 then 100, 150, ... elements. Have you an idea what is the mistake I have made?
Furthermore the complete workbench freezes after a while.

Regards
Andreas

David Withers schrieb:
On 06/07/2010 15:21, Andreas Truszkowski wrote:

David Withers schrieb:

On 06/07/2010 14:41, Andreas Truszkowski wrote:


David Withers schrieb:


On 05/07/2010 18:33, Andreas Truszkowski wrote:



Hi!

I have to write an iterative filereader activitie. Is there
any documentation about writing iterative activies or can
anyone give me a short description?



Hi Andreas,

There is a tutorial for writing activities here: http://www.mygrid.org.uk/dev/wiki/display/developer/Tutorial+-+Service+invocation+plugin




David.

Hi David,

writing an activitie is not the problem. I need to develope an
activitie which is able to read i.e. a big file iterative.
However that is not described in the wiki documentation.


Hi Andreas,

I don't understand what you mean by iterative in this context. What
type of file do you want to read? Do you need to process the file
stream as it's being read?

David.

The file should be read in small pieces so that every piece is
processed separatly within the underlying workflow. If my file ist
several GBs big or maybe a database it is not possible to load the
data into the memory at once. I have read somewhere that the API of
taverna is be able to do so but I can't find any longer the source.

Your activity needs to read a file stream and output a list where each
element of the list is part of the file. There's an example of an
activity that does this at
http://code.google.com/p/taverna-plugins/source/browse/trunk/sequencefile/sequencefile-activity/src/main/java/net/sf/taverna/t2/activities/sequencefile/SequenceFileActivity.java

If you look at lines 126 to 132 of the file, callback.receiveResult is
called for each part of the input file. (Note that the complete list is
also output at line 140).

You also need to specify that the activity output port has a granular
depth of 0 (see line 161).

David.


------------------------------------------------------------------------------
This SF.net email is sponsored by Sprint
What will you do first with EVO, the first 4G phone?
Visit sprint.com/first -- http://p.sf.net/sfu/sprint-com-first
_______________________________________________
taverna-hackers mailing list
[email protected]
Web site: http://www.taverna.org.uk
Mailing lists: http://www.taverna.org.uk/about/contact-us/
Developers Guide: http://www.taverna.org.uk/developers/


------------------------------------------------------------------------------
This SF.net email is sponsored by Sprint
What will you do first with EVO, the first 4G phone?
Visit sprint.com/first -- http://p.sf.net/sfu/sprint-com-first
_______________________________________________
taverna-hackers mailing list
[email protected]
Web site: http://www.taverna.org.uk
Mailing lists: http://www.taverna.org.uk/about/contact-us/
Developers Guide: http://www.taverna.org.uk/developers/


/*
 * Copyright (C) 2010 by Andreas Truszkowski <[email protected]>
 *
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public License
 * as published by the Free Software Foundation; either version 2.1
 * of the License, or (at your option) any later version.
 * All we ask is that proper credit is given for our work, which includes
 * - but is not limited to - adding the above copyright notice to the beginning
 * of your source code files, and to any copyright notice that you may 
distribute
 * with programs based on this work.
 * 
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Lesser General Public License for more details.
 * 
 * You should have received a copy of the GNU Lesser General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
 */
package org.openscience.cdk.applications.taverna;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import net.sf.taverna.t2.invocation.InvocationContext;
import net.sf.taverna.t2.reference.ReferenceService;
import net.sf.taverna.t2.reference.T2Reference;
import 
net.sf.taverna.t2.workflowmodel.processor.activity.AbstractAsynchronousActivity;
import 
net.sf.taverna.t2.workflowmodel.processor.activity.ActivityConfigurationException;
import net.sf.taverna.t2.workflowmodel.processor.activity.AsynchronousActivity;
import 
net.sf.taverna.t2.workflowmodel.processor.activity.AsynchronousActivityCallback;

/**
 * Abstract class to describe an Taverna 2 activity.
 * 
 * @author Andreas Truszkowski
 * 
 */
public abstract class AbstractCDKActivity extends 
AbstractAsynchronousActivity<CDKActivityConfigurationBean> implements
                AsynchronousActivity<CDKActivityConfigurationBean> {

        protected String[] INPUT_PORTS;
        protected String[] RESULT_PORTS;
        protected final String COMMENT_PORT = "Comment";

        protected List<String> comment = new ArrayList<String>();

        private CDKActivityConfigurationBean configBean;

        public AbstractCDKActivity() {
                // empty
        }

        @Override
        public void configure(CDKActivityConfigurationBean configBean) throws 
ActivityConfigurationException {
                // Any pre-config sanity checks
                if (configBean.getActivityName().equals("invalidExample")) {
                        throw new ActivityConfigurationException("Example 
string can't be 'invalidExample'");
                }

                this.configBean = configBean;
                configurePorts();
        }

        /**
         * Do port configuration.
         */
        protected void configurePorts() {
                // In case we are being reconfigured - remove existing ports 
first
                // to avoid duplicates
                removeInputs();
                removeOutputs();
                this.addInputPorts();
                this.addOutputPorts();
                // Add always comment port
                this.addOutput(this.COMMENT_PORT, 1);
        }

        /**
         * Use command addInput() here to add and configure input ports.
         */
        protected abstract void addInputPorts();

        /**
         * Use command addOutput() here to add and configure output ports.
         */
        protected abstract void addOutputPorts();

        @Override
        public void executeAsynch(final Map<String, T2Reference> inputs, final 
AsynchronousActivityCallback callback) {
                // Don't execute service directly now, request to be run ask to 
be run
                // from thread pool and return asynchronously
                callback.requestRun(new Runnable() {

                        public void run() {
                                // Do work
                                Map<String, T2Reference> outputs = null;
                                InvocationContext context = 
callback.getContext();
                                ReferenceService referenceService = 
context.getReferenceService();
                                try {
                                        outputs = 
AbstractCDKActivity.this.work(inputs, callback);
                                } catch (CDKTavernaException e) {
                                        e.printStackTrace();
                                        comment.add(e.getMessage());
                                        callback.fail(e.getMessage());
                                }
                                if (outputs == null) {
                                        outputs = new HashMap<String, 
T2Reference>();
                                }
                                T2Reference containerRef = 
referenceService.register(AbstractCDKActivity.this.comment, 1, true, context);
                                
outputs.put(AbstractCDKActivity.this.COMMENT_PORT, containerRef);
                                // return map of output data, with empty index 
array as this is
                                // the only and final result (this index 
parameter is used if
                                // pipelining output)
                                callback.receiveResult(outputs, new int[0]);
                        }
                });
        }

        /**
         * Abstract method with does the work.
         */
        public abstract Map<String, T2Reference> work(final Map<String, 
T2Reference> inputs,
                        final AsynchronousActivityCallback callback) throws 
CDKTavernaException;

        @Override
        public CDKActivityConfigurationBean getConfiguration() {
                return this.configBean;
        }

        /**
         * @return The name of the activity.
         */
        public abstract String getActivityName();

        /**
         * @return The folder of the activity.
         */
        public abstract String getFolderName();

        /**
         * @return The description of the activity.
         */
        public abstract String getDescription();

        /**
         * @return Additional properties.
         */
        public abstract HashMap<String, Object> getAdditionalProperties();

        /**
         * @return Input port names/identifiers.
         */
        public String[] getINPUT_PORTS() {
                return INPUT_PORTS;
        }

        /**
         * @return Result port names/identifiers.
         */
        public String[] getRESULT_PORTS() {
                return RESULT_PORTS;
        }

        /**
         * @return Comment port name/identifier.
         */
        public String getCOMMENT_PORT() {
                return COMMENT_PORT;
        }

}
package org.openscience.cdk.applications.taverna.iterativeio;

import java.io.File;
import java.io.FileReader;
import java.io.LineNumberReader;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import net.sf.taverna.t2.invocation.InvocationContext;
import net.sf.taverna.t2.reference.ReferenceService;
import net.sf.taverna.t2.reference.T2Reference;
import 
net.sf.taverna.t2.workflowmodel.processor.activity.AsynchronousActivityCallback;

import org.openscience.cdk.applications.taverna.AbstractCDKActivity;
import org.openscience.cdk.applications.taverna.CDKTavernaConstants;
import org.openscience.cdk.applications.taverna.CDKTavernaException;
import org.openscience.cdk.applications.taverna.CMLChemFile;
import org.openscience.cdk.applications.taverna.basicutilities.CDKObjectHandler;
import 
org.openscience.cdk.applications.taverna.basicutilities.CMLChemFileWrapper;
import org.openscience.cdk.applications.taverna.interfaces.IFileReader;
import org.openscience.cdk.io.MDLV2000Reader;

public class IterativeSDFileReaderActivity extends AbstractCDKActivity 
implements IFileReader {

        public static final String ITERATIVE_SD_FILE_READER_ACTIVITY = 
"Iterative SDfile Reader";

        public IterativeSDFileReaderActivity() {
                this.RESULT_PORTS = new String[] { "Structures" };
        }

        @Override
        protected void addInputPorts() {
                // Nothing to add
        }

        @Override
        protected void addOutputPorts() {
                addOutput(this.RESULT_PORTS[0], 1, 0);
        }

        @Override
        public String getActivityName() {
                return 
IterativeSDFileReaderActivity.ITERATIVE_SD_FILE_READER_ACTIVITY;
        }

        @Override
        public HashMap<String, Object> getAdditionalProperties() {
                HashMap<String, Object> properties = new HashMap<String, 
Object>();
                properties.put(CDKTavernaConstants.PROPERTY_FILE_EXTENSION, 
".sdf");
                
properties.put(CDKTavernaConstants.PROPERTY_FILE_EXTENSION_DESCRIPTION, "MDL 
SDFile");
                
properties.put(CDKTavernaConstants.PROPERTY_ITERATIVE_READ_SIZE, 50);
                return properties;
        }

        @Override
        public String getDescription() {
                return "Description: " + 
IterativeSDFileReaderActivity.ITERATIVE_SD_FILE_READER_ACTIVITY;
        }

        @Override
        public String getFolderName() {
                return CDKTavernaConstants.ITERATIVE_IO_FOLDER_NAME;
        }

        @Override
        public Map<String, T2Reference> work(Map<String, T2Reference> inputs, 
AsynchronousActivityCallback callback)
                        throws CDKTavernaException {
                int readSize = (Integer) 
this.getConfiguration().getAdditionalProperty(CDKTavernaConstants.PROPERTY_ITERATIVE_READ_SIZE);
                Map<String, T2Reference> outputs = new HashMap<String, 
T2Reference>();
                InvocationContext context = callback.getContext();
                ReferenceService referenceService = 
context.getReferenceService();
                List<CMLChemFile> cmlChemFileList = null;
                List<byte[]> dataList = new ArrayList<byte[]>();
                // Read SDfile
                File file = (File) 
this.getConfiguration().getAdditionalProperty(CDKTavernaConstants.PROPERTY_FILE);
                if (file == null) {
                        throw new CDKTavernaException(this.getActivityName(), 
"Error, no file chosen!");
                }
                List<T2Reference> outputList = new ArrayList<T2Reference>();
                try {
                        LineNumberReader lineReader = new LineNumberReader(new 
FileReader(file));
                        String line;
                        String SDFilePart = "";
                        int counter = 0;
                        int index = 0;
                        while (true) {
                                line = lineReader.readLine();
                                if (line != null) {
                                        SDFilePart += line + "\n";
                                        if (line.contains("$$$$")) {
                                                counter++;
                                        }
                                }
                                if (line == null || counter >= readSize) {
                                        CMLChemFile cmlChemFile = new 
CMLChemFile();
                                        MDLV2000Reader tmpMDLReader = new 
MDLV2000Reader(new StringReader(SDFilePart));
                                        tmpMDLReader.read(cmlChemFile);
                                        cmlChemFileList = 
CMLChemFileWrapper.wrapInChemModelList(cmlChemFile);
                                        // Congfigure output
                                        for (CMLChemFile c : cmlChemFileList) {
                                                
dataList.add(CDKObjectHandler.getBytes(c));
                                        }
                                        T2Reference containerRef = 
referenceService.register(dataList, 1, true, context);
                                        outputList.add(index, containerRef);
                                        outputs.put(this.RESULT_PORTS[0], 
containerRef);
                                        callback.receiveResult(outputs, new 
int[] { index });
                                        index++;
                                        counter = 0;
                                        SDFilePart = "";
                                }
                                if (line == null) {
                                        break;
                                }
                        }
                        T2Reference containerRef = 
referenceService.register(outputList, 1, true, context);
                        outputs.put(this.RESULT_PORTS[0], containerRef);
                } catch (Exception e) {
                        throw new CDKTavernaException(this.getActivityName(), 
"Error reading SDF file!");
                }
                comment.add("done");
                // Return results
                return outputs;
        }

}
------------------------------------------------------------------------------
Sell apps to millions through the Intel(R) Atom(Tm) Developer Program
Be part of this innovative community and reach millions of netbook users 
worldwide. Take advantage of special opportunities to increase revenue and 
speed time-to-market. Join now, and jumpstart your future.
http://p.sf.net/sfu/intel-atom-d2d
_______________________________________________
taverna-hackers mailing list
[email protected]
Web site: http://www.taverna.org.uk
Mailing lists: http://www.taverna.org.uk/about/contact-us/
Developers Guide: http://www.taverna.org.uk/developers/

Reply via email to