Hi Godfrey, Thanks a lot for your response. I just tried it with env.execute("simple job") but I still get the same error message.
Kant On Sat, Jan 18, 2020 at 6:26 PM godfrey he <godfre...@gmail.com> wrote: > hi kant, > > > 1) The Documentation says full outer join is supported however the below > code just exits with value 1. No error message. > if you have converted Table to DataStream, please execute it > with StreamExecutionEnvironment ( call env.execute("simple job") ) > > > 2) If I am using a blink planner should I use TableEnvironment or > StreamTableEnvironment ? > for streaming job, both Environment can be used. the difference is: > TableEnvironment will optimize multiple queries into one DAG when > executing, while StreamTableEnvironment will independent optimize each > query. > StreamTableEnvironment supports convert from/to DataStream, > while TableEnvironment does not support it. > StreamTableEnvironment supports register TableFunction > and AggregateFunction, while TableEnvironment does not support it now. > > for batch job, only TableEnvironment is the only choice, because > DataStream does not support batch job now. > > > 3) Why flink current stable documentation(1.9) recommends (old planner)? > any rough timeline on when we would be able to use blink planner in > production? perhaps 1.10 or 1.11? > 1.9 is blink planner's first version, and it is unstable. In 1.10, blink > planner is more statable, we are switching the blink planner to the default > step by step [0]. > > [0] > http://mail-archives.apache.org/mod_mbox/flink-dev/202001.mbox/%3CCAELO930%2B3RJ5m4hGQ7fbS-CS%3DcfJe5ENcRmZ%3DT_hey-uL6c27g%40mail.gmail.com%3E > > kant kodali <kanth...@gmail.com> 于2020年1月18日周六 下午5:40写道: > >> Hi All, >> >> 1) The Documentation says full outer join is supported however the below >> code just exits with value 1. No error message. >> >> import org.apache.flink.api.common.serialization.SimpleStringSchema; >> import org.apache.flink.streaming.api.datastream.DataStream; >> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; >> import org.apache.flink.table.api.*; >> import org.apache.flink.table.api.java.StreamTableEnvironment; >> import org.apache.flink.types.Row; >> >> import java.util.Properties; >> >> public class Test { >> >> public static void main(String... args) throws Exception { >> >> EnvironmentSettings bsSettings = >> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); >> final StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> StreamTableEnvironment bsTableEnv = >> StreamTableEnvironment.create(env, bsSettings); >> >> Properties properties = new Properties(); >> properties.setProperty("bootstrap.servers", "localhost:9092"); >> properties.setProperty("group.id", "test"); >> >> FlinkKafkaConsumer<String> consumer1 = new FlinkKafkaConsumer<>( >> java.util.regex.Pattern.compile("test-topic1"), >> new SimpleStringSchema(), >> properties); >> FlinkKafkaConsumer<String> consumer2 = new FlinkKafkaConsumer<>( >> java.util.regex.Pattern.compile("test-topic2"), >> new SimpleStringSchema(), >> properties); >> >> DataStream<String> stream1 = env.addSource(consumer1); >> DataStream<String> stream2 = env.addSource(consumer2); >> >> bsTableEnv.registerDataStream("sample1", stream1); >> bsTableEnv.registerDataStream("sample2", stream2); >> >> Table result = bsTableEnv.sqlQuery("SELECT * FROM sample1 FULL OUTER >> JOIN sample2 on sample1.f0=sample2.f0"); >> result.printSchema(); >> >> bsTableEnv.toAppendStream(result, Row.class).print(); >> bsTableEnv.execute("sample job"); >> } >> } >> >> >> 2) If I am using a blink planner should I use TableEnvironment or >> StreamTableEnvironment ? >> >> 3) Why flink current stable documentation(1.9) recommends (old planner)? >> any rough timeline on when we would be able to use blink planner in >> production? perhaps 1.10 or 1.11? >> >> Thanks! >> >> >>