Re: Does Queryable State only support K/V queries not SQL?

2018-03-03 Thread Hequn Cheng
Hi kant,

To avoid writing to external databases, some relevant issues are on the
roadmap, see FLINK-6968[1]. When this issue is finished, you can store
your updating tables in queryable state which is available to query(key
point queries). Hope it helps you.

Best, Hequn

[1] https://issues.apache.org/jira/browse/FLINK-6968

On Sun, Mar 4, 2018 at 1:41 AM, kant kodali  wrote:

> Hi Fabian,
>
> Does it make sense to have it on the roadmap? that way external
> applications can do ad-hoc queries for the most recent stream data and
> avoid writing to any external database?
>
> Thanks!
>
>
> On Mon, Feb 26, 2018 at 12:14 AM, Fabian Hueske  wrote:
>
>> Hi,
>>
>> Queryable state only supports key point queries, i.e., you can query a
>> keyed state for the value of a key.
>> Support for SQL is not on the roadmap.
>>
>> Best, Fabian
>>
>> 2018-02-25 14:26 GMT+01:00 kant kodali :
>>
>>> Hi All,
>>>
>>> 1) Does Queryable State support SQL? By which I mean I can do issue a
>>> full-fledged sql query like say ("select * from table where foo='hello'
>>> group by name")
>>>
>>> 2) Does Queryable state support offset and limit? Because if I have a
>>> million rows I don't want to get all at once.
>>>
>>> Sorry if these are naive questions I am new to flink but I have used
>>> other streaming processors before.
>>>
>>> Thanks!
>>>
>>
>>
>


Re: Does Queryable State only support K/V queries not SQL?

2018-03-03 Thread Renjie Liu
Hi, kant:
Full support for sql is not easy tuo support. A simple KV storage is not
enough to build a RDBMS.

On Sun, Mar 4, 2018 at 1:42 AM kant kodali  wrote:

> Hi Fabian,
>
> Does it make sense to have it on the roadmap? that way external
> applications can do ad-hoc queries for the most recent stream data and
> avoid writing to any external database?
>
> Thanks!
>
>
> On Mon, Feb 26, 2018 at 12:14 AM, Fabian Hueske  wrote:
>
>> Hi,
>>
>> Queryable state only supports key point queries, i.e., you can query a
>> keyed state for the value of a key.
>> Support for SQL is not on the roadmap.
>>
>> Best, Fabian
>>
>> 2018-02-25 14:26 GMT+01:00 kant kodali :
>>
>>> Hi All,
>>>
>>> 1) Does Queryable State support SQL? By which I mean I can do issue a
>>> full-fledged sql query like say ("select * from table where foo='hello'
>>> group by name")
>>>
>>> 2) Does Queryable state support offset and limit? Because if I have a
>>> million rows I don't want to get all at once.
>>>
>>> Sorry if these are naive questions I am new to flink but I have used
>>> other streaming processors before.
>>>
>>> Thanks!
>>>
>>
>>
> --
Liu, Renjie
Software Engineer, MVAD


Re: Does Queryable State only support K/V queries not SQL?

2018-03-03 Thread kant kodali
Hi Fabian,

Does it make sense to have it on the roadmap? that way external
applications can do ad-hoc queries for the most recent stream data and
avoid writing to any external database?

Thanks!


On Mon, Feb 26, 2018 at 12:14 AM, Fabian Hueske  wrote:

> Hi,
>
> Queryable state only supports key point queries, i.e., you can query a
> keyed state for the value of a key.
> Support for SQL is not on the roadmap.
>
> Best, Fabian
>
> 2018-02-25 14:26 GMT+01:00 kant kodali :
>
>> Hi All,
>>
>> 1) Does Queryable State support SQL? By which I mean I can do issue a
>> full-fledged sql query like say ("select * from table where foo='hello'
>> group by name")
>>
>> 2) Does Queryable state support offset and limit? Because if I have a
>> million rows I don't want to get all at once.
>>
>> Sorry if these are naive questions I am new to flink but I have used
>> other streaming processors before.
>>
>> Thanks!
>>
>
>


Re: Flink join operator after sorting seems to group fields (Scala)

2018-03-03 Thread Xingcan Cui
Hi Felipe,

