Re: flink 1.16 kafka 流和自定义流collect后,watermark 消失

2022-11-14 文章 Tony Wei
Hi Peihui, 確認下你想調用的方法是不是 connect?因為看起來 stream1.collect(stream2) 不像是 DataStream 支援的 API 如果是的話,想請問你 ConfigSource() 有沒有配置 WatermarkStrategy?connect 後的算子是透過上游兩個算子的 watermark 取最小作為輸出。 因此,如果只定義其中一邊的 WatermarkStrategy 會導致這個算子的 watermark 無法推進。 詳細可以參考這個章節

Re: Re: flink1.14 注册mysql connector报错

2022-02-24 文章 Tony Wei
; dmo_index_code string, " + > " index_value string" + > ") with (" + > " 'connector' = 'jdbc', " + > " 'username' = 'root', " + > " 'password' = 'x

Re: flink1.14 注册mysql connector报错

2022-02-24 文章 Tony Wei
Hi xiaoyue, 請問你的測試代碼有其他額外的代碼或配置嗎?能否分享下完整的代碼和配置文件? 我嘗試在我的 IDE 上運行了下列代碼是能成功運行的。 public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings Settings =

Re: flink 不触发checkpoint

2022-02-20 文章 Tony Wei
Hi, 有考慮升級 1.14 嗎?Flink 1.14 支持了 FLIP-147,讓 Flink 在 task 為 finished 狀態時仍能觸發 checkpoint [1, 2]。 [1] https://flink.apache.org/news/2021/09/29/release-1.14.0.html#checkpointing-and-bounded-streams [2]

Re: flink sql lookup join中维表不可以是视图吗?

2021-12-01 文章 Tony Wei
Hi, 如果兩次 left join 的話是否滿足你的需求呢? 然後在取 temporal table 的字段時,用 IF 去判斷取值。參考 SQL 如下 SELECT c.mer_cust_id, *IF(k.mer_cust_id IS NOT NULL AND a.mercust_id IS NOT NULL AND k.mer_cust_id <> '', k.update_time, NULL) AS update_time* FROM charge_log as c LEFT JOIN ka_mer_info FOR SYSTEM_TIME AS OF

Re: FlinkSql回撤流

2021-11-24 文章 Tony Wei
AST_VALUE(num) as num, LAST_VALUE(flag) as flag > from tmpTable, > group by user, ord > ) t1 > group by user best regards, Tony Wei 於 2021年11月25日 週四 上午11:01寫道: > Hi, > > 對於這個 sql 的回撤流,我感覺沒什麼問題。原因是如果在 batch 執行模式下,首三行的輸出結果應該為: > > +--+---+ >

Re: FlinkSql回撤流

