Following up: here’s the JIRA ticket for improving the POJO data type 
documentation - https://issues.apache.org/jira/browse/FLINK-7614.

- Gordon


On 11 September 2017 at 10:31:23 AM, Sridhar Chellappa (flinken...@gmail.com) 
wrote:

That fixed my issue. Thanks. I also agree we need to fix the Documentation

On Thu, Sep 7, 2017 at 6:15 PM, Timo Walther <twal...@apache.org> wrote:
Hi Sridhar,

according to the exception, your "meEvents" stream is not POJO. You can check 
that by printing "meEvents.getType()". In general, you can always check the log 
for such problems. There should be something like:

14:40:57,079 INFO org.apache.flink.api.java.typeutils.TypeExtractor             
- class org.apache.flink.streaming.examples.wordcount.WordCount$MyEvent does 
not contain a setter for field responseTime
14:40:57,083 INFO org.apache.flink.api.java.typeutils.TypeExtractor             
- class org.apache.flink.streaming.examples.wordcount.WordCount$MyEvent is not 
a valid POJO type because not all fields are valid POJO fields.

The problem is that your setters have a return type. A POJO setter usually 
should have a void return type. But I agree that this should be mentioned in 
the documentation.

Regards,
Timo


Am 07.09.17 um 05:20 schrieb Sridhar Chellappa:

I am trying to use the KeyBy operator as follows :


    Pattern<MyEvent, ?> myEventsCEPPattern =
Pattern.<MyEvent>begin("FirstEvent")
                            .subtype(MyEvent.class)
                            .next("SecondEvent")
                            .subtype(MyEvent.class)
                            .within(Time.hours(1));



        PatternStream<MyEvent> myEventsPatternStream =
                CEP.pattern(
                        meEvents.keyBy("field1", "field6"),
                        myEventsCEPPattern
                );



When I run the program, I get the following exception:

The program finished with the following exception:

This type (GenericType<com.events.MyEvent>) cannot be used as key.
org.apache.flink.api.common.operators.Keys$ExpressionKeys.<init>(Keys.java:330)
org.apache.flink.streaming.api.datastream.DataStream.keyBy(DataStream.java:294)


MyEvent is a POJO. What is that I am doing wrong?


Here is the relevant code :

public abstract class AbstractEvent {
    private String field1;
    private String field2;
    private String field3;
    private String field4;
    private Timestamp eventTimestmp;

    public AbstractEvent(String field1, String field2, String field3, String 
field4, Timestamp eventTimestmp) {
        this.field1 = field1;
        this.field2 = field2;
        this.field3 = field3;
        this.field4 = field4;
        this.eventTimestmp = eventTimestmp;
    }

    public AbstractEvent() {
    }

    public String getField1() {
        return field1;
    }

    public AbstractEvent setField1(String field1) {
        this.field1 = field1;
        return this;
    }

    public String getField2() {
        return field2;
    }

    public AbstractEvent setField2(String field2) {
        this.field2 = field2;
        return this;
    }

    public String getField3() {
        return field3;
    }

    public AbstractEvent setField3(String field3) {
        this.field3 = field3;
        return this;
    }

    public String getField4() {
        return field4;
    }

    public AbstractEvent setField4(String field4) {
        this.field4 = field4;
        return this;
    }

    public Timestamp getEventTimestmp() {
        return eventTimestmp;
    }

    public AbstractEvent setEventTimestmp(Timestamp eventTimestmp) {
        this.eventTimestmp = eventTimestmp;
        return this;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (!(o instanceof AbstractEvent)) {
            return false;
        }

        AbstractEvent that = (AbstractEvent) o;

        if (!getField1().equals(that.getField1())) {
            return false;
        }
        if (!getField2().equals(that.getField2())) {
            return false;
        }
        if (!getField3().equals(that.getField3())) {
            return false;
        }
        if (!getField4().equals(that.getField4())) {
            return false;
        }
        return getEventTimestmp().equals(that.getEventTimestmp());
    }

    @SuppressWarnings({"MagicNumber"})
    @Override
    public int hashCode() {
        int result = getField1().hashCode();
        result = 31 * result + getField2().hashCode();
        result = 31 * result + getField3().hashCode();
        result = 31 * result + getField4().hashCode();
        result = 31 * result + getEventTimestmp().hashCode();
        return result;
    }

    @Override
    public String toString() {
        return "AbstractEvent{"
                + "field1='" + field1 + '\''
                + ", field2='" + field2 + '\''
                + ", field3='" + field3 + '\''
                + ", field4='" + field4 + '\''
                + ", eventTimestmp=" + eventTimestmp
                + '}';
    }
}


public class MyEvent extends AbstractEvent {

    private Timestamp responseTime;
    private String field5;
    private String field6;
    private int field7;
    private String field8;
    private String field9;

    public MyEvent(String field1, String field2, String field3, String field4, 
Timestamp eventTimestmp,
                                           Timestamp responseTime, String 
field5, String field6, int field7, String field8,
                                           String field9) {
        super(field1, field2, field3, field4, eventTimestmp);
        this.responseTime = responseTime;
        this.field5 = field5;
        this.field6 = field6;
        this.field7 = field7;
        this.field8 = field8;
        this.field9 = field9;
    }

    public MyEvent() {
        super();
    }

    public int getField7() {
        return field7;
    }

    public String getField8() {
        return field8;
    }

    public String getField9() {
        return field9;
    }

    public String getField5() {
        return field5;
    }

    public String getField6() {
        return field6;
    }

    public Timestamp getResponseTime() {
        return responseTime;
    }

    public MyEvent setResponseTime(Timestamp responseTime) {
        this.responseTime = responseTime;
        return this;
    }

    public MyEvent setField7(int field7) {
        this.field7 = field7;
        return this;
    }

    public MyEvent setField8(String field8) {
        this.field8 = field8;
        return this;
    }

    public MyEvent setField9(String field9) {
        this.field9 = field9;
        return this;
    }

    public MyEvent setField5(String field5) {
        this.field5 = field5;
        return this;
    }

    public MyEvent setField6(String field6) {
        this.field6 = field6;
        return this;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (!(o instanceof MyEvent)) return false;
        if (!super.equals(o)) return false;

        MyEvent that = (MyEvent) o;

        if (getField7() != that.getField7()) return false;
        if (!getResponseTime().equals(that.getResponseTime())) return false;
        if (!getField5().equals(that.getField5())) return false;
        if (!getField6().equals(that.getField6())) return false;
        if (!getField8().equals(that.getField8())) return false;
        return getField9().equals(that.getField9());
    }

    @Override
    public int hashCode() {
        int result = super.hashCode();
        result = 31 * result + getResponseTime().hashCode();
        result = 31 * result + getField5().hashCode();
        result = 31 * result + getField6().hashCode();
        result = 31 * result + getField7();
        result = 31 * result + getField8().hashCode();
        result = 31 * result + getField9().hashCode();
        return result;
    }
}








Reply via email to