Flink SQL 写入Hive问题请教

2021-02-21 Thread yinghua...@163.com
我们在开发一个Flink SQL 框架,在从kafka读取数据加工写入到Hive时一直不成功,sql脚本如下:
CREATE TABLE hive_table_from_kafka (
   collect_time STRING,
   content1 STRING, 
   content2 STRING
) PARTITIONED BY (
  dt STRING,hr STRING
) TBLPROPERTIES (
  'partition.time-extractor.timestamp-pattern'='$dt $hr',
  'sink.partition-commit.trigger'='partition-time',
  'sink.partition-commit.delay'='0S',
  'sink.partition-commit.policy.kind'='metastore,success-file'
);

然后代码中对于创建表的sql做如下的处理
private void callCreateTable(SqlCommandParser.SqlCommandCall cmdCall) {
  String ddl = cmdCall.operands[0];
  if (ddl.contains("hive_table")) {
tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
  } else {
tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
  }
  try {
tableEnv.executeSql(ddl);
  } catch (SqlParserException e) {
throw new RuntimeException("SQL execute failed:\n" + ddl + "\n", e);
  }
}在执行上面的SQL语句时,总是报没有设置connector:Caused by: 
org.apache.flink.table.api.ValidationException: Table options do not contain an 
option key 'connector' for discovering a connector



yinghua...@163.com


bahir-flink 的 flink-connector-kudu 能做批读取么?

2021-02-21 Thread Haseo Chen
各位大佬

我看flink-connector-kudu的例子都是DataStream,但是我想用DataSet 进行点查。 看着提示好像不支持。 有什么办法处理么?

代码如下:


StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
StreamTableEnvironment tEnv2 = StreamTableEnvironment.create(env);

ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment oldBatchTableEnv = BatchTableEnvironment.create(batchEnv);


String KUDU_MASTERS="192.168.248.4:7051";
KuduCatalog catalog = new KuduCatalog(KUDU_MASTERS);
tEnv2.registerCatalog("kudu", catalog);
tEnv2.useCatalog("kudu");

oldBatchTableEnv.registerCatalog("kudu", catalog);
oldBatchTableEnv.useCatalog("kudu");

Table odlTable = oldBatchTableEnv.sqlQuery("select * from users");

DataSet dsRow = oldBatchTableEnv.toDataSet(odlTable, Row.class);
dsRow.print();


Table table = tEnv2.sqlQuery("select * from users");

tEnv2.toAppendStream(table, Row.class).print();

报错如下:
ERROR StatusLogger No Log4j 2 configuration file found. Using default 
configuration (logging only errors to the console), or user programmatically 
provided configurations. Set system property 'log4j2.debug' to show Log4j 2 
internal initialization logging. See 
https://logging.apache.org/log4j/2.x/manual/configuration.html for instructions 
on how to configure Log4j 2
Exception in thread "main" org.apache.flink.table.api.TableException: Only 
BatchTableSource and InputFormatTableSource are supported in 
BatchTableEnvironment.
at 
org.apache.flink.table.plan.nodes.dataset.BatchTableSourceScan.translateToPlan(BatchTableSourceScan.scala:116)
at 
org.apache.flink.table.api.internal.BatchTableEnvImpl.translate(BatchTableEnvImpl.scala:580)
at 
org.apache.flink.table.api.internal.BatchTableEnvImpl.translate(BatchTableEnvImpl.scala:555)
at 
org.apache.flink.table.api.internal.BatchTableEnvImpl.translate(BatchTableEnvImpl.scala:537)
at 
org.apache.flink.table.api.bridge.java.internal.BatchTableEnvironmentImpl.toDataSet(BatchTableEnvironmentImpl.scala:101)
at com.hujiang.bi.order.OrderMasterJob.main(OrderMasterJob.java:44)


看着应该是 flink-connector-kudu不支持batch读了。。。


Re: Configure operator based on key

2021-02-21 Thread yidan zhao
You can self-define it using keyedStream.window(GlobalWindows.create()
).trigger(self-defined-trigger).

