?? ?????????? ????????
------------------ ???????? ------------------ ??????: "Benchao Li"<[email protected]>; ????????: 2020??5??29??(??????) ????5:17 ??????: "user-zh"<[email protected]>; ????: Re: flink-1.10.0 hive-1.2.1 No operators defined in streaming topology ?????????????????????????????????????????????? List<Row&gt; rowList = TableUtils.collectToList(table); System.out.println(rowList); ??????????????????????????????SQL???????????????????? tableEnv.execute("test"); ???????????????????????? ?????????????? <[email protected]> ??2020??5??29?????? ????3:48?????? > ??????????????error??????Exception in thread "main" java.lang.IllegalStateException: > No operators defined in streaming topology. Cannot execute????insert??????????hive > cli????????select????????????????????????????????????????????????????batch table?? EnvironmentSettings > settings = > EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); > TableEnvironment tableEnv = TableEnvironment.create(settings); > > String name = "myhive"; > String defaultDatabase = "situation"; > String hiveConfDir = "/load/data/hive/hive-conf"; // a local path > String version = "1.2.1"; > String CATALOG_NAME = "myhive"; > > HiveCatalog hiveCatalog = new HiveCatalog(name, defaultDatabase, > hiveConfDir, version); > hiveCatalog.open(); > tableEnv.registerCatalog(CATALOG_NAME, hiveCatalog); > > Optional<Catalog&gt; myHive = tableEnv.getCatalog(CATALOG_NAME); > ObjectPath myTablePath = new ObjectPath("situation", "flink_test"); > System.out.println(myHive.get().getTable(myTablePath).getSchema()); > > > //????Hive???????? > tableEnv.loadModule("hiveModule",new HiveModule(version)); > > tableEnv.useCatalog(CATALOG_NAME); > > tableEnv.sqlUpdate("insert into situation.flink_test values (3,'kcz3')"); > Table table = tableEnv.sqlQuery(" select * from situation.flink_test"); > List<Row&gt; rowList = TableUtils.collectToList(table); > System.out.println(rowList); > > > tableEnv.execute("test"); -- Best, Benchao Li
