Flink SQL 写入Hive问题请教
我们在开发一个Flink SQL 框架,在从kafka读取数据加工写入到Hive时一直不成功,sql脚本如下: CREATE TABLE hive_table_from_kafka ( collect_time STRING, content1 STRING, content2 STRING ) PARTITIONED BY ( dt STRING,hr STRING ) TBLPROPERTIES ( 'partition.time-extractor.timestamp-pattern'='$dt $hr', 'sink.partition-commit.trigger'='partition-time', 'sink.partition-commit.delay'='0S', 'sink.partition-commit.policy.kind'='metastore,success-file' ); 然后代码中对于创建表的sql做如下的处理 private void callCreateTable(SqlCommandParser.SqlCommandCall cmdCall) { String ddl = cmdCall.operands[0]; if (ddl.contains("hive_table")) { tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); } else { tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); } try { tableEnv.executeSql(ddl); } catch (SqlParserException e) { throw new RuntimeException("SQL execute failed:\n" + ddl + "\n", e); } }在执行上面的SQL语句时,总是报没有设置connector:Caused by: org.apache.flink.table.api.ValidationException: Table options do not contain an option key 'connector' for discovering a connector yinghua...@163.com
bahir-flink 的 flink-connector-kudu 能做批读取么?
各位大佬 我看flink-connector-kudu的例子都是DataStream,但是我想用DataSet 进行点查。 看着提示好像不支持。 有什么办法处理么? 代码如下: StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); StreamTableEnvironment tEnv2 = StreamTableEnvironment.create(env); ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment oldBatchTableEnv = BatchTableEnvironment.create(batchEnv); String KUDU_MASTERS="192.168.248.4:7051"; KuduCatalog catalog = new KuduCatalog(KUDU_MASTERS); tEnv2.registerCatalog("kudu", catalog); tEnv2.useCatalog("kudu"); oldBatchTableEnv.registerCatalog("kudu", catalog); oldBatchTableEnv.useCatalog("kudu"); Table odlTable = oldBatchTableEnv.sqlQuery("select * from users"); DataSet dsRow = oldBatchTableEnv.toDataSet(odlTable, Row.class); dsRow.print(); Table table = tEnv2.sqlQuery("select * from users"); tEnv2.toAppendStream(table, Row.class).print(); 报错如下: ERROR StatusLogger No Log4j 2 configuration file found. Using default configuration (logging only errors to the console), or user programmatically provided configurations. Set system property 'log4j2.debug' to show Log4j 2 internal initialization logging. See https://logging.apache.org/log4j/2.x/manual/configuration.html for instructions on how to configure Log4j 2 Exception in thread "main" org.apache.flink.table.api.TableException: Only BatchTableSource and InputFormatTableSource are supported in BatchTableEnvironment. at org.apache.flink.table.plan.nodes.dataset.BatchTableSourceScan.translateToPlan(BatchTableSourceScan.scala:116) at org.apache.flink.table.api.internal.BatchTableEnvImpl.translate(BatchTableEnvImpl.scala:580) at org.apache.flink.table.api.internal.BatchTableEnvImpl.translate(BatchTableEnvImpl.scala:555) at org.apache.flink.table.api.internal.BatchTableEnvImpl.translate(BatchTableEnvImpl.scala:537) at org.apache.flink.table.api.bridge.java.internal.BatchTableEnvironmentImpl.toDataSet(BatchTableEnvironmentImpl.scala:101) at com.hujiang.bi.order.OrderMasterJob.main(OrderMasterJob.java:44) 看着应该是 flink-connector-kudu不支持batch读了。。。
??????flinksql1.11????hive??ClassNotFoundException: org.apache.hadoop.fs.PathHandle
---- ??: "Presley"
flinksql1.11????hive??ClassNotFoundException: org.apache.hadoop.fs.PathHandle
flinksql1.11hiveidea??Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.fs.PathHandle_(:??)_ flink1.11??org.apache.flink.table.catalog.hive.HiveCatalog#HiveCatalog(java.lang.String, java.lang.String, java.lang.String, java.lang.String)hadoopflinksql??ideaflinkhadoop??hadoopflinksql
Re: Flink standalone模式如何区分各个任务的日志?
Hi, 这样体验上还是不太友好,如果能做成spark那种每个Job独立记录日志就好了 -- Sent from: http://apache-flink.147419.n8.nabble.com/
SQL作业中使用python udfs语法检查报错,作业提交缺没问题
报错内容:Python callback server start failed!
Re: 自定义partition,使用遇到问题,附代码
Hi! Optional.of(new customPartitioner()) Ye Chen wrote > 各位好,我想实现自定义partition,继承了 FlinkKafkaPartitioner 后,使用会报错,简化的代码如下。 > //自定义partition > public class customPartitioner extends FlinkKafkaPartitioner > > { > @Override > public int partition(String record, byte[] key, byte[] value, String > targetTopic, int[] partitions) { > return 0; > } > } > > > DataStream > > stream = 。。。 > FlinkKafkaProducer > > myProducer = new FlinkKafkaProducer<>( > "test_topic", > new SimpleStringSchema(), > properties, > new customPartitioner() > ); > stream.addSink(myProducer); > > > //上面的代码,编辑器中编译FlinkKafkaProducer会报错,【Error:(55, 49) java: > 无法推断org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer<>的类型参数】 > //去掉new > customPartitioner(),不使用自定义partition,FlinkKafkaProducer就不报错,感觉是构造函数对应不上,但是查看构造函数源码有这个构造函数 > > > > > 查看FlinkKafkaProducer源码如下,我上面的写法有问题么? > public FlinkKafkaProducer( > String topicId, > SerializationSchema > > serializationSchema, > Properties producerConfig, > OptionalFlinkKafkaPartitionerIN> customPartitioner) { > this( > topicId, > serializationSchema, > producerConfig, > customPartitioner.orElse(null), > Semantic.AT_LEAST_ONCE, > DEFAULT_KAFKA_PRODUCERS_POOL_SIZE); > } -- Sent from: http://apache-flink.147419.n8.nabble.com/
??????Re:SqlValidatorException: No match found for function signature prod()
_(:??)_ ---- ??: "user-zh"
??????flink??????????
??,,??,flinkjoin??,hbase,?? ---- ??: "user-zh"
Re:Re:SqlValidatorException: No match found for function signature prod()
应该是继承scalaFunction ? 在 2021-02-22 10:25:31,"xiaoyue" <18242988...@163.com> 写道: >捞一下自己,在线等大佬们的回复 _(:з」∠)_ > > > > > > > >在 2021-02-20 13:14:18,"xiaoyue" <18242988...@163.com> 写道: > >我在使用flinksql1.11的udaf时出现SqlValidatorException: No match found for function >signature prod(),请求大佬帮忙看看_(:з」∠)_ > >以下是代码: >- >... > stableEnv.createTemporarySystemFunction("prod", > ProductAggregateFunction.class); > Table resultTable = stableEnv.sqlQuery("select pf_id,prod(yldrate+1)-1 as > yldrate from queryData group by pf_id"); >... >- >@FunctionHint( >input = @DataTypeHint("Double"), >output = @DataTypeHint("Double") >) >public class ProductAggregateFunction extends AggregateFunctionProduct> { > > >@Override >public Double getValue(Product acc) { >return acc.prod; >} >@Override >public Product createAccumulator() { >return new Product(); >} >public void accumulate(Product acc, Double iValue) { >acc.prod *= iValue; >} >public void retract(Product acc, Double iValue) { >acc.prod /= iValue; >} >public void merge(Product acc, Iterable it) { >for (Product p : it) { >accumulate(acc, p.prod); >} >} >public void resetAccumulator(Product acc) { >acc.prod = 1D; >} >} > > > > > >
Re:flink生成大宽表
可以用多流join,但是数据延迟会导致join不上,可以侧输出处理下,看业务需求 在 2021-02-22 11:05:46,"liujian" <13597820...@qq.com> 写道: >Hi: > 大家好,有3张实时的表,相互关联可以形成大宽表,如何一张都会更新,那么我该如何实现流处理,我目标表放到kudu上 > > 我的理解: > 直接使用jdbc-connecter将三张表读取,然后join,再写入,会不会有什么问题
flink??????????
Hi: ??,??3??,??,,??,kudu?? : jdbc-connecter,join,??,
Re: 大佬们, keyby()两次, 然后再window(), 会有几个窗口?
不对,看了你描述没看代码。你代码那么写的化是2个哈。因为你keyBy后做了flatMap,再keyBy就是另外一个了哈。 yidan zhao 于2021年2月22日周一 上午10:31写道: > 只有最后一个keyBy有效。 > > Hongyuan Ma 于2021年2月21日周日 下午10:59写道: > >> 大佬们, 如果keyby两次然后再调用window()的话是只根据最后一次keyby的键生成n个窗口, >> 还是在前一次keyby的基础上生成m*n个窗口? >> >> >> 像下面这样写, 最后的窗口是只按area划分的吗? >> // 我想对不同车(id)的轨迹进行预测, 然后统计各个区域(area)内的轨迹信息 >> stream.keyby("id") >> .flatmap() // 根据id 对轨迹进行预测, 在里面使用key state >> .assignTime() // 修改轨迹eventTime为预测出的时间 >> .keyby("area") >> .window() // 根据区域划分窗口 >> .process() // 统计各个区域内的轨迹 >> >>
Re: 大佬们, keyby()两次, 然后再window(), 会有几个窗口?
只有最后一个keyBy有效。 Hongyuan Ma 于2021年2月21日周日 下午10:59写道: > 大佬们, 如果keyby两次然后再调用window()的话是只根据最后一次keyby的键生成n个窗口, > 还是在前一次keyby的基础上生成m*n个窗口? > > > 像下面这样写, 最后的窗口是只按area划分的吗? > // 我想对不同车(id)的轨迹进行预测, 然后统计各个区域(area)内的轨迹信息 > stream.keyby("id") > .flatmap() // 根据id 对轨迹进行预测, 在里面使用key state > .assignTime() // 修改轨迹eventTime为预测出的时间 > .keyby("area") > .window() // 根据区域划分窗口 > .process() // 统计各个区域内的轨迹 > >
Re:SqlValidatorException: No match found for function signature prod()
捞一下自己,在线等大佬们的回复 _(:з」∠)_ 在 2021-02-20 13:14:18,"xiaoyue" <18242988...@163.com> 写道: 我在使用flinksql1.11的udaf时出现SqlValidatorException: No match found for function signature prod(),请求大佬帮忙看看_(:з」∠)_ 以下是代码: - ... stableEnv.createTemporarySystemFunction("prod", ProductAggregateFunction.class); Table resultTable = stableEnv.sqlQuery("select pf_id,prod(yldrate+1)-1 as yldrate from queryData group by pf_id"); ... - @FunctionHint( input = @DataTypeHint("Double"), output = @DataTypeHint("Double") ) public class ProductAggregateFunction extends AggregateFunction { @Override public Double getValue(Product acc) { return acc.prod; } @Override public Product createAccumulator() { return new Product(); } public void accumulate(Product acc, Double iValue) { acc.prod *= iValue; } public void retract(Product acc, Double iValue) { acc.prod /= iValue; } public void merge(Product acc, Iterable it) { for (Product p : it) { accumulate(acc, p.prod); } } public void resetAccumulator(Product acc) { acc.prod = 1D; } }
pyflink对Redis sink的支持
各位大佬, 请问pyflink现在有对Redis sink的支持吗,有没有demo可以参考,感谢!
pyflink对Redis sink的支持
各位大佬, 请问pyflink现在有对Redis sink的支持吗,有没有demo可以参考,感谢!
大佬们, keyby()两次, 然后再window(), 会有几个窗口?
大佬们, 如果keyby两次然后再调用window()的话是只根据最后一次keyby的键生成n个窗口, 还是在前一次keyby的基础上生成m*n个窗口? 像下面这样写, 最后的窗口是只按area划分的吗? // 我想对不同车(id)的轨迹进行预测, 然后统计各个区域(area)内的轨迹信息 stream.keyby("id") .flatmap() // 根据id 对轨迹进行预测, 在里面使用key state .assignTime() // 修改轨迹eventTime为预测出的时间 .keyby("area") .window() // 根据区域划分窗口 .process() // 统计各个区域内的轨迹