Abhinav Sharma  于2021年2月21日周日 下午3:57写道:

> Hi,
>
> Is there some way that I can configure an operator based on the key in a
> stream?
> Eg: If the key is 'abcd', then create a window of size X counts, if the
> key is 'bfgh', then create a window of size Y counts.
>
> Is this scenario possible in flink
>
>


??????flinksql1.11????hive??ClassNotFoundException: org.apache.hadoop.fs.PathHandle

2021-02-21 Thread Presley
----
??: 
   "Presley"



flinksql1.11????hive??ClassNotFoundException: org.apache.hadoop.fs.PathHandle

2021-02-21 Thread Presley
flinksql1.11hiveidea??Caused by: 
java.lang.ClassNotFoundException: 
org.apache.hadoop.fs.PathHandle_(:??)_


flink1.11??org.apache.flink.table.catalog.hive.HiveCatalog#HiveCatalog(java.lang.String,
 java.lang.String, java.lang.String, 
java.lang.String)hadoopflinksql??ideaflinkhadoop??hadoopflinksql

Re: Flink standalone模式如何区分各个任务的日志?

2021-02-21 Thread xingoo
Hi, 

这样体验上还是不太友好,如果能做成spark那种每个Job独立记录日志就好了



--
Sent from: http://apache-flink.147419.n8.nabble.com/

SQL作业中使用python udfs语法检查报错,作业提交缺没问题

2021-02-21 Thread zilong xiao
报错内容:Python callback server start failed!


Re: Run the code in the UI

2021-02-21 Thread Tzu-Li (Gordon) Tai
Hi,

Could you re-elaborate what exactly you mean?

If you wish to run a Flink job within the IDE, but also have the web UI
running for it, you can use
`StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(Configuration)`
to create the execution environment.
The default port 8081 will be used unless specified via `rest.port` in the
configuration.

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: [Statefun] Dynamic behavior

2021-02-21 Thread Tzu-Li (Gordon) Tai
Hi,

FWIW, there is this JIRA that is tracking a pubsub / broadcast messaging
primitive in StateFun:
https://issues.apache.org/jira/browse/FLINK-16319

This is probably what you are looking for. And I do agree, in the case that
the control stream (which updates the application logic) is high volume,
redeploying functions may not work well.

I don't think there really is a "recommended" way of doing the "broadcast
control stream, join with main stream" pattern with StateFun at the moment,
at least without FLINK-16319.
On the other hand, it could be possible to use stateful functions to
implement a pub-sub model in user space for the time being. I've actually
left some ideas for implementing that in the comments of FLINK-16319.

Cheers,
Gordon


On Mon, Feb 22, 2021 at 6:38 AM Miguel Araújo  wrote:

> Hi everyone,
>
> What is the recommended way of achieving the equivalent of a broadcast in
> Flink when using Stateful Functions?
>
> For instance, assume we are implementing something similar to Flink's
> demo fraud detection
>  but
> in Stateful Functions - how can one dynamically update the application's
> logic then?
> There was a similar question in this mailing list in the past where it was 
> recommended
> moving the dynamic logic to a remote function
> 
>  so
> that one could achieve that by deploying a new container. I think that's
> not very realistic as updates might happen with a frequency that's not
> compatible with that approach (e.g., sticking to the fraud detection
> example, updating fraud detection rules every hour is not unusual), nor
> should one be deploying a new container when data (not code) changes.
>
> Is there a way of, for example, modifying FunctionProviders
> 
> on the fly?
>
> Thanks,
> Miguel
>


Re: 自定义partition,使用遇到问题,附代码

2021-02-21 Thread 冯嘉伟
Hi!

Optional.of(new customPartitioner())