the `sortPartition()` method just LOCALLY sorts each partition of a dataset. To 
achieve a global sorting, use this method after a `partitionByRange()` (e.g., 
`result.partitionByRange(0).sortPartition(0, Order.ASCENDING)`).

Hope that helps,
Xingcan

> On 3 Mar 2018, at 9:33 PM, Felipe Gutierrez  
> wrote:
> 
> Hi all,
> 
> I have implemented a simple Scala object using Flink to play with joins 
> operator. After that, I put the join operator show my results I decided to 
> sort the output by the first field (.sortPartition(0, Order.ASCENDING)). It 
> seems that the output is ordered by group. The output shows two groups of 
> "Fyodor Dostoyevsky". Why is this happening? How do I sort the complete 
> DataSet?
> 
> Kind Regards,
> Felipe
> 
> import org.apache.flink.api.common.operators.Order
> import org.apache.flink.api.scala.{ExecutionEnvironment, _}
> 
> object JoinBooksAndAuthors {
>   val AUTHOR_ID_FIELD: Int = 0
>   val AUTHOR_NAME_FIELD: Int = 1
> 
>   val BOOK_AUTHORID_FIELD: Int = 0
>   val BOOK_YEAR_FIELD: Int = 1
>   val BOOK_NAME_FIELD: Int = 2
> 
>   def main(args: Array[String]) {
> 
> val env = ExecutionEnvironment.getExecutionEnvironment
> 
> val authors = env.readCsvFile[(Int, String)](
>   "downloads/authors.tsv",
>   fieldDelimiter = "\t",
>   lineDelimiter = "\n",
>   includedFields = Array(0, 1)
> )
> 
> val books = env.readCsvFile[(Int, Short, String)](
>   "downloads/books.tsv",
>   fieldDelimiter = "\t",
>   lineDelimiter = "\n",
>   includedFields = Array(0, 1, 2)
> )
> 
> authors
>   .join(books)
>   .where(AUTHOR_ID_FIELD)
>   .equalTo(BOOK_AUTHORID_FIELD)
>   .map(tuple => (tuple._1._2, tuple._2._3))
>   .sortPartition(0, Order.ASCENDING)
>   .print()
>   }
> }
> output
> 
> (Charles Bukowski,Women)
> (Charles Bukowski,The Most Beautiful Woman in Town)
> (Charles Bukowski,Hot Water Music)
> (Charles Bukowski,Barfly)
> (Charles Bukowski,Notes of a Dirty Old Man)
> (Charles Bukowski,Ham on Rye)
> (Fyodor Dostoyevsky,The Brothers Karamazov)
> (Fyodor Dostoyevsky,The Double: A Petersburg Poem)
> (Fyodor Dostoyevsky,Poor Folk)
> (George Orwell,Coming Up for Air)
> (George Orwell,Burmese Days)
> (George Orwell,A Clergyman's Daughter)
> (George Orwell,Down and Out in Paris and London)
> (Albert Camus,The Plague)
> (Fyodor Dostoyevsky,The Eternal Husband)
> (Fyodor Dostoyevsky,The Gambler)
> (Fyodor Dostoyevsky,The House of the Dead)
> (Fyodor Dostoyevsky,Crime and Punishment)
> (Fyodor Dostoyevsky,Netochka Nezvanova)
> .
> 
> 
> 
> 
> 
> -- 
> --
> -- Felipe Oliveira Gutierrez
> -- skype: felipe.o.gutierrez
> -- https://felipeogutierrez.blogspot.com 
> 


Flink join operator after sorting seems to group fields (Scala)

2018-03-03 Thread Felipe Gutierrez
Hi all,

I have implemented a simple Scala object using Flink to play with joins
operator. After that, I put the join operator show my results I decided to
sort the output by the first field (.sortPartition(0, Order.ASCENDING)). It
seems that the output is ordered by group. The output shows two groups of
"Fyodor Dostoyevsky". Why is this happening? How do I sort the complete
DataSet?

Kind Regards,
Felipe

import org.apache.flink.api.common.operators.Orderimport
org.apache.flink.api.scala.{ExecutionEnvironment, _}
object JoinBooksAndAuthors {
  val AUTHOR_ID_FIELD: Int = 0
  val AUTHOR_NAME_FIELD: Int = 1

  val BOOK_AUTHORID_FIELD: Int = 0
  val BOOK_YEAR_FIELD: Int = 1
  val BOOK_NAME_FIELD: Int = 2

  def main(args: Array[String]) {

val env = ExecutionEnvironment.getExecutionEnvironment

val authors = env.readCsvFile[(Int, String)](
  "downloads/authors.tsv",
  fieldDelimiter = "\t",
  lineDelimiter = "\n",
  includedFields = Array(0, 1)
)

val books = env.readCsvFile[(Int, Short, String)](
  "downloads/books.tsv",
  fieldDelimiter = "\t",
  lineDelimiter = "\n",
  includedFields = Array(0, 1, 2)
)

authors
  .join(books)
  .where(AUTHOR_ID_FIELD)
  .equalTo(BOOK_AUTHORID_FIELD)
  .map(tuple => (tuple._1._2, tuple._2._3))
  .sortPartition(0, Order.ASCENDING)
  .print()
  }}

output

(Charles Bukowski,Women)(Charles Bukowski,The Most Beautiful Woman in
Town)(Charles Bukowski,Hot Water Music)(Charles
Bukowski,Barfly)(Charles Bukowski,Notes of a Dirty Old Man)(Charles
Bukowski,Ham on Rye)(Fyodor Dostoyevsky,The Brothers Karamazov)(Fyodor
Dostoyevsky,The Double: A Petersburg Poem)(Fyodor Dostoyevsky,Poor
Folk)(George Orwell,Coming Up for Air)(George Orwell,Burmese
Days)(George Orwell,A Clergyman's Daughter)(George Orwell,Down and Out
in Paris and London)(Albert Camus,The Plague)(Fyodor Dostoyevsky,The
Eternal Husband)(Fyodor Dostoyevsky,The Gambler)(Fyodor
Dostoyevsky,The House of the Dead)(Fyodor Dostoyevsky,Crime and
Punishment)(Fyodor Dostoyevsky,Netochka Nezvanova).






-- 

* Felipe Oliveira Gutierrez*

*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
*


Re: Watermark not worked as expected in TimeCharacteristic.EvenTime setting.

2018-03-03 Thread sundy
Thanks a lot, use env.setParallelism(1) before the source  define works (I set 
it before the env.execute, so it did not take effect).
 
> On 3 Mar 2018, at 16:02, sundy <543950...@qq.com> wrote:
> 
> Hi Hequn Cheng,
> 
> Finally I got the problem and find the way to define the correct WaterMark by 
> your advice, thank you very much.
> 
> The problem is that I set the watermark to the 
> 
> waterMark =  maxEventTime - lag 
> 
> And the timeWindow is 10Seconds,  But I generated the test records too 
> quickly so the 1 records are all in the window duration(my bad). 
> So flink are waiting for new more numbers to close the window.  
> 
> Another one question is why I set   'env.setParallelism(1)’ and run the code 
> in IDEA(mini Flink cluster) , but the getWatermark is called in 4 different 
> threads?
> Which time is  the getWaterMark function called? After the keyBy operation or 
> after the source operation?
> 
> 
> 
>> On 3 Mar 2018, at 15:28, Hequn Cheng > > wrote:
>> 
>> Hi sundy,
>> 
>> The default parallelism is 4. It seems that your configuration does not take 
>> effect. You can try 'env.setParallelism(1)' to set the job parallelism.
>> For watermark, you can refer to [1] and [2].
>> 
>> PS: Others can not see your reply because you only reply to me. Try reply to 
>> all so that others can help you too :-) 
>> 
>> [1] 
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/event_time.html#watermarks-in-parallel-streams
>>  
>> 
>> [2] 
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/event_timestamps_watermarks.html#timestamps-per-kafka-partition
>>  
>> 
>> 
>> On Sat, Mar 3, 2018 at 3:03 PM, sundy <543950...@qq.com 
>> > wrote:
>> 
>> Hi Hequn Cheng,
>> 
>> Thanks for you advice, I think I found the problem. But I didn't know why.
>> 
>> Firstly, let me introduce my operations,  I run the code in IDEA with 
>> MinFlinkCluster,  and then I set the env parallelism to 1, and the kafka 
>> topic ‘stream_test' has  one partition. Then send 1 records with current 
>> timestamp  the kafka , format is  ‘4281_783_1520059217832’ , the last field 
>> is the evenTime.
>> 
>> I add this debug code to print the thread-id.
>> 
>> 
>> 
>> 
>> The result is it will print in 4 threads int each period duration(3 
>> seconds).Such as
>> 
>> 50:water mark:0
>> 52:water mark:0
>> 51:water mark:1520056427871
>> 49:water mark:0
>> 
>> So this results to the watermark 0. But why it happened in 1 parallelism? 
>> Maybe it was caused by the keyBy operation?  I am new to Flink,  I hope to 
>> know how to set the watermark in the right way.
>> 
>> 
>>   
>>  
>> 
>>> On 3 Mar 2018, at 13:52, Hequn Cheng >> > wrote:
>>> 
>>> Hi sundy,
>>> 
>>> 1. Some partition of your input kafka don't have data. Since window 
>>> watermark is the min value of all it's inputs, if there are no data from 
>>> one of it's inputs, window will never be triggered. You can set parallelism 
>>> of your job to 1 to avoid this problem(PS: Maybe this bug is fixed now, but 
>>> worth a try).
>>> 2. Only one record in the input. In this case, window can not be triggered 
>>> either. You might think of it like the time has be stopped. To trigger the 
>>> widow, you should read more data with watermark bigger than the window end.
>>> 
>>> Hope it helps you.
>>> Best, Hequn
>>> 
>>> 2018-03-03 13:06 GMT+08:00 sundy <543950...@qq.com 
>>> >:
>>> Hi, thanks for your reply. 
>>> 
>>> I have searched it in stackoverflow, and there is someone who has the some 
>>> problem.
>>> 
>>> https://stackoverflow.com/questions/40993753/flink-streaming-program-runs-correctly-with-processing-time-but-will-not-produc
>>>  
>>> 
>>> 
>>> 
>>> From your advice, I tried the code. 
>>> 
>>>  env.getConfig().setAutoWatermarkInterval(3 * 1000);
>>> 
>>> And it calls the getCurrentWaterMark function each 3 seconds,  but still no 
>>> result come out.
>>> From the outputs   ('water mark1520049229163'), I could see that the add 
>>> method is called. But the no result from the sink function.
>>> 
>>> 
>>> 
>>> 
 On 3 Mar 2018, at 12:47, Xingcan Cui > wrote:
 
 Hi,
 
 for periodically generated watermarks, you should use 
 `ExecutionConfig.setAutoWatermarkInterval()` to set an interval.
 
 Hope that helps.
 
 Best,
 Xingcan
 
