Re: [Flink K8s operator] HA metadata not available to restore from last state

2022-11-22 Thread Dongwon Kim
liminate most of these cases. > > Cheers, > Gyula > > On Tue, Nov 22, 2022 at 9:43 AM Dongwon Kim wrote: > >> Hi, >> >> While using a last-state upgrade mode on flink-k8s-operator-1.2.0 and >> flink-1.14.3, we're occasionally facing the following error: &g

[Flink K8s operator] HA metadata not available to restore from last state

2022-11-22 Thread Dongwon Kim
Hi, While using a last-state upgrade mode on flink-k8s-operator-1.2.0 and flink-1.14.3, we're occasionally facing the following error: Status: > Cluster Info: > Flink - Revision: 98997ea @ 2022-01-08T23:23:54+01:00 > Flink - Version: 1.14.3 > Error:

Re: [State Processor API] unable to load the state of a trigger attached to a session window

2022-05-12 Thread Dongwon Kim
your case, and trigger >> count (trigger state) is stored in namespace TimeWindow{start=1,end=15}, >> but WindowReaderOperator only tries to find keys and namespaces related to >> window state. >> >> Dongwon Kim 于2022年4月19日周二 15:29写道: >> >>> Hi, >

Re: [State Processor API] unable to load the state of a trigger attached to a session window

2022-05-10 Thread Dongwon Kim
Is the library, State Processor API, not widely used and no longer maintained? On Mon, Apr 25, 2022 at 3:16 PM Dongwon Kim wrote: > Can anyone help me with this? > > Thanks in advance, > > On Tue, Apr 19, 2022 at 4:28 PM Dongwon Kim wrote: > >> Hi, >> >&g

Re: [State Processor API] unable to load the state of a trigger attached to a session window

2022-04-25 Thread Dongwon Kim
Can anyone help me with this? Thanks in advance, On Tue, Apr 19, 2022 at 4:28 PM Dongwon Kim wrote: > Hi, > > I'm using Flink-1.14.4 and failed to load in WindowReaderFunction the > state of a stateful trigger attached to a session window. > I found that the following data b

[State Processor API] unable to load the state of a trigger attached to a session window

2022-04-19 Thread Dongwon Kim
Hi, I'm using Flink-1.14.4 and failed to load in WindowReaderFunction the state of a stateful trigger attached to a session window. I found that the following data become available in WindowReaderFunction: - the state defined in the ProcessWindowFunction - the registered timers of the stateful

Re: Weird Flink Kafka source watermark behavior

2022-03-18 Thread Dongwon Kim
ntary restarts of the job can cause havoc as this resets >watermarking > > > > I’ll be off next week, unable to take part in the active discussion … > > > > Sincere greetings > > > > Thias > > > > > > > > > > *From:* Dan Hill > *

Re: Weird Flink Kafka source watermark behavior

2022-03-18 Thread Dongwon Kim
astream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector> > . > > > On Fri, Mar 18, 2022 at 12:07 AM Dongwon Kim > wrote: > >> Hi Dan, >> >> I'm quite confused as you already use per-partition watermarking. >> >> What I m

Re: Weird Flink Kafka source watermark behavior

2022-03-18 Thread Dongwon Kim
t; Thanks Dongwon! > > Wow. Yes, I'm using per-partition watermarking [1]. Yes, my # source > tasks < # kafka partitions. This should be called out in the docs or the > bug should be fixed. > > On Thu, Mar 17, 2022 at 10:54 PM Dongwon Kim > wrote: > >> Hi Dan, >&

Re: Weird Flink Kafka source watermark behavior

2022-03-17 Thread Dongwon Kim
Hi Dan, Do you use the per-partition watermarking explained in [1]? I've also experienced a similar problem when running backfill jobs specifically when # source tasks < # kafka partitions. - When # source tasks = # kafka partitions, the backfill job works as expected. - When # source tasks < #

Re: Anyone trying to adopt Scotty on the recent Flink versions?

2021-12-29 Thread Dongwon Kim
1.8 so I wouldn't really expect it to be > compatible with 1.14.x :( > > [1] https://github.com/TU-Berlin-DIMA/scotty-window-processor > > Best, > D. > > On Tue, Dec 28, 2021 at 4:15 AM Dongwon Kim wrote: > >> Hi community, >> >> We're recently trying t

Anyone trying to adopt Scotty on the recent Flink versions?

2021-12-27 Thread Dongwon Kim
Hi community, We're recently trying to adopt Scotty to overcome the poor performance caused by too many sliding windows. We're facing the following exception on the latest Flink-1.14.2: switched from RUNNING to FAILED with failure cause: java.lang.ArrayIndexOutOfBoundsException: -1 at

Re: [DISCUSS] Drop Zookeeper 3.4