Ye Chen wrote
> 各位好,我想实现自定义partition,继承了 FlinkKafkaPartitioner 后,使用会报错,简化的代码如下。
> //自定义partition
> public class customPartitioner extends FlinkKafkaPartitioner
> 
>  {
> @Override
> public int partition(String record, byte[] key, byte[] value, String
> targetTopic, int[] partitions) {
> return 0;
> }
> }
> 
> 
> DataStream
> 
>  stream = 。。。
> FlinkKafkaProducer
> 
>  myProducer = new FlinkKafkaProducer<>(
> "test_topic",
> new SimpleStringSchema(),
> properties,
> new customPartitioner()
> );
> stream.addSink(myProducer);
> 
> 
> //上面的代码,编辑器中编译FlinkKafkaProducer会报错,【Error:(55, 49) java:
> 无法推断org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer<>的类型参数】
> //去掉new
> customPartitioner(),不使用自定义partition,FlinkKafkaProducer就不报错,感觉是构造函数对应不上,但是查看构造函数源码有这个构造函数
> 
> 
> 
> 
> 查看FlinkKafkaProducer源码如下,我上面的写法有问题么?
> public FlinkKafkaProducer(
> String topicId,
> SerializationSchema
> 
>  serializationSchema,
> Properties producerConfig,
> OptionalFlinkKafkaPartitionerIN> customPartitioner) {
> this(
> topicId,
> serializationSchema,
> producerConfig,
> customPartitioner.orElse(null),
> Semantic.AT_LEAST_ONCE,
> DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
> }





--
Sent from: http://apache-flink.147419.n8.nabble.com/


??????Re:SqlValidatorException: No match found for function signature prod()

2021-02-21 Thread Presley
_(:??)_




----
??: 
   "user-zh"



??????flink??????????

2021-02-21 Thread liujian
??,,??,flinkjoin??,hbase,??




----
??: 
   "user-zh"



Re:Re:SqlValidatorException: No match found for function signature prod()

2021-02-21 Thread Ye Chen
应该是继承scalaFunction ?

















在 2021-02-22 10:25:31,"xiaoyue" <18242988...@163.com> 写道:
>捞一下自己,在线等大佬们的回复 _(:з」∠)_
>
>
>
>
>
>
>
>在 2021-02-20 13:14:18,"xiaoyue" <18242988...@163.com> 写道:
>
>我在使用flinksql1.11的udaf时出现SqlValidatorException: No match found for function 
>signature prod(),请求大佬帮忙看看_(:з」∠)_
>
>以下是代码:
>-
>...
>  stableEnv.createTemporarySystemFunction("prod", 
> ProductAggregateFunction.class);
>  Table resultTable = stableEnv.sqlQuery("select pf_id,prod(yldrate+1)-1 as 
> yldrate from queryData group by pf_id");
>...
>-
>@FunctionHint(
>input = @DataTypeHint("Double"),
>output = @DataTypeHint("Double")
>)
>public class ProductAggregateFunction extends AggregateFunctionProduct> {
>
>
>@Override
>public Double getValue(Product acc) {
>return acc.prod;
>}
>@Override
>public Product createAccumulator() {
>return new Product();
>}
>public void accumulate(Product acc, Double iValue) {
>acc.prod *= iValue;
>}
>public void retract(Product acc, Double iValue) {
>acc.prod /= iValue;
>}
>public void merge(Product acc, Iterable it) {
>for (Product p : it) {
>accumulate(acc, p.prod);
>}
>}
>public void resetAccumulator(Product acc) {
>acc.prod = 1D;
>}
>}
>
>
>
>
>
> 


Re:flink生成大宽表

2021-02-21 Thread Ye Chen
可以用多流join,但是数据延迟会导致join不上,可以侧输出处理下,看业务需求

















在 2021-02-22 11:05:46,"liujian" <13597820...@qq.com> 写道:
>Hi:
>  大家好,有3张实时的表,相互关联可以形成大宽表,如何一张都会更新,那么我该如何实现流处理,我目标表放到kudu上
> 
>  我的理解:
>   直接使用jdbc-connecter将三张表读取,然后join,再写入,会不会有什么问题


flink??????????

2021-02-21 Thread liujian
Hi:
  
??,??3??,??,,??,kudu??
 
  :
   
jdbc-connecter,join,??,

Re: 大佬们, keyby()两次, 然后再window(), 会有几个窗口?

2021-02-21 Thread yidan zhao
不对,看了你描述没看代码。你代码那么写的化是2个哈。因为你keyBy后做了flatMap,再keyBy就是另外一个了哈。

yidan zhao  于2021年2月22日周一 上午10:31写道:

> 只有最后一个keyBy有效。
>
> Hongyuan Ma  于2021年2月21日周日 下午10:59写道:
>
>> 大佬们, 如果keyby两次然后再调用window()的话是只根据最后一次keyby的键生成n个窗口,
>> 还是在前一次keyby的基础上生成m*n个窗口?
>>
>>
>> 像下面这样写, 最后的窗口是只按area划分的吗?
>> // 我想对不同车(id)的轨迹进行预测, 然后统计各个区域(area)内的轨迹信息
>> stream.keyby("id")
>> .flatmap() // 根据id 对轨迹进行预测, 在里面使用key state
>> .assignTime() // 修改轨迹eventTime为预测出的时间
>> .keyby("area")
>> .window() // 根据区域划分窗口
>> .process() // 统计各个区域内的轨迹
>>
>>


Re: 大佬们, keyby()两次, 然后再window(), 会有几个窗口?

2021-02-21 Thread yidan zhao
只有最后一个keyBy有效。

Hongyuan Ma  于2021年2月21日周日 下午10:59写道:

> 大佬们, 如果keyby两次然后再调用window()的话是只根据最后一次keyby的键生成n个窗口,
> 还是在前一次keyby的基础上生成m*n个窗口?
>
>
> 像下面这样写, 最后的窗口是只按area划分的吗?
> // 我想对不同车(id)的轨迹进行预测, 然后统计各个区域(area)内的轨迹信息
> stream.keyby("id")
> .flatmap() // 根据id 对轨迹进行预测, 在里面使用key state
> .assignTime() // 修改轨迹eventTime为预测出的时间
> .keyby("area")
> .window() // 根据区域划分窗口
> .process() // 统计各个区域内的轨迹
>
>


Re:SqlValidatorException: No match found for function signature prod()

2021-02-21 Thread xiaoyue
捞一下自己,在线等大佬们的回复 _(:з」∠)_







在 2021-02-20 13:14:18,"xiaoyue" <18242988...@163.com> 写道:

我在使用flinksql1.11的udaf时出现SqlValidatorException: No match found for function 
signature prod(),请求大佬帮忙看看_(:з」∠)_

以下是代码:
-
...
  stableEnv.createTemporarySystemFunction("prod", 
ProductAggregateFunction.class);
  Table resultTable = stableEnv.sqlQuery("select pf_id,prod(yldrate+1)-1 as 
yldrate from queryData group by pf_id");
...
-
@FunctionHint(
input = @DataTypeHint("Double"),
output = @DataTypeHint("Double")
)
public class ProductAggregateFunction extends AggregateFunction {


@Override
public Double getValue(Product acc) {
return acc.prod;
}
@Override
public Product createAccumulator() {
return new Product();
}
public void accumulate(Product acc, Double iValue) {
acc.prod *= iValue;
}
public void retract(Product acc, Double iValue) {
acc.prod /= iValue;
}
public void merge(Product acc, Iterable it) {
for (Product p : it) {
accumulate(acc, p.prod);
}
}
public void resetAccumulator(Product acc) {
acc.prod = 1D;
}
}





 

pyflink对Redis sink的支持

2021-02-21 Thread whh_960101
各位大佬,
请问pyflink现在有对Redis sink的支持吗,有没有demo可以参考,感谢!




 

pyflink对Redis sink的支持

2021-02-21 Thread whh_960101
各位大佬,
请问pyflink现在有对Redis sink的支持吗,有没有demo可以参考,感谢!

[Statefun] Dynamic behavior

2021-02-21 Thread Miguel Araújo
Hi everyone,

What is the recommended way of achieving the equivalent of a broadcast in
Flink when using Stateful Functions?

For instance, assume we are implementing something similar to Flink's demo
fraud detection
 but
in Stateful Functions - how can one dynamically update the application's
logic then?
There was a similar question in this mailing list in the past where it
was recommended
moving the dynamic logic to a remote function

so
that one could achieve that by deploying a new container. I think that's
not very realistic as updates might happen with a frequency that's not
compatible with that approach (e.g., sticking to the fraud detection
example, updating fraud detection rules every hour is not unusual), nor
should one be deploying a new container when data (not code) changes.

Is there a way of, for example, modifying FunctionProviders

on the fly?

Thanks,
Miguel


Union fields with time attributes have different types

2021-02-21 Thread Sebastián Magrí
I'm using a query like this

WITH aggs_1m AS (
  SELECT
`evt`,
`startts`
`endts`,
SUM(`value`) AS `value`
  FROM aggregates_per_minute
), aggs_3m AS (
  SELECT
`evt`,
TUMBLE_START(`endts`, INTERVAL '3' MINUTE) AS `startts`,
TUMBLE_END(`endts`, INTERVAL '3' MINUTE) AS `endts`,
SUM(`c`) AS `value`
  FROM aggregates_per_minute
  GROUP BY t, TUMBLE(`endts`, INTERVAL '3' MINUTE)
)
SELECT `evt`, `value`, `startts`, `endts`
FROM aggs_1m
UNION
SELECT `evt`, `value`, `startts`, `endts`
FROM aggs_3m

But it's throwing this exception

org.apache.flink.table.api.ValidationException: Union fields with time
attributes have different types.

Doesn't TUMBLE_START(somets, ...) return a TIMESTAMP of the same type?

-- 
Sebastián Ramírez Magrí


大佬们, keyby()两次, 然后再window(), 会有几个窗口?

2021-02-21 Thread Hongyuan Ma
大佬们, 如果keyby两次然后再调用window()的话是只根据最后一次keyby的键生成n个窗口, 还是在前一次keyby的基础上生成m*n个窗口?


像下面这样写, 最后的窗口是只按area划分的吗?
// 我想对不同车(id)的轨迹进行预测, 然后统计各个区域(area)内的轨迹信息
stream.keyby("id")
.flatmap() // 根据id 对轨迹进行预测, 在里面使用key state
.assignTime() // 修改轨迹eventTime为预测出的时间
.keyby("area")
.window() // 根据区域划分窗口
.process() // 统计各个区域内的轨迹



Datastream Lag Windowing function

2021-02-21 Thread s_penakalap...@yahoo.com
Hi All,
I am using Flink1.12, I am trying to read realtime data from Kafka topic and as 
per the requirement I need to implement windowing LAG function. Approach I 
followed is below:
DataStream vData = env.addSource(...)vData.keyBy(Id)
createTemperoryViewthen apply flink sql.
My sample data is like below, vTime field contains the timestamp when the even 
was generated and vNumSeq is the unique number for particular group Id.
I tried Lag function by ordering by vSeq field (long datatype), Job failed with 
"OVER windows' ordering in stream mode must be defined on a time attribute". 
I even tried by using vTime field (eventTS is also long datatype). I tried 
converting this field to sql.Timestamp, still no luck Job failed with above 
error.
When I referred few documents solution provided was to use proctime/rowtime. So 
I modified the query to use proctime() Job succeeded but with wrong results.
Kindly help with simple example badly stuck. I am ok to use even Datastream API 
to implement lag functionality.
Lag Query:select vdata.f0 as id, vdata.f1 as name, vdata.f2 as vTime, vdata.f3 
as vSeq, vdata.f4 as currentSal, LAG(vdata.f4,1,0) OVER ( partition BY vdata.f0 
ORDER BY proctime()) AS prevSal from VData vData 

Wrong output :

Expected:

Regards,Sunitha.