2021-11-24 文章 Tony Wei
Hi, 對於這個 sql 的回撤流,我感覺沒什麼問題。原因是如果在 batch 執行模式下,首三行的輸出結果應該為: +--+---+ | user | num | +--+---+ | b | 20| +--+---+ 因為 user a 被 `where flag = 1` 過濾掉了,所以實際下游應該是撤回了 insert 的操作才對,而不是更新成 num = 0。 或許可以考慮把 sql 寫法改為這樣試試? select user, sum(num) as num > from ( > select

Re: 如何实现event triggered window?

2021-11-22 文章 Tony Wei
Hi Pinjie, 如果是需要 event triggered 的累計統計更新的話,可以考慮使用 SQL over aggregation [1]。例如文件中提供的如下範例,計算當前 row 往前一小時內的加總結果。 > SELECT order_id, order_time, amount, > SUM(amount) OVER ( > PARTITION BY product > ORDER BY order_time > RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW > )

Re: 关于FlinkSQL从kafka读取数据写到hive的一些问题

2021-11-01 文章 Tony Wei
Hi yidan, 你可以試試 SQL Hints [1]. [1] https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/sql/queries/hints/ yidan zhao 於 2021年11月2日 週二 下午1:03寫道: > 嗯嗯,hive catalog的确不需要重新建表,但是我的场景是:我需要通过 flinkSQL 流式将 kafka 表数据写入 hive 表。 >

Re: Flink Stream + StreamTableEnvironment 结合使用时checkpoint异常问题

2021-09-12 文章 Tony Wei
Hi 從代碼上來看是使用了 regular join 關聯了 kafka source 和 hbase source,hbase connector 目前是不支持流式數據源的 你可以從任務儀表板上確認下提交的任務,hbase source 的部分應該在執行一段時間後狀態會變更為 FINISHED,目前 flink checkpoint 還不支持在 FINISHED task 上執行 你可以考慮改寫 sql 使用 processing time temporal join [1] 的方式來關聯 hbase table,從 kafka 消費的數據會實時的去查 hbase table

Re: 请问同一个flink history server能够支持多个flink application cluster吗?

2021-08-20 文章 Tony Wei
Hi Chenyu, 這確實是目前尚未解決的一個問題,相關的 jira issue 可以看這 [1]。 jira issue 底下的討論串有提到一個替代方案是:使用 -D\$internal.pipeline.job-id=$(cat /proc/sys/kernel/random/uuid|tr -d "-") 主動為 application 模式的任務產生隨機的 jobid。 但由於此配置參數屬於 flink 內部參數,可能不保證未來任何改動後的向後兼容性,請謹慎考慮後再使用。 [1]

Re: sql hints 在寫入透過 hive 創建的表時沒有作用

2021-08-04 文章 Tony Wei
Hi Caizhi, 我測試了 sink.rolling-policy.rollover-interval 這個配置,並且改使用 csv hive table 作為 sink table,結果是符合預期的。再次謝謝你的幫忙。 Tony Wei 於 2021年8月5日 週四 上午10:40寫道: > Hi, > > 感謝指正,我的目的是為了測試 sql hints 是否生效,選擇 `sink.parallelism` 是 > 單純因為這個配置比較好觀察結果。 > 我會再嘗試其他 hive streaming sink 的配置測試看看。謝謝。 >

Re: sql hints 在寫入透過 hive 創建的表時沒有作用

2021-08-04 文章 Tony Wei
Hi, 感謝指正,我的目的是為了測試 sql hints 是否生效,選擇 `sink.parallelism` 是 單純因為這個配置比較好觀察結果。 我會再嘗試其他 hive streaming sink 的配置測試看看。謝謝。 Caizhi Weng 於 2021年8月5日 週四 上午10:36寫道: > Hi! > > 单独设置 sink 并发是 1.13 的新特性。可以升级到 1.13 试试看。 > > Tony Wei 于2021年8月5日周四 上午10:21写道: > > > Hi Experts, > >

sql hints 在寫入透過 hive 創建的表時沒有作用

2021-08-04 文章 Tony Wei
Hi Experts, 我嘗試使用 sql hints 去覆寫 hive sink table 的配置,發現 sql hints 沒有生效。 我測試了修改 `sink.parallelism` 來調整寫入的並行度,但並行度仍舊為 `1`。(sql 任務配置的並行度) 我運行的環境版本是 Flink 1.12.2,底下是我在 flink sql client 的操作流程和結果的截圖。 寫入的 `user_hive_2` 表是透過 hive beeline 創建的,不是透過 flink sql ddl。 同時我也確認了

Re: 回复:回复:场景题:Flink SQL 不支持 INSERT INTO… ON DUPLICATE KEY UPDATE ?

2021-08-02 文章 Tony Wei
你好, 如果我沒有理解錯你的應用場景的話,你想達成的結果應該是類似這篇討論 [1] 裡提到的問題對吧? 從最新的 flink 文檔 [2] 中來看應該無法透過你期望的 on duplicate key 語句來實現, 或許可以嘗試在 SELECT 語句上達成,舉例來說你可以在原有的 select 語句之外多添加 group by,如下: insert into t select a, last_value(b ignore nulls) as b, last_value(c > ignore nulls) as c from $(original_select_statement)

Re: prometheus metric中如何设置label

2021-05-11 文章 Tony Wei
Hi, 1、是否是这种方式增加label > 是的,MetricGroup#addGroup(key, value) 的設計其中一個目的就是為了支援 prometheus 的 label 。 > 2、由于采用了add group的方式,导致exp对应的值里面的 ‘.’ 变成了下划线,是否有办法保持为'.' 可以透過配置 filterLabelValueCharacters: false 來關閉過濾功能 [1],但使用者需要自行確保不會有非法字元混入 label value 之中。詳細可參考文檔說明。 [1]

Re: Flink DataStream KeyedStream 与 AggregateFunction

2019-11-11 文章 Tony Wei
會是 merge function 去把三者合併。 Best Tony Wei Px New <15701181132mr@gmail.com> 於 2019年11月10日 週日 上午10:58寫道: > [image: image.png]建议深入解下 keyWindow,NoKeyWindow 与Assigner TimeWindow > And WindowsFunction  > > Yuan,Youjun 于2019年11月9日周六 下午7:46写道: > >> 1, 是 >> 2,没有标准答案,

如何讓兩個 SQL 使用相同的 KafkaTableSource

2019-08-08 文章 Tony Wei
kafka operator 。先謝謝各位的幫助。 Best, Tony Wei

請問在 Flink SQL 上能不能指定 WHERE 裡的判斷式的執行順序?

2019-07-26 文章 Tony Wei
and UDF_NEED_TO_QUERY_DB(user) 謝謝大家的幫忙。 Best regards, Tony Wei

Re: 關於如何在流數據上計算 Top K 的應用問題

2019-07-11 文章 Tony Wei
,雖然沒有很有把握但或許可以根據你的想法實現一個專門針對我們應用情境的優化。 撇開上述特殊的情況,我另外好奇的是第一點中維護的 map state 要記錄精確的排名這件事的細 節,想知道如果更新是循序變化的,如果添加了一筆新的紀錄,可能會導致多個紀錄的排名需要 加一或是減一,這部分是不是也需要遍歷整個 map 去判斷是否有增減,針對變動的部分通知下 游? Best Regards, Tony Wei Caizhi Weng 於 2019年7月12日 週五 上午11:36寫道: > Hi Tony! > > 其实 Flink 对 Top-N 问题并没有很