2021-12-06 Thread Dongwon Kim
When should I prepare for upgrading ZK to 3.5 or newer? We're operating a Hadoop cluster w/ ZK 3.4.6 for running only Flink jobs. Just hope that the rolling update is not that painful - any advice on this? Best, Dongwon On Tue, Dec 7, 2021 at 3:22 AM Chesnay Schepler wrote: > Current users of

Converting DataStream of Avro SpecificRecord to Table

2021-12-06 Thread Dongwon Kim
Hi community, I'm currently converting a DataStream of Avro SpecificRecord type into Table using the following method: public static Table toTable(StreamTableEnvironment tEnv, DataStream dataStream,

Re: how to expose the current in-flight async i/o requests as metrics?

2021-11-09 Thread Dongwon Kim
Hi Fabian, Can you maybe share more about the setup and how you use the AsyncFunction > with > the Kafka client? Oops, I didn't mention in the reply to David that the kafka producer has nothing to do with the AsyncFunction! I interact with Redis and a Spring boot app. in the AsyncFunction, not

Re: how to expose the current in-flight async i/o requests as metrics?

2021-11-09 Thread Dongwon Kim
:53 PM Jun Qin wrote: > Hi Dongwon > > Did you override AsyncFunction#timeout()? If so, did you call > resultFuture.complete()/completeExceptionally() in your override? Not > calling them can result in checkpoint timeout. > > Thanks > Jun > > > On Nov 9, 2021,

Re: how to expose the current in-flight async i/o requests as metrics?

2021-11-08 Thread Dongwon Kim
gt; Arvid can you please validate the above? (the checkpoints not being > blocked by the work queue part) > > [1] > https://github.com/apache/flink/blob/release-1.14.0/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueue.java#L109

how to expose the current in-flight async i/o requests as metrics?

2021-11-07 Thread Dongwon Kim
Hi community, While using Flink's async i/o for interacting with an external system, I got the following exception: 2021-11-06 10:38:35,270 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Triggering checkpoint 54 (type=CHECKPOINT) @ 1636162715262 for job

Re: How to deserialize Avro enum type in Flink SQL?

2021-10-13 Thread Dongwon Kim
ata is invalid. > > Looking at the code of AvroToRowDataConverters, It sounds like STRING > should work with avro enums. Can you provide a minimal reproducer (without > confluent schema registry) with a valid input? > > On Tue, Oct 12, 2021 at 6:19 PM Dongwon Kim wrote: > >

Fwd: How to deserialize Avro enum type in Flink SQL?

2021-10-12 Thread Dongwon Kim
Hi community, Can I get advice on this question? Another user just sent me an email asking whether I found a solution or a workaround for this question, but I'm still stuck there. Any suggestions? Thanks in advance, Dongwon -- Forwarded message - From: Dongwon Kim Date: Mon

How to deserialize Avro enum type in Flink SQL?

2021-08-09 Thread Dongwon Kim
Hi community, I have a Kafka topic where the schema of its values is defined by the "MyRecord" record in the following Avro IDL and registered to the Confluent Schema Registry. > @namespace("my.type.avro") > protocol MyProtocol { > enum MyEnumType { > TypeVal1, TypeVal2 > } > record

Re: Enabling allowNonRestoredState when resuming from a savepoint causes ClassNotFoundException

2021-02-08 Thread Dongwon Kim
1] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html > > Regards, > Roman > > > On Mon, Feb 8, 2021 at 4:31 PM Dongwon Kim wrote: > >> Hi 张静, >> >> Q1: By default, a savepoint restore will try to match all state

Re: Enabling allowNonRestoredState when resuming from a savepoint causes ClassNotFoundException

2021-02-08 Thread Dongwon Kim
), > then remove the definition of the POJO type. then you can restore from > savepoint2. > Correct me please if I'm wrong. Thanks. > > Best, > Beyond1920 > > Dongwon Kim 于2021年2月8日周一 下午9:43写道: > > > > Hi, > > > > I have an original job (say v1) and I want

Enabling allowNonRestoredState when resuming from a savepoint causes ClassNotFoundException

2021-02-08 Thread Dongwon Kim
Hi, I have an original job (say v1) and I want to start a new job (say v2) from a savepoint of v1. An operator of v1 used to have per-key states of a POJO type, but I want to remove the states together with the definition of the POJO type. When I start v2 from a savepoint of v1, I specified

Re: How to gracefully avoid "Generic types have been disabled in the ExecutionConfig and type java.util.List is treated as a generic type"?

2021-01-12 Thread Dongwon Kim
ain/velocity/org/apache/avro/compiler/specific/templates/java/classic/record.vm > > On Tue, Jan 12, 2021 at 11:16 AM Dongwon Kim > wrote: > >> Hi Arvid, >> >> Thanks for the very detailed explanation and tips! >> >> inferring the type information of Jav

