Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/bahir-flink/pull/22#discussion_r151840400
  
    --- Diff: 
flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/SiddhiCEPITCase.java
 ---
    @@ -0,0 +1,404 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.siddhi;
    +
    +import java.io.IOException;
    +import java.io.Serializable;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.apache.flink.api.common.functions.InvalidTypesException;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.java.tuple.Tuple1;
    +import org.apache.flink.api.java.tuple.Tuple4;
    +import org.apache.flink.api.java.tuple.Tuple5;
    +import 
org.apache.flink.streaming.siddhi.exception.UndefinedStreamException;
    +import 
org.apache.flink.streaming.siddhi.extension.CustomPlusFunctionExtension;
    +import org.apache.flink.streaming.siddhi.source.Event;
    +import org.apache.flink.streaming.siddhi.source.RandomEventSource;
    +import org.apache.flink.streaming.siddhi.source.RandomTupleSource;
    +import org.apache.flink.streaming.siddhi.source.RandomWordSource;
    +import org.apache.flink.core.fs.FileSystem;
    +import org.apache.flink.streaming.api.TimeCharacteristic;
    +import org.apache.flink.streaming.api.datastream.DataStream;
    +import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    +import 
org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
    +import org.apache.flink.streaming.api.operators.StreamMap;
    +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
    +import org.junit.Assert;
    +import org.junit.Rule;
    +import org.junit.Test;
    +import org.junit.rules.TemporaryFolder;
    +
    +import static org.junit.Assert.assertEquals;
    +
    +/**
    + * Flink-siddhi library integration test cases
    + */
    +public class SiddhiCEPITCase extends StreamingMultipleProgramsTestBase 
implements Serializable {
    +
    +    @Rule
    +    public transient TemporaryFolder tempFolder = new TemporaryFolder();
    +
    +    @Test
    +    public void testSimpleWriteAndRead() throws Exception {
    +        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
    +        DataStream<Event> input = env.fromElements(
    +            Event.of(1, "start", 1.0),
    +            Event.of(2, "middle", 2.0),
    +            Event.of(3, "end", 3.0),
    +            Event.of(4, "start", 4.0),
    +            Event.of(5, "middle", 5.0),
    +            Event.of(6, "end", 6.0)
    +        );
    +
    +        String path = tempFolder.newFile().toURI().toString();
    +        input.transform("transformer", TypeInformation.of(Event.class), 
new StreamMap<>(new MapFunction<Event, Event>() {
    +            @Override
    +            public Event map(Event event) throws Exception {
    +                return event;
    +            }
    +        })).writeAsText(path);
    +        env.execute();
    +        Assert.assertEquals(6, getLineCount(path));
    +    }
    +
    +    @Test
    +    public void testSimplePojoStreamAndReturnPojo() throws Exception {
    +        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
    +        DataStream<Event> input = env.fromElements(
    +            Event.of(1, "start", 1.0),
    +            Event.of(2, "middle", 2.0),
    +            Event.of(3, "end", 3.0),
    +            Event.of(4, "start", 4.0),
    +            Event.of(5, "middle", 5.0),
    +            Event.of(6, "end", 6.0)
    +        );
    +
    +        DataStream<Event> output = SiddhiCEP
    +            .define("inputStream", input, "id", "name", "price")
    +            .cql("from inputStream insert into  outputStream")
    +            .returns("outputStream", Event.class);
    +        String path = tempFolder.newFile().toURI().toString();
    +        output.print();
    +        env.execute();
    +        // Assert.assertEquals(6, getLineCount(path));
    --- End diff --
    
    This looks like being accidentally commented out 


---

Reply via email to