?? ??????????  ????????



------------------ ???????? ------------------
??????:&nbsp;"Benchao Li"<[email protected]&gt;;
????????:&nbsp;2020??5??29??(??????) ????5:17
??????:&nbsp;"user-zh"<[email protected]&gt;;

????:&nbsp;Re: flink-1.10.0 hive-1.2.1 No operators defined in streaming 
topology



??????????????????????????????????????????????
&nbsp;List<Row&amp;gt; rowList =&nbsp; TableUtils.collectToList(table);
&nbsp;System.out.println(rowList);
??????????????????????????????SQL????????????????????
&nbsp;tableEnv.execute("test");
????????????????????????

?????????????? <[email protected]&gt; ??2020??5??29?????? ????3:48??????

&gt; ??????????????error??????Exception in thread "main" 
java.lang.IllegalStateException:
&gt; No operators defined in streaming topology. Cannot 
execute????insert??????????hive
&gt; cli????????select????????????????????????????????????????????????????batch 
table?? EnvironmentSettings
&gt; settings =
&gt; EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
&gt;&nbsp; TableEnvironment tableEnv = TableEnvironment.create(settings);
&gt;
&gt;&nbsp; String 
name&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; = 
"myhive";
&gt;&nbsp; String defaultDatabase = "situation";
&gt;&nbsp; String hiveConfDir&nbsp;&nbsp;&nbsp;&nbsp; = 
"/load/data/hive/hive-conf"; // a local path
&gt;&nbsp; String version&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; = 
"1.2.1";
&gt;&nbsp; String CATALOG_NAME = "myhive";
&gt;
&gt;&nbsp; HiveCatalog hiveCatalog = new HiveCatalog(name, defaultDatabase,
&gt; hiveConfDir, version);
&gt;&nbsp; hiveCatalog.open();
&gt;&nbsp; tableEnv.registerCatalog(CATALOG_NAME, hiveCatalog);
&gt;
&gt;&nbsp; Optional<Catalog&amp;gt; myHive = tableEnv.getCatalog(CATALOG_NAME);
&gt;&nbsp; ObjectPath myTablePath = new ObjectPath("situation", "flink_test");
&gt;&nbsp; System.out.println(myHive.get().getTable(myTablePath).getSchema());
&gt;
&gt;
&gt;&nbsp; //????Hive????????
&gt; tableEnv.loadModule("hiveModule",new HiveModule(version));
&gt;
&gt;&nbsp; tableEnv.useCatalog(CATALOG_NAME);
&gt;
&gt;&nbsp; tableEnv.sqlUpdate("insert into situation.flink_test values 
(3,'kcz3')");
&gt;&nbsp; Table table = tableEnv.sqlQuery(" select * from 
situation.flink_test");
&gt;&nbsp; List<Row&amp;gt; rowList =&nbsp; TableUtils.collectToList(table);
&gt;&nbsp; System.out.println(rowList);
&gt;
&gt;
&gt;&nbsp; tableEnv.execute("test");



-- 

Best,
Benchao Li

回复