Re: How to gracefully avoid "Generic types have been disabled in the ExecutionConfig and type java.util.List is treated as a generic type"?

2021-01-12 Thread Dongwon Kim
che.org/projects/flink/flink-docs-stable/dev/execution_configuration.html > [2] > https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html > [3] https://avro.apache.org/docs/current/gettingstartedjava.html > > > On Mon, Jan 4, 2021 at 6:49 PM Dongwon Kim

Re: "flink list" fails when zk-based HA is enabled in flink-conf.yaml

2021-01-06 Thread Dongwon Kim
FlinkYarnSessionCli. Since it is > widely used, it could not be deprecated or removed very soon. "--executor" > has the exactly same effect with "--target". The only different is the > naming. > > [1]. https://issues.apache.org/jira/browse/FLINK-20866 > > Best

Re: "flink list" fails when zk-based HA is enabled in flink-conf.yaml

2021-01-06 Thread Dongwon Kim
pecify the "high-availability.cluster-id" so that leader > retrieval could get the correct JobManager address. > > > *flink list --target yarn-per-job -Dyarn.application.id > <http://Dyarn.application.id>=$application_id > -Dhigh-availability.cluster-id=$application_

"flink list" fails when zk-based HA is enabled in flink-conf.yaml

2021-01-05 Thread Dongwon Kim
Hi, I'm using Flink-1.12.0 and running on Hadoop YARN. After setting HA-related properties in flink-conf.yaml, high-availability: zookeeper high-availability.zookeeper.path.root: /recovery high-availability.zookeeper.quorum: nm01:2181,nm02:2181,nm03:2181 high-availability.storageDir:

Fwd: How to gracefully avoid "Generic types have been disabled in the ExecutionConfig and type java.util.List is treated as a generic type"?

2021-01-04 Thread Dongwon Kim
Any advice would be appreciated :) Thanks, Dongwon -- Forwarded message - From: Dongwon Kim Date: Mon, Dec 14, 2020 at 11:27 PM Subject: How to gracefully avoid "Generic types have been disabled in the ExecutionConfig and type java.util.List is treated as a generic

Re: failed w/ Application Mode but succeeded w/ Per-Job Cluster Mode

2020-12-16 Thread Dongwon Kim
mbedded client > to submit the job. So the user jar will be added to distributed cache. > When deploying a task to TaskManager, > it will be downloaded again and run in user classloader even though we > already have it in the system classpath. > > I think it might be the reason why t

Re: failed w/ Application Mode but succeeded w/ Per-Job Cluster Mode

2020-12-16 Thread Dongwon Kim
as well. In your case it seems that the classes are loaded from > different classloaders. > But if Kafka is really only available in the user jar, then this error > still should not occur. > > > On Wed, Dec 16, 2020 at 8:22 AM Dongwon Kim wrote: > >>

Re: failed w/ Application Mode but succeeded w/ Per-Job Cluster Mode

2020-12-15 Thread Dongwon Kim
? Best, Dongwon On Wed, Dec 16, 2020 at 3:59 PM Dongwon Kim wrote: > Hi, > > I have an artifact which works perfectly fine with Per-Job Cluster Mode > with the following bash script: > > #!/bin/env bash > > export FLINK_CONF_DIR=./conf > > export HADO

failed w/ Application Mode but succeeded w/ Per-Job Cluster Mode

2020-12-15 Thread Dongwon Kim
Hi, I have an artifact which works perfectly fine with Per-Job Cluster Mode with the following bash script: #!/bin/env bash export FLINK_CONF_DIR=./conf export HADOOP_CLASSPATH=`hadoop classpath` $FLINK_HOME/bin/flink run -t yarn-per-job myjar.jar myconf.conf I tried Application Mode [1]

How to gracefully avoid "Generic types have been disabled in the ExecutionConfig and type java.util.List is treated as a generic type"?

