This should work and I don’t see anything obviously wrong.
Where is KeyValue<T1, T2> defined and are you sure the same type is used in both places? From: "Mukkamula, Suryavamshivardhan (CWM-NR)" <[email protected]> Reply-To: <[email protected]> Date: Monday, August 15, 2016 at 10:40 AM To: "[email protected]" <[email protected]> Subject: dag.addStream Error Hi, Can you please help me resolve the below error at the highlighted portion? The port types are matching but still I get the below error, let me know If I am missing anything? ######################Error Message########################### The method addStream(String, Operator.OutputPort<? extends T>, Operator.InputPort<? super T>...) in the type DAG is not applicable for the arguments (String, DefaultOutputPort<KeyValue<String,byte[]>>, DefaultInputPort<KeyValue<String,byte[]>>) #######################Application Dag ################################## InputFileReader inputFileStreamReader = dag.addOperator("inputFileStreamReader", InputFileReader.class); GroupByteArrayParser silverParser = dag.addOperator("silverParser", GroupByteArrayParser.class); dag.addStream("file stream to silver parser", inputFileStreamReader.output, silverParser.inputPort); ############################Oper1 code####################### public class InputFileReader extends BaseOperator{ private static Logger LOG = LoggerFactory.getLogger(InputFileReader.class); public transient DefaultOutputPort<KeyValue<String, byte[]>> output = new DefaultOutputPort<KeyValue<String, byte[]>>(); ########################oper2 code##################### public class GroupByteArrayParser extends BaseOperator { public final transient DefaultInputPort<KeyValue<String, byte[]>> inputPort = new DefaultInputPort<KeyValue<String, byte[]>>() { @Override public void process(KeyValue<String, byte[]> tuple) { _______________________________________________________________________ If you received this email in error, please advise the sender (by return email or otherwise) immediately. You have consented to receive the attached electronically at the above-noted email address; please retain a copy of this confirmation for future reference. Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur immédiatement, par retour de courriel ou par un autre moyen. Vous avez accepté de recevoir le(s) document(s) ci-joint(s) par voie électronique à l'adresse courriel indiquée ci-dessus; veuillez conserver une copie de cette confirmation pour les fins de reference future.
