Re: Tumbling window with timestamp out-of-range events
Please ignore this message. The issue was that a different timestamp extractor was used when the kafka source was setup. That caused the issue. On Tue, Jun 9, 2020 at 2:58 PM Yu Yang wrote: > Hi, > > > We implement a flink application that uses TumblingWindow, and uses even > time as time characteristics. In the TumblingWindow's process function, we > has the implementation below that checks whether the event's timestamp is > in the tumbling window's timestamp range. We expected that all events > shall be in the range. However, the application reports events with > out-of-range timestamps. Any insights on how this happens? > > > @Override > public void process(EventStreamPartitionKey key, > Context context, Iterable elements, > Collector out) { > > for(Event event : elements) { > if ( event.getTimestamp() >= context.window().getEnd() || >event.getTimestamp() < context.window().getStart() ) > > System.out.println("NOT in RANGE: " + context.window().getStart() > > + ", " + event.getTimestamp() + ", " + context.window().getEnd()); > ... > > } > out.collect(res); > } > > > Thanks! > > > Regards, > > -Yu >
Tumbling window with timestamp out-of-range events
Hi, We implement a flink application that uses TumblingWindow, and uses even time as time characteristics. In the TumblingWindow's process function, we has the implementation below that checks whether the event's timestamp is in the tumbling window's timestamp range. We expected that all events shall be in the range. However, the application reports events with out-of-range timestamps. Any insights on how this happens? @Override public void process(EventStreamPartitionKey key, Context context, Iterable elements, Collector out) { for(Event event : elements) { if ( event.getTimestamp() >= context.window().getEnd() || event.getTimestamp() < context.window().getStart() ) System.out.println("NOT in RANGE: " + context.window().getStart() + ", " + event.getTimestamp() + ", " + context.window().getEnd()); ... } out.collect(res); } Thanks! Regards, -Yu
sanity checking in ProcessWindowFunction.process shows that event timestamps are out of tumbling window time range
Hi all, We are writing an application that set TimeCharacteristic.EventTime as time characteristic. When we implement the ProcessWindowFunction for a TumblingWindow, we added code as below to check if the timestamp of events is in the tumbling window time range. To our surprise, we found that the process function reports processing events that are not in the tumbling window time range. Any insights on how this happens? We are using Flink 1.9.1. Below is the timestamp assigner, stream dag snippet and process function implementation: Timestamp assigner: FlinkKafkaConsumerBase source = consumer.assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor(Time.seconds(60)) { @Override public long extractTimestamp(Event element) { return element.getTimestamp(); } }); The stream dag of our application: env.addSource(source) .filter(new EventFilter(events)) .keyBy(new KeySelector() { @Override public EventStreamPartitionKey getKey(Event value) throws Exception { return new EventStreamPartitionKey(value.getHost()); } }).window(TumblingEventTimeWindows.of(Time.seconds(60)) .process(new EventValidator()) .addSink(kafkaProducer); The implementation of process window function EventValidator.process that checks whether the event timestamp is in the tumbling window time range: @Override public void process(EventStreamPartitionKey key, Context context, Iterable elements, Collector out) { for(Event event : elements) { if ( event.getTimestamp() >= context.window().getEnd() || event.getTimestamp() < context.window().getStart() ) System.out.println("NOT in RANGE: " + context.window().getStart() + ", " + event.getTimestamp() + ", " + context.window().getEnd()); ... } out.collect(res); } Thanks! Regards, -Yu
Re: best practice for handling corrupted records / exceptions in custom DefaultKryoSerializer?
Thanks for the suggestion, Yun! On Sun, May 31, 2020 at 11:15 PM Yun Gao wrote: > Hi Yu, > > I think when the serializer returns *null, *the following operator should > still receive a record of null. A possible thought is that the following > operator may couting the number of null records received and use a metric > to publish the value to a monitor system, and the monitor system promethus, > and the monitor system should be able to configure alert conditions. > > If *null* has problems, a special indicating object instance may be > created like NULL_TBASE, and the operator should be able to count the > number of NULL_TBASE received. > > Best, > Yun > > > ------Original Mail -- > *Sender:*Yu Yang > *Send Date:*Mon Jun 1 06:37:35 2020 > *Recipients:*user > *Subject:*best practice for handling corrupted records / exceptions in > custom DefaultKryoSerializer? > >> Hi all, >> >> To deal with corrupted messages that can leak into the data source once >> in a while, we implement a custom DefaultKryoSerializer class as below that >> catches exceptions. The custom serializer returns null in read(...) method >> when it encounters exception in reading. With this implementation, the >> serializer may silently drop records. One concern is that it may drop too >> many records before we notice and take actions. What is the best practice >> to handle this? >> >> The serializer processes one record at a time. Will reading a corrupted >> record make the serialize fail to process the next valid record? >> >> public class CustomTBaseSerializer extends TBaseSerializer { >> private static final Logger LOG = LoggerFactory.getLogger >> (CustomTBaseSerializer.class); >> @Override >> public void write(Kryo kryo, Output output, TBase tBase) { >> try { >> super.write(kryo, output, tBase); >> } catch (Throwable t) { >> LOG.error("Failed to write due to unexpected Throwable", t); >> } >> } >> >> @Override >> public TBase read(Kryo kryo, Input input, Class tBaseClass) { >> try { >> return super.read(kryo, input, tBaseClass); >> } catch (Throwable t) { >> LOG.error("Failed to read from input due to unexpected >> Throwable", t); >> return null; >> } >> } >> } >> >> Thank you! >> >> Regards, >> -Yu >> >
best practice for handling corrupted records / exceptions in custom DefaultKryoSerializer?
Hi all, To deal with corrupted messages that can leak into the data source once in a while, we implement a custom DefaultKryoSerializer class as below that catches exceptions. The custom serializer returns null in read(...) method when it encounters exception in reading. With this implementation, the serializer may silently drop records. One concern is that it may drop too many records before we notice and take actions. What is the best practice to handle this? The serializer processes one record at a time. Will reading a corrupted record make the serialize fail to process the next valid record? public class CustomTBaseSerializer extends TBaseSerializer { private static final Logger LOG = LoggerFactory.getLogger (CustomTBaseSerializer.class); @Override public void write(Kryo kryo, Output output, TBase tBase) { try { super.write(kryo, output, tBase); } catch (Throwable t) { LOG.error("Failed to write due to unexpected Throwable", t); } } @Override public TBase read(Kryo kryo, Input input, Class tBaseClass) { try { return super.read(kryo, input, tBaseClass); } catch (Throwable t) { LOG.error("Failed to read from input due to unexpected Throwable", t); return null; } } } Thank you! Regards, -Yu
checkpoint _metadata file has >20x different in size among different check-points
Hi all, We have a flink job that does check-pointing per 10 minutes. We noticed that for the check-points of this job, the _metadata file size can vary a lot. In some checkpoint, we observe that _metadata file size was >900MB, while in some other check-points of the same job, the _metadata file size is < 4MB. Any insights on what may cause the difference? Thank you! Regards, -Yu
Re: best practices on getting flink job logs from Hadoop history server?
Hi Yun Tang & Zhu Zhu, Thanks for the reply! With your current approach, we will still need to search job manager log / yarn client log to find information on job id/vertex id --> yarn container id mapping. I am wondering howe we can propagate this kind of information to Flink execution graph so that it can stored under flink history server's archived execution graph. Any suggestions about that? -Yu On Fri, Aug 30, 2019 at 2:21 AM Yun Tang wrote: > Hi Yu > > If you have client job log and you could find your application id from > below description: > > The Flink YARN client has been started in detached mode. In order to stop > Flink on YARN, use the following command or a YARN web interface to stop it: > yarn application -kill {appId} > Please also note that the temporary files of the YARN session in the home > directory will not be removed. > > Best > Yun Tang > > -- > *From:* Zhu Zhu > *Sent:* Friday, August 30, 2019 16:24 > *To:* Yu Yang > *Cc:* user > *Subject:* Re: best practices on getting flink job logs from Hadoop > history server? > > Hi Yu, > > Regarding #2, > Currently we search task deployment log in JM log, which contains info of > the container and machine the task deploys to. > > Regarding #3, > You can find the application logs aggregated by machines on DFS, this path > of which relies on your YARN config. > Each log may still include multiple TM logs. However it can be much > smaller than the "yarn logs ..." generated log. > > Thanks, > Zhu Zhu > > Yu Yang 于2019年8月30日周五 下午3:58写道: > > Hi, > > We run flink jobs through yarn on hadoop clusters. One challenge that we > are facing is to simplify flink job log access. > > The flink job logs can be accessible using "yarn logs $application_id". > That approach has a few limitations: > >1. It is not straightforward to find yarn application id based on >flink job id. >2. It is difficult to find the corresponding container id for the >flink sub tasks. >3. For jobs that have many tasks, it is inefficient to use "yarn logs >..." as it mixes logs from all task managers. > > Any suggestions on the best practice to get logs for completed flink job > that run on yarn? > > Regards, > -Yu > > >
best practices on getting flink job logs from Hadoop history server?
Hi, We run flink jobs through yarn on hadoop clusters. One challenge that we are facing is to simplify flink job log access. The flink job logs can be accessible using "yarn logs $application_id". That approach has a few limitations: 1. It is not straightforward to find yarn application id based on flink job id. 2. It is difficult to find the corresponding container id for the flink sub tasks. 3. For jobs that have many tasks, it is inefficient to use "yarn logs ..." as it mixes logs from all task managers. Any suggestions on the best practice to get logs for completed flink job that run on yarn? Regards, -Yu
metrics for checking whether a broker throttles requests based on its quota limits?
Hi, Recently we enabled Kafka quota management for our Kafka clusters. We are looking for Kafka metrics that can be used for alerting on whether a Kafka broker throttles requests based on quota. There are a few throttle related metrics on Kafka. But none of them can tell accurately whether the broker is throttling the requests. Could anyone share insights on this? kafka.network.produce.throttletimems.requestmetrics.95thPercentile kafka.network.produce.throttletimems.requestmetrics.Count Thanks! Regards, -Yu
Re: can flink sql handle udf-generated timestamp field
Thank you Fabian! We will try the approach that you suggest. On Thu, Jun 6, 2019 at 1:03 AM Fabian Hueske wrote: > Hi Yu, > > When you register a DataStream as a Table, you can create a new attribute > that contains the event timestamp of the DataStream records. > For that, you would need to assign timestamps and generate watermarks > before registering the stream: > > FlinkKafkaConsumer kafkaConsumer = > new FlinkKafkaConsumer(“customer_orders”, deserializationSchema, > m10n05Properties); > > // create DataStream from Kafka consumer > DataStream orders = env.addSource(kafkaConsumer); > // assign timestamps with a custom timestamp assigner & WM generator > DataStream ordersWithTS = > orders.assignTimestampsAndWatermarks(new YourTimestampAssigner()); > > // register DataStream as Table with ts as timestamp which is > automatically extracted (see [1] for how to map POJO fields and [2] for > timestamps) > tableEnv.registerDataStream("custom_orders", ordersWithTS, "userName, ..., > ts.rowtime"); > > Hope this helps, > Fabian > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/common.html#mapping-of-data-types-to-table-schema > [2] > https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/streaming/time_attributes.html#during-datastream-to-table-conversion-1 > > Am Do., 6. Juni 2019 um 08:48 Uhr schrieb Yu Yang : > >> Hi Jingsong, >> >> Thanks for the reply! The following is our code snippet for creating the >> log stream. Our messages are in thrift format. We use a customized >> serializer for serializing/deserializing messages ( see >> https://github.com/apache/flink/pull/8067 for the implementation) . >> Given that, how shall we define a time attribute column? We'd like to >> leverage customized serializer to figure out column names as much as >> possible. >> >> ThriftDeserializationSchema deserializationSchema = >> new ThriftDeserializationSchema(CustomerOrders.class, >> ThriftCodeGenerator.SCROOGE); >> >> FlinkKafkaConsumer kafkaConsumer = >> new FlinkKafkaConsumer(“customer_orders”, deserializationSchema, >> m10n05Properties); >> >> tableEnv.registerDataStream(“orders”, kafkaConsumer); >> >> Regards, >> -Yu >> >> On Wed, Jun 5, 2019 at 11:15 PM JingsongLee >> wrote: >> >>> Hi @Yu Yang: >>> >>> Time-based operations such as windows in both the Table API and SQL require >>> >>> information about the notion of time and its origin. Therefore, tables can >>> offer >>> >>> logical time attributes for indicating time and accessing corresponding >>> timestamps >>> in table programs.[1] >>> This mean Window can only be defined over a time attribute column. >>> You need define a rowtime in your source just like (UserActionTime is a >>> long field, you don't need convert it to Timestamp): >>> >>> Table table = tEnv.fromDataStream(stream, "Username, Data, >>> UserActionTime.rowtime"); >>> >>> See more information in below document: >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/streaming/time_attributes.html#event-time >>> >>> Best, JingsongLee >>> >>> -- >>> From:Yu Yang >>> Send Time:2019年6月5日(星期三) 14:57 >>> To:user >>> Subject:can flink sql handle udf-generated timestamp field >>> >>> Hi, >>> >>> I am trying to use Flink SQL to do aggregation on a hopping window. In >>> the data stream, we store the timestamp in long type. So I wrote a UDF >>> 'FROM_UNIXTIME' to convert long to Timestamp type. >>> >>> public static class TimestampModifier extends ScalarFunction { >>> public Timestamp eval(long t) { >>> return new Timestamp(t); >>> } >>> public TypeInformation getResultType(Class[] signature) { >>> return Types.SQL_TIMESTAMP; >>> } >>> } >>> >>> With the above UDF, I wrote the following query, and ran into >>> "ProgramInvocationException: The main method caused an error: Window can >>> only be defined over a time attribute column". >>> Any suggestions on how to resolve this issue? I am using Flink 1.8 for >>> this experiment. >>> >>> my sql query: >>> >>> select keyid, sum(value) >>> from ( >>>select
Re: can flink sql handle udf-generated timestamp field
Hi Jingsong, Thanks for the reply! The following is our code snippet for creating the log stream. Our messages are in thrift format. We use a customized serializer for serializing/deserializing messages ( see https://github.com/apache/flink/pull/8067 for the implementation) . Given that, how shall we define a time attribute column? We'd like to leverage customized serializer to figure out column names as much as possible. ThriftDeserializationSchema deserializationSchema = new ThriftDeserializationSchema(CustomerOrders.class, ThriftCodeGenerator.SCROOGE); FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer(“customer_orders”, deserializationSchema, m10n05Properties); tableEnv.registerDataStream(“orders”, kafkaConsumer); Regards, -Yu On Wed, Jun 5, 2019 at 11:15 PM JingsongLee wrote: > Hi @Yu Yang: > Time-based operations such as windows in both the Table API and SQL require > > information about the notion of time and its origin. Therefore, tables can > offer > > logical time attributes for indicating time and accessing corresponding > timestamps > in table programs.[1] > This mean Window can only be defined over a time attribute column. > You need define a rowtime in your source just like (UserActionTime is a > long field, you don't need convert it to Timestamp): > > Table table = tEnv.fromDataStream(stream, "Username, Data, > UserActionTime.rowtime"); > > See more information in below document: > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/streaming/time_attributes.html#event-time > > Best, JingsongLee > > -- > From:Yu Yang > Send Time:2019年6月5日(星期三) 14:57 > To:user > Subject:can flink sql handle udf-generated timestamp field > > Hi, > > I am trying to use Flink SQL to do aggregation on a hopping window. In the > data stream, we store the timestamp in long type. So I wrote a UDF > 'FROM_UNIXTIME' to convert long to Timestamp type. > > public static class TimestampModifier extends ScalarFunction { > public Timestamp eval(long t) { > return new Timestamp(t); > } > public TypeInformation getResultType(Class[] signature) { > return Types.SQL_TIMESTAMP; > } > } > > With the above UDF, I wrote the following query, and ran into > "ProgramInvocationException: The main method caused an error: Window can > only be defined over a time attribute column". > Any suggestions on how to resolve this issue? I am using Flink 1.8 for > this experiment. > > my sql query: > > select keyid, sum(value) > from ( >select FROM_UNIXTIME(timestampMs) as ts, key.id as keyid, value >from orders) > group by HOP(ts, INTERVAL ‘10’ second, INTERVAL ‘30’ second), keyid > > flink exception: > > org.apache.flink.client.program.ProgramInvocationException: The main > method caused an error: Window can only be defined over a time attribute > column. > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:423) > at > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813) > at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287) > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) > at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126) > at > org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126) > Caused by: org.apache.flink.table.api.ValidationException: Window can only > be defined over a time attribute column. > at > org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.getOperandAsTimeIndicator$1(DataStreamLogicalWindowAggregateRule.scala:85) > at > org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.translateWindowExpression(DataStreamLogicalWindowAggregateRule.scala:99) > at > org.apache.flink.table.plan.rules.common.LogicalWindowAggregateRule.onMatch(LogicalWindowAggregateRule.scala:66) > at > org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:319) > at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:559) > at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:418) > > Regards, > -Yu > > >
Re: can flink sql handle udf-generated timestamp field
+flink-user On Wed, Jun 5, 2019 at 9:58 AM Yu Yang wrote: > Thanks for the reply! In flink-table-planner, TimeIndicatorTypeInfo is an > internal class that cannot be referenced from application. I got "cannot > find symbol" error when I tried to use it. I have also tried to use " > SqlTimeTypeInfo.getInfoFor(Timestamp.class) " as return type for my udf > type info. With that, I got the same "Window can only be defined over a > time attribute column" error as before. > > On Wed, Jun 5, 2019 at 4:41 AM Lee tinker > wrote: > >> Hi Yu Yang: >> When you want to use time on window, the type of time should be right >> according to flink. We can see you return a Types.SQL_TIMESTAMP in your >> UDF. This type should be TimeIndicatorTypeInfo.PROCTIME_INDICATOR >> or TimeIndicatorTypeInfo.ROWTIME_INDICATOR instead of Types.SQL_TIMESTAMP >> according to your time type(proctime or rowtime). You can try it again by >> using it. >> >> Yu Yang 于2019年6月5日周三 下午2:57写道: >> >>> Hi, >>> >>> I am trying to use Flink SQL to do aggregation on a hopping window. In >>> the data stream, we store the timestamp in long type. So I wrote a UDF >>> 'FROM_UNIXTIME' to convert long to Timestamp type. >>> >>> public static class TimestampModifier extends ScalarFunction { >>> public Timestamp eval(long t) { >>> return new Timestamp(t); >>> } >>> public TypeInformation getResultType(Class[] signature) { >>> return Types.SQL_TIMESTAMP; >>> } >>> } >>> >>> With the above UDF, I wrote the following query, and ran into >>> "ProgramInvocationException: The main method caused an error: Window can >>> only be defined over a time attribute column". >>> Any suggestions on how to resolve this issue? I am using Flink 1.8 for >>> this experiment. >>> >>> my sql query: >>> >>> select keyid, sum(value) >>> from ( >>>select FROM_UNIXTIME(timestampMs) as ts, key.id as keyid, value >>>from orders) >>> group by HOP(ts, INTERVAL ‘10’ second, INTERVAL ‘30’ second), keyid >>> >>> flink exception: >>> >>> org.apache.flink.client.program.ProgramInvocationException: The main >>> method caused an error: Window can only be defined over a time attribute >>> column. >>> at >>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546) >>> at >>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421) >>> at >>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:423) >>> at >>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813) >>> at >>> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287) >>> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) >>> at >>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050) >>> at >>> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126) >>> at >>> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) >>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126) >>> Caused by: org.apache.flink.table.api.ValidationException: Window can >>> only be defined over a time attribute column. >>> at >>> org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.getOperandAsTimeIndicator$1(DataStreamLogicalWindowAggregateRule.scala:85) >>> at >>> org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.translateWindowExpression(DataStreamLogicalWindowAggregateRule.scala:99) >>> at >>> org.apache.flink.table.plan.rules.common.LogicalWindowAggregateRule.onMatch(LogicalWindowAggregateRule.scala:66) >>> at >>> org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:319) >>> at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:559) >>> at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:418) >>> >>> Regards, >>> -Yu >>> >>
can flink sql handle udf-generated timestamp field
Hi, I am trying to use Flink SQL to do aggregation on a hopping window. In the data stream, we store the timestamp in long type. So I wrote a UDF 'FROM_UNIXTIME' to convert long to Timestamp type. public static class TimestampModifier extends ScalarFunction { public Timestamp eval(long t) { return new Timestamp(t); } public TypeInformation getResultType(Class[] signature) { return Types.SQL_TIMESTAMP; } } With the above UDF, I wrote the following query, and ran into "ProgramInvocationException: The main method caused an error: Window can only be defined over a time attribute column". Any suggestions on how to resolve this issue? I am using Flink 1.8 for this experiment. my sql query: select keyid, sum(value) from ( select FROM_UNIXTIME(timestampMs) as ts, key.id as keyid, value from orders) group by HOP(ts, INTERVAL ‘10’ second, INTERVAL ‘30’ second), keyid flink exception: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Window can only be defined over a time attribute column. at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:423) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813) at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050) at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126) at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126) Caused by: org.apache.flink.table.api.ValidationException: Window can only be defined over a time attribute column. at org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.getOperandAsTimeIndicator$1(DataStreamLogicalWindowAggregateRule.scala:85) at org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.translateWindowExpression(DataStreamLogicalWindowAggregateRule.scala:99) at org.apache.flink.table.plan.rules.common.LogicalWindowAggregateRule.onMatch(LogicalWindowAggregateRule.scala:66) at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:319) at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:559) at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:418) Regards, -Yu
read a finite number of messages from Kafka using Kafka connector without extending it?
Hi, We are considering to use Flink SQL for ad hoc data analytics on real-time Kafka data, and want to limit the queries to process data in the past 5-10 minutes. To achieve that, one possible approach is to extend the current Kafka connect to have it only read messages in a given period of time to generate a finite DataStream. I am wondering if there is an alternative to this approach. Any suggestions will be very much appreciated. Regards, -Yu