2020-12-14 Thread Dongwon Kim
Hi, The following program compiles and runs w/o exceptions: > public class Test { > > public static class A { > private int n; > > public A() { } > public int getN() { return n; } > public void setN(int n) { this.n = n; } > } > > public static class B { > private

Re: What happens when all input partitions become idle

2020-12-09 Thread Dongwon Kim
aster/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValve.java#L108 > > Dongwon Kim 于2020年12月10日周四 上午12:21写道: > >> Hi, >> >> Let's consider two operators: A (parallelism=2) and B (parallelism=1). >> B has two inpu

What happens when all input partitions become idle

2020-12-09 Thread Dongwon Kim
Hi, Let's consider two operators: A (parallelism=2) and B (parallelism=1). B has two input partitions, B_A1 and B_A2, which are connected to A1 and A2 respectively. At some point, - B_A1's watermark : 12 - B_A2's watermark : 10 - B's event-time clock : 10 = min(12, 10) - B has registered a timer

Re: queryLogicalType != sinkLogicalType when UDAF returns List

2020-11-27 Thread Dongwon Kim
n is a > bug that should be fixed in future version. But the mismatch is valid: > > ARRAY is not a list type info but `Types.OBJECT_ARRAY(Types.INT)`. > Can you try this as the result type of your aggregate function. > > Reagrds, > Timo > > > On 26.11.20 18:13, Dongwo

Re: Caching

2020-11-27 Thread Dongwon Kim
ongwon. It was extremely helpful. I didn't quite understand how > async io can be used here. It would be great if you can share some info on > it. > > Also how are you propagating any changes to values? > > Regards, > Navneeth > > On Thu, Nov 26, 2020 at 6:26 AM Dongwo

queryLogicalType != sinkLogicalType when UDAF returns List

2020-11-26 Thread Dongwon Kim
Hello, I'm using Flink-1.11.2. Let's assume that I want to store on a table the result of the following UDAF: > public class Agg extends AggregateFunction, List> > { > @Override > public List createAccumulator() { > return new LinkedList<>(); > } > @Override > public

Re: Caching

2020-11-26 Thread Dongwon Kim
Oops, I forgot to mention that when doing bulk insert into Redis, you'd better open a pipeline with a 'transaction' property set to False [1]. Otherwise, API calls from your Flink job will be timeout. [1] https://github.com/andymccurdy/redis-py#pipelines On Thu, Nov 26, 2020 at 11:09 PM Dongwon

Re: Caching

2020-11-26 Thread Dongwon Kim
Hi Navneeth, I reported a similar issue to yours before [1] but I took the broadcasting approach at first. As you already anticipated, broadcasting is going to use more memory than your current approach based on a static object on each TM . And the broadcasted data will be treated as operator

Re: execution.runtime-mode=BATCH when reading from Hive

2020-11-18 Thread Dongwon Kim
or but you might have some success with > that one. > > Best, > Aljoscha > > On 18.11.20 07:03, Dongwon Kim wrote: > > Hi, > > > > Recently I've been working on a real-time data stream processing pipeline > > with DataStream API while preparing for a new service

execution.runtime-mode=BATCH when reading from Hive

2020-11-17 Thread Dongwon Kim
Hi, Recently I've been working on a real-time data stream processing pipeline with DataStream API while preparing for a new service to launch. Now it's time to develop a back-fill job to produce the same result by reading data stored on Hive which we use for long-term storage. Meanwhile, I

Re: How to run a per-job cluster for a Beam pipeline w/ FlinkRunner on YARN

2020-11-17 Thread Dongwon Kim
wrote: > > Hi, > > > > Not sure if this question would be more suitable for the Apache Beam > > mailing lists, but I'm pulling in Aljoscha (CC'ed) who would know more > > about Beam and whether or not this is an expected behaviour. > > > > Cheers, > > G

Re: CREATE TABLE LIKE clause from different catalog or database

2020-11-16 Thread Dongwon Kim
ve.navi.gps is recognized as a table name as a whole. I currently declare such a table by specifying all fields without the LIKE clause. Do I miss something? FYI, I'm working with Flink-1.11.2. Thank you~ Best, Dongwon On Fri, Nov 13, 2020 at 5:19 PM Danny Chan wrote: > Hi Dongwon ~ > > T

CREATE TABLE LIKE clause from different catalog or database

2020-11-10 Thread Dongwon Kim
Hi, Is it disallowed to refer to a table from different databases or catalogs when someone creates a table? According to [1], there's no way to refer to tables belonging to different databases or catalogs. [1]

Re: Better way to share large data across task managers

2020-09-23 Thread Dongwon Kim
api/org/apache/hadoop/yarn/api/records/LocalResourceVisibility.html > >> On Sun, Sep 20, 2020 at 6:05 PM Dongwon Kim wrote: >> Hi, >> I'm using Flink broadcast state similar to what Fabian explained in [1]. One >> difference might be the size of the broadcasted dat

Better way to share large data across task managers

2020-09-20 Thread Dongwon Kim
Hi, I'm using Flink broadcast state similar to what Fabian explained in [1]. One difference might be the size of the broadcasted data; the size is around 150MB. I've launched 32 TMs by setting - taskmanager.numberOfTaskSlots : 6 - parallelism of the non-broadcast side : 192 Here's some

Re: [SQL DDL] How to extract timestamps from Kafka message's metadata

2020-08-11 Thread Dongwon Kim
Walther wrote: > Hi Dongwon, > > another possibility is to use DataStream API before. There you can > extract the metadata and use DataStream.assignTimestampsAndWatermarks > before converting the stream to a table. > > Regards, > Timo > > > On 11.08.20 09:41

Re: [SQL DDL] How to extract timestamps from Kafka message's metadata

2020-08-11 Thread Dongwon Kim
release-1.11/dev/table/sourceSinks.html#encoding--decoding-formats > On 11/08/2020 09:20, Dongwon Kim wrote: > > Hi, > > I'm working on a Kafka topic where timestamps are not shown in the message > body, instead in the message's metadata. > > I want to declare a table from the

[SQL DDL] How to extract timestamps from Kafka message's metadata

2020-08-11 Thread Dongwon Kim
Hi, I'm working on a Kafka topic where timestamps are not shown in the message body, instead in the message's metadata. I want to declare a table from the topic with DDL but "rowtime_column_name" in the below definition seems to accept only existing columns. > WATERMARK FOR rowtime_column_name

getting in an infinite loop while creating the dependency-reduced pom

2020-08-03 Thread Dongwon Kim
Hi, I create a new maven project (I'm using Maven 3.6.3) w/ the below command > curl https://flink.apache.org/q/quickstart-SNAPSHOT.sh | bash -s 1.11.1 > > and add the following dependencies to dependencies > > org.apache.flink > flink-connector-kafka_${scala.binary.version} >

Re: How to use a nested column for CREATE TABLE PARTITIONED BY

2020-07-21 Thread Dongwon Kim
.flink.sql.parser.impl.FlinkSqlParserImpl.parseSqlStmtEof(FlinkSqlParserImpl.java:248) > at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:161) > ... 5 more Best, Dongwon On Wed, Jul 22, 2020 at 12:09 AM Jark Wu wrote: > Hi Dongwon, > > I think this is a bug in the File

Re: How to use a nested column for CREATE TABLE PARTITIONED BY

2020-07-21 Thread Dongwon Kim
, both are correct, can you share you stack trace detail ? > Which version did you use and what SQL context throws the error ? > > Best, > Danny Chan > 在 2020年7月21日 +0800 PM4:55,Dongwon Kim ,写道: > > Hi, > > I want to create subdirectories named after values of a nested column

How to use a nested column for CREATE TABLE PARTITIONED BY

2020-07-21 Thread Dongwon Kim
Hi, I want to create subdirectories named after values of a nested column, location.transId. This is my first attempt: > CREATE TABLE output > PARTITIONED BY (`location.transId`) > WITH ( > 'connector' = 'filesystem', > 'path' = 'east-out', > 'format' = 'json' > ) LIKE navi (EXCLUDING

Re: Encoding problem in WHERE LIKE

2020-07-20 Thread Dongwon Kim
you post the the > entire query(or select clause) ? > > Best > Leonard Xu > > 在 2020年7月20日,21:49,Dongwon Kim 写道: > > When I execute the following query in .sqlQuery(), > >> SELECT ... >> FROM ... >> WHERE location.goalName LIKE '%양현마을%' > > >

Encoding problem in WHERE LIKE

2020-07-20 Thread Dongwon Kim
Hi, When I execute the following query in .sqlQuery(), > SELECT ... > FROM ... > WHERE location.goalName LIKE '%양현마을%' > I got the following error message > Caused by: org.apache.calcite.sql.parser.SqlParseException: Lexical error > at line 1, column 96. Encountered: "\uc591" (50577), after :

Re: [Table API] how to configure a nested timestamp field

2020-07-20 Thread Dongwon Kim
y/FLINK/FLIP-129%3A+Refactor+Descriptor+API+to+register+connectors+in+Table+API > <https://cwiki.apache.org/confluence/display/FLINK/FLIP-129:+Refactor+Descriptor+API+to+register+connectors+in+Table+API> > > 在 2020年7月20日,14:30,Dongwon Kim 写道: > > > I tried below (

Re: [Table API] how to configure a nested timestamp field

2020-07-20 Thread Dongwon Kim
her.java:185) > at > org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) >

Re: [Table API] how to configure a nested timestamp field

2020-07-06 Thread Dongwon Kim
.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html > > > 在 2020年7月4日,21:21,Dongwon Kim 写道: > > Hi, > I use Flink 1.10.1 and I want to use Table API to read JSON messages. The > message looks like below. > >> { >>"type&q

[Table API] how to configure a nested timestamp field

2020-07-04 Thread Dongwon Kim
Hi, I use Flink 1.10.1 and I want to use Table API to read JSON messages. The message looks like below. > { >"type":"Update", >"location":{ > "id":"123e4567-e89b-12d3-a456-42665234", > "lastUpdateTime":1593866161436 >} > } I wrote the

Re: Cannot start native K8s

2020-05-08 Thread Dongwon Kim
9, 2020 at 12:29 PM Yang Wang wrote: > Hi Dongwon Kim, > > Thanks a lot for your information. I will dig into this issue. > > I think the "UnknownHostException" is caused by incorrectly setting the > Kubernetes > ApiServer address. Maybe you are using &q

Re: Cannot start native K8s

2020-05-08 Thread Dongwon Kim
or.java:84) at org.apache.flink.kubernetes.shadded.okhttp3.internal.connection.StreamAllocation.findConnection(StreamAllocation.java:214) Thanks a lot for your interest :-) Best, Dongwon On Sat, May 9, 2020 at 11:15 AM Yang Wang wrote: > Hi Dongwon Kim, > > Are you running Flink on a

