JDBC connector support for JSON

2021-03-30 Thread Fanbin Bu
Hi, For a streaming job that uses Kafka connector, this doc https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/formats/json.html#format-options shows that we can parse json data format. However, it does not seem like Flink JDBC connector support json data type, at least

Re: latency monitoring

2020-12-10 Thread Fanbin Bu
timestamp and report the latency. > > > > Julian > > > > *From: *Fanbin Bu > *Date: *Thursday, December 10, 2020 at 3:41 PM > *To: *user > *Subject: *latency monitoring > > > > Hi, > > > > I would like to monitor the pipeline latenc

latency monitoring

2020-12-10 Thread Fanbin Bu
Hi, I would like to monitor the pipeline latency measured by timestamp when writing the output to sink - timestamp when ingested from the source. Now I'm able to get the timestamp writing to sink since the sink is implementing a RichSinkFunction and therefore I can report gauge there [1]. But I

Re: user defined metrics showed in Flink UI but not datadog

2020-12-08 Thread Fanbin Bu
'll file > an issue to fix that. > > On 12/8/2020 4:42 AM, Fanbin Bu wrote: > > Hi, > > I followed [1] to define my own metric as: > > val dropwizardHistogram = new com.codahale.metrics.Histogram(new > SlidingWindowReservoir(500)) > histogram = getRuntimeContext >

user defined metrics showed in Flink UI but not datadog

2020-12-07 Thread Fanbin Bu
Hi, I followed [1] to define my own metric as: val dropwizardHistogram = new com.codahale.metrics.Histogram(new SlidingWindowReservoir(500)) histogram = getRuntimeContext .getMetricGroup .histogram("*feature_latency*", new DropwizardHistogramWrapper(dropwizardHistogram)) and it is

Re: How to set keystore.jks location on EMR when reading Kafka topics via SSL

2020-11-18 Thread Fanbin Bu
i have to put the keystore file to the nodes. On Wed, Nov 18, 2020 at 4:29 PM Fanbin Bu wrote: > Hi, > > This is a repost with modified subject per Sri Tummala's suggestion. > > I'm running Flink 1.11 on EMR and would like to read Kafka via SSL. I > tried to put keystore.jks l

How to set keystore.jks location on EMR when reading Kafka topics via SSL

2020-11-18 Thread Fanbin Bu
Hi, This is a repost with modified subject per Sri Tummala's suggestion. I'm running Flink 1.11 on EMR and would like to read Kafka via SSL. I tried to put keystore.jks location under /usr/lib/flink/... like: export

Re: keystore location on EMR

2020-11-17 Thread Fanbin Bu
(SslEngineBuilder.java:285) ... 23 more On Tue, Nov 17, 2020 at 10:01 PM Fanbin Bu wrote: > let me try to put it on s3 and change code like: > 'properties.ssl.keystore.location'='s3://my-bucket/keystore.jks > > Thanks, > Fanbin > > On Tue, Nov 17, 2020 at 6:43 PM sri hari kali charan

keystore location on EMR

2020-11-17 Thread Fanbin Bu
Hi, I'm running Flink 1.11 on EMR and would like to read Kafka via SSL. I tried to put keystore.jks location under /usr/lib/flink/... like: export SSL_KEYSTORE_LOCATION=/usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks Notice that this is on EMR master(master) node.

Re: Flink 1.11.2 could not create kafka table source on EMR.

2020-11-17 Thread Fanbin Bu
gt; > Please verify that: > 1. kafka-connector is indeed in the fat jar (e.g. by "jar vtf > your-program.jar | grep KafkaDynamicTableFactory") > 2. kafka-connector version matches the version of Flink distribution on > EMR. > > Regards, > Roman > > > On

Flink 1.11.2 could not create kafka table source on EMR.

2020-11-16 Thread Fanbin Bu
Hi, I could not launch my flink 1.11.2 application on EMR with exception Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the classpath. I attached the

Re: timestamp parsing in create table statement