> On 3 Mar 2018, at 12:08 PM, sundy <543950...@qq.com 
> > wrote:
> 
> 
> 
> 

Re: Watermark not worked as expected in TimeCharacteristic.EvenTime setting.

2018-03-03 Thread Hequn Cheng
Hi sundy,

It is strange that your configuration does not take effect. Do you set
parallelism somewhere else? Maybe, you can refer to the kafka test case[1].
In this test case, line 229 set parallelism to 1 and works fine.
Hope it helps you.

[1]
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java

On Sat, Mar 3, 2018 at 4:02 PM, sundy <543950...@qq.com> wrote:

> Hi Hequn Cheng,
>
> Finally I got the problem and find the way to define the correct WaterMark
> by your advice, thank you very much.
>
> The problem is that I set the watermark to the
>
> waterMark =  maxEventTime - lag
>
> And the timeWindow is 10Seconds,  But I generated the test records too
> quickly so the 1 records are all in the window duration(my bad).
> So flink are waiting for new more numbers to close the window.
>
> Another one question is why I set   'env.setParallelism(1)’ and run the
> code in IDEA(mini Flink cluster) , but the getWatermark is called in 4
> different threads?
> Which time is  the getWaterMark function called? After the keyBy operation
> or after the source operation?
>
>
>
> On 3 Mar 2018, at 15:28, Hequn Cheng  wrote:
>
> Hi sundy,
>
> The default parallelism is 4. It seems that your configuration does not
> take effect. You can try 'env.setParallelism(1)' to set the job
> parallelism.
> For watermark, you can refer to [1] and [2].
>
> PS: Others can not see your reply because you only reply to me. Try reply
> to all so that others can help you too :-)
>
> [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/
> event_time.html#watermarks-in-parallel-streams
> [2] https://ci.apache.org/projects/flink/flink-docs-master/dev/
> event_timestamps_watermarks.html#timestamps-per-kafka-partition
>
> On Sat, Mar 3, 2018 at 3:03 PM, sundy <543950...@qq.com> wrote:
>
>>
>> Hi Hequn Cheng,
>>
>> Thanks for you advice, I think I found the problem. But I didn't know why.
>>
>> Firstly, let me introduce my operations,  I run the code in IDEA with
>> MinFlinkCluster,  and then I set the env parallelism to 1, and the kafka
>> topic ‘stream_test' has  one partition. Then send 1 records with
>> current timestamp  the kafka , format is  ‘4281_783_1520059217832’ , the
>> last field is the evenTime.
>>
>> I add this debug code to print the thread-id.
>>
>> 
>>
>>
>> The result is it will print in 4 threads int each period duration(3
>> seconds).Such as
>>
>> 50:water mark:0
>> 52:water mark:0
>> 51:water mark:1520056427871
>> 49:water mark:0
>>
>> So this results to the watermark 0. But why it happened in 1 parallelism?
>> Maybe it was caused by the keyBy operation?  I am new to Flink,  I hope
>> to know how to set the watermark in the right way.
>>
>>
>>
>>
>>
>> On 3 Mar 2018, at 13:52, Hequn Cheng  wrote:
>>
>> Hi sundy,
>>
>> 1. Some partition of your input kafka don't have data. Since window
>> watermark is the min value of all it's inputs, if there are no data from
>> one of it's inputs, window will never be triggered. You can set
>> parallelism of your job to 1 to avoid this problem(PS: Maybe this bug is
>> fixed now, but worth a try).
>> 2. Only one record in the input. In this case, window can not be
>> triggered either. You might think of it like the time has be stopped. To
>> trigger the widow, you should read more data with watermark bigger than the
>> window end.
>>
>> Hope it helps you.
>> Best, Hequn
>>
>> 2018-03-03 13:06 GMT+08:00 sundy <543950...@qq.com>:
>>
>>> Hi, thanks for your reply.
>>>
>>> I have searched it in stackoverflow, and there is someone who has the
>>> some problem.
>>>
>>> https://stackoverflow.com/questions/40993753/flink-streaming
>>> -program-runs-correctly-with-processing-time-but-will-not-produc
>>>
>>>
>>> From your advice, I tried the code.
>>>
>>>  env.getConfig().setAutoWatermarkInterval(3 * 1000);
>>>
>>> And it calls the getCurrentWaterMark function each 3 seconds,  but still
>>> no result come out.
>>> From the outputs   ('water mark1520049229163'), I could see that the add
>>> method is called. But the no result from the sink function.
>>>
>>>
>>>
>>>
>>> On 3 Mar 2018, at 12:47, Xingcan Cui  wrote:
>>>
>>> Hi,
>>>
>>> for periodically generated watermarks, you should use
>>> `ExecutionConfig.setAutoWatermarkInterval()` to set an interval.
>>>
>>> Hope that helps.
>>>
>>> Best,
>>> Xingcan
>>>
>>> On 3 Mar 2018, at 12:08 PM, sundy <543950...@qq.com> wrote:
>>>
>>>
>>>
>>> Hi, I got a problem in Flink  and need your help.
>>>
>>> I tried to use TimeCharacteristic.EvenTime, but the sink function never
>>> be executed.
>>>
>>> public class StreamingJob {
>>>   public static void main(String[] args) throws Exception {
>>> // set up the streaming execution environment
>>> final StreamExecutionEnvironment env = 
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>> ObjectMapper 

Re: Watermark not worked as expected in TimeCharacteristic.EvenTime setting.

2018-03-03 Thread sundy
Hi Hequn Cheng,

Finally I got the problem and find the way to define the correct WaterMark by 
your advice, thank you very much.

The problem is that I set the watermark to the 

waterMark =  maxEventTime - lag 

And the timeWindow is 10Seconds,  But I generated the test records too quickly 
so the 1 records are all in the window duration(my bad). 
So flink are waiting for new more numbers to close the window.  

Another one question is why I set   'env.setParallelism(1)’ and run the code in 
IDEA(mini Flink cluster) , but the getWatermark is called in 4 different 
threads?
Which time is  the getWaterMark function called? After the keyBy operation or 
after the source operation?



> On 3 Mar 2018, at 15:28, Hequn Cheng  wrote:
> 
> Hi sundy,
> 
> The default parallelism is 4. It seems that your configuration does not take 
> effect. You can try 'env.setParallelism(1)' to set the job parallelism.
> For watermark, you can refer to [1] and [2].
> 
> PS: Others can not see your reply because you only reply to me. Try reply to 
> all so that others can help you too :-) 
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/event_time.html#watermarks-in-parallel-streams
>  
> 
> [2] 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/event_timestamps_watermarks.html#timestamps-per-kafka-partition
>  
> 
> 
> On Sat, Mar 3, 2018 at 3:03 PM, sundy <543950...@qq.com 
> > wrote:
> 
> Hi Hequn Cheng,
> 
> Thanks for you advice, I think I found the problem. But I didn't know why.
> 
> Firstly, let me introduce my operations,  I run the code in IDEA with 
> MinFlinkCluster,  and then I set the env parallelism to 1, and the kafka 
> topic ‘stream_test' has  one partition. Then send 1 records with current 
> timestamp  the kafka , format is  ‘4281_783_1520059217832’ , the last field 
> is the evenTime.
> 
> I add this debug code to print the thread-id.
> 
> 
> 
> 
> The result is it will print in 4 threads int each period duration(3 
> seconds).Such as
> 
> 50:water mark:0
> 52:water mark:0
> 51:water mark:1520056427871
> 49:water mark:0
> 
> So this results to the watermark 0. But why it happened in 1 parallelism? 
> Maybe it was caused by the keyBy operation?  I am new to Flink,  I hope to 
> know how to set the watermark in the right way.
> 
> 
>   
>  
> 
>> On 3 Mar 2018, at 13:52, Hequn Cheng > > wrote:
>> 
>> Hi sundy,
>> 
>> 1. Some partition of your input kafka don't have data. Since window 
>> watermark is the min value of all it's inputs, if there are no data from one 
>> of it's inputs, window will never be triggered. You can set parallelism of 
>> your job to 1 to avoid this problem(PS: Maybe this bug is fixed now, but 
>> worth a try).
>> 2. Only one record in the input. In this case, window can not be triggered 
>> either. You might think of it like the time has be stopped. To trigger the 
>> widow, you should read more data with watermark bigger than the window end.
>> 
>> Hope it helps you.
>> Best, Hequn
>> 
>> 2018-03-03 13:06 GMT+08:00 sundy <543950...@qq.com 
>> >:
>> Hi, thanks for your reply. 
>> 
>> I have searched it in stackoverflow, and there is someone who has the some 
>> problem.
>> 
>> https://stackoverflow.com/questions/40993753/flink-streaming-program-runs-correctly-with-processing-time-but-will-not-produc
>>  
>> 
>> 
>> 
>> From your advice, I tried the code. 
>> 
>>  env.getConfig().setAutoWatermarkInterval(3 * 1000);
>> 
>> And it calls the getCurrentWaterMark function each 3 seconds,  but still no 
>> result come out.
>> From the outputs   ('water mark1520049229163'), I could see that the add 
>> method is called. But the no result from the sink function.
>> 
>> 
>> 
>> 
>>> On 3 Mar 2018, at 12:47, Xingcan Cui >> > wrote:
>>> 
>>> Hi,
>>> 
>>> for periodically generated watermarks, you should use 
>>> `ExecutionConfig.setAutoWatermarkInterval()` to set an interval.
>>> 
>>> Hope that helps.
>>> 
>>> Best,
>>> Xingcan
>>> 
 On 3 Mar 2018, at 12:08 PM, sundy <543950...@qq.com 
 > wrote:
 
 
 
 Hi, I got a problem in Flink  and need your help.
 
 I tried to use TimeCharacteristic.EvenTime, but the sink function never be 
 executed.  
 
 public class StreamingJob {
   public static void main(String[] args) throws Exception {
 // set up the streaming execution environment
 final StreamExecutionEnvironment env =