Cannot start native K8s

2020-05-05 Thread Dongwon Kim
Hi, I'm using Flink-1.10 and tested everything [1] successfully. While trying [2], I got the following message. Can anyone help please? [root@DAC-E04-W06 bin]# ./kubernetes-session.sh > 2020-05-06 08:10:49,411 INFO > org.apache.flink.configuration.GlobalConfiguration- Loading >

Re: Support of custom JDBC dialects in flink-jdbc

2020-03-27 Thread Dongwon Kim
833 to track this > effort. Hope this can be done before 1.11 release. > > Best, > Jark > > On Fri, 27 Mar 2020 at 22:22, Dongwon Kim wrote: > >> Hi, I tried flink-jdbc [1] to read data from Druid because Druid >> implements Calcite Avatica [2], but the connection string,

Support of custom JDBC dialects in flink-jdbc

2020-03-27 Thread Dongwon Kim
Hi, I tried flink-jdbc [1] to read data from Druid because Druid implements Calcite Avatica [2], but the connection string, jdbc:avatica:remote:url= http://BROKER:8082/druid/v2/sql/avatica/, is not supported by any of JDBCDialects [3]. I implement custom JDBCDialect [4], custom

Re: User program failures cause JobManager to be shutdown

2019-12-08 Thread Dongwon Kim
n Fri, Dec 6, 2019 at 10:31 AM Khachatryan Roman < > khachatryan.ro...@gmail.com> wrote: > >> Hi Dongwon, >> >> This should work but it could also interfere with Flink itself exiting in >> case of a fatal error. >> >> Regards, >> Roman &g

