This should work and I don’t see anything obviously wrong.

 

Where is KeyValue<T1, T2> defined and are you sure the same type is used in 
both places?

 

From: "Mukkamula, Suryavamshivardhan (CWM-NR)" 
<[email protected]>
Reply-To: <[email protected]>
Date: Monday, August 15, 2016 at 10:40 AM
To: "[email protected]" <[email protected]>
Subject: dag.addStream Error

 

Hi,

 

Can you please help me resolve the below error at the highlighted portion? The 
port types are matching but still I get the below error, let me know If I am 
missing anything?

 

######################Error Message###########################

The method addStream(String, Operator.OutputPort<? extends T>, 
Operator.InputPort<? super T>...) in the type DAG is not applicable for the 
arguments (String, DefaultOutputPort<KeyValue<String,byte[]>>, 
DefaultInputPort<KeyValue<String,byte[]>>)

 

#######################Application Dag ##################################

 

InputFileReader inputFileStreamReader = 
dag.addOperator("inputFileStreamReader", InputFileReader.class);

        GroupByteArrayParser silverParser = dag.addOperator("silverParser", 
GroupByteArrayParser.class);

 

dag.addStream("file stream to silver parser", inputFileStreamReader.output, 
silverParser.inputPort);

 

############################Oper1 code#######################

public class InputFileReader extends BaseOperator{

    private static Logger LOG = LoggerFactory.getLogger(InputFileReader.class);

 

    public transient DefaultOutputPort<KeyValue<String, byte[]>> output = new 
DefaultOutputPort<KeyValue<String, byte[]>>();

 

 

########################oper2 code#####################

public class GroupByteArrayParser extends BaseOperator {

 

 

    public final transient DefaultInputPort<KeyValue<String, byte[]>> inputPort 
= new DefaultInputPort<KeyValue<String, byte[]>>() {

 

        @Override

        public void process(KeyValue<String, byte[]> tuple) {

 

_______________________________________________________________________

If you received this email in error, please advise the sender (by return email 
or otherwise) immediately. You have consented to receive the attached 
electronically at the above-noted email address; please retain a copy of this 
confirmation for future reference.

Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur 
immédiatement, par retour de courriel ou par un autre moyen. Vous avez accepté 
de recevoir le(s) document(s) ci-joint(s) par voie électronique à l'adresse 
courriel indiquée ci-dessus; veuillez conserver une copie de cette confirmation 
pour les fins de reference future. 

Reply via email to