Flink SQL 写入Hive问题请教

2021-02-21 文章 yinghua...@163.com
我们在开发一个Flink SQL 框架,在从kafka读取数据加工写入到Hive时一直不成功,sql脚本如下:
CREATE TABLE hive_table_from_kafka (
   collect_time STRING,
   content1 STRING, 
   content2 STRING
) PARTITIONED BY (
  dt STRING,hr STRING
) TBLPROPERTIES (
  'partition.time-extractor.timestamp-pattern'='$dt $hr',
  'sink.partition-commit.trigger'='partition-time',
  'sink.partition-commit.delay'='0S',
  'sink.partition-commit.policy.kind'='metastore,success-file'
);

然后代码中对于创建表的sql做如下的处理
private void callCreateTable(SqlCommandParser.SqlCommandCall cmdCall) {
  String ddl = cmdCall.operands[0];
  if (ddl.contains("hive_table")) {
tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
  } else {
tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
  }
  try {
tableEnv.executeSql(ddl);
  } catch (SqlParserException e) {
throw new RuntimeException("SQL execute failed:\n" + ddl + "\n", e);
  }
}在执行上面的SQL语句时,总是报没有设置connector:Caused by: 
org.apache.flink.table.api.ValidationException: Table options do not contain an 
option key 'connector' for discovering a connector



yinghua...@163.com


bahir-flink 的 flink-connector-kudu 能做批读取么?

2021-02-21 文章 Haseo Chen
各位大佬

我看flink-connector-kudu的例子都是DataStream,但是我想用DataSet 进行点查。 看着提示好像不支持。 有什么办法处理么?

代码如下:


StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
StreamTableEnvironment tEnv2 = StreamTableEnvironment.create(env);

ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment oldBatchTableEnv = BatchTableEnvironment.create(batchEnv);


String KUDU_MASTERS="192.168.248.4:7051";
KuduCatalog catalog = new KuduCatalog(KUDU_MASTERS);
tEnv2.registerCatalog("kudu", catalog);
tEnv2.useCatalog("kudu");

oldBatchTableEnv.registerCatalog("kudu", catalog);
oldBatchTableEnv.useCatalog("kudu");

Table odlTable = oldBatchTableEnv.sqlQuery("select * from users");

DataSet dsRow = oldBatchTableEnv.toDataSet(odlTable, Row.class);
dsRow.print();


Table table = tEnv2.sqlQuery("select * from users");

tEnv2.toAppendStream(table, Row.class).print();

报错如下:
ERROR StatusLogger No Log4j 2 configuration file found. Using default 
configuration (logging only errors to the console), or user programmatically 
provided configurations. Set system property 'log4j2.debug' to show Log4j 2 
internal initialization logging. See 
https://logging.apache.org/log4j/2.x/manual/configuration.html for instructions 
on how to configure Log4j 2
Exception in thread "main" org.apache.flink.table.api.TableException: Only 
BatchTableSource and InputFormatTableSource are supported in 
BatchTableEnvironment.
at 
org.apache.flink.table.plan.nodes.dataset.BatchTableSourceScan.translateToPlan(BatchTableSourceScan.scala:116)
at 
org.apache.flink.table.api.internal.BatchTableEnvImpl.translate(BatchTableEnvImpl.scala:580)
at 
org.apache.flink.table.api.internal.BatchTableEnvImpl.translate(BatchTableEnvImpl.scala:555)
at 
org.apache.flink.table.api.internal.BatchTableEnvImpl.translate(BatchTableEnvImpl.scala:537)
at 
org.apache.flink.table.api.bridge.java.internal.BatchTableEnvironmentImpl.toDataSet(BatchTableEnvironmentImpl.scala:101)
at com.hujiang.bi.order.OrderMasterJob.main(OrderMasterJob.java:44)


看着应该是 flink-connector-kudu不支持batch读了。。。


??????flinksql1.11????hive??ClassNotFoundException: org.apache.hadoop.fs.PathHandle

2021-02-21 文章 Presley
----
??: 
   "Presley"



flinksql1.11????hive??ClassNotFoundException: org.apache.hadoop.fs.PathHandle

2021-02-21 文章 Presley
flinksql1.11hiveidea??Caused by: 
java.lang.ClassNotFoundException: 
org.apache.hadoop.fs.PathHandle_(:??)_


flink1.11??org.apache.flink.table.catalog.hive.HiveCatalog#HiveCatalog(java.lang.String,
 java.lang.String, java.lang.String, 
java.lang.String)hadoopflinksql??ideaflinkhadoop??hadoopflinksql

Re: Flink standalone模式如何区分各个任务的日志?

2021-02-21 文章 xingoo
Hi, 