Re: User program failures cause JobManager to be shutdown

2019-12-05 Thread Dongwon Kim
() calls from terminating JVMs and found [1]. Can it be a possible solution to the problem? [1] https://stackoverflow.com/questions/5549720/how-to-prevent-calls-to-system-exit-from-terminating-the-jvm Best, - Dongwon On Fri, Dec 6, 2019 at 10:39 AM Dongwon Kim wrote: > Hi Robert and Ro

Re: User program failures cause JobManager to be shutdown

2019-12-05 Thread Dongwon Kim
ithout any other jobs >> submitted. >> >> Regards, >> Roman >> >> >> On Thu, Dec 5, 2019 at 3:48 PM Dongwon Kim wrote: >> >>> Hi Roman, >>> >>> We're using the latest version 1.9.1 and those two lines are all I've &g

Re: User program failures cause JobManager to be shutdown

2019-12-05 Thread Dongwon Kim
Hi Roman, We're using the latest version 1.9.1 and those two lines are all I've seen after executing the job on the web ui. Best, Dongwon On Thu, Dec 5, 2019 at 11:36 PM r_khachatryan wrote: > Hi Dongwon, > > Could you please provide Flink version you are running and the job manager > logs?

User program failures cause JobManager to be shutdown

2019-12-04 Thread Dongwon Kim
Hi, I tried to run a program by uploading a jar on Flink UI. When I intentionally enter a wrong parameter to my program, JobManager dies. Below is all log messages I can get from JobManager; JobManager dies as soon as spitting the second line: 2019-12-05 04:47:58,623 WARN >

Re: [ANNOUNCE] Weekly Community Update 2019/48

2019-12-03 Thread Dongwon Kim
covers T map team's journey adopting and operating a real-time driving score service [2] which is a mission-critical application written in Flink DataStream API. 3. "Do Flink On Web with FLOW" by Dongwon Kim (Data Labs, SK telecom) : Dongwon introduces a web service to build data stream pro

Re: Wrong result of MATCH_RECOGNIZE clause

2019-09-05 Thread Dongwon Kim
, 2019 at 9:15 PM Dongwon Kim wrote: > Hi, > I'm using Flink 1.9 and testing MATCH_RECOGNIZE by following [1]. > While testing the query in [2] on myself, I've got the different result > from [2] > The query result from [2] is as follows: > > symbol start_ts

Wrong result of MATCH_RECOGNIZE clause

