你好:
    我现在在使用flink 1.11.2版本 hive1.1.0 版本;
    当我在使用flink hive streaming的使用发现按照 示例写数据到hive 可以看到指定目录下已经生产了文件,但是使用hive查询没有数据;
    好像是分区信息没有提交到hive  meta store;但是官网已经说实现了这个功能;我操作却不行
    下面是我的代码
     object StreamMain {
  def main(args: Array[String]): Unit = {
    val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
    streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    streamEnv.setParallelism(3)

    val tableEnvSettings = EnvironmentSettings.newInstance()
      .useBlinkPlanner()
      .inStreamingMode()
      .build()
    val tableEnv = StreamTableEnvironment.create(streamEnv, tableEnvSettings)
    
tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,
 CheckpointingMode.EXACTLY_ONCE)
    
tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
 Duration.ofSeconds(20))

    val dataStream = streamEnv.addSource(new MySource)

    val catalogName = "my_catalog"
    val catalog = new HiveCatalog(
      catalogName,              // catalog name
      "yutest",                // default database
      "D:\\IdeaProjects\\dc\\dc_hive\\src\\main\\resources\\",  // Hive config 
(hive-site.xml) directory
      "1.1.0"                   // Hive version
    )
    tableEnv.registerCatalog(catalogName, catalog)
    tableEnv.useCatalog(catalogName)

    tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
    tableEnv.useDatabase("yutest")


    tableEnv.createTemporaryView("users", dataStream)
    tableEnv.executeSql("DROP TABLE IF EXISTS  fs_table ")
    //      如果hive中已经存在了相应的表,则这段代码省略
    val hiveSql = """CREATE external TABLE fs_table (
                    user_id STRING,
                    order_amount DOUBLE
                  )
                  partitioned by(
                  dt string,
                  h string,
                  m string) stored as parquet
                  TBLPROPERTIES (
                    'partition.time-extractor.timestamp-pattern'='$dt $h:$m:00',
                    'sink.partition-commit.delay'='0s',
                    'sink.partition-commit.trigger'='partition-time',
                    'sink.partition-commit.policy.kind'='metastore,success-file'
                  )""".stripMargin
    tableEnv.executeSql(hiveSql)

    val insertSql = "insert into  fs_table SELECT userId, amount, " + " 
DATE_FORMAT(ts, 'yyyy-MM-dd'), DATE_FORMAT(ts, 'HH'), DATE_FORMAT(ts, 'mm') 
FROM users"
    tableEnv.executeSql(insertSql)
  }
}
public class MySource implements SourceFunction<UserInfo> {
    private volatile boolean run = true;
    String userids[] = {
            "4760858d-2bec-483c-a535-291de04b2247", 
"67088699-d4f4-43f2-913c-481bff8a2dc5",
            "72f7b6a8-e1a9-49b4-9a0b-770c41e01bfb", 
"dfa27cb6-bd94-4bc0-a90b-f7beeb9faa8b",
            "aabbaa50-72f4-495c-b3a1-70383ee9d6a4", 
"3218bbb9-5874-4d37-a82d-3e35e52d1702",
            "3ebfb9602ac07779||3ebfe9612a007979", 
"aec20d52-c2eb-4436-b121-c29ad4097f6c",
            "e7e896cd939685d7||e7e8e6c1930689d7", 
"a4b1e1db-55ef-4d9d-b9d2-18393c5f59ee"
    };

    @Override
    public void run(SourceFunction.SourceContext<UserInfo> sourceContext) 
throws Exception {

        while (run) {
            String userid = userids[(int) (Math.random() * (userids.length - 
1))];
            UserInfo userInfo = new UserInfo();
            userInfo.setUserId(userid);
            userInfo.setAmount(Math.random() * 100);
            userInfo.setTs(new Timestamp(System.currentTimeMillis()));
            sourceContext.collect(userInfo);
            Thread.sleep(100);
        }
    }

    @Override
    public void cancel() {
        run = false;
    }
}
public class UserInfo implements Serializable {
    private String userId;
    private Double amount;
    private Timestamp ts;

    public String getUserId() {
        return userId;
    }

    public void setUserId(String userId) {
        this.userId = userId;
    }

    public Double getAmount() {
        return amount;
    }

    public void setAmount(Double amount) {
        this.amount = amount;
    }

    public Timestamp getTs() {
        return ts;
    }

    public void setTs(Timestamp ts) {
        this.ts = ts;
    }
}

hive (yutest)>
             >
             > show partitions fs_table;
OK
partition
Time taken: 20.214 seconds



[email protected]

回复