Re: Tumbling window with timestamp out-of-range events

2020-06-09 Thread Yu Yang
Please ignore this message. The issue was that a different timestamp
extractor was used when the kafka source was setup. That caused the issue.

On Tue, Jun 9, 2020 at 2:58 PM Yu Yang  wrote:

> Hi,
>
>
> We implement a flink application that uses TumblingWindow, and uses even
> time as time characteristics. In the TumblingWindow's process function, we
> has the implementation below that checks whether the event's timestamp is
> in the tumbling window's timestamp range.  We expected that all events
> shall be in the range. However, the application reports events with
> out-of-range timestamps.  Any insights on how this happens?
>
>
> @Override
> public void process(EventStreamPartitionKey key,
>   Context context, Iterable elements,
> Collector out) {
>
> for(Event event : elements) {
> if ( event.getTimestamp() >= context.window().getEnd() ||
>event.getTimestamp() < context.window().getStart() )
>
> System.out.println("NOT in RANGE: " + context.window().getStart()
>
> + ", " + event.getTimestamp() + ", " + context.window().getEnd());
> ...
>
> }
> out.collect(res);
> }
>
>
> Thanks!
>
>
> Regards,
>
> -Yu
>


Tumbling window with timestamp out-of-range events

2020-06-09 Thread Yu Yang
Hi,


We implement a flink application that uses TumblingWindow, and uses even
time as time characteristics. In the TumblingWindow's process function, we
has the implementation below that checks whether the event's timestamp is
in the tumbling window's timestamp range.  We expected that all events
shall be in the range. However, the application reports events with
out-of-range timestamps.  Any insights on how this happens?


@Override
public void process(EventStreamPartitionKey key,
  Context context, Iterable elements,
Collector out) {

for(Event event : elements) {
if ( event.getTimestamp() >= context.window().getEnd() ||
   event.getTimestamp() < context.window().getStart() )

System.out.println("NOT in RANGE: " + context.window().getStart()

+ ", " + event.getTimestamp() + ", " + context.window().getEnd());
...

}
out.collect(res);
}


Thanks!


Regards,

-Yu


sanity checking in ProcessWindowFunction.process shows that event timestamps are out of tumbling window time range

2020-06-09 Thread Yu Yang
Hi all,

We are writing an application that set TimeCharacteristic.EventTime as time
characteristic. When we implement the ProcessWindowFunction for a
TumblingWindow, we added code as below to check if the timestamp of events
is in the tumbling window time range. To our surprise, we found that the
process function reports processing events that are not in the tumbling
window time range. Any insights on how this happens?  We are using Flink
1.9.1.

Below is the timestamp assigner, stream dag snippet and process function
implementation:

Timestamp assigner:

FlinkKafkaConsumerBase source =
consumer.assignTimestampsAndWatermarks(

   new BoundedOutOfOrdernessTimestampExtractor(Time.seconds(60)) {

 @Override

 public long extractTimestamp(Event element) {

   return element.getTimestamp();

 }

   });


The stream dag of our application:

env.addSource(source)

   .filter(new EventFilter(events))