2019-09-05 Thread Dongwon Kim
Hi, I'm using Flink 1.9 and testing MATCH_RECOGNIZE by following [1]. While testing the query in [2] on myself, I've got the different result from [2] The query result from [2] is as follows: symbol start_tstamp end_tstamp avgPrice = ==

How to shorten MATCH_RECOGNIZE's DEFINE clause

2019-08-20 Thread Dongwon Kim
Hi, Flink relational apis with MATCH_RECOGNITION looks very attractive and promising but it's currently not easy to use at all. While looking into [1], especially the following DEFINE clause, > DEFINE > PRICE_DOWN AS > (LAST(PRICE_DOWN.price, 1) IS NULL AND PRICE_DOWN.price < START_ROW.price)

Re: [DataStream API] Best practice to handle custom session window - time gap based but handles event for ending session

2019-08-04 Thread Dongwon Kim
Hi Jungtaek, I've faced a similar problem in the past; we need to calculate an aggregate upon receiving an end message from each user. While you're trying to solve problem by defining a custom window assigner, I took a different approach to the problem by implementing a custom trigger. You can

Re: [Table API] ClassCastException when converting a table to DataStream

2019-07-23 Thread Dongwon Kim
to a > function call. > > Best, Fabian > > Am Sa., 20. Juli 2019 um 03:58 Uhr schrieb Dongwon Kim < > eastcirc...@gmail.com>: > >> Hi Rong, >> >> I have to dig deeper into the code to reproduce this error. This seems to >>> be a bug to me and

Re: [Table API] ClassCastException when converting a table to DataStream

2019-07-19 Thread Dongwon Kim
com/DISCUSS-FLIP-36-Support-Interactive-Programming-in-Flink-Table-API-td27658.html > [4] > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Embracing-Table-API-in-Flink-ML-td25368.html > > > On Thu, Jul 18, 2019 at 8:42 PM Dongwon Kim wrote: > >> Hi Ron

Re: [Table API] ClassCastException when converting a table to DataStream

2019-07-18 Thread Dongwon Kim
e return type of this operator. It is > used in cases where Flink cannot determine automatically[1]. > > Thanks, > Rong > > -- > [1] > https://github.com/apache/flink/blob/release-1.8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/S

[Table API] ClassCastException when converting a table to DataStream

2019-07-17 Thread Dongwon Kim
Hello, Consider the following snippet: > Table sourceTable = getKafkaSource0(tEnv); > DataStream stream = tEnv.toAppendStream(sourceTable, Row.class) > > * .map(a -> a) .returns(sourceTable.getSchema().toRowType());* > stream.print(); > where sourceTable.printSchema()

Re: How to split tuple2 returned by UDAF into two columns in a result table

2019-03-20 Thread Dongwon Kim
Another, yet related question: Is there something like aggregate table function? In the above scenario, I have to apply an aggregate function and then apply a table function solely to flatten tuples, which seems quite inefficient. On Wed, Mar 20, 2019 at 1:09 PM Dongwon Kim wrote: > Hi K

Re: How to split tuple2 returned by UDAF into two columns in a result table

2019-03-19 Thread Dongwon Kim
like "myscalar(col1, col2, col3) as > (col4, col5)". Am I missing something? > > If you want to split Tuple2 into two different columns, you can use UDTF. > > Best, > Kurt > > > On Wed, Mar 20, 2019 at 9:59 AM Dongwon Kim wrote: > >> Hi, >> &g

How to split tuple2 returned by UDAF into two columns in a result table

2019-03-19 Thread Dongwon Kim
Hi, I want to split Tuple2 returned by AggregateFunction.getValue into two different columns in a resultant table. Let's consider the following example where myudaf returns Tuple2: Table table2 = table1 .window(Slide.over("3.rows").every("1.rows").on("time").as("w")) .groupBy("w,

RocksDB "shared" directory on HDFS getting bigger

2018-12-12 Thread Dongwon Kim
Hi, I'm using RocksDB state backend w/ incremental checkpoint. hdfs:shared directory is getting bigger as time goes on. Does it mean that I forget to clean up state somewhere? - Dongwon

Re: Service discovery for flink-metrics-prometheus

2018-08-01 Thread Dongwon Kim
Hi all, I also suffer from the lack of service discovery for flink-metrics-prometheus while using YARN for deployment, Prometheus for instrumentation, and Flink for stream processing. I just upload a Python script for the purpose here: https://github.com/eastcirclek/flink-service-discovery

Service discovery for Prometheus on YARN

2018-08-01 Thread Dongwon Kim
Hi community, Prior to my presentation [1], I'd like to share a Python script [2] to discover Flink clusters on YARN and let Prometheus know via its file-based service discovery mechanism [3]. Prometheus needs to pull metrics from Prometheus exporters running inside TaskManagers. The problem

Modify EventTimeTrigger only for event-time session windows