这样体验上还是不太友好,如果能做成spark那种每个Job独立记录日志就好了



--
Sent from: http://apache-flink.147419.n8.nabble.com/

SQL作业中使用python udfs语法检查报错,作业提交缺没问题

2021-02-21 文章 zilong xiao
报错内容:Python callback server start failed!


Re: 自定义partition,使用遇到问题,附代码

2021-02-21 文章 冯嘉伟
Hi!

Optional.of(new customPartitioner())



Ye Chen wrote
> 各位好,我想实现自定义partition,继承了 FlinkKafkaPartitioner 后,使用会报错,简化的代码如下。
> //自定义partition
> public class customPartitioner extends FlinkKafkaPartitioner
> 
>  {
> @Override
> public int partition(String record, byte[] key, byte[] value, String
> targetTopic, int[] partitions) {
> return 0;
> }
> }
> 
> 
> DataStream
> 
>  stream = 。。。
> FlinkKafkaProducer
> 
>  myProducer = new FlinkKafkaProducer<>(
> "test_topic",
> new SimpleStringSchema(),
> properties,
> new customPartitioner()
> );
> stream.addSink(myProducer);
> 
> 
> //上面的代码,编辑器中编译FlinkKafkaProducer会报错,【Error:(55, 49) java:
> 无法推断org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer<>的类型参数】
> //去掉new
> customPartitioner(),不使用自定义partition,FlinkKafkaProducer就不报错,感觉是构造函数对应不上,但是查看构造函数源码有这个构造函数
> 
> 
> 
> 
> 查看FlinkKafkaProducer源码如下,我上面的写法有问题么?
> public FlinkKafkaProducer(
> String topicId,
> SerializationSchema
> 
>  serializationSchema,
> Properties producerConfig,
> OptionalFlinkKafkaPartitionerIN> customPartitioner) {
> this(
> topicId,
> serializationSchema,
> producerConfig,
> customPartitioner.orElse(null),
> Semantic.AT_LEAST_ONCE,
> DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
> }





--
Sent from: http://apache-flink.147419.n8.nabble.com/


??????Re:SqlValidatorException: No match found for function signature prod()

2021-02-21 文章 Presley
_(:??)_




----
??: 
   "user-zh"



??????flink??????????

2021-02-21 文章 liujian
??,,??,flinkjoin??,hbase,??




----
??: 
   "user-zh"



Re:Re:SqlValidatorException: No match found for function signature prod()

2021-02-21 文章 Ye Chen
应该是继承scalaFunction ?

















在 2021-02-22 10:25:31,"xiaoyue" <18242988...@163.com> 写道:
>捞一下自己,在线等大佬们的回复 _(:з」∠)_
>
>
>
>
>
>
>
>在 2021-02-20 13:14:18,"xiaoyue" <18242988...@163.com> 写道:
>
>我在使用flinksql1.11的udaf时出现SqlValidatorException: No match found for function 
>signature prod(),请求大佬帮忙看看_(:з」∠)_
>
>以下是代码:
>-
>...
>  stableEnv.createTemporarySystemFunction("prod", 
> ProductAggregateFunction.class);
>  Table resultTable = stableEnv.sqlQuery("select pf_id,prod(yldrate+1)-1 as 
> yldrate from queryData group by pf_id");
>...
>-
>@FunctionHint(
>input = @DataTypeHint("Double"),
>output = @DataTypeHint("Double")
>)
>public class ProductAggregateFunction extends AggregateFunctionProduct> {
>
>
>@Override
>public Double getValue(Product acc) {
>return acc.prod;
>}
>@Override
>public Product createAccumulator() {
>return new Product();
>}
>public void accumulate(Product acc, Double iValue) {
>acc.prod *= iValue;
>}
>public void retract(Product acc, Double iValue) {
>acc.prod /= iValue;
>}
>public void merge(Product acc, Iterable it) {
>for (Product p : it) {
>accumulate(acc, p.prod);
>}
>}
>public void resetAccumulator(Product acc) {
>acc.prod = 1D;
>}
>}
>
>
>
>
>
> 


Re:flink生成大宽表

2021-02-21 文章 Ye Chen
可以用多流join,但是数据延迟会导致join不上,可以侧输出处理下,看业务需求

















在 2021-02-22 11:05:46,"liujian" <13597820...@qq.com> 写道:
>Hi:
>  大家好,有3张实时的表,相互关联可以形成大宽表,如何一张都会更新,那么我该如何实现流处理,我目标表放到kudu上
> 
>  我的理解:
>   直接使用jdbc-connecter将三张表读取,然后join,再写入,会不会有什么问题


flink??????????

2021-02-21 文章 liujian
Hi:
  
??,??3??,??,,??,kudu??
 
  :
   
jdbc-connecter,join,??,

Re: 大佬们, keyby()两次, 然后再window(), 会有几个窗口?

2021-02-21 文章 yidan zhao
不对,看了你描述没看代码。你代码那么写的化是2个哈。因为你keyBy后做了flatMap,再keyBy就是另外一个了哈。

yidan zhao  于2021年2月22日周一 上午10:31写道:

> 只有最后一个keyBy有效。
>
> Hongyuan Ma  于2021年2月21日周日 下午10:59写道:
>
>> 大佬们, 如果keyby两次然后再调用window()的话是只根据最后一次keyby的键生成n个窗口,
>> 还是在前一次keyby的基础上生成m*n个窗口?
>>
>>
>> 像下面这样写, 最后的窗口是只按area划分的吗?
>> // 我想对不同车(id)的轨迹进行预测, 然后统计各个区域(area)内的轨迹信息
>> stream.keyby("id")
>> .flatmap() // 根据id 对轨迹进行预测, 在里面使用key state
>> .assignTime() // 修改轨迹eventTime为预测出的时间
>> .keyby("area")
>> .window() // 根据区域划分窗口
>> .process() // 统计各个区域内的轨迹
>>
>>


Re: 大佬们, keyby()两次, 然后再window(), 会有几个窗口?

2021-02-21 文章 yidan zhao
只有最后一个keyBy有效。

Hongyuan Ma  于2021年2月21日周日 下午10:59写道:

> 大佬们, 如果keyby两次然后再调用window()的话是只根据最后一次keyby的键生成n个窗口,
> 还是在前一次keyby的基础上生成m*n个窗口?
>
>
> 像下面这样写, 最后的窗口是只按area划分的吗?
> // 我想对不同车(id)的轨迹进行预测, 然后统计各个区域(area)内的轨迹信息
> stream.keyby("id")
> .flatmap() // 根据id 对轨迹进行预测, 在里面使用key state
> .assignTime() // 修改轨迹eventTime为预测出的时间
> .keyby("area")
> .window() // 根据区域划分窗口
> .process() // 统计各个区域内的轨迹
>
>


Re:SqlValidatorException: No match found for function signature prod()

2021-02-21 文章 xiaoyue
捞一下自己,在线等大佬们的回复 _(:з」∠)_







在 2021-02-20 13:14:18,"xiaoyue" <18242988...@163.com> 写道:

我在使用flinksql1.11的udaf时出现SqlValidatorException: No match found for function 
signature prod(),请求大佬帮忙看看_(:з」∠)_

以下是代码:
-
...
  stableEnv.createTemporarySystemFunction("prod", 
ProductAggregateFunction.class);
  Table resultTable = stableEnv.sqlQuery("select pf_id,prod(yldrate+1)-1 as 
yldrate from queryData group by pf_id");
...
-
@FunctionHint(
input = @DataTypeHint("Double"),
output = @DataTypeHint("Double")
)
public class ProductAggregateFunction extends AggregateFunction {


@Override
public Double getValue(Product acc) {
return acc.prod;
}
@Override
public Product createAccumulator() {
return new Product();
}
public void accumulate(Product acc, Double iValue) {
acc.prod *= iValue;
}
public void retract(Product acc, Double iValue) {
acc.prod /= iValue;
}
public void merge(Product acc, Iterable it) {
for (Product p : it) {
accumulate(acc, p.prod);
}
}
public void resetAccumulator(Product acc) {
acc.prod = 1D;
}
}





 

pyflink对Redis sink的支持

2021-02-21 文章 whh_960101
各位大佬,
请问pyflink现在有对Redis sink的支持吗,有没有demo可以参考,感谢!




 

pyflink对Redis sink的支持

2021-02-21 文章 whh_960101
各位大佬,
请问pyflink现在有对Redis sink的支持吗,有没有demo可以参考,感谢!

大佬们, keyby()两次, 然后再window(), 会有几个窗口?

2021-02-21 文章 Hongyuan Ma
大佬们, 如果keyby两次然后再调用window()的话是只根据最后一次keyby的键生成n个窗口, 还是在前一次keyby的基础上生成m*n个窗口?


像下面这样写, 最后的窗口是只按area划分的吗?
// 我想对不同车(id)的轨迹进行预测, 然后统计各个区域(area)内的轨迹信息
stream.keyby("id")
.flatmap() // 根据id 对轨迹进行预测, 在里面使用key state
.assignTime() // 修改轨迹eventTime为预测出的时间
.keyby("area")
.window() // 根据区域划分窗口
.process() // 统计各个区域内的轨迹