.keyBy(new KeySelector() {

@Override

public EventStreamPartitionKey  getKey(Event value)

   throws Exception {

return new EventStreamPartitionKey(value.getHost());

   }

}).window(TumblingEventTimeWindows.of(Time.seconds(60))

   .process(new EventValidator())

   .addSink(kafkaProducer);

The implementation of  process window function EventValidator.process that
checks whether the event timestamp is in the tumbling window time range:

@Override
public void process(EventStreamPartitionKey key,
  Context context, Iterable elements,
Collector out) {

for(Event event : elements) {
if ( event.getTimestamp() >= context.window().getEnd() ||
   event.getTimestamp() < context.window().getStart() )

System.out.println("NOT in RANGE: " + context.window().getStart()

+ ", " + event.getTimestamp() + ", " + context.window().getEnd());
...

}
out.collect(res);
}



Thanks!


Regards,

-Yu


Re: best practice for handling corrupted records / exceptions in custom DefaultKryoSerializer?

2020-06-01 Thread Yu Yang
Thanks for the suggestion, Yun!

On Sun, May 31, 2020 at 11:15 PM Yun Gao  wrote:

> Hi Yu,
>
> I think when the serializer returns *null, *the following operator should
> still receive a record of null. A possible thought is that the following
> operator may couting the number of null records received and use a metric
> to publish the value to a monitor system, and the monitor system promethus,
> and the monitor system should be able to configure alert conditions.
>
> If *null* has problems, a special indicating object instance may be
> created like NULL_TBASE, and the operator should be able to count the
> number of NULL_TBASE received.
>
> Best,
>  Yun
>
>
> ------Original Mail --
> *Sender:*Yu Yang 
> *Send Date:*Mon Jun 1 06:37:35 2020
> *Recipients:*user 
> *Subject:*best practice for handling corrupted records / exceptions in
> custom DefaultKryoSerializer?
>
>> Hi all,
>>
>> To deal with corrupted messages that can leak into the data source once
>> in a while, we implement a custom DefaultKryoSerializer class as below that
>> catches exceptions. The custom serializer returns null in read(...) method
>> when it encounters exception in reading. With this implementation, the
>> serializer may silently drop records.  One concern is that it may drop too
>> many records before we notice and take actions. What is the best practice
>> to handle this?
>>
>> The serializer processes one record at a time. Will reading a corrupted
>> record make the serialize fail to process the next valid record?
>>
>> public class CustomTBaseSerializer extends TBaseSerializer {
>>  private static final Logger LOG = LoggerFactory.getLogger
>> (CustomTBaseSerializer.class);
>>  @Override
>>  public void write(Kryo kryo, Output output, TBase tBase) {
>>  try {
>>  super.write(kryo, output, tBase);
>> } catch (Throwable t) {
>>  LOG.error("Failed to write due to unexpected Throwable", t);
>> }
>> }
>>
>>  @Override
>>  public TBase read(Kryo kryo, Input input, Class tBaseClass) {
>>  try {
>>  return super.read(kryo, input, tBaseClass);
>> } catch (Throwable t) {
>>  LOG.error("Failed to read from input due to unexpected
>> Throwable", t);
>>  return null;
>> }
>>  }
>>   }
>>
>> Thank you!
>>
>> Regards,
>> -Yu
>>
>


best practice for handling corrupted records / exceptions in custom DefaultKryoSerializer?

2020-05-31 Thread Yu Yang
Hi all,

To deal with corrupted messages that can leak into the data source once in
a while, we implement a custom DefaultKryoSerializer class as below that
catches exceptions. The custom serializer returns null in read(...) method
when it encounters exception in reading. With this implementation, the
serializer may silently drop records.  One concern is that it may drop too
many records before we notice and take actions. What is the best practice
to handle this?

The serializer processes one record at a time. Will reading a corrupted
record make the serialize fail to process the next valid record?

public class CustomTBaseSerializer extends TBaseSerializer {
 private static final Logger LOG = LoggerFactory.getLogger
(CustomTBaseSerializer.class);
 @Override
 public void write(Kryo kryo, Output output, TBase tBase) {
 try {
 super.write(kryo, output, tBase);
} catch (Throwable t) {
 LOG.error("Failed to write due to unexpected Throwable", t);
}
}

 @Override
 public TBase read(Kryo kryo, Input input, Class tBaseClass) {
 try {
 return super.read(kryo, input, tBaseClass);
} catch (Throwable t) {
 LOG.error("Failed to read from input due to unexpected
Throwable", t);
 return null;
}
 }
  }

Thank you!

Regards,
-Yu


checkpoint _metadata file has >20x different in size among different check-points

2020-03-04 Thread Yu Yang
Hi all,

We have a flink job that does check-pointing per 10 minutes. We noticed
that for the check-points of this job,  the _metadata file size can vary a
lot. In some checkpoint, we observe that _metadata file size was >900MB,
while in some other check-points of the same job, the _metadata file size
is < 4MB.  Any insights on what may cause the difference?

Thank you!

Regards,
-Yu


Re: best practices on getting flink job logs from Hadoop history server?

2019-09-05 Thread Yu Yang
Hi Yun Tang & Zhu Zhu,

Thanks for the reply!  With your current approach, we will still need to
search job manager log / yarn client log to find information on job
id/vertex id --> yarn container id mapping. I am wondering howe we can
propagate this kind of information to Flink execution graph so that it can
stored under flink history server's archived execution graph. Any
suggestions about that?

-Yu

On Fri, Aug 30, 2019 at 2:21 AM Yun Tang  wrote:

> Hi  Yu
>
> If you have client job log and you could find your application id from
> below description:
>
> The Flink YARN client has been started in detached mode. In order to stop
> Flink on YARN, use the following command or a YARN web interface to stop it:
> yarn application -kill {appId}
> Please also note that the temporary files of the YARN session in the home
> directory will not be removed.
>
> Best
> Yun Tang
>
> --
> *From:* Zhu Zhu 
> *Sent:* Friday, August 30, 2019 16:24
> *To:* Yu Yang 
> *Cc:* user 
> *Subject:* Re: best practices on getting flink job logs from Hadoop
> history server?
>
> Hi Yu,
>
> Regarding #2,
> Currently we search task deployment log in JM log, which contains info of
> the container and machine the task deploys to.
>
> Regarding #3,
> You can find the application logs aggregated by machines on DFS, this path
> of which relies on your YARN config.
> Each log may still include multiple TM logs. However it can be much
> smaller than the "yarn logs ..." generated log.
>
> Thanks,
> Zhu Zhu
>
> Yu Yang  于2019年8月30日周五 下午3:58写道:
>
> Hi,
>
> We run flink jobs through yarn on hadoop clusters. One challenge that we
> are facing is to simplify flink job log access.
>
> The flink job logs can be accessible using "yarn logs $application_id".
> That approach has a few limitations:
>
>1. It is not straightforward to find yarn application id based on
>flink job id.
>2. It is difficult to find the corresponding container id for the
>flink sub tasks.
>3. For jobs that have many tasks, it is inefficient to use "yarn logs
>..."  as it mixes logs from all task managers.
>
> Any suggestions on the best practice to get logs for completed flink job
> that run on yarn?
>
> Regards,
> -Yu
>
>
>


best practices on getting flink job logs from Hadoop history server?

2019-08-30 Thread Yu Yang
Hi,

We run flink jobs through yarn on hadoop clusters. One challenge that we
are facing is to simplify flink job log access.

The flink job logs can be accessible using "yarn logs $application_id".
That approach has a few limitations:

   1. It is not straightforward to find yarn application id based on flink
   job id.
   2. It is difficult to find the corresponding container id for the flink
   sub tasks.
   3. For jobs that have many tasks, it is inefficient to use "yarn logs
   ..."  as it mixes logs from all task managers.

Any suggestions on the best practice to get logs for completed flink job
that run on yarn?

Regards,
-Yu


metrics for checking whether a broker throttles requests based on its quota limits?

2019-06-20 Thread Yu Yang
Hi,

Recently we enabled Kafka quota management for our Kafka clusters. We are
looking for Kafka metrics that can be used for alerting on whether a Kafka
broker throttles requests based on quota.

There are a few throttle related metrics on Kafka. But none of them can
tell accurately whether the broker is throttling the requests.  Could
anyone share insights on this?

kafka.network.produce.throttletimems.requestmetrics.95thPercentile

kafka.network.produce.throttletimems.requestmetrics.Count

Thanks!

Regards,
-Yu


Re: can flink sql handle udf-generated timestamp field

2019-06-06 Thread Yu Yang
Thank you Fabian!  We will try the approach that you suggest.

On Thu, Jun 6, 2019 at 1:03 AM Fabian Hueske  wrote:

> Hi Yu,
>
> When you register a DataStream as a Table, you can create a new attribute
> that contains the event timestamp of the DataStream records.
> For that, you would need to assign timestamps and generate watermarks
> before registering the stream:
>
> FlinkKafkaConsumer kafkaConsumer =
> new FlinkKafkaConsumer(“customer_orders”, deserializationSchema,
> m10n05Properties);
>
> // create DataStream from Kafka consumer
> DataStream orders = env.addSource(kafkaConsumer);
> // assign timestamps with a custom timestamp assigner & WM generator
> DataStream ordersWithTS =
> orders.assignTimestampsAndWatermarks(new YourTimestampAssigner());
>
> // register DataStream as Table with ts as timestamp which is
> automatically extracted (see [1] for how to map POJO fields and [2] for
> timestamps)
> tableEnv.registerDataStream("custom_orders", ordersWithTS, "userName, ...,
> ts.rowtime");
>
> Hope this helps,
> Fabian
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/common.html#mapping-of-data-types-to-table-schema
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/streaming/time_attributes.html#during-datastream-to-table-conversion-1
>
> Am Do., 6. Juni 2019 um 08:48 Uhr schrieb Yu Yang :
>
>> Hi Jingsong,
>>
>> Thanks for the reply! The following is our code snippet for creating the
>> log stream.  Our messages are in thrift format. We use a customized
>> serializer for serializing/deserializing messages ( see
>> https://github.com/apache/flink/pull/8067 for the implementation) .
>> Given that, how shall we define a time attribute column?  We'd like to
>> leverage customized serializer to figure out column names as much as
>> possible.
>>
>> ThriftDeserializationSchema deserializationSchema =
>> new ThriftDeserializationSchema(CustomerOrders.class,
>> ThriftCodeGenerator.SCROOGE);
>>
>> FlinkKafkaConsumer kafkaConsumer =
>> new FlinkKafkaConsumer(“customer_orders”, deserializationSchema,
>> m10n05Properties);
>>
>> tableEnv.registerDataStream(“orders”, kafkaConsumer);
>>
>> Regards,
>> -Yu
>>
>> On Wed, Jun 5, 2019 at 11:15 PM JingsongLee 
>> wrote:
>>
>>> Hi @Yu Yang:
>>>
>>> Time-based operations such as windows in both the Table API and SQL require
>>>
>>>  information about the notion of time and its origin. Therefore, tables can 
>>> offer
>>>
>>>  logical time attributes for indicating time and accessing corresponding 
>>> timestamps
>>>  in table programs.[1]
>>> This mean Window can only be defined over a time attribute column.
>>> You need define a rowtime in your source just like (UserActionTime is a
>>> long field, you don't need convert it to Timestamp):
>>>
>>> Table table = tEnv.fromDataStream(stream, "Username, Data, 
>>> UserActionTime.rowtime");
>>>
>>> See more information in below document:
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/streaming/time_attributes.html#event-time
>>>
>>> Best, JingsongLee
>>>
>>> --
>>> From:Yu Yang 
>>> Send Time:2019年6月5日(星期三) 14:57
>>> To:user 
>>> Subject:can flink sql handle udf-generated timestamp field
>>>
>>> Hi,
>>>
>>> I am trying to use Flink SQL to do aggregation on a hopping window. In
>>> the data stream, we store the timestamp in long type. So I wrote a UDF
>>> 'FROM_UNIXTIME' to convert long to Timestamp type.
>>>
>>>   public static class TimestampModifier extends ScalarFunction {
>>> public Timestamp eval(long t) {
>>>   return new Timestamp(t);
>>> }
>>> public TypeInformation getResultType(Class[] signature) {
>>>   return Types.SQL_TIMESTAMP;
>>> }
>>>   }
>>>
>>> With the above UDF, I wrote the following query, and ran into
>>>  "ProgramInvocationException: The main method caused an error: Window can
>>> only be defined over a time attribute column".
>>> Any suggestions on how to resolve this issue? I am using Flink 1.8 for
>>> this experiment.
>>>
>>> my sql query:
>>>
>>> select  keyid, sum(value)
>>> from (
>>>select 

Re: can flink sql handle udf-generated timestamp field

2019-06-06 Thread Yu Yang
Hi Jingsong,

Thanks for the reply! The following is our code snippet for creating the
log stream.  Our messages are in thrift format. We use a customized
serializer for serializing/deserializing messages ( see
https://github.com/apache/flink/pull/8067 for the implementation) . Given
that, how shall we define a time attribute column?  We'd like to leverage
customized serializer to figure out column names as much as possible.

ThriftDeserializationSchema deserializationSchema =
new ThriftDeserializationSchema(CustomerOrders.class,
ThriftCodeGenerator.SCROOGE);

FlinkKafkaConsumer kafkaConsumer =
new FlinkKafkaConsumer(“customer_orders”, deserializationSchema,
m10n05Properties);

tableEnv.registerDataStream(“orders”, kafkaConsumer);

Regards,
-Yu

On Wed, Jun 5, 2019 at 11:15 PM JingsongLee  wrote:

> Hi @Yu Yang:
> Time-based operations such as windows in both the Table API and SQL require
>
>  information about the notion of time and its origin. Therefore, tables can 
> offer
>
>  logical time attributes for indicating time and accessing corresponding 
> timestamps
>  in table programs.[1]
> This mean Window can only be defined over a time attribute column.
> You need define a rowtime in your source just like (UserActionTime is a
> long field, you don't need convert it to Timestamp):
>
> Table table = tEnv.fromDataStream(stream, "Username, Data, 
> UserActionTime.rowtime");
>
> See more information in below document:
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/streaming/time_attributes.html#event-time
>
> Best, JingsongLee
>
> --
> From:Yu Yang 
> Send Time:2019年6月5日(星期三) 14:57
> To:user 
> Subject:can flink sql handle udf-generated timestamp field
>
> Hi,
>
> I am trying to use Flink SQL to do aggregation on a hopping window. In the
> data stream, we store the timestamp in long type. So I wrote a UDF
> 'FROM_UNIXTIME' to convert long to Timestamp type.
>
>   public static class TimestampModifier extends ScalarFunction {
> public Timestamp eval(long t) {
>   return new Timestamp(t);
> }
> public TypeInformation getResultType(Class[] signature) {
>   return Types.SQL_TIMESTAMP;
> }
>   }
>
> With the above UDF, I wrote the following query, and ran into
>  "ProgramInvocationException: The main method caused an error: Window can
> only be defined over a time attribute column".
> Any suggestions on how to resolve this issue? I am using Flink 1.8 for
> this experiment.
>
> my sql query:
>
> select  keyid, sum(value)
> from (
>select FROM_UNIXTIME(timestampMs) as ts, key.id as keyid, value
>from orders)
>  group by HOP(ts, INTERVAL ‘10’ second, INTERVAL ‘30’ second), keyid
>
> flink exception:
>
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error: Window can only be defined over a time attribute
> column.
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
> at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:423)
> at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
> at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
> at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
> at
> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
> Caused by: org.apache.flink.table.api.ValidationException: Window can only
> be defined over a time attribute column.
> at
> org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.getOperandAsTimeIndicator$1(DataStreamLogicalWindowAggregateRule.scala:85)
> at
> org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.translateWindowExpression(DataStreamLogicalWindowAggregateRule.scala:99)
> at
> org.apache.flink.table.plan.rules.common.LogicalWindowAggregateRule.onMatch(LogicalWindowAggregateRule.scala:66)
> at
> org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:319)
> at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:559)
> at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:418)
>
> Regards,
> -Yu
>
>
>


Re: can flink sql handle udf-generated timestamp field

2019-06-06 Thread Yu Yang
+flink-user

On Wed, Jun 5, 2019 at 9:58 AM Yu Yang  wrote:

> Thanks for the reply!  In flink-table-planner, TimeIndicatorTypeInfo is an
> internal class that cannot be referenced from application. I got "cannot
> find symbol" error when I tried to use it. I have also tried to use "
> SqlTimeTypeInfo.getInfoFor(Timestamp.class) " as return type for my udf
> type info. With that, I got the same  "Window can only be defined over a
> time attribute column" error as before.
>
> On Wed, Jun 5, 2019 at 4:41 AM Lee tinker 
> wrote:
>
>> Hi Yu Yang:
>> When you want to use time on window, the type of time should be right
>> according to flink.  We can see you return a Types.SQL_TIMESTAMP in your
>> UDF. This type should be TimeIndicatorTypeInfo.PROCTIME_INDICATOR
>> or  TimeIndicatorTypeInfo.ROWTIME_INDICATOR instead of Types.SQL_TIMESTAMP
>> according to your time type(proctime or rowtime). You can try it again by
>> using it.
>>
>> Yu Yang  于2019年6月5日周三 下午2:57写道:
>>
>>> Hi,
>>>
>>> I am trying to use Flink SQL to do aggregation on a hopping window. In
>>> the data stream, we store the timestamp in long type. So I wrote a UDF
>>> 'FROM_UNIXTIME' to convert long to Timestamp type.
>>>
>>>   public static class TimestampModifier extends ScalarFunction {
>>> public Timestamp eval(long t) {
>>>   return new Timestamp(t);
>>> }
>>> public TypeInformation getResultType(Class[] signature) {
>>>   return Types.SQL_TIMESTAMP;
>>> }
>>>   }
>>>
>>> With the above UDF, I wrote the following query, and ran into
>>>  "ProgramInvocationException: The main method caused an error: Window can
>>> only be defined over a time attribute column".
>>> Any suggestions on how to resolve this issue? I am using Flink 1.8 for
>>> this experiment.
>>>
>>> my sql query:
>>>
>>> select  keyid, sum(value)
>>> from (
>>>select FROM_UNIXTIME(timestampMs) as ts, key.id as keyid, value
>>>from orders)
>>>  group by HOP(ts, INTERVAL ‘10’ second, INTERVAL ‘30’ second), keyid
>>>
>>> flink exception:
>>>
>>> org.apache.flink.client.program.ProgramInvocationException: The main
>>> method caused an error: Window can only be defined over a time attribute
>>> column.
>>> at
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)
>>> at
>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
>>> at
>>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:423)
>>> at
>>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
>>> at
>>> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
>>> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
>>> at
>>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
>>> at
>>> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
>>> at
>>> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
>>> Caused by: org.apache.flink.table.api.ValidationException: Window can
>>> only be defined over a time attribute column.
>>> at
>>> org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.getOperandAsTimeIndicator$1(DataStreamLogicalWindowAggregateRule.scala:85)
>>> at
>>> org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.translateWindowExpression(DataStreamLogicalWindowAggregateRule.scala:99)
>>> at
>>> org.apache.flink.table.plan.rules.common.LogicalWindowAggregateRule.onMatch(LogicalWindowAggregateRule.scala:66)
>>> at
>>> org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:319)
>>> at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:559)
>>> at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:418)
>>>
>>> Regards,
>>> -Yu
>>>
>>


can flink sql handle udf-generated timestamp field

2019-06-05 Thread Yu Yang
Hi,

I am trying to use Flink SQL to do aggregation on a hopping window. In the
data stream, we store the timestamp in long type. So I wrote a UDF
'FROM_UNIXTIME' to convert long to Timestamp type.

  public static class TimestampModifier extends ScalarFunction {
public Timestamp eval(long t) {
  return new Timestamp(t);
}
public TypeInformation getResultType(Class[] signature) {
  return Types.SQL_TIMESTAMP;
}
  }

With the above UDF, I wrote the following query, and ran into
 "ProgramInvocationException: The main method caused an error: Window can
only be defined over a time attribute column".
Any suggestions on how to resolve this issue? I am using Flink 1.8 for this
experiment.

my sql query:

select  keyid, sum(value)
from (
   select FROM_UNIXTIME(timestampMs) as ts, key.id as keyid, value
   from orders)
 group by HOP(ts, INTERVAL ‘10’ second, INTERVAL ‘30’ second), keyid

flink exception:

org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: Window can only be defined over a time attribute column.
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:423)
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
at
org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
Caused by: org.apache.flink.table.api.ValidationException: Window can only
be defined over a time attribute column.
at
org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.getOperandAsTimeIndicator$1(DataStreamLogicalWindowAggregateRule.scala:85)
at
org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.translateWindowExpression(DataStreamLogicalWindowAggregateRule.scala:99)
at
org.apache.flink.table.plan.rules.common.LogicalWindowAggregateRule.onMatch(LogicalWindowAggregateRule.scala:66)
at
org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:319)
at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:559)
at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:418)

Regards,
-Yu


read a finite number of messages from Kafka using Kafka connector without extending it?

2019-02-15 Thread Yu Yang
Hi,

We are considering to use Flink SQL for ad hoc data analytics on real-time
Kafka data, and want to limit the queries to process data in the past 5-10
minutes. To achieve that, one possible approach is to extend the current
Kafka connect to have it only read messages in a given period of time to
generate a finite DataStream. I am wondering if there is an alternative to
this approach. Any suggestions will be very much appreciated.

Regards,
-Yu