2018-07-23 Thread Dongwon Kim
Hi all, I want to be sure about when EventTimeTrigger.onEventTime() method is called with event-time session windows. It returns TriggerResult.FIRE only when the timestamp of the registered timer equals to the max timestamp of the current window: @Override public TriggerResult onEventTime(long

Re: # of active session windows of a streaming job

2018-06-21 Thread Dongwon Kim
d, > write) the counter of its associated Trigger to checkpoint and restore it? > > Best, Fabian > > 2018-06-20 16:59 GMT+02:00 Dongwon Kim : > >> Hi Fabian and Chesnay, >> >> As Chesnay pointed out, it seems that I need to write the current counter >> (which i

Re: # of active session windows of a streaming job

2018-06-20 Thread Dongwon Kim
to increment the counter in > Trigger.onElement() and decrement in Trigger.clear(). > > I'm not 100% sure, but I doubt that metrics can checkpointed. Chesnay (in > CC) would know that. > Not sure what would be the best approach if you need a fault tolerant > solution. > > Best,

Re: # of active session windows of a streaming job

2018-06-19 Thread Dongwon Kim
hough the > Trigger.onMerge() method is called, it does not know how many windows were > merged (which is done by the WindowAssigner) and only sees the merged > window. There might be a way to maintain state in a Trigger that allows to > infer how many windows were merged. > > Best,

Re: # of active session windows of a streaming job

2018-06-16 Thread Dongwon Kim
the current trigger invocation (as shown in Trigger#TriggerContext) Now I've come to a conclusion that it might not be possible using DataStream API. Otherwise, do I need to think in a totally different way to achieve the goal? Best, - Dongwon > 2018. 2. 20. 오후 6:53, Fabian Hueske 작성:

Wrong endpoints to cancel a job

2018-04-25 Thread Dongwon Kim
Hi, 1.5.0 needs to update its web page for rest APIs. I'm testing YARN dispatcher and had difficulty canceling jobs today. I've been sending DELETE requests to dispatcher according to https://ci.apache.org/projects/flink/flink-docs-master/monitoring/rest_api.html#cancel-job

Re: Run programs w/ params including comma via REST api

2018-04-25 Thread Dongwon Kim
nd > this is a rabbit hole I'd rather avoid) > > Ultimately I would love to change this call to send the parameters as JSON > instead (then you wouldn't have to deal with escaping characters...), but we > can't do that until the API versioning is in place (no ETA). > &g

Run programs w/ params including comma via REST api

2018-04-20 Thread Dongwon Kim
Hi, I'm trying to run a program by sending POST requests. I've already spawned dispatcher in YARN and uploaded a jar file to the dispatcher. I want to execute my application using the following arguments (--topic and --kafkaParams): --topic gps-topic --kafkaParams

Re: rest.port is reset to 0 by YarnEntrypointUtils

2018-04-18 Thread Dongwon Kim
pache.org/jira/browse/FLINK-5758 > <https://issues.apache.org/jira/browse/FLINK-5758> > > On Tue, Apr 17, 2018 at 9:56 AM, Dongwon Kim <eastcirc...@gmail.com > <mailto:eastcirc...@gmail.com>> wrote: > Hi, > > I'm trying to launch a dispatcher on top of YARN by executing

rest.port is reset to 0 by YarnEntrypointUtils

2018-04-17 Thread Dongwon Kim
Hi, I'm trying to launch a dispatcher on top of YARN by executing "yarn-session.sh" on the command line. To access the rest endpoint outside the cluster, I need to assign a port from an allowed range. YarnEntrypointUtils, however, sets rest.port to 0 for random binding. Is there any reason

Re: Restore from a savepoint is very slow

2018-04-04 Thread Dongwon Kim
nothing to do with GPUs. Best, - Dongwon > 2018. 4. 2. 오후 3:33, Dongwon Kim <eastcirc...@gmail.com> 작성: > > Attached is a log file from a taskmanager. > Please take a look at the log file considering the below events: > - Around 01:10:47 : the job is submitted to the job m

Re: Restore from a savepoint is very slow

2018-04-02 Thread Dongwon Kim
appreciated! T.T Best, - Dongwon tm.log Description: Binary data > 2018. 4. 2. 오후 2:30, Dongwon Kim <eastcirc...@gmail.com> 작성: > > Hi, > > While restoring from the latest checkpoint starts immediately after the job > is restarted, restoring from a savepoint takes more

Restore from a savepoint is very slow

2018-04-01 Thread Dongwon Kim
Hi, While restoring from the latest checkpoint starts immediately after the job is restarted, restoring from a savepoint takes more than five minutes until the job makes progress. During the blackout, I cannot observe any resource usage over the cluster. After that period of time, I observe

  1   2   >