Is there only one KeyValue class in the classpath? Regards, Ashwin.
On Mon, Aug 15, 2016 at 11:38 AM, Sanjay Pujare <[email protected]> wrote: > This should work and I don’t see anything obviously wrong. > > > > Where is KeyValue<T1, T2> defined and are you sure the same type is used > in both places? > > > > *From: *"Mukkamula, Suryavamshivardhan (CWM-NR)" < > [email protected]> > *Reply-To: *<[email protected]> > *Date: *Monday, August 15, 2016 at 10:40 AM > *To: *"[email protected]" <[email protected]> > *Subject: *dag.addStream Error > > > > Hi, > > > > Can you please help me resolve the below error at the highlighted portion? > The port types are matching but still I get the below error, let me know If > I am missing anything? > > > > ######################Error Message########################### > > The method addStream(String, Operator.OutputPort<? extends T>, > Operator.InputPort<? super T>...) in the type DAG is not applicable for the > arguments (String, DefaultOutputPort<KeyValue<String,byte[]>>, > DefaultInputPort<KeyValue<String,byte[]>>) > > > > #######################Application Dag ################################## > > > > InputFileReader inputFileStreamReader = dag.addOperator(" > inputFileStreamReader", InputFileReader.*class*); > > GroupByteArrayParser silverParser = dag.addOperator("silverParser", > GroupByteArrayParser.*class*); > > > > dag.addStream("file stream to silver parser", > inputFileStreamReader.output, silverParser.inputPort); > > > > ############################Oper1 code####################### > > *public* *class* InputFileReader *extends* BaseOperator{ > > *private* *static* Logger *LOG* = LoggerFactory.*getLogger*(InputF > ileReader.*class*); > > > > *public* *transient* DefaultOutputPort<KeyValue<String, *byte*[]>> > output = *new* DefaultOutputPort<KeyValue<String, *byte*[]>>(); > > > > > > ########################oper2 code##################### > > *public* *class* GroupByteArrayParser *extends* BaseOperator { > > > > > > *public* *final* *transient* DefaultInputPort<KeyValue<String, *byte*[]>> > inputPort = *new* DefaultInputPort<KeyValue<String, *byte*[]>>() { > > > > @Override > > *public* *void* process(KeyValue<String, *byte*[]> tuple) { > > > > _______________________________________________________________________ > > If you received this email in error, please advise the sender (by return > email or otherwise) immediately. You have consented to receive the attached > electronically at the above-noted email address; please retain a copy of > this confirmation for future reference. > > Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur > immédiatement, par retour de courriel ou par un autre moyen. Vous avez > accepté de recevoir le(s) document(s) ci-joint(s) par voie électronique à > l'adresse courriel indiquée ci-dessus; veuillez conserver une copie de > cette confirmation pour les fins de reference future. > -- Regards, Ashwin.