2020-11-10 Thread Fanbin Bu
k/flink-docs-master/dev/table/sql/create.html > > Best, > Jark > > On Wed, 11 Nov 2020 at 11:11, Fanbin Bu wrote: > >> Jark, >> >> Thanks for the quick response. >> I tried to_timestamp(ts, ...), but got the following error: >> >> Exception in thread

Re: timestamp parsing in create table statement

2020-11-10 Thread Fanbin Bu
mp? > > the framework will validate the return type of the computed column > expression. >Currently, it must be a type of TIMESTAMP(3). > > Q2: is it possible to reuse ts without introducing a new column? > > Currently, it is not supported. This requires to support "TIMESTAMP WI

Re: timestamp parsing in create table statement

2020-11-10 Thread Fanbin Bu
. ``` could you please provide a concrete example for this? Thanks Fanbin [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/create.html#create-table On Tue, Nov 10, 2020 at 6:18 PM Fanbin Bu wrote: > i also tried: > ts TIMESTAMP WITH LOCAL TIME ZONE > > b

Re: timestamp parsing in create table statement

2020-11-10 Thread Fanbin Bu
i also tried: ts TIMESTAMP WITH LOCAL TIME ZONE but it failed with Rowtime attribute 'ts' must be of type TIMESTAMP but is of type 'TIMESTAMP(6) WITH LOCAL TIME ZONE'. On Tue, Nov 10, 2020 at 5:42 PM Fanbin Bu wrote: > Hi, > > I have source json data like: > {"ts": "

timestamp parsing in create table statement

2020-11-10 Thread Fanbin Bu
Hi, I have source json data like: {"ts": "2020-11-09T20:26:10.368123Z", "user_id": "user1", "action": "click"} ... my sql is: create table t ( user_id string, action string, ts timestamp, watermark for ts as ts - interval '5' second ) with ( 'connector' = 'kafka', 'topic' = 'test',

datadog failed to send report

2020-06-23 Thread Fanbin Bu
Hi, Does any have any idea on the following error msg: (it flooded my task manager log) I do have datadog metrics present so this is probably only happens for some metrics. 2020-06-24 03:27:15,362 WARN org.apache.flink.metrics.datadog.DatadogHttpClient- Failed sending request to

Re: two phase aggregation

2020-06-22 Thread Fanbin Bu
Jark, thanks for the reply. Do you know whether it's on the roadmap or what's the plan? On Mon, Jun 22, 2020 at 9:36 PM Jark Wu wrote: > Hi Fanbin, > > Currently, over window aggregation doesn't support two-phase optimization. > > Best, > Jark > > On Tue, 23 Jun 2020 at

two phase aggregation

2020-06-22 Thread Fanbin Bu
Hi, Does over window aggregation support two-phase mode? https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/config.html#table-optimizer-agg-phase-strategy SELECT user_id , event_time , listagg(event_type, '*') over w as names FROM table WINDOW w AS ( PARTITION BY user_id

Re: flink 1.9 conflict jackson version

2020-04-07 Thread Fanbin Bu
r owner jar? >> >> ouywl >> ou...@139.com >> >> <https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1=ouywl=ouywl%40139.com=https%3A%2F%2Fmail-online.nosdn.127.net%2Fsma8dc7719018ba2517da7111b3db5a170.jpg=%5B%22ouywl%40139.com%22%5D> >> 签名

watermark not advancing when reading kinesis historical data

2020-04-05 Thread Fanbin Bu
Hi, i've been debugging this issue for several days now and still cant get it to work. I need to read the kinesis historical data (7 days) using Flink SQL. Here is my setup: Flink version: 1.9.1 kinesis shard number: 32 Flink parallelism: 32 sql: select * from mytable (i purposely make this

some subtask taking too long

2020-03-30 Thread Fanbin Bu
Hi, I m running flink 1.9 on EMR using flink sql blink planner reading and writing to JDBC input/output. my sql is just a listagg over window for the last 7 days. However, i notice that there are one or two subtasks that take too long to finish. In this thread

savepoint - checkpoint - directory

2020-03-25 Thread Fanbin Bu
Hi, For savepoint, the dir looks like s3://bucket/savepoint-jobid/* To resume, i do: flink run -s s3://bucket/savepoint-jobid/ perfect! For checkpoint, the dir looks like s3://bucket/jobid/chk-100 s3://bucket/jobid/shared. <-- what is this for? To resume, which one should i do: flink run -s

Re: datadog metrics

2020-03-16 Thread Fanbin Bu
et > on this as well. > > - Steve > > On Wed, Mar 11, 2020 at 5:13 AM Chesnay Schepler > wrote: > >> Please open a JIRA; we may have to split the datatog report into several >> chunks. >> >> On 09/03/2020 07:47, Fanbin Bu wrote: >> >> quote from the

Re: datadog metrics

2020-03-09 Thread Fanbin Bu
long as the prefix is unique you can safely ignore this warning." I do see from log that my sql operator name is too long and says it's truncated. But i still failed to report to datadog. Thanks Fanbin On Sun, Mar 8, 2020 at 11:36 PM Fanbin Bu wrote: > Hi, > > Has anybody see

datadog metrics

2020-03-09 Thread Fanbin Bu
Hi, Has anybody seen this error before and what is the suggested way to solve it? 2020-03-07 02:54:34,100 WARN org.apache.flink.metrics.datadog.DatadogHttpClient- Failed to send request to Datadog (response was Response{protocol=http/1.1, code=413, message=Request Entity Too Large,

Re: JDBC source running continuously

2020-02-21 Thread Fanbin Bu
fetch rows which are greater than the last max ID or max created > time. > For (3), this is a changelog support, which will be supported natively in > 1.11 in Flink SQL. > > Best, > Jark > > > On Fri, 21 Feb 2020 at 02:35, Fanbin Bu wrote: > >> >> https://st

Re: JDBC source running continuously

2020-02-20 Thread Fanbin Bu
https://stackoverflow.com/questions/48151881/how-to-run-apache-flink-streaming-job-continuously-on-flink-server On Thu, Feb 20, 2020 at 3:14 AM Chesnay Schepler wrote: > Can you show us where you found the suggestion to use iterate()? > > On 20/02/2020 02:08, Fanbin Bu wrote: > >

JDBC source running continuously

2020-02-19 Thread Fanbin Bu
Hi, My app creates the source from JDBC inputformat and running some sql and print out. But the source terminates itself after the query is done. Is there anyway to keep the source running? samle code: val env = StreamExecutionEnvironment.getExecutionEnvironment val settings =

Fwd: StreamTableEnvironment can not run in batch mode

2020-02-19 Thread Fanbin Bu
Hi, Currently, "StreamTableEnvironment can not run in batch mode for now, please use TableEnvironment." Are there any plans on the unification of batch/streaming roadmap that use StreamTableEnvironment for both streamingMode and batchMode? Thanks, Fanbin

StreamTableEnvironment can not run in batch mode

2020-02-19 Thread Fanbin Bu
Hi, Currently, "StreamTableEnvironment can not run in batch mode for now, please use TableEnvironment." Are there any plans on the unification of batch/streaming roadmap that use StreamTableEnvironment for both streamingMode and batchMode? Thanks, Fanbin

1.9 timestamp type default

2020-02-13 Thread Fanbin Bu
Hi, According to https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/types.html#timestamp , the default java bridge time for timestamp is java.time.LocalDateTime. Is there a setting that can change it to use java.sql.Timestamp instead? Thanks, Fanbin

Re: Aggregation for last n seconds for each event

2020-02-11 Thread Fanbin Bu
can u do RANGE BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW? On Tue, Feb 11, 2020 at 12:15 PM oleg wrote: > Hi Community, > > I do streaming in event time and I want to preserve ordering and late > events. I have a use case where I need to fire an aggregation function > for events of

merge implementation in count distinct

2020-02-10 Thread Fanbin Bu
Hi, For the following implementation of merge, https://github.com/apache/flink/blob/0ab1549f52f1f544e8492757c6b0d562bf50a061/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedAggFunctions.java#L224 what if acc has the some keys in mergeAcc? the

Re: batch job OOM

2020-02-05 Thread Fanbin Bu
gt; Best, > > Arvid > > On Tue, Jan 28, 2020 at 1:24 AM Fanbin Bu wrote: > >> I can build flink 1.10 and install it on to EMR >> (flink-dist_2.11-1.10.0.jar). but what about other dependencies in my >> project build.gradle, ie. flink-scala_2.11, flink-json, fli

Re: batch job OOM

2020-01-27 Thread Fanbin Bu
u can describe >> complete SQL and some data informations. >> >> Best, >> Jingsong Lee >> >> On Thu, Jan 23, 2020 at 4:13 PM Fanbin Bu wrote: >> >>> Jingsong, >>> >>> Do you have any suggestions to debug the above mentioned >&g

debug flink in intelliJ on EMR

2020-01-23 Thread Fanbin Bu
Hi, I m following https://cwiki.apache.org/confluence/display/FLINK/Remote+Debugging+of+Flink+Clusters to debug flink program running on EMR. how do I specify the host in the `edit configurations` if the terminal on emr master is hadoop@ip-10-200-46-186 ? Thanks, Fanbin

Re: batch job OOM

2020-01-23 Thread Fanbin Bu
Jingsong, Do you have any suggestions to debug the above mentioned IndexOutOfBoundsException error? Thanks, Fanbin On Wed, Jan 22, 2020 at 11:07 PM Fanbin Bu wrote: > I got the following error when running another job. any suggestions? > > Caused by: java.lang.IndexOutOfBounds

Re: batch job OOM

2020-01-22 Thread Fanbin Bu
) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at java.lang.Thread.run(Thread.java:748) On Wed, Jan 22, 2020 at 8:57 PM Fanbin Bu wrote: > Jingsong, > > I set the config value to be too large. After

Re: batch job OOM

2020-01-22 Thread Fanbin Bu
nfig code? > > Best, > Jingsong Lee > > On Thu, Jan 23, 2020 at 12:41 PM Fanbin Bu wrote: > >> Jingsong, >> >> Great, now i got a different error: >> >> java.lang.NullPointerException: Initial Segment may not be null >> at >> org.apache.flink

Re: batch job OOM

2020-01-22 Thread Fanbin Bu
) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at java.lang.Thread.run(Thread.java:748) is there any other config i should add? thanks, Fanbin On Wed, Jan 22, 2020 at 8:07 PM Fanbin Bu wrote

Re: batch job OOM

2020-01-22 Thread Fanbin Bu
song Lee > > On Thu, Jan 23, 2020 at 11:54 AM Fanbin Bu wrote: > >> Jingsong, >> >> Thank you for the response. >> Since I'm using flink on EMR and the latest version is 1.9 now. the >> second option is ruled out. but will keep that in mind for future upgrade

Re: batch job OOM

2020-01-22 Thread Fanbin Bu
I saw the doc in https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/config.html . Do i have to set that in the code or can i do it through flink-conf.yaml? On Wed, Jan 22, 2020 at 7:54 PM Fanbin Bu wrote: > Jingsong, > > Thank you for the response. > Since I'm using

Re: batch job OOM

2020-01-22 Thread Fanbin Bu
84.n3.nabble.com/ANNOUNCE-Apache-Flink-1-10-0-release-candidate-0-td36770.html > [2] https://issues.apache.org/jira/browse/FLINK-15732 > > Best, > Jingsong Lee > > On Thu, Jan 23, 2020 at 7:40 AM Fanbin Bu wrote: > >> >> tried to increase memory: >> flink r

Re: batch job OOM

2020-01-22 Thread Fanbin Bu
, interval '30' second, interval '1' minute) On Wed, Jan 22, 2020 at 3:19 PM Fanbin Bu wrote: > Hi, > > I have a batch job using blink planner. and got the following error. I was > able to successfully run the same job with flink 1.8 on yarn. > > I set conf as: > taskmana

batch job OOM

2020-01-22 Thread Fanbin Bu
Hi, I have a batch job using blink planner. and got the following error. I was able to successfully run the same job with flink 1.8 on yarn. I set conf as: taskmanager.heap.size: 5m and flink UI gives me Last Heartbeat:20-01-22 14:56:25ID:container_1579720108062_0018_01_20Data

Re: savepoint failed for finished tasks

2020-01-17 Thread Fanbin Bu
> On Fri, 17 Jan 2020 at 16:26, Congxian Qiu wrote: > >> Hi >> >> Currently, Checkpoint/savepoint only works if all operators/tasks are >> still running., there is an issue[1] tracking this >> >> [1] https://issues.apache.org/jira/browse/FLINK-2491 >&

savepoint failed for finished tasks

2020-01-16 Thread Fanbin Bu
Hi, I couldn't make a savepoint for the following graph: [image: image.png] with stacktrace: Caused by: java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: org.apache.flink.runtime.checkpoint.CheckpointException: Failed to trigger savepoint. Failure reason: Not

Re: managedMemoryInMB failure

2020-01-08 Thread Fanbin Bu
anagedMemoryPerSlot (138m in > your case) * numberOfSlots'. > > It's not clear to me why the exactly same code works on emr. Were you > running the same version of flink? > > Thank you~ > > Xintong Song > > > > On Wed, Jan 8, 2020 at 8:18 AM Fanbin Bu wrote: >

managedMemoryInMB failure

2020-01-07 Thread Fanbin Bu
Hi, with Flink 1.9 running in docker mode, I have a batch job and got the following error message. However, it works totally fine with the same code on EMR. I checked the log and here is the only difference: managedMemoryInMB=138 . (the working ones has 0 value) did anybody see this before?

Re: Flink SQL + savepoint

2019-12-30 Thread Fanbin Bu
y discuss the > solutions > in the following versions. > > Best, > Kurt > > On Sat, Nov 2, 2019 at 7:24 AM Fanbin Bu wrote: > >> Kurt, >> >> What do you recommend for Flink SQL to use savepoints? >> >> >> >> On Thu, Oct 31, 2019 at 12:0

flink 1.9 conflict jackson version

2019-12-16 Thread Fanbin Bu
Hi, After I upgrade flink 1.9, I got the following error message on EMR, it works locally on IntelliJ. I'm explicitly declaring the dependency as implementation 'com.fasterxml.jackson.module:jackson-module-scala_2.11:2.10.1' and I have implementation group: 'com.amazonaws', name:

sink type error in scala

2019-12-14 Thread Fanbin Bu
Hi I have my sink defined as: class MyAwesomeSink() extends RichSinkFunction[(Boolean, Row)] { ... } But compile complains when I use it like: val sink = new MyAwesomeSink() tableEnv.toRetractStream(queryResult, classOf[Row]).addSink(sink) found : MyAwesomeSink required:

Re: Flink SQL + savepoint

2019-11-01 Thread Fanbin Bu
> https://github.com/apache/flink/blob/blink/flink-libraries/flink-table/src/main/java/org/apache/flink/table/util/resource/StreamNodeUtil.java#L44 > > > > Best > > Yun Tang > > > > > > *From: *Fanbin Bu > *Date: *Thursday, October 31, 2019 at 1:17 PM > *

Flink SQL + savepoint

2019-10-30 Thread Fanbin Bu
Hi, it is highly recommended that we assign the uid to the operator for the sake of savepoint. How do we do this for Flink SQL? According to https://stackoverflow.com/questions/55464955/how-to-add-uid-to-operator-in-flink-table-api, it is not possible. Does that mean, I can't use savepoint to

Re: JDBCInputFormat does not support json type

2019-10-24 Thread Fanbin Bu
olumnIndex); What would be the best way to handle this on Flink side? On Thu, Oct 24, 2019 at 12:36 AM Fanbin Bu wrote: > Hi there, > > Flink Version: 1.8.1 > JDBC driver: net.snowflake.client.jdbc.SnowflakeDriver > > Here is the code snippet: > > val rowTypeInf

JDBCInputFormat does not support json type

2019-10-24 Thread Fanbin Bu
Hi there, Flink Version: 1.8.1 JDBC driver: net.snowflake.client.jdbc.SnowflakeDriver Here is the code snippet: val rowTypeInfo = new RowTypeInfo( Array[TypeInformation[_]]( new RowTypeInfo( Array[TypeInformation[_]](BasicTypeInfo.STRING_TYPE_INFO,

count distinct not supported in batch?

2019-09-19 Thread Fanbin Bu
Hi, Just found that count distinct is supported in streaming but not in batch (version 1.8), is there any plan to add this to batch? SELECT user_id , hop_end(created_at, interval '30' second, interval '30' second) as bucket_ts , count(distinct name) FROM $table GROUP BY user_id ,

flink sql syntax for accessing object

2019-08-23 Thread Fanbin Bu
Hi, I have a table with schema being a scala case class or a Map. How do I access the field? Tried the following and it doesn't work. case class MyObject(myField: String) case class Event(myObject: MyObject, myMap: Map[String, String]) table = tableEnv.fromDataStream[Event](myStream, 'myObject,

kinesis table connector support

2019-08-23 Thread Fanbin Bu
Hi, Looks like Flink table connectors do not include `kinesis`. (only FileSystem, Kafka, ES) see https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connect.html#table-connectors . I also found some examples for Kafka:

Re: Is it possible to decide the order of where conditions in Flink SQL

2019-07-26 Thread Fanbin Bu
how about move query db filter to the outer select. On Fri, Jul 26, 2019 at 9:31 AM Tony Wei wrote: > Hi, > > If I have multiple where conditions in my SQL, is it possible to specify > its order, so that the query > can be executed more efficiently? > > For example, if I have the following SQL,

Re: GroupBy result delay

2019-07-24 Thread Fanbin Bu
ackoverflow, see more details > here[1]. > > Best, Hequn > > [1] > https://stackoverflow.com/questions/51691269/event-time-window-in-flink-does-not-trigger > > On Wed, Jul 24, 2019 at 4:38 AM Fanbin Bu wrote: > >> If I use proctime, the groupBy happens without any d

Re: GroupBy result delay

2019-07-23 Thread Fanbin Bu
If I use proctime, the groupBy happens without any delay. On Tue, Jul 23, 2019 at 10:16 AM Fanbin Bu wrote: > not sure whether this is related: > > public SingleOutputStreamOperator assignTimestampsAndWatermarks( > AssignerWithPeriodicWatermarks timestampAndWater

Re: GroupBy result delay

2019-07-23 Thread Fanbin Bu
t to 32 val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(32) and the command for launching the job is flink run -m yarn-cluster -ys 8 -yn 4 -ytm 4096 -yjm 4096 $JAR $ARGS On Tue, Jul 23, 2019 at 9:59 AM Fanbin Bu wrote: > Thanks Fabian for the prompt reply. I just

Re: GroupBy result delay

2019-07-23 Thread Fanbin Bu
will > be computed with approx. 10 minute delay. > > Best, Fabian > > Am Di., 23. Juli 2019 um 02:00 Uhr schrieb Fanbin Bu < > fanbin...@coinbase.com>: > >> Hi, >> I have a Flink sql streaming job defined by: >> >> SELECT >> user_id >>

GroupBy result delay

2019-07-22 Thread Fanbin Bu
Hi, I have a Flink sql streaming job defined by: SELECT user_id , hop_end(created_at, interval '30' second, interval '1' minute) as bucket_ts , count(name) as count FROM event WHERE name = 'signin' GROUP BY user_id , hop(created_at, interval '30' second, interval '1' minute) there is