Re: FlinkSQL ES7连接器无法使用
这是个依赖问题,你检查下你环境中是否只使用sql connector 的jar,即 flink-sql-connector-elasticsearch7, 如果不是 datastream 作业是不需要 flink-connector-elasticsearch7 这个 jar包的。如果不是这个问题,你可以分析下你作业里使用的 es 相关依赖,可以参考异常栈确定类再去确定jar包,看下是不是多加了一些无用的jar。 祝好, Leonard > 在 2021年11月22日,12:30,mispower 写道: > > 你好,咨询一下后续你这个问题是如何解决的? > > > > > > > > > > > > At 2021-06-10 10:15:58, "mokaful" <649713...@qq.com> wrote: >> org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot >> instantiate user function. >> at >> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:338) >> ~[flink-dist_2.11-1.13.1.jar:1.13.1] >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperator(OperatorChain.java:653) >> ~[flink-dist_2.11-1.13.1.jar:1.13.1] >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:626) >> ~[flink-dist_2.11-1.13.1.jar:1.13.1] >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:566) >> ~[flink-dist_2.11-1.13.1.jar:1.13.1] >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:616) >> ~[flink-dist_2.11-1.13.1.jar:1.13.1] >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:566) >> ~[flink-dist_2.11-1.13.1.jar:1.13.1] >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:616) >> ~[flink-dist_2.11-1.13.1.jar:1.13.1] >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:566) >> ~[flink-dist_2.11-1.13.1.jar:1.13.1] >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:616) >> ~[flink-dist_2.11-1.13.1.jar:1.13.1] >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:566) >> ~[flink-dist_2.11-1.13.1.jar:1.13.1] >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:181) >> ~[flink-dist_2.11-1.13.1.jar:1.13.1] >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:548) >> ~[flink-dist_2.11-1.13.1.jar:1.13.1] >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647) >> ~[flink-dist_2.11-1.13.1.jar:1.13.1] >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:537) >> ~[flink-dist_2.11-1.13.1.jar:1.13.1] >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759) >> ~[flink-dist_2.11-1.13.1.jar:1.13.1] >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) >> ~[flink-dist_2.11-1.13.1.jar:1.13.1] >> at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_181] >> Caused by: java.io.InvalidClassException: >> org.apache.flink.streaming.connectors.elasticsearch.table.Elasticsearch7DynamicSink$AuthRestClientFactory; >> local class incompatible: stream classdesc serialVersionUID = >> -2564582543942331131, local class serialVersionUID = -2353232579685349916 >> at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699) >> ~[?:1.8.0_181] >> at >> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1885) >> ~[?:1.8.0_181] >> at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751) >> ~[?:1.8.0_181] >> at >> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2042) >> ~[?:1.8.0_181] >> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) >> ~[?:1.8.0_181] >> at >> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) >> ~[?:1.8.0_181] >> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) >> ~[?:1.8.0_181] >> at >> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) >> ~[?:1.8.0_181] >> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) >> ~[?:1.8.0_181] >> at >> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) >> ~[?:1.8.0_181] >> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) >> ~[?:1.8.0_181] >> at >> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) >> ~[?:1.8.0_181] >> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) >> ~[?:1.8.0_181] >> at >> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) >> ~[?:1.8.0_181] >> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) >> ~[?:1.8.0_181] >> at >> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) >> ~[?:1.8.0_181] >> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) >> ~[?:1.8.0_181] >> at >> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) >> ~[?:1.8.0_181] >>
Re: 如何实现event triggered window?
Hi Pinjie, 如果是需要 event triggered 的累計統計更新的話,可以考慮使用 SQL over aggregation [1]。例如文件中提供的如下範例,計算當前 row 往前一小時內的加總結果。 > SELECT order_id, order_time, amount, > SUM(amount) OVER ( > PARTITION BY product > ORDER BY order_time > RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW > ) AS one_hour_prod_amount_sumFROM Orders > > 但是這種作法只能根據收到的事件來觸發,無法根據處理時間。換句話說,如果 t=X 沒有數據進來的話,就不會有 t=(X-1) ~ X 的累計統計輸出。 考慮更複雜的情況需要結合事件和處理時間來觸發的話,需要透過 Process Function API 或者用 DataStream API 自定義 Trigger 的方式實現。 best regards, [1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/over-agg/ tison 於 2021年11月23日 週二 下午2:03寫道: > 如果你是想每时每刻(实际上开销很大,假设是每 1 分钟),那就用 Sliding Window > > Best, > tison. > > > tison 于2021年11月23日周二 下午2:00写道: > > > 你的理解里就是必须整点对齐嘛,那其实是可以加个 offset 不整点对齐的捏。 > > > > Best, > > tison. > > > > > > tison 于2021年11月23日周二 下午1:59写道: > > > >> > >> > https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sql/queries/window-tvf/ > >> > >> 你可以关注一下这里有个 offset 参数,这样就能用 tumbling window 了 =。= > >> > >> Best, > >> tison. > >> > >> > >> Pinjie Huang 于2021年11月23日周二 > 下午1:18写道: > >> > >>> Hi Yidan, > >>> > >>> Tumbling window 只有 > >>> t=0~1h > >>> t=1~2h > >>> 等等的window > >>> > >>> 我现在需要在 t=X 时刻,得到 t=(X-1) ~ X 的window 比如 > >>> t=1.5h 时刻 我需要 t=0.5~1.5h 这个window > >>> > >>> On Tue, Nov 23, 2021 at 12:32 PM yidan zhao > wrote: > >>> > >>> > 其实问题还是没描述清楚,所以遇到问题究竟是啥,没发现tumbling window不行。 > >>> > > >>> > zhiyuan su 于2021年11月22日周一 下午4:59写道: > >>> > > >>> > > 感觉你的场景是近实时累计统计,你可以参考下window TVF 看下是否满足 > >>> > > > >>> > > > >>> > > >>> > https://flink-learning.org.cn/article/detail/a8b0895d4271bf6b770927eea214612d?tab=SQL=1 > >>> > > 具体在第2章第一节 > >>> > > > >>> > > Pinjie Huang 于2021年11月22日周一 > >>> > 下午3:52写道: > >>> > > > >>> > > > Hi friends, > >>> > > > > >>> > > > Flink 自带的window 有tumlbing sliding 和 session 但是似乎没有event > triggerred。 > >>> > > > > >>> > > > 比如说 想知道过去1小时event A trigger的次数, > >>> > > > > >>> > > > 如果使用tumbling window和1h window > >>> > > > |1h | 1h | > >>> > > > t=0 > >>> > > > 在t=1.5h时刻,读取数据,是t=1h 时刻过去一小时的数据,而不是实时的。 > >>> > > > > >>> > > > 使用sliding window 的话需要define非常小的slide,而且依旧可能有延迟。 > >>> > > > > >>> > > > 如果想知道实时的数据,需要基于event来更新state,如何实现event triggered window? > >>> > > > > >>> > > > >>> > > >>> > >>> > >>> -- > >>> Thanks, > >>> Pinjie Huang > >>> > >> >
Re: 如何实现event triggered window?
如果你是想每时每刻(实际上开销很大,假设是每 1 分钟),那就用 Sliding Window Best, tison. tison 于2021年11月23日周二 下午2:00写道: > 你的理解里就是必须整点对齐嘛,那其实是可以加个 offset 不整点对齐的捏。 > > Best, > tison. > > > tison 于2021年11月23日周二 下午1:59写道: > >> >> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sql/queries/window-tvf/ >> >> 你可以关注一下这里有个 offset 参数,这样就能用 tumbling window 了 =。= >> >> Best, >> tison. >> >> >> Pinjie Huang 于2021年11月23日周二 下午1:18写道: >> >>> Hi Yidan, >>> >>> Tumbling window 只有 >>> t=0~1h >>> t=1~2h >>> 等等的window >>> >>> 我现在需要在 t=X 时刻,得到 t=(X-1) ~ X 的window 比如 >>> t=1.5h 时刻 我需要 t=0.5~1.5h 这个window >>> >>> On Tue, Nov 23, 2021 at 12:32 PM yidan zhao wrote: >>> >>> > 其实问题还是没描述清楚,所以遇到问题究竟是啥,没发现tumbling window不行。 >>> > >>> > zhiyuan su 于2021年11月22日周一 下午4:59写道: >>> > >>> > > 感觉你的场景是近实时累计统计,你可以参考下window TVF 看下是否满足 >>> > > >>> > > >>> > >>> https://flink-learning.org.cn/article/detail/a8b0895d4271bf6b770927eea214612d?tab=SQL=1 >>> > > 具体在第2章第一节 >>> > > >>> > > Pinjie Huang 于2021年11月22日周一 >>> > 下午3:52写道: >>> > > >>> > > > Hi friends, >>> > > > >>> > > > Flink 自带的window 有tumlbing sliding 和 session 但是似乎没有event triggerred。 >>> > > > >>> > > > 比如说 想知道过去1小时event A trigger的次数, >>> > > > >>> > > > 如果使用tumbling window和1h window >>> > > > |1h | 1h | >>> > > > t=0 >>> > > > 在t=1.5h时刻,读取数据,是t=1h 时刻过去一小时的数据,而不是实时的。 >>> > > > >>> > > > 使用sliding window 的话需要define非常小的slide,而且依旧可能有延迟。 >>> > > > >>> > > > 如果想知道实时的数据,需要基于event来更新state,如何实现event triggered window? >>> > > > >>> > > >>> > >>> >>> >>> -- >>> Thanks, >>> Pinjie Huang >>> >>
Re: 如何实现event triggered window?
你的理解里就是必须整点对齐嘛,那其实是可以加个 offset 不整点对齐的捏。 Best, tison. tison 于2021年11月23日周二 下午1:59写道: > > https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sql/queries/window-tvf/ > > 你可以关注一下这里有个 offset 参数,这样就能用 tumbling window 了 =。= > > Best, > tison. > > > Pinjie Huang 于2021年11月23日周二 下午1:18写道: > >> Hi Yidan, >> >> Tumbling window 只有 >> t=0~1h >> t=1~2h >> 等等的window >> >> 我现在需要在 t=X 时刻,得到 t=(X-1) ~ X 的window 比如 >> t=1.5h 时刻 我需要 t=0.5~1.5h 这个window >> >> On Tue, Nov 23, 2021 at 12:32 PM yidan zhao wrote: >> >> > 其实问题还是没描述清楚,所以遇到问题究竟是啥,没发现tumbling window不行。 >> > >> > zhiyuan su 于2021年11月22日周一 下午4:59写道: >> > >> > > 感觉你的场景是近实时累计统计,你可以参考下window TVF 看下是否满足 >> > > >> > > >> > >> https://flink-learning.org.cn/article/detail/a8b0895d4271bf6b770927eea214612d?tab=SQL=1 >> > > 具体在第2章第一节 >> > > >> > > Pinjie Huang 于2021年11月22日周一 >> > 下午3:52写道: >> > > >> > > > Hi friends, >> > > > >> > > > Flink 自带的window 有tumlbing sliding 和 session 但是似乎没有event triggerred。 >> > > > >> > > > 比如说 想知道过去1小时event A trigger的次数, >> > > > >> > > > 如果使用tumbling window和1h window >> > > > |1h | 1h | >> > > > t=0 >> > > > 在t=1.5h时刻,读取数据,是t=1h 时刻过去一小时的数据,而不是实时的。 >> > > > >> > > > 使用sliding window 的话需要define非常小的slide,而且依旧可能有延迟。 >> > > > >> > > > 如果想知道实时的数据,需要基于event来更新state,如何实现event triggered window? >> > > > >> > > >> > >> >> >> -- >> Thanks, >> Pinjie Huang >> >
Re: 如何实现event triggered window?
哦哦,懂了,那还有另一个问题。你什么时候需要知道呢? 是只有半小时时刻吗,还是随时随刻都可能,如果是随时随刻都希望能拿到过去1h,这个本身就没意义,比如1s分成1000ms,每个ms你都希望拿到过去1h的数据,只能按照sliding window做,而且这个性能消耗很高,取决于你究竟多久需要拿到一次。 如果你只是固定的不希望用0-1,1-2,而是需要0.5-1.5,1.5-2.5这样的话使用offset就可以实现。 Pinjie Huang 于2021年11月23日周二 下午1:18写道: > Hi Yidan, > > Tumbling window 只有 > t=0~1h > t=1~2h > 等等的window > > 我现在需要在 t=X 时刻,得到 t=(X-1) ~ X 的window 比如 > t=1.5h 时刻 我需要 t=0.5~1.5h 这个window > > On Tue, Nov 23, 2021 at 12:32 PM yidan zhao wrote: > > > 其实问题还是没描述清楚,所以遇到问题究竟是啥,没发现tumbling window不行。 > > > > zhiyuan su 于2021年11月22日周一 下午4:59写道: > > > > > 感觉你的场景是近实时累计统计,你可以参考下window TVF 看下是否满足 > > > > > > > > > https://flink-learning.org.cn/article/detail/a8b0895d4271bf6b770927eea214612d?tab=SQL=1 > > > 具体在第2章第一节 > > > > > > Pinjie Huang 于2021年11月22日周一 > > 下午3:52写道: > > > > > > > Hi friends, > > > > > > > > Flink 自带的window 有tumlbing sliding 和 session 但是似乎没有event triggerred。 > > > > > > > > 比如说 想知道过去1小时event A trigger的次数, > > > > > > > > 如果使用tumbling window和1h window > > > > |1h | 1h | > > > > t=0 > > > > 在t=1.5h时刻,读取数据,是t=1h 时刻过去一小时的数据,而不是实时的。 > > > > > > > > 使用sliding window 的话需要define非常小的slide,而且依旧可能有延迟。 > > > > > > > > 如果想知道实时的数据,需要基于event来更新state,如何实现event triggered window? > > > > > > > > > > > > -- > Thanks, > Pinjie Huang >
Re: 如何实现event triggered window?
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sql/queries/window-tvf/ 你可以关注一下这里有个 offset 参数,这样就能用 tumbling window 了 =。= Best, tison. Pinjie Huang 于2021年11月23日周二 下午1:18写道: > Hi Yidan, > > Tumbling window 只有 > t=0~1h > t=1~2h > 等等的window > > 我现在需要在 t=X 时刻,得到 t=(X-1) ~ X 的window 比如 > t=1.5h 时刻 我需要 t=0.5~1.5h 这个window > > On Tue, Nov 23, 2021 at 12:32 PM yidan zhao wrote: > > > 其实问题还是没描述清楚,所以遇到问题究竟是啥,没发现tumbling window不行。 > > > > zhiyuan su 于2021年11月22日周一 下午4:59写道: > > > > > 感觉你的场景是近实时累计统计,你可以参考下window TVF 看下是否满足 > > > > > > > > > https://flink-learning.org.cn/article/detail/a8b0895d4271bf6b770927eea214612d?tab=SQL=1 > > > 具体在第2章第一节 > > > > > > Pinjie Huang 于2021年11月22日周一 > > 下午3:52写道: > > > > > > > Hi friends, > > > > > > > > Flink 自带的window 有tumlbing sliding 和 session 但是似乎没有event triggerred。 > > > > > > > > 比如说 想知道过去1小时event A trigger的次数, > > > > > > > > 如果使用tumbling window和1h window > > > > |1h | 1h | > > > > t=0 > > > > 在t=1.5h时刻,读取数据,是t=1h 时刻过去一小时的数据,而不是实时的。 > > > > > > > > 使用sliding window 的话需要define非常小的slide,而且依旧可能有延迟。 > > > > > > > > 如果想知道实时的数据,需要基于event来更新state,如何实现event triggered window? > > > > > > > > > > > > -- > Thanks, > Pinjie Huang >
Re: 如何实现event triggered window?
Hi Yidan, Tumbling window 只有 t=0~1h t=1~2h 等等的window 我现在需要在 t=X 时刻,得到 t=(X-1) ~ X 的window 比如 t=1.5h 时刻 我需要 t=0.5~1.5h 这个window On Tue, Nov 23, 2021 at 12:32 PM yidan zhao wrote: > 其实问题还是没描述清楚,所以遇到问题究竟是啥,没发现tumbling window不行。 > > zhiyuan su 于2021年11月22日周一 下午4:59写道: > > > 感觉你的场景是近实时累计统计,你可以参考下window TVF 看下是否满足 > > > > > https://flink-learning.org.cn/article/detail/a8b0895d4271bf6b770927eea214612d?tab=SQL=1 > > 具体在第2章第一节 > > > > Pinjie Huang 于2021年11月22日周一 > 下午3:52写道: > > > > > Hi friends, > > > > > > Flink 自带的window 有tumlbing sliding 和 session 但是似乎没有event triggerred。 > > > > > > 比如说 想知道过去1小时event A trigger的次数, > > > > > > 如果使用tumbling window和1h window > > > |1h | 1h | > > > t=0 > > > 在t=1.5h时刻,读取数据,是t=1h 时刻过去一小时的数据,而不是实时的。 > > > > > > 使用sliding window 的话需要define非常小的slide,而且依旧可能有延迟。 > > > > > > 如果想知道实时的数据,需要基于event来更新state,如何实现event triggered window? > > > > > > -- Thanks, Pinjie Huang
Re: 如何实现event triggered window?
其实问题还是没描述清楚,所以遇到问题究竟是啥,没发现tumbling window不行。 zhiyuan su 于2021年11月22日周一 下午4:59写道: > 感觉你的场景是近实时累计统计,你可以参考下window TVF 看下是否满足 > > https://flink-learning.org.cn/article/detail/a8b0895d4271bf6b770927eea214612d?tab=SQL=1 > 具体在第2章第一节 > > Pinjie Huang 于2021年11月22日周一 下午3:52写道: > > > Hi friends, > > > > Flink 自带的window 有tumlbing sliding 和 session 但是似乎没有event triggerred。 > > > > 比如说 想知道过去1小时event A trigger的次数, > > > > 如果使用tumbling window和1h window > > |1h | 1h | > > t=0 > > 在t=1.5h时刻,读取数据,是t=1h 时刻过去一小时的数据,而不是实时的。 > > > > 使用sliding window 的话需要define非常小的slide,而且依旧可能有延迟。 > > > > 如果想知道实时的数据,需要基于event来更新state,如何实现event triggered window? > > >
flink1.12 ??????????????state.checkpoints.num-retained????????????
??flink1.12??flink-conf.yaml??state.checkpoints.num-retained: 3checkpoint??1checkpoint??on yarn
Re: jdk11创建hive catalog抛错
Hi! 这是从 hive 里产生的错误。据我所知,hive 对 Java 11 的支持仍在建设中 [1],因此还是建议使用 Java 8。 [1] https://issues.apache.org/jira/browse/HIVE-22415 aiden <18765295...@163.com> 于2021年11月22日周一 下午12:00写道: > 求助,jdk从8升级到11后使用hive作为flink > table的catalog抛错,排查是bsTableEnv.registerCatalog(catalogName, catalog) > 抛错,具体异常为: > 11:55:22.343 [main] ERROR hive.log - Got exception: > java.lang.ClassCastException class [Ljava.lang.Object; cannot be cast to > class [Ljava.net.URI; ([Ljava.lang.Object; and [Ljava.net.URI; are in > module java.base of loader 'bootstrap') > java.lang.ClassCastException: class [Ljava.lang.Object; cannot be cast to > class [Ljava.net.URI; ([Ljava.lang.Object; and [Ljava.net.URI; are in > module java.base of loader 'bootstrap') > at > org.apache.hadoop.hive.metastore.HiveMetaStoreClient.(HiveMetaStoreClient.java:274) > [hive-exec-2.1.1.jar:2.1.1] > at > org.apache.hadoop.hive.metastore.HiveMetaStoreClient.(HiveMetaStoreClient.java:210) > [hive-exec-2.1.1.jar:2.1.1] > at > java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native > Method) ~[?:?] > at > java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > [?:?] > at > java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > [?:?] > at > java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490) > [?:?] > at > org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1652) > [hive-exec-2.1.1.jar:2.1.1] > at > org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.(RetryingMetaStoreClient.java:80) > [hive-exec-2.1.1.jar:2.1.1] > at > org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:130) > [hive-exec-2.1.1.jar:2.1.1] > at > org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:115) > [hive-exec-2.1.1.jar:2.1.1] > at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) ~[?:?] > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > ~[?:?] > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > ~[?:?] > at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[?:?] > at > org.apache.flink.table.catalog.hive.client.HiveShimV200.getHiveMetastoreClient(HiveShimV200.java:54) > [flink-connector-hive_2.11-1.14.0.jar:1.14.0] > at > org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper.createMetastoreClient(HiveMetastoreClientWrapper.java:277) > [flink-connector-hive_2.11-1.14.0.jar:1.14.0] > at > org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper.(HiveMetastoreClientWrapper.java:78) > [flink-connector-hive_2.11-1.14.0.jar:1.14.0] > at > org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper.(HiveMetastoreClientWrapper.java:68) > [flink-connector-hive_2.11-1.14.0.jar:1.14.0] > at > org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory.create(HiveMetastoreClientFactory.java:32) > [flink-connector-hive_2.11-1.14.0.jar:1.14.0] > at > org.apache.flink.table.catalog.hive.HiveCatalog.open(HiveCatalog.java:296) > [flink-connector-hive_2.11-1.14.0.jar:1.14.0] > at > org.apache.flink.table.catalog.CatalogManager.registerCatalog(CatalogManager.java:195) > [flink-table-api-java-1.14.0.jar:1.14.0] > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.registerCatalog(TableEnvironmentImpl.java:373) > [flink-table-api-java-1.14.0.jar:1.14.0] > at catalogTest.FlinkExecTableRun.flinkMain(FlinkExecTableRun.java:27) > [classes/:?] > at catalogTest.test.main(test.java:11) [classes/:?] > 11:55:22.348 [main] ERROR hive.log - Converting exception to MetaException > Exception in thread "main" > org.apache.flink.table.catalog.exceptions.CatalogException: Failed to > create Hive Metastore client > at > org.apache.flink.table.catalog.hive.client.HiveShimV200.getHiveMetastoreClient(HiveShimV200.java:61) > at > org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper.createMetastoreClient(HiveMetastoreClientWrapper.java:277) > at > org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper.(HiveMetastoreClientWrapper.java:78) > at > org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper.(HiveMetastoreClientWrapper.java:68) > at > org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory.create(HiveMetastoreClientFactory.java:32) > at > org.apache.flink.table.catalog.hive.HiveCatalog.open(HiveCatalog.java:296) > at > org.apache.flink.table.catalog.CatalogManager.registerCatalog(CatalogManager.java:195) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.registerCatalog(TableEnvironmentImpl.java:373) > at catalogTest.FlinkExecTableRun.flinkMain(FlinkExecTableRun.java:27) > at catalogTest.test.main(test.java:11) > Caused by: java.lang.reflect.InvocationTargetException
关于flink plugins目录不生效的疑问
Hi, 环境:flink-1.14.0,单节点standalone https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/filesystems/s3/ 参考官方文档,执行下面命令: mkdir plugins/s3-fs-hadoop cp opt/flink-s3-fs-hadoop-1.14.0.jar plugins/s3-fs-hadoop/ 在flink-conf中配置了hadoop的路径(s3使用了hadoop的配置文件) env.hadoop.conf.dir: /data/hadoop/s3 然后启动集群,成功启动后,执行./bin/sql-client.sh,用SQL测试读取s3数据, 出现连接超时报错 但是把flink-s3-fs-hadoop-1.14.0.jar挪到flink的lib下,重启集群,重新执行同样的测试,就可以读取到数据了,其余的配置都没有修改 所以感觉这个plugins目录没有生效?这个plugins和lib目录的区别在哪里,应该如何使用?? 附上报错信息: [ERROR] Could not execute SQL statement. Reason: org.apache.flink.util.SerializedThrowable: connect timed out Caused by: com.amazonaws.SdkClientException: Failed to connect to service endpoint: at com.amazonaws.internal.EC2ResourceFetcher.doReadResource(EC2ResourceFetcher.java:100) at com.amazonaws.internal.EC2ResourceFetcher.doReadResource(EC2ResourceFetcher.java:70) at com.amazonaws.internal.InstanceMetadataServiceResourceFetcher.readResource(InstanceMetadataServiceResourceFetcher.java:75) at com.amazonaws.internal.EC2ResourceFetcher.readResource(EC2ResourceFetcher.java:66) at com.amazonaws.auth.InstanceMetadataServiceCredentialsFetcher.getCredentialsEndpoint(InstanceMetadataServiceCredentialsFetcher.java:58) at com.amazonaws.auth.InstanceMetadataServiceCredentialsFetcher.getCredentialsResponse(InstanceMetadataServiceCredentialsFetcher.java:46) at com.amazonaws.auth.BaseCredentialsFetcher.fetchCredentials(BaseCredentialsFetcher.java:112) at com.amazonaws.auth.BaseCredentialsFetcher.getCredentials(BaseCredentialsFetcher.java:68) at com.amazonaws.auth.InstanceProfileCredentialsProvider.getCredentials(InstanceProfileCredentialsProvider.java:165) at org.apache.hadoop.fs.s3a.AWSCredentialProviderList.getCredentials(AWSCredentialProviderList.java:137) ... 49 more Caused by: java.net.SocketTimeoutException: connect timed out at java.net.PlainSocketImpl.socketConnect(Native Method) at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350) at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) at java.net.Socket.connect(Socket.java:607) at sun.net.NetworkClient.doConnect(NetworkClient.java:175) at sun.net.www.http.HttpClient.openServer(HttpClient.java:463) at sun.net.www.http.HttpClient.openServer(HttpClient.java:558) at sun.net.www.http.HttpClient.(HttpClient.java:242) at sun.net.www.http.HttpClient.New(HttpClient.java:339) at sun.net.www.http.HttpClient.New(HttpClient.java:357) at sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1226) at sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1205) at sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1056) at sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:990) at com.amazonaws.internal.ConnectionUtils.connectToEndpoint(ConnectionUtils.java:52) at com.amazonaws.internal.EC2ResourceFetcher.doReadResource(EC2ResourceFetcher.java:80) ... 58 more
flink1.12 请教下如何配置多hadoop参数,s3使用问题
hi, 环境: 1. flink-1.12,版本可以升级 2. flink-conf中配置了env.hadoop.conf.dir,路径下有hdfs集群的core-site.xml和hdfs-site.xml, state.backend保存在该HDFS上 3. flink的部署模式是K8S+session 需求: 需要从一个s3协议的分布式文件系统中读取文件,处理完写到mysql中 问题: s3配置采用hadoop的配置方式,保存为一个新的core-site.xml文件,参考的 https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#S3A 按照官网说明文档中,需要 修改hadoop的环境变量,但是就和以前的core-site.xml冲突了,无法同时配置2个hadoop路径 https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/filesystems/s3/ 或者 在flink-conf.yaml中添加一堆s3配置,这样又写死了,再新增一个s3集群的时候如何处理? 所以请教下如何解决这类问题(可以修改代码)?如何配置多个hadoop配置(比如从第一个文件系统(s3协议)读数据,写到第二个文件系统中(s3协议))?
?????? ??????????????downloads/setup-pyflink-virtual-env.sh????
??venv.zip??1.14.0?? ---- ??: "user-zh" https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/python/python_config/#python-client-executable ??1.14.0??-pyclientexec venv.zip/venv/bin/python On Fri, Nov 19, 2021 at 10:48 AM Asahi Lee <978466...@qq.com.invalidgt; wrote: gt; ??source my_env/bin/activate??PYFLINK_CLIENT_EXECUTABLE?? gt; jobmanagerNo module named pyflinkjobmanageryarn gt; ?? gt; gt; gt; amp;gt; LogType:jobmanager.out gt; amp;gt; Log Upload Time:?? ?? 18 20:48:45 +0800 2021 gt; amp;gt; LogLength:37 gt; amp;gt; Log Contents: gt; amp;gt; /bin/python: No module named pyflink gt; gt; gt; gt; gt; --amp;nbsp;amp;nbsp;-- gt; ??: gt;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp; "user-zh" gt;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp; < gt; dian0511...@gmail.comamp;gt;; gt; :amp;nbsp;2021??11??19??(??) 9:38 gt; ??:amp;nbsp;"user-zh"https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/python/dependency_management/#python-interpreter-of-client gt
询问下Flink分布式缓存的问题
Flink分布式缓存怎么没有效果呢,这里不好贴代码,可以看这个链接下 https://issues.apache.org/jira/browse/FLINK-24973 麻烦解答下,是我用的不对还是理解不对.
Re: 回复:flink1.13.1 sql client connect hivecatalog 报错
感谢,我已经解决了。 更换了jdk 版本,重新替换了插件包 RS 于2021年11月22日周一 下午1:44写道: > 图片看不到的,尽量不要发图片,你可以复制文字出来并说明下, > > > > > > > > > > > > > > > > > 在 2021-11-22 13:14:13,"zhiyuan su" 写道: > > 我使用的是上面的jar 包。从1.13的文档处获取的,但维标注flink 版本,我理解应该是flink1.13版本编译的。 > > > > 这个是yaml文件,我直接在sql 客户端,通过DDL 的方式去编写的话,也是如下报错: > Caused by: java.util.ServiceConfigurationError: > org.apache.flink.table.factories.Factory: > org.apache.flink.table.module.hive.HiveModuleFactory not a subtype >
Re: 如何实现event triggered window?
感觉你的场景是近实时累计统计,你可以参考下window TVF 看下是否满足 https://flink-learning.org.cn/article/detail/a8b0895d4271bf6b770927eea214612d?tab=SQL=1 具体在第2章第一节 Pinjie Huang 于2021年11月22日周一 下午3:52写道: > Hi friends, > > Flink 自带的window 有tumlbing sliding 和 session 但是似乎没有event triggerred。 > > 比如说 想知道过去1小时event A trigger的次数, > > 如果使用tumbling window和1h window > |1h | 1h | > t=0 > 在t=1.5h时刻,读取数据,是t=1h 时刻过去一小时的数据,而不是实时的。 > > 使用sliding window 的话需要define非常小的slide,而且依旧可能有延迟。 > > 如果想知道实时的数据,需要基于event来更新state,如何实现event triggered window? >