Is there only one KeyValue class in the classpath?

Regards,
Ashwin.

On Mon, Aug 15, 2016 at 11:38 AM, Sanjay Pujare <[email protected]>
wrote:

> This should work and I don’t see anything obviously wrong.
>
>
>
> Where is KeyValue<T1, T2> defined and are you sure the same type is used
> in both places?
>
>
>
> *From: *"Mukkamula, Suryavamshivardhan (CWM-NR)" <
> [email protected]>
> *Reply-To: *<[email protected]>
> *Date: *Monday, August 15, 2016 at 10:40 AM
> *To: *"[email protected]" <[email protected]>
> *Subject: *dag.addStream Error
>
>
>
> Hi,
>
>
>
> Can you please help me resolve the below error at the highlighted portion?
> The port types are matching but still I get the below error, let me know If
> I am missing anything?
>
>
>
> ######################Error Message###########################
>
> The method addStream(String, Operator.OutputPort<? extends T>,
> Operator.InputPort<? super T>...) in the type DAG is not applicable for the
> arguments (String, DefaultOutputPort<KeyValue<String,byte[]>>,
> DefaultInputPort<KeyValue<String,byte[]>>)
>
>
>
> #######################Application Dag ##################################
>
>
>
> InputFileReader inputFileStreamReader = dag.addOperator("
> inputFileStreamReader", InputFileReader.*class*);
>
>         GroupByteArrayParser silverParser = dag.addOperator("silverParser",
> GroupByteArrayParser.*class*);
>
>
>
> dag.addStream("file stream to silver parser",
> inputFileStreamReader.output, silverParser.inputPort);
>
>
>
> ############################Oper1 code#######################
>
> *public* *class* InputFileReader *extends* BaseOperator{
>
>     *private* *static* Logger *LOG* = LoggerFactory.*getLogger*(InputF
> ileReader.*class*);
>
>
>
>     *public* *transient* DefaultOutputPort<KeyValue<String, *byte*[]>>
> output = *new* DefaultOutputPort<KeyValue<String, *byte*[]>>();
>
>
>
>
>
> ########################oper2 code#####################
>
> *public* *class* GroupByteArrayParser *extends* BaseOperator {
>
>
>
>
>
>     *public* *final* *transient* DefaultInputPort<KeyValue<String, *byte*[]>>
> inputPort = *new* DefaultInputPort<KeyValue<String, *byte*[]>>() {
>
>
>
>         @Override
>
>         *public* *void* process(KeyValue<String, *byte*[]> tuple) {
>
>
>
> _______________________________________________________________________
>
> If you received this email in error, please advise the sender (by return
> email or otherwise) immediately. You have consented to receive the attached
> electronically at the above-noted email address; please retain a copy of
> this confirmation for future reference.
>
> Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur
> immédiatement, par retour de courriel ou par un autre moyen. Vous avez
> accepté de recevoir le(s) document(s) ci-joint(s) par voie électronique à
> l'adresse courriel indiquée ci-dessus; veuillez conserver une copie de
> cette confirmation pour les fins de reference future.
>



-- 

Regards,
Ashwin.

Reply via email to