Re: Any way to improve list state get performance

2022-11-22 Thread Xingcan Cui
Hi Tao,

I think you just need an extra `isEmpty` VARIABLE and maintain it properly
(e.g., when restoring the job, check if the list state is empty or not).

Also, I remembered that the list state for rocksdb is not as performant as
the map state when the state is large. Sometimes you could use a map state
with some extra value states to simulate it.

Best,
Xingcan

On Mon, Nov 21, 2022 at 9:20 PM tao xiao  wrote:

> any suggestion is highly appreciated
>
> On Tue, Nov 15, 2022 at 8:50 PM tao xiao  wrote:
>
>> Hi team,
>>
>> I have a Flink job that joins two streams, let's say A and B streams,
>> followed by a key process function. In the key process function the job
>> inserts elements from B stream to a list state if element from A stream
>> hasn't arrived yet. I am wondering if any way to skip the liststat.get() to
>> check if there are elements in the list state when A stream arrives to
>> reduce the call to underlying state (RocksDB)
>>
>> Here is the code snippet
>>
>> keyfunction {
>>
>> process(in, ctx, collector) {
>> if (in is A stream)
>> // anyway to check if list state is empty so that we dont need to call
>> get()?
>> for (b : liststate.get()) {
>> .
>> }
>>
>> if (in is B stream)
>> liststate.add(in)
>>
>>
>> --
>> Regards,
>> Tao
>>
>
>
> --
> Regards,
> Tao
>


Re: Timestamp type mismatch between Flink, Iceberg, and Avro

2021-05-21 Thread Xingcan Cui
Hi Timo,

Thanks for the reply! The document is really helpful. I can solve my
current problem with some workarounds. Will keep an eye on this topic!

Best,
Xingcan

On Fri, May 21, 2021, 12:08 Timo Walther  wrote:

> Hi Xingcan,
>
> we had a couple of discussions around the timestamp topic in Flink and
> have a clear picture nowadays. Some background:
>
>
> https://docs.google.com/document/d/1gNRww9mZJcHvUDCXklzjFEQGpefsuR_akCDfWsdE35Q/edit#
>
> So whenever an instant or epoch time is required, TIMESTAMP_LTZ is the
> way to go. However, since you can also represent a TIMESTAMP as a long
> value (this is also done internally), we can also support TIMESTAMP in
> connectors.
>
> So I would assume that the issues is on the connector side which is not
> properly integrated into the SQL type system. It might be a bug.
>
> Regards,
> Timo
>
>
>
> On 21.05.21 17:23, Xingcan Cui wrote:
> > Hi all,
> >
> > Recently, I tried to use Flink to write some Avro data to Iceberg.
> > However, the timestamp representations for these systems really confused
> > me. Here are some facts:
> >
> >   * Avro uses `java.time.Instant` for logical type `timestamp_ms`;
> >   * Flink takes `java.time.Instant` as table type
> > `TIMESTAMP_WITH_LOCAL_TIME_ZONE`;
> >   * Iceberg takes Avro `timestamp_ms` as timestamp without timezone.
> >
> > When I used Flink DataType TIMESTAMP for timestamp_ms of Avro, I got the
> > following error "*class java.time.Instant cannot be cast to class
> > java.time.LocalDateTime*".
> >
> > If I change the Flink DataType to TIMESTAMP_WITH_LOCAL_TIME_ZONE,
> > Iceberg complains "t*imestamptz cannot be promoted to timestamp".*
> >
> > Does anyone have any thoughts on this?
> >
> > Thanks,
> > Xingcan
>
>


Timestamp type mismatch between Flink, Iceberg, and Avro

2021-05-21 Thread Xingcan Cui
Hi all,

Recently, I tried to use Flink to write some Avro data to Iceberg. However,
the timestamp representations for these systems really confused me. Here
are some facts:

   - Avro uses `java.time.Instant` for logical type `timestamp_ms`;
   - Flink takes `java.time.Instant` as table type
   `TIMESTAMP_WITH_LOCAL_TIME_ZONE`;
   - Iceberg takes Avro `timestamp_ms` as timestamp without timezone.

When I used Flink DataType TIMESTAMP for timestamp_ms of Avro, I got the
following error "*class java.time.Instant cannot be cast to class
java.time.LocalDateTime*".

If I change the Flink DataType to TIMESTAMP_WITH_LOCAL_TIME_ZONE, Iceberg
complains "t*imestamptz cannot be promoted to timestamp".*

Does anyone have any thoughts on this?

Thanks,
Xingcan


Re: Datadog reporter timeout & OOM issue

2021-01-27 Thread Xingcan Cui
Hi Juha and Chesnay,

I do appreciate your prompt responses! I'll also continue to investigate
this issue.

Best,
Xingcan

On Wed, Jan 27, 2021, 04:32 Chesnay Schepler  wrote:

> (setting this field is currently not possible from a Flink user
> perspective; it is something I will investigate)
>
>
> On 1/27/2021 10:30 AM, Chesnay Schepler wrote:
>
> Yes, I could see how the memory issue can occur.
>
> However, it should be limited to buffering 64 requests; this is the
> default limit that okhttp imposes on concurrent calls.
> Maybe lowering this value already does the trick.
>
> On 1/27/2021 5:52 AM, Xingcan Cui wrote:
>
> Hi all,
>
> Recently, I tried to use the Datadog reporter to collect some user-defined
> metrics. Sometimes when reaching traffic peaks (which are also peaks for
> metrics), the HTTP client will throw the following exception:
>
> ```
> [OkHttp https://app.datadoghq.com/...] WARN
>  org.apache.flink.metrics.datadog.DatadogHttpClient  - Failed sending
> request to Datadog
> java.net.SocketTimeoutException: timeout
> at
> okhttp3.internal.http2.Http2Stream$StreamTimeout.newTimeoutException(Http2Stream.java:593)
> at
> okhttp3.internal.http2.Http2Stream$StreamTimeout.exitAndThrowIfTimedOut(Http2Stream.java:601)
> at
> okhttp3.internal.http2.Http2Stream.takeResponseHeaders(Http2Stream.java:146)
> at
> okhttp3.internal.http2.Http2Codec.readResponseHeaders(Http2Codec.java:120)
> at
> okhttp3.internal.http.CallServerInterceptor.intercept(CallServerInterceptor.java:75)
> at
> okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
> at
> okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:45)
> at
> okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
> at
> okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
> at
> okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93)
> at
> okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
> at
> okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
> at
> okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.java:93)
> at
> okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
> at
> okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.java:120)
> at
> okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
> at
> okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
> at okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:185)
> at okhttp3.RealCall$AsyncCall.execute(RealCall.java:135)
> at okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> ```
>
> I guess this may be caused by the rate limit of the Datadog server since
> too many HTTP requests look like a kind of "attack". The real problem is
> that after throwing the above exceptions, the JVM heap size of the
> taskmanager starts to increase and finally causes OOM. I'm curious if this
> may be caused by metrics accumulation, i.e., for some reason, the client
> can't reconnect to the Datadog server and send the metrics so that the
> metrics data is buffered in memory and causes OOM.
>
> I'm running Flink 1.11.2 on EMR-6.2.0 with
> flink-metrics-datadog-1.11.2.jar.
>
> Thanks,
> Xingcan
>
>
>
>


Datadog reporter timeout & OOM issue

2021-01-26 Thread Xingcan Cui
Hi all,

Recently, I tried to use the Datadog reporter to collect some user-defined
metrics. Sometimes when reaching traffic peaks (which are also peaks for
metrics), the HTTP client will throw the following exception:

```
[OkHttp https://app.datadoghq.com/...] WARN
 org.apache.flink.metrics.datadog.DatadogHttpClient  - Failed sending
request to Datadog
java.net.SocketTimeoutException: timeout
at
okhttp3.internal.http2.Http2Stream$StreamTimeout.newTimeoutException(Http2Stream.java:593)
at
okhttp3.internal.http2.Http2Stream$StreamTimeout.exitAndThrowIfTimedOut(Http2Stream.java:601)
at
okhttp3.internal.http2.Http2Stream.takeResponseHeaders(Http2Stream.java:146)
at
okhttp3.internal.http2.Http2Codec.readResponseHeaders(Http2Codec.java:120)
at
okhttp3.internal.http.CallServerInterceptor.intercept(CallServerInterceptor.java:75)
at
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
at
okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:45)
at
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
at
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
at
okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93)
at
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
at
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
at
okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.java:93)
at
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
at
okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.java:120)
at
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
at
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
at okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:185)
at okhttp3.RealCall$AsyncCall.execute(RealCall.java:135)
at okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
```

I guess this may be caused by the rate limit of the Datadog server since
too many HTTP requests look like a kind of "attack". The real problem is
that after throwing the above exceptions, the JVM heap size of the
taskmanager starts to increase and finally causes OOM. I'm curious if this
may be caused by metrics accumulation, i.e., for some reason, the client
can't reconnect to the Datadog server and send the metrics so that the
metrics data is buffered in memory and causes OOM.

I'm running Flink 1.11.2 on EMR-6.2.0 with flink-metrics-datadog-1.11.2.jar.

Thanks,
Xingcan


Re: Flink SQL client support for running in Flink cluster

2019-09-07 Thread Xingcan Cui
Hi Dipanjan,

Sorry that I didn’t make it clear. The ’standalone cluster’ could be fully 
distributed. It refers to a static cluster deployed without any cluster 
managers (or other related tools), such as YARN and Kubernetes.

For more information, please check the documentation [1].

Best,
Xingcan

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/cluster_setup.html
 
<https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/cluster_setup.html>

> On Sep 7, 2019, at 3:30 AM, Dipanjan Mazumder  wrote:
> 
> Hi Xingcan,
> 
> Thanks a lot for this info and this verifies my suspect , so if i want to 
> reuse the SQL client to implement an API based framework , it will not work 
> in Flink cluster running in  cluster mode, is there any plan in near future 
> to support that, if its planned is there any expected release event for the 
> same ,because i need to prototype my platform and present the same.
> 
> Or is there any other way i can achieve this dynamic configuration and query 
> submission to Flink running as a full fledged cluster.
> 
> Regards
> Dipanjan
> 
> On Saturday, September 7, 2019, 12:52:02 PM GMT+5:30, Xingcan Cui 
>  wrote:
> 
> 
> Hi Dipanjan,
> 
> Here the ’standalone’ is short for ’standalone cluster mode’, which means you 
> can setup a standalone Flink cluster and submit your SQL queries (via SQL 
> client) to the cluster.
> 
> As far as I know, the current SQL client can only connect to a Flink cluster 
> deployed in standalone mode.
> 
> Hope that helps.
> 
> Best,
> Xingcan
> 
>> On Sep 7, 2019, at 1:57 AM, Dipanjan Mazumder > <mailto:java...@yahoo.com>> wrote:
>> 
>> Hi Guys,
>> 
>> I was going through the Flink sql client configuration YAML from the 
>> training example and came across a section in the configuration as below:
>> 
>>  
>> # Deployment properties allow for describing the cluster to which table
>> # programs are submitted to.
>> 
>> deployment:
>>   type: standalone # only the 'standalone' deployment is 
>> supported
>>   response-timeout: 5000   # general cluster communication timeout in ms
>>   gateway-address: ""  # (optional) address from cluster to gateway
>>   gateway-port: 0  # (optional) port from cluster to gateway
>> 
>> ##
>> 
>> The first property of the deployment section "type" says that only 
>> standalone deployment of the program is supported , does that mean that 
>> Flink SQL client is non functional in when Flink running in Cluster mode. 
>> Also i cannot find any documentation for this configuration stating any 
>> different value of "deployment.type" for example like "cluster" or 
>> something. 
>> 
>> I want to implement an API on the SQL client which will support dynamic 
>> submission of configurations and thereby sql queries on the fly for the new 
>> configuration. If Flink sql is non-functional in Flink cluster then the 
>> effort will be not worth because in that case i will have to first see how 
>> the program can be made to work in Flink cluster first and then implement 
>> the API on top of that.
>> 
>> Please kindly help me on the same , as i am stuck for further changes and i 
>> am still not going in to identify if what i see is true because it will take 
>> more time to identify through code which can be probably answered faster by 
>> your team.
>> 
>> Regards
>> Dipanjan
>> 
> 



Re: [ANNOUNCE] Rong Rong becomes a Flink committer

2019-07-11 Thread Xingcan Cui
Congrats Rong!

Best,
Xingcan

> On Jul 11, 2019, at 1:08 PM, Shuyi Chen  wrote:
> 
> Congratulations, Rong!
> 
> On Thu, Jul 11, 2019 at 8:26 AM Yu Li  > wrote:
> Congratulations Rong!
> 
> Best Regards,
> Yu
> 
> 
> On Thu, 11 Jul 2019 at 22:54, zhijiang  > wrote:
> Congratulations Rong!
> 
> Best,
> Zhijiang
> --
> From:Kurt Young mailto:ykt...@gmail.com>>
> Send Time:2019年7月11日(星期四) 22:54
> To:Kostas Kloudas mailto:kklou...@gmail.com>>
> Cc:Jark Wu mailto:imj...@gmail.com>>; Fabian Hueske 
> mailto:fhue...@gmail.com>>; dev  >; user  >
> Subject:Re: [ANNOUNCE] Rong Rong becomes a Flink committer
> 
> Congratulations Rong!
> 
> Best,
> Kurt
> 
> 
> On Thu, Jul 11, 2019 at 10:53 PM Kostas Kloudas  > wrote:
> Congratulations Rong!
> 
> On Thu, Jul 11, 2019 at 4:40 PM Jark Wu  > wrote:
> Congratulations Rong Rong! 
> Welcome on board!
> 
> On Thu, 11 Jul 2019 at 22:25, Fabian Hueske  > wrote:
> Hi everyone,
> 
> I'm very happy to announce that Rong Rong accepted the offer of the Flink PMC 
> to become a committer of the Flink project.
> 
> Rong has been contributing to Flink for many years, mainly working on SQL and 
> Yarn security features. He's also frequently helping out on the user@f.a.o 
> mailing lists.
> 
> Congratulations Rong!
> 
> Best, Fabian 
> (on behalf of the Flink PMC)
> 



Re: Apache Flink - Relation between stream time characteristic and timer triggers

2019-07-09 Thread Xingcan Cui
Yes, Mans. You can use both processing-time and event-time timers if you set 
the time characteristic to event-time. They'll be triggered by their own time 
semantics, separately. (actually there’s no watermark for processing time)

Cheers,
Xingcan

> On Jul 9, 2019, at 11:40 AM, M Singh  wrote:
> 
> Thanks Yun for your answers.
> 
> Does this mean that we can use processing and event timers (in processors or 
> triggers) regardless of the time characteristic ?  Also, is possible to use 
> both together and will they both fire at the appropriate watermarks for 
> processing and event times ?  
> 
> Mans
> 
> On Tuesday, July 9, 2019, 12:18:30 AM EDT, Yun Gao  
> wrote:
> 
> 
> Hi,
> For the three questions,
>   1. The processing time timer will be trigger. IMO you may think the 
> processing time timer as in parallel with the event time timer. They are 
> processed separately underlying. The processing time timer will be triggered 
> according to the realistic time.
>   2. I'am not very clear on how to changed later in the application. Do you 
> mean call `StreamExecutionEnvironment#setStreamTimeCharacteristics` multiple 
> times ? If so, then the last call will take effect for all the operators 
> before or after the last call, since the setting will only take effect in 
> `StreamExecutionEnvironment#execute`.
>   3. 'assignTimeStampAndWatermark' will change the timestamp of the record. 
> IMO you may think each record contains a timestamp field, and the filed is 
> set when ingesting, but 'assignTimeStampAndWatermark' will change the value 
> of this field, so the following operators relying on the timestamp will see 
> the updated value.
> 
> Best,
> Yun
> 
> 
> 
> --
> From:M Singh 
> Send Time:2019 Jul. 9 (Tue.) 09:42
> To:User 
> Subject:Apache Flink - Relation between stream time characteristic and timer 
> triggers
> 
> Hi:
> 
> I have a few questions about the stream time characteristics:
> 
> 1. If the time characteristic is set to TimeCharacteristic.EventTime, but the 
> timers in a processor or trigger is set using registerProcessingTimeTimer (or 
> vice versa), then will that timer fire ?  
> 
> 2.  Once the time character is set on the stream environment, and changed 
> later in the application, which one is applied, the first one or the last one 
> ?
> 
> 3.  If the stream time characteristic is set to IngestionTime, then is there 
> any adverse effect of assigning the timestamp using  
> assignTimeStampAndWatermark to a stream later in the application ?
> 
> Thanks
> 



Re: [VOTE] How to Deal with Split/Select in DataStream API

2019-07-08 Thread Xingcan Cui
Hi Aljoscha,

Thanks for your response.

With all this preliminary information collected, I’ll start a formal process.

Thank everybody for your attention.

Best,
Xingcan

> On Jul 8, 2019, at 10:17 AM, Aljoscha Krettek  wrote:
> 
> I think this would benefit from a FLIP, that neatly sums up the options, and 
> which then gives us also a point where we can vote and ratify a decision.
> 
> As a gut feeling, I most like Option 3). Initially I would have preferred 
> option 1) (because of a sense of API purity), but by now I think it’s good 
> that users have this simpler option.
> 
> Aljoscha 
> 
>> On 8. Jul 2019, at 06:39, Xingcan Cui > <mailto:xingc...@gmail.com>> wrote:
>> 
>> Hi all,
>> 
>> Thanks for your participation.
>> 
>> In this thread, we got one +1 for option 1 and option 3, respectively. In 
>> the original thread[1], we got two +1 for option 1, one +1 for option 2, and 
>> five +1 and one -1 for option 3.
>> 
>> To summarize,
>> 
>> Option 1 (port side output to flatMap and deprecate split/select): three +1
>> Option 2 (introduce a new split/select and deprecate existing one): one +1
>> Option 3 ("correct" the existing split/select): six +1 and one -1
>> 
>> It seems that most people involved are in favor of "correcting" the existing 
>> split/select. However, this will definitely break the API compatibility, in 
>> a subtle way.
>> 
>> IMO, the real behavior of consecutive split/select's has never been 
>> thoroughly clarified. Even in the community, it hard to say that we come 
>> into a consensus on its real semantics[2-4]. Though the initial design is 
>> not ambiguous, there's no doubt that its concept has drifted. 
>> 
>> As the split/select is quite an ancient API, I cc'ed this to more members. 
>> It couldn't be better if you can share your opinions on this.
>> 
>> Thanks,
>> Xingcan
>> 
>> [1] 
>> https://lists.apache.org/thread.html/f94ea5c97f96c705527dcc809b0e2b69e87a4c5d400cb7c61859e1f4@%3Cdev.flink.apache.org%3E
>>  
>> <https://lists.apache.org/thread.html/f94ea5c97f96c705527dcc809b0e2b69e87a4c5d400cb7c61859e1f4@%3Cdev.flink.apache.org%3E>
>> [2] https://issues.apache.org/jira/browse/FLINK-1772 
>> <https://issues.apache.org/jira/browse/FLINK-1772>
>> [3] https://issues.apache.org/jira/browse/FLINK-5031 
>> <https://issues.apache.org/jira/browse/FLINK-5031>
>> [4] https://issues.apache.org/jira/browse/FLINK-11084 
>> <https://issues.apache.org/jira/browse/FLINK-11084>
>> 
>> 
>>> On Jul 5, 2019, at 12:04 AM, 杨力 >> <mailto:bill.le...@gmail.com>> wrote:
>>> 
>>> I prefer the 1) approach. I used to carry fields, which is needed only for 
>>> splitting, in the outputs of flatMap functions. Replacing it with 
>>> outputTags would simplify data structures.
>>> 
>>> Xingcan Cui mailto:xingc...@gmail.com> 
>>> <mailto:xingc...@gmail.com <mailto:xingc...@gmail.com>>> 于 2019年7月5日周五 
>>> 上午2:20写道:
>>> Hi folks,
>>> 
>>> Two weeks ago, I started a thread [1] discussing whether we should discard 
>>> the split/select methods (which have been marked as deprecation since v1.7) 
>>> in DataStream API. 
>>> 
>>> The fact is, these methods will cause "unexpected" results when using 
>>> consecutively (e.g., ds.split(a).select(b).split(c).select(d)) or 
>>> multi-times on the same target (e.g., ds.split(a).select(b), 
>>> ds.split(c).select(d)). The reason is that following the initial design, 
>>> the new split/select logic will always override the existing one on the 
>>> same target operator, rather than append to it. Some users may not be aware 
>>> of that, but if you do, a current solution would be to use the more 
>>> powerful side output feature [2].
>>> 
>>> FLINK-11084 <https://issues.apache.org/jira/browse/FLINK-11084 
>>> <https://issues.apache.org/jira/browse/FLINK-11084>> added some 
>>> restrictions to the existing split/select logic and suggest to replace it 
>>> with side output in the future. However, considering that the side output 
>>> is currently only available in the process function layer and the 
>>> split/select could have been widely used in many real-world applications, 
>>> we'd like to start a vote andlisten to the community on how to deal with 
>>> them.
>>> 
>>> In the discussion thread [1], we proposed three solutions as follow

Re: [VOTE] How to Deal with Split/Select in DataStream API

2019-07-07 Thread Xingcan Cui
Hi all,

Thanks for your participation.

In this thread, we got one +1 for option 1 and option 3, respectively. In the 
original thread[1], we got two +1 for option 1, one +1 for option 2, and five 
+1 and one -1 for option 3.

To summarize,

Option 1 (port side output to flatMap and deprecate split/select): three +1
Option 2 (introduce a new split/select and deprecate existing one): one +1
Option 3 ("correct" the existing split/select): six +1 and one -1

It seems that most people involved are in favor of "correcting" the existing 
split/select. However, this will definitely break the API compatibility, in a 
subtle way.

IMO, the real behavior of consecutive split/select's has never been thoroughly 
clarified. Even in the community, it hard to say that we come into a consensus 
on its real semantics[2-4]. Though the initial design is not ambiguous, there's 
no doubt that its concept has drifted. 

As the split/select is quite an ancient API, I cc'ed this to more members. It 
couldn't be better if you can share your opinions on this.

Thanks,
Xingcan

[1] 
https://lists.apache.org/thread.html/f94ea5c97f96c705527dcc809b0e2b69e87a4c5d400cb7c61859e1f4@%3Cdev.flink.apache.org%3E
[2] https://issues.apache.org/jira/browse/FLINK-1772
[3] https://issues.apache.org/jira/browse/FLINK-5031
[4] https://issues.apache.org/jira/browse/FLINK-11084


> On Jul 5, 2019, at 12:04 AM, 杨力  wrote:
> 
> I prefer the 1) approach. I used to carry fields, which is needed only for 
> splitting, in the outputs of flatMap functions. Replacing it with outputTags 
> would simplify data structures.
> 
> Xingcan Cui mailto:xingc...@gmail.com>> 于 2019年7月5日周五 
> 上午2:20写道:
> Hi folks,
> 
> Two weeks ago, I started a thread [1] discussing whether we should discard 
> the split/select methods (which have been marked as deprecation since v1.7) 
> in DataStream API. 
> 
> The fact is, these methods will cause "unexpected" results when using 
> consecutively (e.g., ds.split(a).select(b).split(c).select(d)) or multi-times 
> on the same target (e.g., ds.split(a).select(b), ds.split(c).select(d)). The 
> reason is that following the initial design, the new split/select logic will 
> always override the existing one on the same target operator, rather than 
> append to it. Some users may not be aware of that, but if you do, a current 
> solution would be to use the more powerful side output feature [2].
> 
> FLINK-11084 <https://issues.apache.org/jira/browse/FLINK-11084> added some 
> restrictions to the existing split/select logic and suggest to replace it 
> with side output in the future. However, considering that the side output is 
> currently only available in the process function layer and the split/select 
> could have been widely used in many real-world applications, we'd like to 
> start a vote andlisten to the community on how to deal with them.
> 
> In the discussion thread [1], we proposed three solutions as follows. All of 
> them are feasible but have different impacts on the public API.
> 
> 1) Port the side output feature to DataStream API's flatMap and replace 
> split/select with it.
> 
> 2) Introduce a dedicated function in DataStream API (with the "correct" 
> behavior but a different name) that can be used to replace the existing 
> split/select.
> 
> 3) Keep split/select but change the behavior/semantic to be "correct".
> 
> Note that this is just a vote for gathering information, so feel free to 
> participate and share your opinions.
> 
> The voting time will end on July 7th 17:00 EDT.
> 
> Thanks,
> Xingcan
> 
> [1] 
> https://lists.apache.org/thread.html/f94ea5c97f96c705527dcc809b0e2b69e87a4c5d400cb7c61859e1f4@%3Cdev.flink.apache.org%3E
>  
> <https://lists.apache.org/thread.html/f94ea5c97f96c705527dcc809b0e2b69e87a4c5d400cb7c61859e1f4@%3Cdev.flink.apache.org%3E>
> [2] 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/side_output.html
>  
> <https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/side_output.html>


[VOTE] How to Deal with Split/Select in DataStream API

2019-07-04 Thread Xingcan Cui
Hi folks,

Two weeks ago, I started a thread [1] discussing whether we should discard
the split/select methods (which have been marked as deprecation since v1.7)
in DataStream API.

The fact is, these methods will cause "unexpected" results when using
consecutively (e.g., ds.split(a).select(b).split(c).select(d)) or
multi-times on the same target (e.g., ds.split(a).select(b),
ds.split(c).select(d)). The reason is that following the initial design,
the new split/select logic will always override the existing one on the
same target operator, rather than append to it. Some users may not be aware
of that, but if you do, a current solution would be to use the more
powerful side output feature [2].

FLINK-11084  added some
restrictions to the existing split/select logic and suggest to replace it with
side output in the future. However, considering that the side output is
currently only available in the process function layer and the split/select
could have been widely used in many real-world applications, we'd like to start
a vote andlisten to the community on how to deal with them.

In the discussion thread [1], we proposed three solutions as follows. All
of them are feasible but have different impacts on the public API.

1) Port the side output feature to DataStream API's flatMap and replace
split/select with it.

2) Introduce a dedicated function in DataStream API (with the "correct"
behavior but a different name) that can be used to replace the existing
split/select.

3) Keep split/select but change the behavior/semantic to be "correct".

Note that this is just a vote for gathering information, so feel free to
participate and share your opinions.

The voting time will end on *July 7th 17:00 EDT*.

Thanks,
Xingcan

[1]
https://lists.apache.org/thread.html/f94ea5c97f96c705527dcc809b0e2b69e87a4c5d400cb7c61859e1f4@%3Cdev.flink.apache.org%3E
[2]
https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/side_output.html


Re: Types源码

2019-06-16 Thread Xingcan Cui
你好,看故障猜测是Scala类型推断机制问题,用.asInstanceOf[Array[TypeInformation[_]强转一下即可。

> On Jun 16, 2019, at 10:33 PM, liu_mingzhang  wrote:
> 
> 感谢您的回复, 我试过运行了, 编译报错, 无法build project
> 另外您贴的issue我这里打不开...
> 
> 
> 在2019年6月17日 10:27,Zili Chen 写道:
> 你试过直接运行吗?IDEA 有时候对 Scala 的类型推断有问题,可以编译运行的代码会误报类型不匹配。如果可以运行应该是 IDEA 的问题,可以到相应的 
> issue tracker[1] 报告。
> 
> 
> Best,
> tison.
> 
> 
> [1] https://youtrack.jetbrains.com/oauth?state=%2Fissues%2FIDEA
> 
> 
> 
> 
> liu_mingzhang  于2019年6月17日周一 上午10:22写道:
> 
> 
> 
> 我希望自定义一个这样功能的UDTF,
> 
> 
> 将表中原始数据:
> idfiled1field2
> 1 A,B,C D,E,F
> 
> 
> 转换成:
> idnumfiled1field2
> 1   1AD
> 1   2BE
> 1   3CF
> 
> 
> 
> 
> 下面是我自己写的函数,但是报错
> 
> 
> 然而org.apache.flink.table.api.Types.ROW是有这样的构造函数的,注释中的示例也是这样写的
> 
> 
> 我不明白我为什么会报错, 希望大佬们帮忙解答,不胜感激
> 
> 
> 另: 不知道为什么之前发的图片没法成功, 如果这次还看不到的话, 麻烦请看附件,多谢各位大佬



Re: Inconsistent documentation of Table Conditional functions

2019-05-08 Thread Xingcan Cui
Hi Flavio,

In the description, resultX is just an identifier for the result of the first 
meeting condition.

Best,
Xingcan

> On May 8, 2019, at 12:02 PM, Flavio Pompermaier  wrote:
> 
> Hi to all,
> in the documentation of the Table Conditional functions [1] the example is 
> inconsistent with the related description (there's no resultX for example). 
> Or am I wrong?
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/functions.html#conditional-functions
>  
> 
> 



Re: flink tableapi inner join exception

2019-03-15 Thread Xingcan Cui
Hi,

As the message said, some columns share the same names. You could first rename 
the columns of one table with the `as` operation [1].

Best,
Xingcan

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tableApi.html#scan-projection-and-filter
 

> On Mar 15, 2019, at 9:03 AM, 刘 文  wrote:
> 
> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> join relations with ambiguous names: id, name, value
>   at 
> org.apache.flink.table.plan.logical.LogicalNode.failValidation(LogicalNode.scala:156)
>   at 
> org.apache.flink.table.plan.logical.Join.validate(operators.scala:463)
>   at org.apache.flink.table.api.Table.join(table.scala:589)
>   at org.apache.flink.table.api.Table.join(table.scala:397)
>   at 
> com.opensourceteams.module.bigdata.flink.example.tableapi.operation.innerJoin.Run$.main(Run.scala:26)
>   at 
> com.opensourceteams.module.bigdata.flink.example.tableapi.operation.innerJoin.Run.main(Run.scala)
> 
> 
> 
> 
> 
> 
> package 
> com.opensourceteams.module.bigdata.flink.example.tableapi.operation.innerJoin
> 
> import org.apache.flink.api.scala.{ExecutionEnvironment, _}
> import org.apache.flink.table.api.TableEnvironment
> import org.apache.flink.table.api.scala._
> 
> object Run {
> 
> 
>   def main(args: Array[String]): Unit = {
> 
> val env = ExecutionEnvironment.getExecutionEnvironment
> val tableEnv = TableEnvironment.getTableEnvironment(env)
> 
> val dataSet = env.fromElements( (1,"a",10),(2,"b",20), (3,"c",30) )
> val dataSet2 = env.fromElements( (1,"a",10),(20,"b",20), (30,"c",30) )
> 
> 
> 
> //从dataset转化为 table
> val table = tableEnv.fromDataSet(dataSet,'id,'name,'value)
> val table2 = tableEnv.fromDataSet(dataSet2,'id,'name,'value)
> 
> 
> 
>table.join(table2).where(" id = id ").first(1000).print()
> 
> 
> 
> 
> 
> 
>   }
> 
> }
> 
> 
> 
> 
> 
> 
> Best,
> thinktothings
> 
> 
> 



Re: left join failing with FlinkLogicalJoinConverter NPE

2019-02-26 Thread Xingcan Cui
Hi Karl,

I think this is a bug and created FLINK-11769 
<https://issues.apache.org/jira/browse/FLINK-11769> to track it.

Best,
Xingcan

> On Feb 26, 2019, at 2:02 PM, Karl Jin  wrote:
> 
> I removed the multiset> field and the join worked fine. 
> The field was created from a Kafka source through a query that looks like 
> "select collect(data) as i_data from ... group by pk"
> 
> Do you think this is a bug or is this something I can get around using some 
> configuration?
> 
> On Tue, Feb 26, 2019 at 1:20 AM Xingcan Cui  <mailto:xingc...@gmail.com>> wrote:
> Yes. Please check that. If it's the nested type's problem, this might be a 
> bug.
> 
> On Mon, Feb 25, 2019, 21:50 Karl Jin  <mailto:karl@gmail.com>> wrote:
> Do you think something funky might be happening with Map/Multiset types? If 
> so how do I deal with it (I think I can verify by removing those columns and 
> retry?)?
> 
> On Mon, Feb 25, 2019 at 6:28 PM Karl Jin  <mailto:karl@gmail.com>> wrote:
> Thanks for checking in quickly,
> 
> Below is what I got on printSchema on the two tables (left joining the second 
> one to the first one on uc_pk = i_uc_pk). rowtime in both are extracted from 
> the string field uc_update_ts
> 
> root
>  |-- uc_pk: String
>  |-- uc_update_ts: String
>  |-- rowtime: TimeIndicatorTypeInfo(rowtime)
>  |-- uc_version: String
>  |-- uc_type: String
>  |-- data_parsed: Map
> 
> root
>  |-- i_uc_pk: String
>  |-- i_uc_update_ts: TimeIndicatorTypeInfo(rowtime)
>  |-- image_count: Long
>  |-- i_data: Multiset>
> 
> On Mon, Feb 25, 2019 at 4:54 PM Xingcan Cui  <mailto:xingc...@gmail.com>> wrote:
> Hi Karl,
> 
> It seems that some field types of your inputs were not properly extracted. 
> Could you share the result of `printSchema()` for your input tables?
> 
> Best,
> Xingcan
> 
> > On Feb 25, 2019, at 4:35 PM, Karl Jin  > <mailto:karl@gmail.com>> wrote:
> > 
> > Hello,
> > 
> > First time posting, so please let me know if the formatting isn't correct, 
> > etc.
> > 
> > I'm trying to left join two Kafka sources, running 1.7.2 locally, but 
> > getting the below exception. Looks like some sort of query optimization 
> > process but I'm not sure where to start investigating/debugging. I see 
> > things are marked as NONE in the object so that's a bit of a flag to me, 
> > although I don't know for sure. Any pointer would be much appreciated:
> > 
> > Exception in thread "main" java.lang.RuntimeException: Error while applying 
> > rule FlinkLogicalJoinConverter, args 
> > [rel#94:LogicalJoin.NONE(left=rel#84:Subset#0.NONE,right=rel#93:Subset#5.NONE,condition==($0,
> >  $6),joinType=left)]
> >   at 
> > org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:236)
> >   at 
> > org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:646)
> >   at 
> > org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:339)
> >   at 
> > org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:373)
> >   at 
> > org.apache.flink.table.api.TableEnvironment.optimizeLogicalPlan(TableEnvironment.scala:292)
> >   at 
> > org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:812)
> >   at 
> > org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:860)
> >   at 
> > org.apache.flink.table.api.scala.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:205)
> >   at 
> > org.apache.flink.table.api.scala.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:185)
> >   at 
> > org.apache.flink.table.api.scala.TableConversions.toRetractStream(TableConversions.scala:143)
> > ...
> > Caused by: java.lang.RuntimeException: Error occurred while applying rule 
> > FlinkLogicalJoinConverter
> >   at 
> > org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:149)
> >   at 
> > org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:234)
> >   at 
> > org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:141)
> >   at 
> > org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:212)
> >   ... 11 more
> > Caused by: java.lang.NullPointerException
> >   at 
> > org.apache.flink.table.plan.nodes.FlinkRelNode$class.estimateDataTypeSize(FlinkRelNode.scala:84)
> >   at 
> > org.apache

Re: left join failing with FlinkLogicalJoinConverter NPE

2019-02-25 Thread Xingcan Cui
Yes. Please check that. If it's the nested type's problem, this might be a
bug.

On Mon, Feb 25, 2019, 21:50 Karl Jin  wrote:

> Do you think something funky might be happening with Map/Multiset types?
> If so how do I deal with it (I think I can verify by removing those columns
> and retry?)?
>
> On Mon, Feb 25, 2019 at 6:28 PM Karl Jin  wrote:
>
>> Thanks for checking in quickly,
>>
>> Below is what I got on printSchema on the two tables (left joining the
>> second one to the first one on uc_pk = i_uc_pk). rowtime in both are
>> extracted from the string field uc_update_ts
>>
>> root
>>  |-- uc_pk: String
>>  |-- uc_update_ts: String
>>  |-- rowtime: TimeIndicatorTypeInfo(rowtime)
>>  |-- uc_version: String
>>  |-- uc_type: String
>>  |-- data_parsed: Map
>>
>> root
>>  |-- i_uc_pk: String
>>  |-- i_uc_update_ts: TimeIndicatorTypeInfo(rowtime)
>>  |-- image_count: Long
>>  |-- i_data: Multiset>
>>
>> On Mon, Feb 25, 2019 at 4:54 PM Xingcan Cui  wrote:
>>
>>> Hi Karl,
>>>
>>> It seems that some field types of your inputs were not properly
>>> extracted.
>>> Could you share the result of `printSchema()` for your input tables?
>>>
>>> Best,
>>> Xingcan
>>>
>>> > On Feb 25, 2019, at 4:35 PM, Karl Jin  wrote:
>>> >
>>> > Hello,
>>> >
>>> > First time posting, so please let me know if the formatting isn't
>>> correct, etc.
>>> >
>>> > I'm trying to left join two Kafka sources, running 1.7.2 locally, but
>>> getting the below exception. Looks like some sort of query optimization
>>> process but I'm not sure where to start investigating/debugging. I see
>>> things are marked as NONE in the object so that's a bit of a flag to me,
>>> although I don't know for sure. Any pointer would be much appreciated:
>>> >
>>> > Exception in thread "main" java.lang.RuntimeException: Error while
>>> applying rule FlinkLogicalJoinConverter, args
>>> [rel#94:LogicalJoin.NONE(left=rel#84:Subset#0.NONE,right=rel#93:Subset#5.NONE,condition==($0,
>>> $6),joinType=left)]
>>> >   at
>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:236)
>>> >   at
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:646)
>>> >   at
>>> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:339)
>>> >   at
>>> org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:373)
>>> >   at
>>> org.apache.flink.table.api.TableEnvironment.optimizeLogicalPlan(TableEnvironment.scala:292)
>>> >   at
>>> org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:812)
>>> >   at
>>> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:860)
>>> >   at
>>> org.apache.flink.table.api.scala.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:205)
>>> >   at
>>> org.apache.flink.table.api.scala.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:185)
>>> >   at
>>> org.apache.flink.table.api.scala.TableConversions.toRetractStream(TableConversions.scala:143)
>>> > ...
>>> > Caused by: java.lang.RuntimeException: Error occurred while applying
>>> rule FlinkLogicalJoinConverter
>>> >   at
>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:149)
>>> >   at
>>> org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:234)
>>> >   at
>>> org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:141)
>>> >   at
>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:212)
>>> >   ... 11 more
>>> > Caused by: java.lang.NullPointerException
>>> >   at
>>> org.apache.flink.table.plan.nodes.FlinkRelNode$class.estimateDataTypeSize(FlinkRelNode.scala:84)
>>> >   at
>>> org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.estimateDataTypeSize(FlinkLogicalJoinBase.scala:29)
>>> >   at
>>> org.apache.flink.table.plan.nodes.FlinkRelNode$class.estimateDataTypeSize(FlinkRelNode.scala:104)
>>> >   at
>>> org.apache.flink.table.p

Re: left join failing with FlinkLogicalJoinConverter NPE

2019-02-25 Thread Xingcan Cui
Hi Karl,

It seems that some field types of your inputs were not properly extracted. 
Could you share the result of `printSchema()` for your input tables?

Best,
Xingcan

> On Feb 25, 2019, at 4:35 PM, Karl Jin  wrote:
> 
> Hello,
> 
> First time posting, so please let me know if the formatting isn't correct, 
> etc.
> 
> I'm trying to left join two Kafka sources, running 1.7.2 locally, but getting 
> the below exception. Looks like some sort of query optimization process but 
> I'm not sure where to start investigating/debugging. I see things are marked 
> as NONE in the object so that's a bit of a flag to me, although I don't know 
> for sure. Any pointer would be much appreciated:
> 
> Exception in thread "main" java.lang.RuntimeException: Error while applying 
> rule FlinkLogicalJoinConverter, args 
> [rel#94:LogicalJoin.NONE(left=rel#84:Subset#0.NONE,right=rel#93:Subset#5.NONE,condition==($0,
>  $6),joinType=left)]
>   at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:236)
>   at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:646)
>   at 
> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:339)
>   at 
> org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:373)
>   at 
> org.apache.flink.table.api.TableEnvironment.optimizeLogicalPlan(TableEnvironment.scala:292)
>   at 
> org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:812)
>   at 
> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:860)
>   at 
> org.apache.flink.table.api.scala.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:205)
>   at 
> org.apache.flink.table.api.scala.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:185)
>   at 
> org.apache.flink.table.api.scala.TableConversions.toRetractStream(TableConversions.scala:143)
> ...
> Caused by: java.lang.RuntimeException: Error occurred while applying rule 
> FlinkLogicalJoinConverter
>   at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:149)
>   at 
> org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:234)
>   at 
> org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:141)
>   at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:212)
>   ... 11 more
> Caused by: java.lang.NullPointerException
>   at 
> org.apache.flink.table.plan.nodes.FlinkRelNode$class.estimateDataTypeSize(FlinkRelNode.scala:84)
>   at 
> org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.estimateDataTypeSize(FlinkLogicalJoinBase.scala:29)
>   at 
> org.apache.flink.table.plan.nodes.FlinkRelNode$class.estimateDataTypeSize(FlinkRelNode.scala:104)
>   at 
> org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.estimateDataTypeSize(FlinkLogicalJoinBase.scala:29)
>   at 
> org.apache.flink.table.plan.nodes.FlinkRelNode$$anonfun$estimateRowSize$2.apply(FlinkRelNode.scala:80)
>   at 
> org.apache.flink.table.plan.nodes.FlinkRelNode$$anonfun$estimateRowSize$2.apply(FlinkRelNode.scala:79)
>   at 
> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)
>   at 
> scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)
>   at scala.collection.mutable.ArrayBuffer.foldLeft(ArrayBuffer.scala:48)
>   at 
> org.apache.flink.table.plan.nodes.FlinkRelNode$class.estimateRowSize(FlinkRelNode.scala:79)
>   at 
> org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.estimateRowSize(FlinkLogicalJoinBase.scala:29)
>   at 
> org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.computeSelfCost(FlinkLogicalJoinBase.scala:51)
>   at 
> org.apache.calcite.rel.metadata.RelMdPercentageOriginalRows.getNonCumulativeCost(RelMdPercentageOriginalRows.java:162)
>   at 
> GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost_$(Unknown 
> Source)
>   at 
> GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost(Unknown 
> Source)
>   at 
> org.apache.calcite.rel.metadata.RelMetadataQuery.getNonCumulativeCost(RelMetadataQuery.java:301)
>   at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.getCost(VolcanoPlanner.java:953)
>   at 
> org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements0(RelSubset.java:339)
>   at 
> org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements(RelSubset.java:322)
>   at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.addRelToSet(VolcanoPlanner.java:1643)
>   at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1579)
>   at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:859)
>   at 
> 

Re: Flink join stream where one stream is coming 5 minutes late

2018-11-26 Thread Xingcan Cui
Hi Abhijeet,

If you want to perform window-join in the DataStream API, the window 
configurations on both sides must be exactly the same.

For your case, maybe you can try adding a 5 mins delay on event times (and 
watermarks) of the faster stream. 

Hope that helps.

Best,
Xingcan


> On Nov 26, 2018, at 12:44 PM, Abhijeet Kumar  
> wrote:
> 
> I already mentioned above, one is coming late by 5 mins. Maybe my approach 
> isn't correct, and I asked to correct me if I'm wrong. 
> 
>> On 26-Nov-2018, at 5:56 PM, Taher Koitawala > > wrote:
>> 
>> May I ask why you want to have 2 differences between window time? What's the 
>> use case?
>> 
>> On Mon 26 Nov, 2018, 5:53 PM Abhijeet Kumar >  wrote:
>> Hello Team,
>> 
>> I've to join two stream where one stream is coming late. So, I planned doing 
>> it by creating two windows, for first window the size will be 5 minutes and 
>> for the other it will be 10 minutes. Then, I'll perform the join operation. 
>> Is my solution correct because I don't know whether one window will sync 
>> with other or not because one is getting finished earlier and another is 
>> late. If there is a better way to achieve the same, please let me know. 
>> 
>> Thanks,
>> 
>>  
>> Abhijeet Kumar
>> Software Development Engineer,
>> Sentienz Solutions Pvt Ltd
>> Cognitive Data Platform - Perceive the Data !
>> abhijeet.ku...@sentienz.com  
>> |www.sentienz.com  | Bengaluru
>> 
>> 
> 



Re: IntervalJoin is stucked in rocksdb'seek for too long time in flink-1.6.2

2018-11-21 Thread Xingcan Cui
Hi Jiangang,

The IntervalJoin is actually the DataStream-level implementation of the SQL 
time-windowed join[1]. 

To ensure the completeness of the join results, we have to cache all the 
records (from both sides) in the most recent time interval. That may lead to 
state backend problems when huge streams flooding in. 

One benefit of SQL is that the optimizer will help to reduce the join inputs as 
much as possible (e.g., via predicate pushdown), but that should be done 
manually in DataStream programs. Thus, I suggest you to 1) try increasing the 
parallelism (and number of nodes if possible); 2) filter out some records or 
reduce the number of fields in advance.

Best,
Xingcan

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql.html#joins

> On Nov 21, 2018, at 2:06 AM, liujiangang  wrote:
> 
> I am using IntervalJoin function to join two streams within 10 minutes. As
> below:
> 
> labelStream.intervalJoin(adLogStream)
>   .between(Time.milliseconds(0), Time.milliseconds(60))
>   .process(new processFunction())
>   .sink(kafkaProducer)
> labelStream and adLogStream are proto-buf class that are keyed by Long id.
> 
> Our two input-streams are huge. After running about 30minutes, the output to
> kafka go down slowly, like this:
> 
>  
> 
> When data output begins going down, I use jstack and pstack sevaral times to
> get these: 
> 
>  
> 
>  
> 
> It seems the program is stucked in rockdb's seek. And I find that some
> rockdb's srt file are accessed slowly by iteration.
> 
>  
> 
> I have tried several ways:
> 
> 1)Reduce the input amount to half. This works well.
> 2)Replace labelStream and adLogStream with simple Strings. This way, data
> amount will not change. This works well.
> 3)Use PredefinedOptions like SPINNING_DISK_OPTIMIZED and
> SPINNING_DISK_OPTIMIZED_HIGH_MEM. This still fails.
> 4)Use new versions of rocksdbjni. This still fails.
> Can anyone give me some suggestions? Thank you very much.
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: Flink SQL string literal does not support double quotation?

2018-11-01 Thread Xingcan Cui
Hi Henry,

In most SQL conventions, single quotes are for Strings, while double quotes are 
for identifiers.

Best,
Xingcan

> On Oct 31, 2018, at 7:53 PM, 徐涛  wrote:
> 
> Hi Experts,
>   When I am running the following SQL in FLink 1.6.2, I got 
> org.apache.calcite.sql.parser.impl.ParseException
>   
>   select 
>BUYER_ID, 
>AMOUNT, 
>concat( 
>from_unixtime(unix_timestamp(CREATE_TIME, 'EEE MMM dd 
> HH:mm:ss zzz '),'MMdd'), 
>case when 1>= 10 and 1<= 21 
>then "02" else "01” 
>end 
>) as date12 
>from 
>vip_order
> 
>   but when I use single quotes , such as  ’02’ , it does not complain 
> anything, it is a bit wired, because normally in SQL user can use double 
> quotation or single quotation equally.
>   I want to know it is a Flink limitation or a bug? If it is a 
> limitation, why?
>   Thank you.
> 
> 
> Best 
> Henry



Re: 2 Broadcast streams to a Single Keyed Stream....how to?

2018-09-18 Thread Xingcan Cui
Hi Vishal,

Actually, you could provide multiple MapStateDescriptors for the `broadcast()` 
method and then use them, separately.

Best,
Xingcan

> On Sep 18, 2018, at 9:29 PM, Vishal Santoshi  
> wrote:
> 
> I could do that, but I was under the impression that 2 or more disparate 
> broadcast states could be provided to a keyed stream, referenced through a 
> key in the Map State...That would be cleaner as in the fact that 2 different 
> set of rules are to be applied are explictely declared rather then carries 
> inside the datums of a unioned stream.. I will look at second option...
> 
> On Tue, Sep 18, 2018, 9:15 AM Xingcan Cui  <mailto:xingc...@gmail.com>> wrote:
> Hi Vishal,
> 
> You could try 1) merging these two rule streams first with the `union` method 
> if they get the same type or 2) connecting them and encapsulate the records 
> from both sides to a unified type (e.g., scala Either).
> 
> Best,
> Xingcan
> 
> > On Sep 18, 2018, at 8:59 PM, Vishal Santoshi  > <mailto:vishal.santo...@gmail.com>> wrote:
> > 
> > I have 2 broadcast streams that carry rules to be applied to a third keyed  
> > stream. The connect method of the keyed stream only takes a single 
> > broadcast stream. How do I connect the 2 broadcast stream to that single 
> > keyed stream.
> > 
> >   Do I have 2 connects and thus 2 instances of BroadcastConnextedStream, 
> > union them and then apply process through a single 
> > SingleOutpitStreamOperator ? The issue I see there are 2 keyBy calls and an 
> > additional shuffle before connect is called.
> > 
> > To be precise, is there a simple example of applying 2 dissimilar rules 
> > through 2 broadcast streams, thus 2 different MapStateDiscriptors, to a 
> > single keyed stream without any unnecessary overhead...
> > 
> > 
> >  
> 



Re: 2 Broadcast streams to a Single Keyed Stream....how to?

2018-09-18 Thread Xingcan Cui
Hi Vishal,

You could try 1) merging these two rule streams first with the `union` method 
if they get the same type or 2) connecting them and encapsulate the records 
from both sides to a unified type (e.g., scala Either).

Best,
Xingcan

> On Sep 18, 2018, at 8:59 PM, Vishal Santoshi  
> wrote:
> 
> I have 2 broadcast streams that carry rules to be applied to a third keyed  
> stream. The connect method of the keyed stream only takes a single broadcast 
> stream. How do I connect the 2 broadcast stream to that single keyed stream.
> 
>   Do I have 2 connects and thus 2 instances of BroadcastConnextedStream, 
> union them and then apply process through a single SingleOutpitStreamOperator 
> ? The issue I see there are 2 keyBy calls and an additional shuffle before 
> connect is called.
> 
> To be precise, is there a simple example of applying 2 dissimilar rules 
> through 2 broadcast streams, thus 2 different MapStateDiscriptors, to a 
> single keyed stream without any unnecessary overhead...
> 
> 
>  



Re: Potential bug in Flink SQL HOP and TUMBLE operators

2018-09-17 Thread Xingcan Cui
Hi John,

I suppose that was caused by the groupBy field “timestamp”. You were actually 
grouping on two time fields simultaneously, the processing time and the time 
from your producer. As @Rong suggested, try removing the additional groupBy 
field “timestamp” and check the result again.

Best,
Xingcan

> On Sep 18, 2018, at 6:50 AM, Rong Rong  wrote:
> 
> This is in fact a very strange behavior. 
> 
> To add to the discussion, when you mentioned: "raw Flink (windowed or not) 
> nor when using Flink CEP", how were the comparisons being done? 
> Also, were you able to get the results correct without the additional GROUP 
> BY term of "foo" or "userId"?
> 
> --
> Rong
> 
> On Mon, Sep 17, 2018 at 12:30 PM Fabian Hueske  > wrote:
> Hmm, that's interesting. 
> HOP and TUMBLE window aggregations are directly translated into their 
> corresponding DataStream counterparts (Sliding, Tumble).
> There should be no filtering of records.
> 
> I assume you tried a simple query like "SELECT * FROM MyEventTable" and 
> received all expected data?
> 
> Fabian
> 
> 2018-09-17 18:56 GMT+02:00 elliotst...@gmail.com 
>   >:
> Yes, I am certain events are being ignored or dropped during the first five 
> seconds.  Further investigation on my part reveals that the "ignore" period 
> is exactly the first five seconds of the stream - regardless of the size of 
> the window.
> 
> Situation
> 
> I have a script which pushes an event into Kafka once every second structured 
> as:
> 
> {"userId": "use...@email.com ", "timestamp": 
> }
> 
> My stream uses this Kafka queue as its source.  JSON schema and table schema 
> are as follows:
> 
> final Json jsonFormat = new Json()
> .failOnMissingField(false)
> .jsonSchema("{"
> + "  type: 'object',"
> + "  properties: {"
> + "userId: { type: 'string' },"
> + "timestamp: { type: 'integer' }"
> + "  }"
> + "}");
> 
> final Schema tableSchema = new Schema()
> .field("userId", Types.STRING())
> .field("timestamp", TypeInformation.of(BigDecimal.class))
> .field("proctime", Types.SQL_TIMESTAMP())
> .proctime();
> 
> StreamTableEnvironment is configured to be in append mode, and table source 
> is named "MyEventTable".  The stream is using the following SQL query:
> 
> final String sql =
> " SELECT userId, `timestamp` "
> + " FROM MyEventTable "
> + " GROUP BY HOP(proctime, INTERVAL '1' SECOND, INTERVAL '10' 
> SECOND), userId, `timestamp` ";
> final Table resultTable = tableEnvironment.sqlQuery(sql);
> 
> Code which I'm using to verify that events are being dropped:
> 
> streamTableEnvironment.toAppendStream(sqlResultTable, Row.class)
> .map((MapFunction) row -> {
>   final String userId = row.getField(0).toString();
>   final BigDecimal timestamp = (BigDecimal) row.getField(1);
> 
>   return String.format(
>   "(%s, %s)",
>   userId, timestamp.toString()
>   );
> })
> .print();
> 
> 
> No events produced during the first five seconds following a cold start of 
> Flink are ever printed to the console.  Any and all events produced after the 
> first five seconds following a cold start of Flink are always printed to the 
> console.  All processes are running on the same system.
> 
> This issue does not occur when using raw Flink (windowed or not) nor when 
> using Flink CEP.  Again, have not tried Table API.
> 
> 



Re: Potential bug in Flink SQL HOP and TUMBLE operators

2018-09-17 Thread Xingcan Cui
Hi John,

I’ve not dug into this yet, but IMO, it shouldn’t be the case. I just wonder 
how do you judge that the data in the first five seconds are not processed by 
the system?

Best,
Xingcan

> On Sep 17, 2018, at 11:21 PM, John Stone  wrote:
> 
> Hello,
> 
> I'm checking if this is intentional or a bug in Apache Flink SQL (Flink 
> 1.6.0).
> 
> I am using processing time with a RocksDB backend.  I have not checked if 
> this issue is also occurring in the Table API.  I have not checked if this 
> issue also exists for event time (although I suspect it does).
> 
> Consider the following two queries:
> 
> "SELECT foo, COUNT(bar)
> FROM MyTable
> WHERE  faz = 'xyz'
> GROUP BY HOP(myTime, INTERVAL '1' SECOND, INTERVAL '5' SECOND), foo"
> 
> and
> 
> "SELECT foo, COUNT(bar)
> FROM MyTable
> WHERE  faz = 'xyz'
> GROUP BY TUMBLE(myTime, INTERVAL '5' SECOND), foo"
> 
> I have found in my testing for both of the above that events received in the 
> first 5 seconds are ignored.  In other words, the first window interval is 
> effectively a black hole, and only events which are received starting after 
> the first 5 seconds of the stream being "up" are processed.
> 
> Is this ignoring of events during the first interval a bug or intentional?
> 
> Many thanks,
> 
> John



Re: What's the advantage of using BroadcastState?

2018-08-27 Thread Xingcan Cui
Hi Radu,

I cannot make a full understanding of your question but I guess the answer is 
NO.

The broadcast state pattern just provides you with an automatic data 
broadcasting and a bunch of map states to cache the "low-throughput” patterns. 
Also, to keep consistency, it forbid the `processElement()` to modify the 
states. But this API does not really broadcast the states. You should keep the 
logic for `processBraodcastElement()` deterministic. Maybe the equation below 
could make the pattern clear.

 +  =  = 

Best,
Xingcan

> On Aug 27, 2018, at 10:23 PM, Radu Tudoran  wrote:
> 
> Hi Fabian,
>  
> Thanks for the blog post about broadcast state. I have a question with 
> respect to the update capabilities of the broadcast state:
>  
> Assume you do whatever processing logic in the main processElement function 
> .. and at a given context marker you 1) would change a local field marker, to 
> 2) signal that next time the broadcast function is triggered a special 
> pattern should be created and broadcasted.
>  
> My question is: is such a behavior allowed? Would the new special Pattern 
> that originates in an operator be shared across the other instances of the 
> KeyedProcessFunction?
>  
>  
> public static class PatternEvaluator
>  extends KeyedBroadcastProcessFunction Pattern>> {
> 
> public bolean test = false;
>  
>   @Override
>   public void processElement(
>  Action action, 
>  ReadOnlyContext ctx, 
>  Collector> out) throws Exception {
>  
>//…logic
>   
>if (..whatever context) {
>   Test = true;
>}
>  
>}
> 
>  @Override
>  public void processBroadcastElement(
>  Pattern pattern, 
>  Context ctx, 
>  Collector> out) throws Exception {
>// store the new pattern by updating the broadcast state
>   
>  BroadcastState bcState =
>  ctx.getBroadcastState(new MapStateDescriptor<>("patterns", Types.VOID, 
> Types.POJO(Pattern.class)));
>// storing in MapState with null as VOID default value
>bcState.put(null, pattern);
> 
>If (test) {
>bcState.put(null, new Pattern(test) );
>}
>  
>  }
> }
>  
>  
> Dr. Radu Tudoran
> Staff Research Engineer - Big Data Expert
> IT R Division
>  
> 
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> German Research Center
> Munich Office
> Riesstrasse 25, 80992 München
>  
> E-mail: radu.tudo...@huawei.com 
> Mobile: +49 15209084330
> Telephone: +49 891588344173
>  
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com 
> 
> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
> Managing Director: Bo PENG, Qiuen Peng, Shengli Wang
> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
> Geschäftsführer: Bo PENG, Qiuen Peng, Shengli Wang
> This e-mail and its attachments contain confidential information from HUAWEI, 
> which is intended only for the person or entity whose address is listed 
> above. Any use of the information contained herein in any way (including, but 
> not limited to, total or partial disclosure, reproduction, or dissemination) 
> by persons other than the intended recipient(s) is prohibited. If you receive 
> this e-mail in error, please notify the sender by phone or email immediately 
> and delete it!
>  
> From: Fabian Hueske [mailto:fhue...@gmail.com ] 
> Sent: Monday, August 20, 2018 9:40 AM
> To: Paul Lam mailto:paullin3...@gmail.com>>
> Cc: Rong Rong mailto:walter...@gmail.com>>; Hequn Cheng 
> mailto:chenghe...@gmail.com>>; user 
> mailto:user@flink.apache.org>>
> Subject: Re: What's the advantage of using BroadcastState?
>  
> Hi,
>  
> I've recently published a blog post about Broadcast State [1].
>  
> Cheers,
> Fabian
>  
> [1] 
> https://data-artisans.com/blog/a-practical-guide-to-broadcast-state-in-apache-flink
>  
> 
>  
> 2018-08-20 3:58 GMT+02:00 Paul Lam  >:
> Hi Rong, Hequn
>  
> Your answers are very helpful! Thank you!
>  
> Best Regards,
> Paul Lam
> 
> 
> 在 2018年8月19日,23:30,Rong Rong  > 写道:
>  
> Hi Paul,
>  
> To add to Hequn's answer. Broadcast state can typically be used as "a 
> low-throughput stream containing a set of rules which we want to evaluate 
> against all elements coming from another stream" [1] 
> So to add to the difference list is: whether it is "broadcast" across all 
> keys if processing a keyed stream. This is typically when it is not possible 
> to derive same key field using KeySelector in CoStream.
> Another additional difference is performance: BroadcastStream is "stored 
> locally and is used to process all incoming elements on the other stream" 
> thus requires to carefully manage the size of the BroadcastStream.
>  
> [1]: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/state/broadcast_state.html
>  
> 

Re: Will idle state retention trigger retract in dynamic table?

2018-08-21 Thread Xingcan Cui
Hi Henry,

Idle state retention is just making a trade-off between the accuracy and the 
storage consumption. It can meet part of the calculation requirements in the 
stream environment, but not all. For instance, in your use case, if there 
exists a TTL for each article, their praise states can be safely removed after 
a period of time. Otherwise, inconsistencies are unavoidable.

We admit that there should be other state retention mechanisms which can be 
applied in different scenarios. However, for now, setting a larger retention 
time or simply omitting this config seems to be the only choices.

Best,
Xingcan

> On Aug 21, 2018, at 6:03 PM, 徐涛  wrote:
> 
> Hi Fabian,
>   Is the behavior a bit weird? Because it leads to data inconsistency.
> 
> Best,
> Henry
> 
>> 在 2018年8月21日,下午5:14,Fabian Hueske > > 写道:
>> 
>> Hi,
>> 
>> In the given example, article_id 123 will always remain in the external 
>> storage. The state is removed and hence it cannot be retracted anymore.
>> Once the state was removed and the count reaches 10, a second record for 
>> article_id 123 will be emitted to the data store.
>> 
>> As soon as you enable state retention and state is needed that was removed, 
>> the query result can become inconsistent.
>> 
>> Best, Fabian
>> 
>> 2018-08-21 10:52 GMT+02:00 徐涛 > >:
>> Hi Fabian,
>>  SELECT article_id FROM praise GROUP BY article_id having count(1)>=10
>>  If article_id 123 has 100 praises and remains its state in the dynamic 
>> table ,and when the time passed, its state is removed, but later the 
>> article_id 123 has never reached to 10 praises.
>>  How can other program know that the state is been removed? Because the 
>> sink currently has the praises count stored as 100, it is not consistent as 
>> the dynamic table.
>> 
>> Best, 
>> Henry
>> 
>> 
>>> 在 2018年8月21日,下午4:16,Fabian Hueske >> > 写道:
>>> 
>>> Hi,
>>> 
>>> No, it won't. I will simply remove state that has not been accessed for the 
>>> configured time but not change the result.
>>> For example, if you have a GROUP BY aggregation and the state for a 
>>> grouping key is removed, the operator will start a new aggregation if a 
>>> record with the removed grouping key arrives.
>>> 
>>> Idle state retention is not meant to affect the semantics of a query. 
>>> The semantics of updating the result should be defined in the query, e.g., 
>>> with a WHERE clause that removes all records that are older than 1 day 
>>> (note, this is not supported yet).
>>> 
>>> Best, Fabian
>>> 
>>> 2018-08-21 10:04 GMT+02:00 徐涛 >> >:
>>> Hi All,
>>> Will idle state retention trigger retract in dynamic table?
>>> 
>>> Best,
>>> Henry
>>> 
>> 
>> 
> 



Re: CoFlatMapFunction with more than two input streams

2018-08-15 Thread Xingcan Cui
Hi Averell,

With the CoProcessFunction, you could get access to the time-related services 
which may be useful when maintaining the elements in Stream_C and you could get 
rid of type casting with the Either class.

Best,
Xingcan

> On Aug 15, 2018, at 3:27 PM, Averell  wrote:
> 
> Thank you Vino & Xingcan.
> @Vino: could you help explain more details on using DBMS? Would that be with
> using TableAPI, or you meant directly reading DBMS data inside the
> ProcessFunction?
> 
> @Xingcan: I don't know what are the benefits of using CoProcess over
> RichCoFlatMap in this case.
> Regarding using Either wrapper, as my understanding, I would need to use
> that both in my sources (stream_A and B) and in the
> CoProcess/CoFlatMapFunction. Then using a super class Animal would be more
> convenient, wouldn't it?
> 
> Thanks and regards,
> Averell
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: CoFlatMapFunction with more than two input streams

2018-08-15 Thread Xingcan Cui
Hi Averell,

I am also in favor of option 2. Besides, you could use CoProcessFunction 
instead of CoFlatMapFunction and try to wrap elements of stream_A and stream_B 
using the `Either` class.

Best,
Xingcan

> On Aug 15, 2018, at 2:24 PM, vino yang  wrote:
> 
> Hi Averell,
> 
> As far as these two solutions are concerned, I think you can only choose 
> option 2, because as you have stated, the current Flink DataStream API does 
> not support the replacement of one of the input stream types of 
> CoFlatMapFunction. Another choice:
> 
> 1. Split it into two separate jobs. But in comparison, I still think that 
> Option 2 is better.
> 2. Since you said that stream_c is slower and has fewer updates, if it is not 
> very large, you can store it in the RDBMS and then join it with stream_a and 
> stream_b respectively (using CoFlatMapFunction as well).
> 
> I think you should give priority to your option 2.
> 
> Thanks, vino.
> 
> Averell mailto:lvhu...@gmail.com>> 于2018年8月15日周三 下午1:51写道:
> Hi,
> 
> I have stream_A of type "Dog", which needs to be transformed using data from
> stream_C of type "Name_Mapping". As stream_C is a slow one (data is not
> being updated frequently), to do the transformation I connect two streams,
> do a keyBy, and then use a RichCoFlatMapFunction in which mapping data from
> stream_C is saved into a State (flatMap1 generates 1 output, while flatMap2
> is just to update State table, not generating any output).
> 
> Now I have another stream B of type "Cat", which also needs to be
> transformed using data from stream_C. After that transformation,
> transformed_B will go through a completely different pipeline from
> transformed A. 
> 
> I can see two approaches for this:
> 1. duplicate stream_C and the RichCoFlatMapFunction and apply on stream_B
> 2. create a new stream D of type "Animal", transform it with C, then split
> the result into two streams using split/select using case class pattern
> matching.
> 
> My question is which option should I choose?
> With option 1, at least I need to maintain two State tables, let alone the
> cost for duplicating stream (I am not sure how expensive this is in term of
> resource), and the requirement on duplicating the CoFlatMapFunction (*).
> With option 2, there's additional cost coming from unioning,
> splitting/selecting, and type-casting at the final streams. 
> Is there any better option for me?
> 
> Thank you very much for your support.
> Regards,
> Averell
> 
> (*) I am using Scala, and I tried to create a RichCoFlatMapFunction of type
> [Animal, Name_Mapping] but it cannot be used for a stream of [Dog,
> Name_Mapping] or [Cat, Name_Mapping]. Thus I needed to duplicate the
> Function as well.
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ 
> 



Re: Flink wrong Watermark in Periodic watermark

2018-07-30 Thread Xingcan Cui
HI Soheil,

That may relate to your parallelism since each extractor instance compute its 
own watermarks. Try to print the max timestamps with the current thread’s name 
and you will notice this.

Best,
Xingcan

> On Jul 30, 2018, at 3:05 PM, Soheil Pourbafrani  wrote:
> 
> Using Flink EventTime feature, I implement the class 
> AssignerWithPeriodicWatermark such that:
> 
> public static class SampleTimestampExtractor implements 
> AssignerWithPeriodicWatermarks> {
> private static final long serialVersionUID = 1L;
> private long MAX_TIMESTAMP;
> private final long DELEY = 3000;
> 
> 
> @Override
> public long extractTimestamp(Tuple3 t, long l) {
> long timestamp = t.f1 ;
> MAX_TIMESTAMP =  Math.max(timestamp , MAX_TIMESTAMP);
> System.out.println("Max TimeStamp : " + MAX_TIMESTAMP);
> return timestamp ;
> }
> 
> @Nullable
> @Override
> public Watermark getCurrentWatermark() {
> System.out.println("Current WatreMark : " + (MAX_TIMESTAMP - DELEY));
> return new Watermark(MAX_TIMESTAMP - DELEY);
> }
> }
> In addition, I set the watermark interval to 100 milliseconds:
> env.getConfig().setAutoWatermarkInterval(100);
> But when I check the logs, some watermarks are -3000, so in 
> getCurrentWatermark method, it considers the MAX_TIMESTAMP zero (0 - 3000 = 
> -3000), while I can see in the logs that the MAX_TIMESTAMP has a value 
> greater than zero!
> Here is a part of the output:
> Max TimeStamp : 1532934243136
> Max TimeStamp : 1532934243136
> Max TimeStamp : 1532934243144
> Max TimeStamp : 1532934243144
> Max TimeStamp : 1532934243152
> Max TimeStamp : 1532934243152
> Max TimeStamp : 1532934243160
> Max TimeStamp : 1532934243160
> Max TimeStamp : 1532934243168
> Max TimeStamp : 1532934243168
> Current WatreMark : 1532934240168
> Current WatreMark : -3000
> Current WatreMark : -3000
> Current WatreMark : 1532934240168
> Max TimeStamp : 1532934243176
> Max TimeStamp : 1532934243176
> Max TimeStamp : 1532934243184
> Max TimeStamp : 1532934243200
> Max TimeStamp : 1532934243208
> Max TimeStamp : 1532934243184
> 
> 



Re: streaming predictions

2018-07-22 Thread Xingcan Cui
Hi Cederic,

If the model is a simple function, you can just load it and make predictions 
using the map/flatMap function in the StreamEnvironment.

But I’m afraid the model trained by Flink-ML should be a “batch job", whose 
predict method takes a Dataset as the parameter and outputs another Dataset as 
the result. That means you cannot easily apply the model on streams, at least 
for now.

There are two options to solve this. (1) Train the dataset using another 
framework to produce a simple function. (2) Adjust your model serving as a 
series of batch jobs.

Hope that helps,
Xingcan

> On Jul 22, 2018, at 8:56 PM, Hequn Cheng  wrote:
> 
> Hi Cederic,
> 
> I am not familiar with SVM or machine learning but I think we can work it out 
> together.
> What problem have you met when you try to implement this function? From my 
> point of view, we can rebuild the model in the flatMap function and use it to 
> predict the input data. There are some flatMap documents here[1]. 
> 
> Best, Hequn
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/#datastream-transformations
>  
> 
> 
> 
> 
> 
> 
> On Sun, Jul 22, 2018 at 4:12 PM, Cederic Bosmans  > wrote:
> Dear
> 
> My name is Cederic Bosmans and I am a masters student at the Ghent University 
> (Belgium).
> I am currently working on my masters dissertation which involves Apache Flink.
> 
> I want to make predictions in the streaming environment based on a model 
> trained in the batch environment.
> 
> I trained my SVM-model this way:
> val svm2 = SVM()
> svm2.setSeed(1)
> svm2.fit(trainLV)
> val testVD = testLV.map(lv => (lv.vector, lv.label))
> val evalSet = svm2.evaluate(testVD)
> 
> and saved the model: 
> val modelSvm = svm2.weightsOption.get
> 
> Then I have an incoming datastream in the streaming environment:
> dataStream[(Int, Int, Int)]
> which should be bininary classified using this trained SVM model.
> 
> Since the predict function does only support DataSet and not DataStream, on 
> stackoverflow a flink contributor mentioned that this should be done using a 
> map/flatMap function.
> Unfortunately I am not able to work this function out.
> 
> It would be incredible for me if you could help me a little bit further!
> 
> 
> Kind regards and thanks in advance
> Cederic Bosmans
> 



Re: data enrichment via endpoint, serializable issue

2018-07-19 Thread Xingcan Cui
Hi Steffen,

You could make the class `TextAPIClient` serializable, or use `RichMapFunction` 
[1] and instantiate all the required objects in its `open()` method.

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/api_concepts.html#rich-functions
 


Best,
Xingcan

> On Jul 19, 2018, at 6:56 PM, Steffen Wohlers  wrote:
> 
> Hi all,
> 
> I’m new to Apache Flink and I have the following issue:
> 
> I would like to enrich data via map function. For that I call a method which 
> calls an endpoint but I get following error message 
> 
> „The implementation of the MapFunction is not serializable. The object 
> probably contains or references non serializable fields.
>   at 
> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99)“ …
> „Caused by: java.io.NotSerializableException: 
> com.aylien.textapi.TextAPIClient“
> 
> Is there a smart way to fix that issue?
> 
> Regards,
> 
> Steffen
> 
> 
> Map Function:
> DataStream tweetSentimentDataStream = noRTDataStream
> .map(new MapFunction() {
> @Override
> public TweetSentiment map(Tweet tweet) throws Exception {
> String polarity = "good";
> polarity = test.testMethod();
> polarity =  sentimentAnalysis.sentiment(tweet.getText());
> return new TweetSentiment(tweet, polarity, 0);
> }
> });
> 
> Class:
> 
> public class SentimentAnalysis implements Serializable {
> 
> private TextAPIClient _sentimentClient;
> 
> public SentimentAnalysis () {
> _sentimentClient = new TextAPIClient(„xxx", „xxx");
> }
> 
> public String sentiment(String text)  throws Exception{
> SentimentParams sentimentParams = new SentimentParams(text, null, 
> null);
> Sentiment sentiment = _sentimentClient.sentiment(sentimentParams);
> 
> return sentiment.getPolarity();
> }
> (Analysis via Aylien)



Re: Window Stream - Need assistance

2018-07-18 Thread Xingcan Cui
Hi Rakkesh,

The `GlobalWindow` is commonly used for custom window assignment and you should 
specify a `trigger` for it [1].
If the built-in window (e.g., tumbling window or sliding window) join in 
DataStream API fails to meet the requirements,
you could try the time-windowed join in Table/SQL API [2].

Hope that helps.

Best,
Xingcan 

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/windows.html#global-windows
 
<https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/windows.html#global-windows>
[2] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql.html#joins 
<https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql.html#joins>


> On Jul 18, 2018, at 5:55 PM, Titus Rakkesh  wrote:
> 
> Thanks for the reply. I have called "env.execute()". But nothing getting 
> printed. I have a doubt whether "implemented function" is correct with my 
> "requirement". Please assist. 
> 
> On Wed, Jul 18, 2018 at 2:57 PM, Xingcan Cui  <mailto:xingc...@gmail.com>> wrote:
> Hi Rakkesh,
> 
> Did you call `execute()`on your `StreamExecutionEnvironment`?
> 
> Best,
> Xingcan 
> 
> > On Jul 18, 2018, at 5:12 PM, Titus Rakkesh  > <mailto:titus.rakk...@gmail.com>> wrote:
> > 
> > Dear Friends,
> >  I have 2 streams of the below data types.
> > 
> > DataStream> splittedActivationTuple;
> > 
> > DataStream> unionReloadsStream;
> > 
> > These streams are getting data from Kafka and getting data in different 
> > frequencies. "unionReloadsStream"  will receive more data than 
> > "splittedActivationTuple". I need to store  "splittedActivationTuple" in a 
> > Window of 24 hours and manipulate its "Double" field, if a matching data 
> > comes from unionReloadsStream (String field is the common field).
> > 
> > So I wrote the following method to do this task.
> > 
> > 
> > public static DataStream> 
> > joinActivationsBasedOnReload(
> > DataStream> activationsStream,
> > DataStream> unifiedReloadStream) {
> > 
> > return activationsStream.join(unifiedReloadStream).where(new 
> > ActivationStreamSelector())
> > .equalTo(new 
> > ReloadStreamSelector()).window(GlobalWindows.create())
> > .evictor(TimeEvictor.of(Time.of(24, TimeUnit.HOURS)))
> > .apply(new JoinFunction, 
> > Tuple2, Tuple3>() {
> > private static final long serialVersionUID = 1L;
> > @Override
> > public Tuple3 
> > join(Tuple3 first,
> > Tuple2 second) {
> > return new Tuple3 > Double>(first.f0, first.f1, first.f2 + second.f1);
> > }
> > });
> > }
> > 
> > 
> > and calling as,
> > 
> > DataStream> activationWindowStream = 
> > joinActivationsBasedOnReload(splittedActivationTuple, unionReloadsStream);
> > 
> > activationWindowStream.print();
> > 
> > 
> > But I couldn't see anything printing. 
> > 
> > I expected "activationWindowStream" to contain the 
> > "splittedActivationTuple" (smaller set) data and the Double value 
> > accumulated if  unionReloadsStream's incoming elements have a matching 
> > "String" field. But that is not happening. Where I am missing?
> > 
> > Thanks,
> > Rakkesh
> 
> 



Re: Window Stream - Need assistance

2018-07-18 Thread Xingcan Cui
Hi Rakkesh,

Did you call `execute()`on your `StreamExecutionEnvironment`?

Best,
Xingcan 

> On Jul 18, 2018, at 5:12 PM, Titus Rakkesh  wrote:
> 
> Dear Friends,
>  I have 2 streams of the below data types.
> 
> DataStream> splittedActivationTuple;
> 
> DataStream> unionReloadsStream;
> 
> These streams are getting data from Kafka and getting data in different 
> frequencies. "unionReloadsStream"  will receive more data than 
> "splittedActivationTuple". I need to store  "splittedActivationTuple" in a 
> Window of 24 hours and manipulate its "Double" field, if a matching data 
> comes from unionReloadsStream (String field is the common field).
> 
> So I wrote the following method to do this task.
> 
> 
> public static DataStream> 
> joinActivationsBasedOnReload(
> DataStream> activationsStream,
> DataStream> unifiedReloadStream) {
> 
> return activationsStream.join(unifiedReloadStream).where(new 
> ActivationStreamSelector())
> .equalTo(new 
> ReloadStreamSelector()).window(GlobalWindows.create())
> .evictor(TimeEvictor.of(Time.of(24, TimeUnit.HOURS)))
> .apply(new JoinFunction, 
> Tuple2, Tuple3>() {
> private static final long serialVersionUID = 1L;
> @Override
> public Tuple3 
> join(Tuple3 first,
> Tuple2 second) {
> return new Tuple3(first.f0, 
> first.f1, first.f2 + second.f1);
> }
> });
> }
> 
> 
> and calling as,
> 
> DataStream> activationWindowStream = 
> joinActivationsBasedOnReload(splittedActivationTuple, unionReloadsStream);
> 
> activationWindowStream.print();
> 
> 
> But I couldn't see anything printing. 
> 
> I expected "activationWindowStream" to contain the "splittedActivationTuple" 
> (smaller set) data and the Double value accumulated if  unionReloadsStream's 
> incoming elements have a matching "String" field. But that is not happening. 
> Where I am missing?
> 
> Thanks,
> Rakkesh



Re: Can not get OutPutTag datastream from Windowing function

2018-07-17 Thread Xingcan Cui
Hi Soheil,

The `getSideOutput()` method is defined on the operator instead of the 
datastream.
You can invoke it after any action (e.g., map, window) performed on a 
datastream.

Best,
Xingcan

> On Jul 17, 2018, at 3:36 PM, Soheil Pourbafrani  wrote:
> 
> Hi, according to the documents I tried to get late data using side output.
> 
> final OutputTag> lateOutputTag = new 
> OutputTag>("late-data"){};
> DataStream> res = aggregatedTuple
> .assignTimestampsAndWatermarks(new Bound())
> }).keyBy(1).timeWindow(Time.milliseconds(160))/*.countWindow(3)*/
> .allowedLateness(Time.milliseconds(2))
> .sideOutputLateData(lateOutputTag)
> .reduce(Do some process);
> 
> When trying to store late data in a Datastream (As shown in document):
> DataStream> lateData = res.
> there is no predefined getSideOutput method on DataStream res!
> But if I call getSideOutput just after reduce function, it is known! But I 
> don't want to save late data on res variable and I want to save them on 
> another variable!
> DataStream> res = aggregatedTuple
> .assignTimestampsAndWatermarks(new Bound())
> }).keyBy(1).timeWindow(Time.milliseconds(160))/*.countWindow(3)*/
> .allowedLateness(Time.milliseconds(2))
> .sideOutputLateData(lateOutputTag)
> .reduce(Do some process)
>  .getSideoutput(lateOutputTag);
> What is the problem here?
> 
> 



Re: Issues with Flink1.5 SQL-Client

2018-07-03 Thread Xingcan Cui
Hi Ashwin,

I encountered this problem before. You should make sure that the version for 
your Flink cluster and the version you run the SQL-Client are exactly the same.

Best,
Xingcan

> On Jul 3, 2018, at 10:00 PM, Chesnay Schepler  wrote:
> 
> Can you provide us with the JobManager logs?
> 
> Based on the exception i concur with Timo, it looks like the server is either 
> running 1.4 or below, or was started in the legacy mode.
> 
> On 03.07.2018 15:42, Ashwin Sinha wrote:
>> Hi Timo, 
>> 
>> Our flink version is 1.5.0
>> 
>> We followed this 
>> 
>>  documentation and started flink cluster same way. Also we are getting more 
>> logs https://pastebin.com/fGTW9s2b 
>> On Tue, Jul 3, 2018 at 7:01 PM Timo Walther > > wrote:
>> Hi Ashwin,
>> 
>> which Flink version is your (local cluster)? Are you executing Flink in the 
>> default (new deployment) or legacy mode? The SQL client supports only the 
>> new "FLIP-6" deployment model. I'm not sure about your error message but it 
>> might be related to that.
>> 
>> Regards,
>> Timo
>> 
>> 
>> Am 03.07.18 um 14:42 schrieb Ashwin Sinha:
>>> Hi folks,
>>> 
>>> We are trying to setup flink sql client 
>>> .
>>>  It is still in development phase, but flink-1.5 contains beta version of 
>>> this feature.
>>> 
>>> Our environment:
>>> 
>>> Kafka-
>>> Topic: test_flink_state_check
>>> Kafka Key: null
>>> Kafka Value example: {"state": 299, "number": 299} 
>>> 
>>> Flink-
>>> Jobmanager: http://localhost:8080/ 
>>> conf/sql-client-defaults.yaml: https://pastebin.com/zGb3qs7E 
>>> 
>>> 
>>> Then we start sql-client shell by 
>>> ./bin/sql-client.sh embedded
>>> 
>>> 
>>> When we run
>>> select * from [tablename]
>>> the result shell hangs and after sometime it exits without printing any 
>>> data and give following error on screen: https://pastebin.com/wuitQx1a 
>>> 
>>> 
>>> Complete stacktrace in flink logs: https://pastebin.com/mgk06xeG 
>>> 
>>> 
>>> Please assist us in resolving this!
>>> 
>>> -- 
>>> Ashwin Sinha | Data Engineer
>>> ashwin.si...@go-mmt.com  | 9452075361
>>>   
>>>  2nd floor, Tower B Divyashree Technopolis Yemalur, 
>>> Bangalore, Karnataka 560037, India
>>>  
>>> 
>>> 
>>> ::DISCLAIMER::
>>> 
>>> 
>>> 
>>> 
>>> 
>>> This message is intended only for the use of the addressee and may contain 
>>> information that is privileged, confidential and exempt from disclosure 
>>> under applicable law. If the reader of this message is not the intended 
>>> recipient, or the employee or agent responsible for delivering the message 
>>> to the intended recipient, you are hereby notified that any dissemination, 
>>> distribution or copying of this communication is strictly prohibited. If 
>>> you have received this e-mail in error, please notify us immediately by 
>>> return e-mail and delete this e-mail and all attachments from your system.
>>> 
>> 
>> 
>> 
>> -- 
>> Ashwin Sinha | Data Engineer
>> ashwin.si...@go-mmt.com  | 9452075361
>>   
>>  2nd floor, Tower B Divyashree Technopolis Yemalur, 
>> Bangalore, Karnataka 560025, India
>>  
>> 
>> 
>> ::DISCLAIMER::
>> 
>> 
>> 
>> 
>> 
>> This message is intended only for the use of the addressee and may contain 
>> information that is privileged, confidential and exempt from disclosure 
>> under applicable law. If the reader of this message is not the intended 
>> recipient, or the employee or agent responsible for delivering the message 
>> to the intended recipient, you are hereby notified that any dissemination, 
>> distribution or copying of this communication is strictly prohibited. If you 
>> have received this e-mail in error, please notify us immediately by return 
>> e-mail and delete this e-mail and all attachments from your system.
>> 
> 



Re: Replaying logs with microsecond delay

2018-05-15 Thread Xingcan Cui
Yes, that makes sense and maybe you could also generate dynamic intervals 
according to the time spans.

Thanks,
Xingcan

> On May 16, 2018, at 9:41 AM, Dhruv Kumar <gargdhru...@gmail.com> wrote:
> 
> As a part of my PhD research, I have been working on few optimization 
> algorithms which try to jointly optimize delay and traffic (WAN traffic) in a 
> geo-distributed streaming analytics setting. So, to show that the 
> optimization actually works in real life, I am trying to implement these 
> optimization algorithms on top of Apache Flink. For emulating a real life 
> example, I need to generate a stream of records with some realistic delay 
> (order of microseconds for fast incoming stream) between any two records. 
> This stream will then by ingested and processed by Flink. 
> 
> Using the timestamps as is, in the form of event timestamps, only proves the 
> algorithms from a theoretical/simulation perspective. 
> 
> Hope this answers your question to some extent at least. Let me know. 
> 
> Thanks!
> --
> Dhruv Kumar
> PhD Candidate
> Department of Computer Science and Engineering
> University of Minnesota
> www.dhruvkumar.me <http://www.dhruvkumar.me/>
> 
>> On May 15, 2018, at 20:29, Xingcan Cui <xingc...@gmail.com 
>> <mailto:xingc...@gmail.com>> wrote:
>> 
>> Hi Dhruv,
>> 
>> since there are timestamps associated with each record, I was wondering why 
>> you try to replay them with a fixed interval. Can you give a little 
>> explanation about that?
>> 
>> Thanks,
>> Xingcan
>> 
>>> On May 16, 2018, at 2:11 AM, Ted Yu <yuzhih...@gmail.com 
>>> <mailto:yuzhih...@gmail.com>> wrote:
>>> 
>>> Please see the following:
>>> 
>>> http://www.rationaljava.com/2015/10/measuring-microsecond-in-java.html 
>>> <http://www.rationaljava.com/2015/10/measuring-microsecond-in-java.html>
>>> 
>>> https://stackoverflow.com/questions/11498585/how-to-suspend-a-java-thread-for-a-small-period-of-time-like-100-nanoseconds
>>>  
>>> <https://stackoverflow.com/questions/11498585/how-to-suspend-a-java-thread-for-a-small-period-of-time-like-100-nanoseconds>
>>> 
>>> On Tue, May 15, 2018 at 10:40 AM, Dhruv Kumar <gargdhru...@gmail.com 
>>> <mailto:gargdhru...@gmail.com>> wrote:
>>> Hi
>>> 
>>> I am trying to replay a log file in which each record has a timestamp 
>>> associated with it. The time difference between the records is of the order 
>>> of microseconds. I am trying to replay this log maintaining the same delay 
>>> between the records (using Thread.sleep()) and sending it to a socket. And 
>>> then the Flink program reads the incoming data from this socket. Currently, 
>>> replay of the entire log file takes much more time (3 times) then the 
>>> expected time (last_timstamp - first_timstamp).
>>> 
>>> I wanted to know what are the standard ways of replaying log files if one 
>>> wants to maintain the same arrival delay between the records.
>>> 
>>> Let me know if I am not clear above.
>>> 
>>> Thanks 
>>> --
>>> Dhruv Kumar
>>> PhD Candidate
>>> Department of Computer Science and Engineering
>>> University of Minnesota
>>> www.dhruvkumar.me <http://www.dhruvkumar.me/>
>>> 
>> 
> 



Re: Better way to clean up state when connect

2018-05-15 Thread Xingcan Cui
Hi Chengzhi,

more details about partitioning mechanisms can be found at 
https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/#physical-partitioning
 
<https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/#physical-partitioning>.

Best,
Xingcan

> On May 16, 2018, at 3:25 AM, Chengzhi Zhao <w.zhaocheng...@gmail.com> wrote:
> 
> Hi Xingcan,
> 
> Thanks a lot for providing your inputs on the possible solutions here. Can 
> you please clarify on how to broadcasted in Flink? 
> Appreciate your help!!
> 
> Best,
> Chengzhi
> 
> On Tue, May 15, 2018 at 10:22 AM, Xingcan Cui <xingc...@gmail.com 
> <mailto:xingc...@gmail.com>> wrote:
> Hi Chengzhi,
> 
> currently, it's impossible to process both a stream and a (dynamically 
> updated) dataset in a single job. I'll provide you with some workarounds, all 
> of which are based on that the file for active test names is not so large.
> 
> (1) You may define your own stream source[1] which should be aware of the 
> file update, and keep the input file as a stream (the Stream B as you 
> described). Some special records can be inserted to indicate the start and 
> end of an update. Note that instead of using the `keyBy()` method, the Stream 
> B should be broadcasted, while the Stream A can be partitioned arbitrarily. 
> With this method, you can clean and rebuild the states according to the 
> start/end indicators.
> 
> (2) You may also take the file of active test names as external states and 
> set processing time timers[2] to update them regularly (e.g., with 1 min 
> interval) in a ProcessFunction[3].
> 
> IMO, the watermark may not work as expected for your use case. Besides, since 
> the file will be updated unpredictably, it's hard to guarantee the precision 
> of results.
> 
> Hope that helps,
> Xingcan
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/datastream_api.html#data-sources
>  
> <https://ci.apache.org/projects/flink/flink-docs-master/dev/datastream_api.html#data-sources>
> [2] 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/process_function.html#timers
>  
> <https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/process_function.html#timers>
> [3] 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/process_function.html#the-processfunction
>  
> <https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/process_function.html#the-processfunction>
> 
>> On May 14, 2018, at 10:05 PM, Chengzhi Zhao <w.zhaocheng...@gmail.com 
>> <mailto:w.zhaocheng...@gmail.com>> wrote:
>> 
>> Hi Xingcan,
>> 
>> Thanks for your response, to give your more background about my use case, I 
>> have Stream B with some split test name, and Stream A will be the actual 
>> test. I want to have Stream A connect to Stream B to figure out whether this 
>> test is still active or not. I am not sure this is the right way to do: My 
>> watermark is based on event time for 15 mins, OnTimer will be emit that 
>> records after 15 mins. I was wondering if there is way to purge the state of 
>> entire Stream B so I can get all the active test, since the file will 
>> include all the updated split testing name so I can refresh the lookup.
>> 
>> Also, I am not sure if I am using the right operator here, or if there is a 
>> way to share variable globally so I can just perform filter on stream A.
>> Please let me know your thoughts and thanks for you suggestions again.
>> 
>> Regards,
>> Chengzhi
>> 
>> On Sat, May 12, 2018 at 8:55 PM, Xingcan Cui <xingc...@gmail.com 
>> <mailto:xingc...@gmail.com>> wrote:
>> Hi Chengzhi,
>> 
>> you said the Stream B which comes from a file will be updated unpredictably. 
>> I wonder if you could share more about how to judge an item (from Stream A I 
>> suppose) is not in the file and what watermark generation strategy did you 
>> choose?
>> 
>> Best,
>> Xingcan
>> 
>> > On May 12, 2018, at 12:48 AM, Chengzhi Zhao <w.zhaocheng...@gmail.com 
>> > <mailto:w.zhaocheng...@gmail.com>> wrote:
>> > 
>> > Hi there,
>> > 
>> > I have a use case to check for active ID, there are two streams and I 
>> > connect them: one has actual data (Stream A) and the other one is for 
>> > lookup purpose (Stream B), I am getting Stream B as a file which includes 
>> > all active ID, so inactive ID would not be show up on this list. I tried 
>> > to use watermark to clean up the state of inactivate ID, but the Stream B 
>> > updates is unpredictable so I want to keep everything in state until I 
>> > found the item is not in that file any more. 
>> > 
>> > Please suggest what is the best way to implement it in flink. Thanks in 
>> > advance for your help.
>> > 
>> > Regards,
>> > Chengzhi
>> > 
>> > 
>> 
>> 
> 
> 



Re: Replaying logs with microsecond delay

2018-05-15 Thread Xingcan Cui
Hi Dhruv,

since there are timestamps associated with each record, I was wondering why you 
try to replay them with a fixed interval. Can you give a little explanation 
about that?

Thanks,
Xingcan

> On May 16, 2018, at 2:11 AM, Ted Yu  wrote:
> 
> Please see the following:
> 
> http://www.rationaljava.com/2015/10/measuring-microsecond-in-java.html 
> 
> 
> https://stackoverflow.com/questions/11498585/how-to-suspend-a-java-thread-for-a-small-period-of-time-like-100-nanoseconds
>  
> 
> 
> On Tue, May 15, 2018 at 10:40 AM, Dhruv Kumar  > wrote:
> Hi
> 
> I am trying to replay a log file in which each record has a timestamp 
> associated with it. The time difference between the records is of the order 
> of microseconds. I am trying to replay this log maintaining the same delay 
> between the records (using Thread.sleep()) and sending it to a socket. And 
> then the Flink program reads the incoming data from this socket. Currently, 
> replay of the entire log file takes much more time (3 times) then the 
> expected time (last_timstamp - first_timstamp).
> 
> I wanted to know what are the standard ways of replaying log files if one 
> wants to maintain the same arrival delay between the records.
> 
> Let me know if I am not clear above.
> 
> Thanks 
> --
> Dhruv Kumar
> PhD Candidate
> Department of Computer Science and Engineering
> University of Minnesota
> www.dhruvkumar.me 
> 



Re: Better way to clean up state when connect

2018-05-15 Thread Xingcan Cui
Hi Chengzhi,

currently, it's impossible to process both a stream and a (dynamically updated) 
dataset in a single job. I'll provide you with some workarounds, all of which 
are based on that the file for active test names is not so large.

(1) You may define your own stream source[1] which should be aware of the file 
update, and keep the input file as a stream (the Stream B as you described). 
Some special records can be inserted to indicate the start and end of an 
update. Note that instead of using the `keyBy()` method, the Stream B should be 
broadcasted, while the Stream A can be partitioned arbitrarily. With this 
method, you can clean and rebuild the states according to the start/end 
indicators.

(2) You may also take the file of active test names as external states and set 
processing time timers[2] to update them regularly (e.g., with 1 min interval) 
in a ProcessFunction[3].

IMO, the watermark may not work as expected for your use case. Besides, since 
the file will be updated unpredictably, it's hard to guarantee the precision of 
results.

Hope that helps,
Xingcan

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/datastream_api.html#data-sources
[2] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/process_function.html#timers
[3] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/process_function.html#the-processfunction

> On May 14, 2018, at 10:05 PM, Chengzhi Zhao <w.zhaocheng...@gmail.com> wrote:
> 
> Hi Xingcan,
> 
> Thanks for your response, to give your more background about my use case, I 
> have Stream B with some split test name, and Stream A will be the actual 
> test. I want to have Stream A connect to Stream B to figure out whether this 
> test is still active or not. I am not sure this is the right way to do: My 
> watermark is based on event time for 15 mins, OnTimer will be emit that 
> records after 15 mins. I was wondering if there is way to purge the state of 
> entire Stream B so I can get all the active test, since the file will include 
> all the updated split testing name so I can refresh the lookup.
> 
> Also, I am not sure if I am using the right operator here, or if there is a 
> way to share variable globally so I can just perform filter on stream A.
> Please let me know your thoughts and thanks for you suggestions again.
> 
> Regards,
> Chengzhi
> 
> On Sat, May 12, 2018 at 8:55 PM, Xingcan Cui <xingc...@gmail.com 
> <mailto:xingc...@gmail.com>> wrote:
> Hi Chengzhi,
> 
> you said the Stream B which comes from a file will be updated unpredictably. 
> I wonder if you could share more about how to judge an item (from Stream A I 
> suppose) is not in the file and what watermark generation strategy did you 
> choose?
> 
> Best,
> Xingcan
> 
> > On May 12, 2018, at 12:48 AM, Chengzhi Zhao <w.zhaocheng...@gmail.com 
> > <mailto:w.zhaocheng...@gmail.com>> wrote:
> > 
> > Hi there,
> > 
> > I have a use case to check for active ID, there are two streams and I 
> > connect them: one has actual data (Stream A) and the other one is for 
> > lookup purpose (Stream B), I am getting Stream B as a file which includes 
> > all active ID, so inactive ID would not be show up on this list. I tried to 
> > use watermark to clean up the state of inactivate ID, but the Stream B 
> > updates is unpredictable so I want to keep everything in state until I 
> > found the item is not in that file any more. 
> > 
> > Please suggest what is the best way to implement it in flink. Thanks in 
> > advance for your help.
> > 
> > Regards,
> > Chengzhi
> > 
> > 
> 
> 



Re: Better way to clean up state when connect

2018-05-12 Thread Xingcan Cui
Hi Chengzhi,

you said the Stream B which comes from a file will be updated unpredictably. I 
wonder if you could share more about how to judge an item (from Stream A I 
suppose) is not in the file and what watermark generation strategy did you 
choose?

Best,
Xingcan

> On May 12, 2018, at 12:48 AM, Chengzhi Zhao  wrote:
> 
> Hi there,
> 
> I have a use case to check for active ID, there are two streams and I connect 
> them: one has actual data (Stream A) and the other one is for lookup purpose 
> (Stream B), I am getting Stream B as a file which includes all active ID, so 
> inactive ID would not be show up on this list. I tried to use watermark to 
> clean up the state of inactivate ID, but the Stream B updates is 
> unpredictable so I want to keep everything in state until I found the item is 
> not in that file any more. 
> 
> Please suggest what is the best way to implement it in flink. Thanks in 
> advance for your help.
> 
> Regards,
> Chengzhi
> 
> 



Re: How to use keyBy on ConnectedStream?

2018-05-10 Thread Xingcan Cui
Hi Ishwara,

the `keyBy()` method automatically ensures that records with the same key will 
be processed by the same instance of a CoFlatMap.

As for the exception, I suppose the types `MessageType1` and `MessageType1` are 
POJOs which should follow some rules [1]. 
Also, make sure that (1) `property1` and `property2` are not arrays; (2) their 
types have overridden the `hashCode()` method [2].

Hope that helps,
Xingcan

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/types_serialization.html#rules-for-pojo-types
 

[2] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/#datastream-transformations
 


> On May 10, 2018, at 10:43 PM, Ishwara Varnasi  wrote:
> 
> Hello,
> I am using ConnectedStream to process two different types of messages using 
> CoFlatMap. However, I would like to use keyBy on the ConnectedStream such 
> that messages with same value of certain property should always be sent to 
> same instance of CoFlatMap instance. So I've tried keyBy on ConnectedStream, 
> surprised to see that the return type is not grouped.
> 
> ConnectedStreams connect = 
> myDataStream1.connect(myDataStreamOther);
> connect = connect.keyBy("property1", "property2");
> // property1 is a valid property in MessageTyp1 and property2 is a valid 
> property of MessageType2
> However, I get following exception:
> Caused by: org.apache.flink.api.common.InvalidProgramException: This type 
> (GenericType) cannot be used as key.
> How to use keyBy with ConnectedStream and ensure that grouped messages are 
> handled by same instance of CoFlatMap?
> 
> thanks
> Ishwara Varnasi



Re: Slow watermark advances

2018-04-13 Thread Xingcan Cui
Yes, Chengzhi. That’s exactly what I mean. But you should be careful with the 
semantics of your pipeline. The problem cannot be gracefully solved if there’s 
a natural time offset between the two streams.

Best, Xingcan

> On 14 Apr 2018, at 4:00 AM, Chengzhi Zhao <w.zhaocheng...@gmail.com> wrote:
> 
> Hi Xingcan,
> 
> Thanks for your quick response and now I understand it better. To clarify, do 
> you mean try to add a static time when I override extractTimestamp function?
> 
> For example, 
> 
> override def extractTimestamp(element: MyEvent, previousElementTimestamp: 
> Long): Long = {
> val timestamp = element.getCreationTime() + 360L //1 hour delay
> currentMaxTimestamp = max(timestamp, currentMaxTimestamp)
> timestamp 
> }
> 
> Appreciate your help!
> 
> Best,
> Chengzhi
> 
> 
> On Fri, Apr 13, 2018 at 12:49 PM, Xingcan Cui <xingc...@gmail.com 
> <mailto:xingc...@gmail.com>> wrote:
> Hi Chengzhi,
> 
> currently, the watermarks of the two streams of a connected stream are 
> forcibly synchronized, i.e., the watermark is decided by the stream with a 
> larger delay. Thus the window trigger is also affected by this mechanism. 
> 
> As a workaround, you could try to add (or subtract) a static time offset to 
> one of your streams, which can make them more “close” to each other.
> 
> Best,
> Xingcan
> 
> 
>> On 13 Apr 2018, at 11:48 PM, Chengzhi Zhao <w.zhaocheng...@gmail.com 
>> <mailto:w.zhaocheng...@gmail.com>> wrote:
>> 
>> Hi, flink community,
>> 
>> I had an issue with slow watermark advances and needs some help here. So 
>> here is what happened: I have two streams -- A and B, and they perform 
>> co-process to join together and A has another steam as output. 
>> 
>> A --> Output
>> B --> (Connect A) --> Output
>> 
>> I used BoundedOutOfOrdernessGenerator [1] with both A and B stream with 2 
>> hours delay. The low watermark of A and output sink is within 2 hours 
>> window, however, the co-process end up with 10 hours low watermark late.
>> 
>> My setup is I am using file system as source, so every 15 mins there will be 
>> files been drop to a directory and flink pick them up from there. 
>> 
>> Please advise and appreciate it in advance!
>> 
>> [1] 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/event_timestamps_watermarks.html#with-periodic-watermarks
>>  
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/event_timestamps_watermarks.html#with-periodic-watermarks>
>> 
>> Best,
>> Chengzhi
>> 
> 
> 



Re: Slow watermark advances

2018-04-13 Thread Xingcan Cui
Hi Chengzhi,

currently, the watermarks of the two streams of a connected stream are forcibly 
synchronized, i.e., the watermark is decided by the stream with a larger delay. 
Thus the window trigger is also affected by this mechanism. 

As a workaround, you could try to add (or subtract) a static time offset to one 
of your streams, which can make them more “close” to each other.

Best,
Xingcan

> On 13 Apr 2018, at 11:48 PM, Chengzhi Zhao  wrote:
> 
> Hi, flink community,
> 
> I had an issue with slow watermark advances and needs some help here. So here 
> is what happened: I have two streams -- A and B, and they perform co-process 
> to join together and A has another steam as output. 
> 
> A --> Output
> B --> (Connect A) --> Output
> 
> I used BoundedOutOfOrdernessGenerator [1] with both A and B stream with 2 
> hours delay. The low watermark of A and output sink is within 2 hours window, 
> however, the co-process end up with 10 hours low watermark late.
> 
> My setup is I am using file system as source, so every 15 mins there will be 
> files been drop to a directory and flink pick them up from there. 
> 
> Please advise and appreciate it in advance!
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/event_timestamps_watermarks.html#with-periodic-watermarks
>  
> 
> 
> Best,
> Chengzhi
> 



Re: Sliding window based on event arrival

2018-03-12 Thread Xingcan Cui
Hi Miyuru,

what you need should be something like a `SlidingCountWindow`. Flink Datastream 
API has already provided a `countWindow()` method for that and a related 
example can be found here 
.

Hope that helps,
Xingcan

> On 12 Mar 2018, at 1:33 PM, Miyuru Dayarathna  wrote:
> 
> Hi,
> 
> I need to create a sliding window of 4 events in Flink streaming application. 
> The window should slide by one event per every new arriving event. 
> Furthermore, per each new arriving event I need to calculate the aggregate 
> sum of a particular field in the 4 events I have in the window. I have 
> referred [1] which describes how this can be implemented based on time. But 
> my use case if little different, hence I am wondering how the window can be 
> slided when a new event gets added to the window.
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/windows.html#sliding-windows
>  
> 
> 
> Thanks,
> Miyuru



Re: flink sql timed-window join throw "mismatched type" AssertionError on rowtime column

2018-03-09 Thread Xingcan Cui
Hi Yan,

I think you could try that as a workaround. Don’t forget to follow the 
DataStreamWindowJoin 
<https://github.com/apache/flink/blob/fddedda78ad03f1141f3e32f0e0f39c2e045df0e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala>
 to hold back watermarks. We’ll continue improving the SQL/Table API part.

Best,
Xingcan


> On 9 Mar 2018, at 4:08 AM, Yan Zhou [FDS Science] <yz...@coupang.com> wrote:
> 
> Hi Xingcan, Timo, 
> 
> Thanks for the information. 
> I am going to convert the result table to DataStream and follow the logic of 
> TimeBoundedStreamInnerJoin to do the timed-window join. Should I do this? Is 
> there any concern from performance or stability perspective?
> 
> Best
> Yan
> 
> From: Xingcan Cui <xingc...@gmail.com>
> Sent: Thursday, March 8, 2018 8:21:42 AM
> To: Timo Walther
> Cc: user; Yan Zhou [FDS Science]
> Subject: Re: flink sql timed-window join throw "mismatched type" 
> AssertionError on rowtime column
>  
> Hi Yan & Timo,
> 
> this is confirmed to be a bug and I’ve created an issue [1] for it.
> 
> I’ll explain more about this query. In Flink SQL/Table API, the DISTINCT 
> keyword will be implemented with an aggregation, which outputs a retract 
> stream [2]. In that situation, all the time-related fields will be 
> materialized as if they were common fields (with the timestamp type). 
> Currently, due to the semantics problem, the time-windowed join cannot be 
> performed on retract streams. But you could try non-windowed join [3] after 
> we fix this.
> 
> Best,
> Xingcan
> 
> [1] https://issues.apache.org/jira/browse/FLINK-8897 
> <https://issues.apache.org/jira/browse/FLINK-8897>
> [2] 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming.html#table-to-stream-conversion
>  
> <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming.html#table-to-stream-conversion>
> [3] 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql.html#joins
>  
> <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql.html#joins>
> 
>> On 8 Mar 2018, at 8:59 PM, Timo Walther <twal...@apache.org 
>> <mailto:twal...@apache.org>> wrote:
>> 
>> Hi Xingcan,
>> 
>> thanks for looking into this. This definitely seems to be a bug. Maybe in 
>> the org.apache.flink.table.calcite.RelTimeIndicatorConverter. In any case we 
>> should create an issue for it.
>> 
>> Regards,
>> Timo
>> 
>> 
>> Am 3/8/18 um 7:27 AM schrieb Yan Zhou [FDS Science]:
>>> Hi Xingcan,
>>> 
>>> Thanks for your help. Attached is a sample code that can reproduce the 
>>> problem.
>>> When I was writing the sample code, if I remove the `distinct` keyword in 
>>> select clause, the AssertionError doesn't occur.
>>> 
>>> String sql1 = "select distinct id, eventTs, count(*) over (partition by id 
>>> order by eventTs rows between 100 preceding and current row) as cnt1 from 
>>> myTable";
>>> 
>>> Best
>>> Yan
>>>  
>>> From: xccui-foxmail <xingc...@gmail.com> <mailto:xingc...@gmail.com>
>>> Sent: Wednesday, March 7, 2018 8:10 PM
>>> To: Yan Zhou [FDS Science]
>>> Cc: user@flink.apache.org <mailto:user@flink.apache.org>
>>> Subject: Re: flink sql timed-window join throw "mismatched type" 
>>> AssertionError on rowtime column
>>>  
>>> Hi Yan,
>>> 
>>> I’d like to look into this. Can you share more about your queries and the 
>>> full stack trace?
>>> 
>>> Thank,
>>> Xingcan
>>> 
>>>> On 8 Mar 2018, at 11:28 AM, Yan Zhou [FDS Science] <yz...@coupang.com 
>>>> <mailto:yz...@coupang.com>> wrote:
>>>> 
>>>> Hi experts, 
>>>> I am using flink table api to join two tables, which are datastream 
>>>> underneath. However, I got an assertion error of 
>>>> "java.lang.AssertionError: mismatched type $1 TIMESTAMP(3)" on rowtime 
>>>> column. Below is more details:
>>>> 
>>>> There in only one kafka data source, which is then converted to Table and 
>>>> registered. One existed column is set as rowtime(event time) attribute. 
>>>> Two over-window aggregation queries are run against the table and two 
>>>> tables are created as results. Everything works great so far.
>>>> However when timed-window joining two result tables with inherente

Re: Emulate Tumbling window in Event Time Space

2018-03-08 Thread Xingcan Cui
Hi Dhruv,

there’s no need to implement the window logic with the low-level 
`ProcessFunction` yourself. Flink has provided built-in window operators and 
you just need to implement the `WindowFunction` for that [1].

Best,
Xingcan

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/windows.html#window-functions
 


> On 9 Mar 2018, at 1:51 PM, Dhruv Kumar  wrote:
> 
> Hi
> 
> I was trying to emulate tumbling window in event time space. Here 
> 
>  is the link to my code.
> I am using the process function to do the custom processing which I want to 
> do within every window. I am having an issue of how to emit results at the 
> end of every window since my watermark only gets emitted at every incoming 
> event (incoming event will mostly not intersect with the end time of any 
> window). Seems like I need to add a trigger somewhere which fires at the end 
> of every window. Could any one here help me? Sorry, if I am not clear in 
> anything. I am quite new to Flink. 
> 
> Thanks
> Dhruv



Re: flink sql timed-window join throw "mismatched type" AssertionError on rowtime column

2018-03-08 Thread Xingcan Cui
Hi Yan & Timo,

this is confirmed to be a bug and I’ve created an issue [1] for it.

I’ll explain more about this query. In Flink SQL/Table API, the DISTINCT 
keyword will be implemented with an aggregation, which outputs a retract stream 
[2]. In that situation, all the time-related fields will be materialized as if 
they were common fields (with the timestamp type). Currently, due to the 
semantics problem, the time-windowed join cannot be performed on retract 
streams. But you could try non-windowed join [3] after we fix this.

Best,
Xingcan

[1] https://issues.apache.org/jira/browse/FLINK-8897 

[2] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming.html#table-to-stream-conversion
 

[3] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql.html#joins 


> On 8 Mar 2018, at 8:59 PM, Timo Walther  wrote:
> 
> Hi Xingcan,
> 
> thanks for looking into this. This definitely seems to be a bug. Maybe in the 
> org.apache.flink.table.calcite.RelTimeIndicatorConverter. In any case we 
> should create an issue for it.
> 
> Regards,
> Timo
> 
> 
> Am 3/8/18 um 7:27 AM schrieb Yan Zhou [FDS Science]:
>> Hi Xingcan,
>> 
>> Thanks for your help. Attached is a sample code that can reproduce the 
>> problem.
>> When I was writing the sample code, if I remove the `distinct` keyword in 
>> select clause, the AssertionError doesn't occur.
>> 
>> String sql1 = "select distinct id, eventTs, count(*) over (partition by id 
>> order by eventTs rows between 100 preceding and current row) as cnt1 from 
>> myTable";
>> 
>> Best
>> Yan
>> From: xccui-foxmail  
>> Sent: Wednesday, March 7, 2018 8:10 PM
>> To: Yan Zhou [FDS Science]
>> Cc: user@flink.apache.org 
>> Subject: Re: flink sql timed-window join throw "mismatched type" 
>> AssertionError on rowtime column
>>  
>> Hi Yan,
>> 
>> I’d like to look into this. Can you share more about your queries and the 
>> full stack trace?
>> 
>> Thank,
>> Xingcan
>> 
>>> On 8 Mar 2018, at 11:28 AM, Yan Zhou [FDS Science] >> > wrote:
>>> 
>>> Hi experts, 
>>> I am using flink table api to join two tables, which are datastream 
>>> underneath. However, I got an assertion error of "java.lang.AssertionError: 
>>> mismatched type $1 TIMESTAMP(3)" on rowtime column. Below is more details:
>>> 
>>> There in only one kafka data source, which is then converted to Table and 
>>> registered. One existed column is set as rowtime(event time) attribute. Two 
>>> over-window aggregation queries are run against the table and two tables 
>>> are created as results. Everything works great so far.
>>> However when timed-window joining two result tables with inherented 
>>> rowtime, calcite throw the "java.lang.AssertionError: mismatched type $1 
>>> TIMESTAMP(3)" AssertionError. Can someone let me know what is the possible 
>>> cause? F.Y.I., I rename the rowtime column for one of the result table.  
>>> 
>>> DataStream dataStream = env.addSource(kafkaConsumer);
>>> Table table = tableEnv.fromDataStream(dataStream, "col1", "col2", ...);
>>> tableEnv.registerTable(tableName, table);
>>> Table left = tableEnv.sqlQuery("select id, eventTime,count (*) over ...  
>>> from ...");
>>> Table right = tableEnv.sqlQuery("select id as r_id, eventTime as 
>>> r_event_time, count (*) over ...  from ...");
>>> left.join(right).where("id = r_id && eventTime === r_event_time)
>>> .addSink(...); // here calcite throw exception: java.lang.AssertionError: 
>>> mismatched type $1 TIMESTAMP(3) 
>>> 
>>> source table
>>>  |-- id: Long
>>>  |-- eventTime: TimeIndicatorTypeInfo(rowtime)
>>>  |-- ...
>>>  |-- ...
>>>  
>>> result_1 table
>>>  |-- id: Long
>>>  |-- eventTime: TimeIndicatorTypeInfo(rowtime)
>>>  |-- ...
>>>  |-- ...
>>>  
>>> result_2 table
>>>  |-- rid: Long
>>>  |-- r_event_time: TimeIndicatorTypeInfo(rowtime)
>>>  |-- ...
>>> 
>>> 
>>> Best
>>> Yan
>> 
> 
> 



Re: Does Flink support stream-stream outer joins in the latest version?

2018-03-07 Thread Xingcan Cui
Hi Kant,

the non windowed stream-stream join is not equivalent to the full-history join, 
though they get the same SQL form. The retention times for records must be set 
to leverage the storage consumption and completeness of the results.

Best,
Xingcan

> On 7 Mar 2018, at 8:02 PM, kant kodali <kanth...@gmail.com> wrote:
> 
> Hi Cheng,
> 
> The docs here 
> <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tableApi.html#joins>
>  states full outer joins are only available for batch (I am not sure if I am 
> reading that correctly). I am trying to understand how two unbounded streams 
> can be joined like a batch? If we have to do batch join then it must be 
> bounded right? If so, how do we bound? I can think Time Window is one way to 
> bound but other than that if I execute the below join query on the unbounded 
> stream I am not even sure how that works? A row from one table can join with 
> a row from another table and that row can come anytime in future right if it 
> is unbounded. so I am sorry I am failing to understand.
> 
> 
> SELECT *
> FROM Orders o, Shipments s
> WHERE o.id <http://o.id/> = s.orderId
> 
> Thanks!
> 
> On Wed, Mar 7, 2018 at 3:49 AM, Hequn Cheng <chenghe...@gmail.com 
> <mailto:chenghe...@gmail.com>> wrote:
> Hi kant,
> 
> It seems that you mean the Time-windowed Join. The Time-windowed Joins are 
> supported now. You can check more details with the docs given by Xingcan.
> As for the non-window join, it is used to join two unbounded stream and the 
> semantic is very like batch join.
> 
> Time-windowed Join:
> SELECT *
> FROM Orders o, Shipments s
> WHERE o.id <http://o.id/> = s.orderId AND
>   o.ordertime BETWEEN s.shiptime - INTERVAL '4' HOUR AND s.shiptime
>  
> Non-windowed Join:
> SELECT *
> FROM Orders o, Shipments s
> WHERE o.id <http://o.id/> = s.orderId
> 
> On Wed, Mar 7, 2018 at 7:02 PM, kant kodali <kanth...@gmail.com 
> <mailto:kanth...@gmail.com>> wrote:
> Hi! 
> 
> Thanks for all this. and yes I was indeed talking about SQL/Table API so I 
> will keep track of these tickets! BTW, What is non-windowed Join? I thought 
> stream-stream-joins by default is a stateful operation so it has to be within 
> some time window right? Also does the output of stream-stream joins emit 
> every time so we can see the state of the join at any given time or only when 
> the watermark elapses and join result fully materializes? 
> 
> On a side note, Full outer join seems to be the most useful for my use case. 
> so the moment its available in master I can start playing and testing it!
> 
> On Tue, Mar 6, 2018 at 10:39 PM, Hequn Cheng <chenghe...@gmail.com 
> <mailto:chenghe...@gmail.com>> wrote:
> Hi Kant,
> 
> The stream-stream outer joins are work in progress now(left/right/full), and 
> will probably be ready before the end of this month. You can check the 
> progress from[1]. 
> 
> Best, Hequn
> 
> [1] https://issues.apache.org/jira/browse/FLINK-5878 
> <https://issues.apache.org/jira/browse/FLINK-5878>
> 
> On Wed, Mar 7, 2018 at 1:01 PM, Xingcan Cui <xingc...@gmail.com 
> <mailto:xingc...@gmail.com>> wrote:
> Hi Kant,
> 
> I suppose you refer to the stream join in SQL/Table API since the outer join 
> for windowed-streams can always be achieved with the `JoinFunction` in 
> DataStream API.
> 
> There are two kinds of stream joins, namely, the time-windowed join and the 
> non-windowed join in Flink SQL/Table API [1, 2]. The time-windowed outer join 
> has been supported since version 1.5 and the non-windowed outer join is still 
> work in progress.
> 
> Hope that helps.
> 
> Best,
> Xingcan
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tableApi.html#joins
>  
> <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tableApi.html#joins>
> [2] 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql.html#joins
>  
> <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql.html#joins>
> 
> 
>> On 7 Mar 2018, at 12:45 AM, kant kodali <kanth...@gmail.com 
>> <mailto:kanth...@gmail.com>> wrote:
>> 
>> Hi All,
>> 
>> Does Flink support stream-stream outer joins in the latest version?
>> 
>> Thanks!
> 
> 
> 
> 
> 



Re: Does Flink support stream-stream outer joins in the latest version?

2018-03-06 Thread Xingcan Cui
Hi Kant,

I suppose you refer to the stream join in SQL/Table API since the outer join 
for windowed-streams can always be achieved with the `JoinFunction` in 
DataStream API.

There are two kinds of stream joins, namely, the time-windowed join and the 
non-windowed join in Flink SQL/Table API [1, 2]. The time-windowed outer join 
has been supported since version 1.5 and the non-windowed outer join is still 
work in progress.

Hope that helps.

Best,
Xingcan

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tableApi.html#joins
 

[2] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql.html#joins 



> On 7 Mar 2018, at 12:45 AM, kant kodali  wrote:
> 
> Hi All,
> 
> Does Flink support stream-stream outer joins in the latest version?
> 
> Thanks!



Re: Timers and state

2018-03-05 Thread Xingcan Cui
Hi Alberto,

an ultimate solution for your problem would be a map state with ordered keys 
(like a TreeMap), but unfortunately, this is still a WIP feature. 

For now, maybe you could "eagerly remove” the outdated value (with 
`iterator.remove()`) when iterating the map state in the process function or 
split the key space for your map state into static bins, thus you could 
calculate a set of outdated keys before removing them.

Hope that helps.

Best,
Xingcan

> On 5 Mar 2018, at 4:19 PM, Alberto Mancini  wrote:
> 
>  



Re: Table API Compilation Error in Flink

2018-03-04 Thread Xingcan Cui
Hi Nagananda,

adding `flink-streaming-scala_${scala version}` to your maven dependency would 
solve this.

Best,
Xingcan



> On 5 Mar 2018, at 2:21 PM, Nagananda M  wrote:
> 
> Hi All,
> Am trying to compile a sample program in apache flink using TableEnvironment 
> and facing some issue in compilation for the addition of the below line.
> "StreamTableEnvironment tableEnv = 
> StreamTableEnvironment.getTableEnvironment(env);"
> The error is as below
> "cannot access org.apache.flink.api.scala.ExecutionEnvironment"
> Am compiling the code using maven. Can anyone help on this.
> Cheers,
> Nagananda M
> 
>  
> 
> Virus-free. www.avast.com 
> 


Re: Flink join operator after sorting seems to group fields (Scala)

2018-03-03 Thread Xingcan Cui
Hi Felipe,

the `sortPartition()` method just LOCALLY sorts each partition of a dataset. To 
achieve a global sorting, use this method after a `partitionByRange()` (e.g., 
`result.partitionByRange(0).sortPartition(0, Order.ASCENDING)`).

Hope that helps,
Xingcan

> On 3 Mar 2018, at 9:33 PM, Felipe Gutierrez  
> wrote:
> 
> Hi all,
> 
> I have implemented a simple Scala object using Flink to play with joins 
> operator. After that, I put the join operator show my results I decided to 
> sort the output by the first field (.sortPartition(0, Order.ASCENDING)). It 
> seems that the output is ordered by group. The output shows two groups of 
> "Fyodor Dostoyevsky". Why is this happening? How do I sort the complete 
> DataSet?
> 
> Kind Regards,
> Felipe
> 
> import org.apache.flink.api.common.operators.Order
> import org.apache.flink.api.scala.{ExecutionEnvironment, _}
> 
> object JoinBooksAndAuthors {
>   val AUTHOR_ID_FIELD: Int = 0
>   val AUTHOR_NAME_FIELD: Int = 1
> 
>   val BOOK_AUTHORID_FIELD: Int = 0
>   val BOOK_YEAR_FIELD: Int = 1
>   val BOOK_NAME_FIELD: Int = 2
> 
>   def main(args: Array[String]) {
> 
> val env = ExecutionEnvironment.getExecutionEnvironment
> 
> val authors = env.readCsvFile[(Int, String)](
>   "downloads/authors.tsv",
>   fieldDelimiter = "\t",
>   lineDelimiter = "\n",
>   includedFields = Array(0, 1)
> )
> 
> val books = env.readCsvFile[(Int, Short, String)](
>   "downloads/books.tsv",
>   fieldDelimiter = "\t",
>   lineDelimiter = "\n",
>   includedFields = Array(0, 1, 2)
> )
> 
> authors
>   .join(books)
>   .where(AUTHOR_ID_FIELD)
>   .equalTo(BOOK_AUTHORID_FIELD)
>   .map(tuple => (tuple._1._2, tuple._2._3))
>   .sortPartition(0, Order.ASCENDING)
>   .print()
>   }
> }
> output
> 
> (Charles Bukowski,Women)
> (Charles Bukowski,The Most Beautiful Woman in Town)
> (Charles Bukowski,Hot Water Music)
> (Charles Bukowski,Barfly)
> (Charles Bukowski,Notes of a Dirty Old Man)
> (Charles Bukowski,Ham on Rye)
> (Fyodor Dostoyevsky,The Brothers Karamazov)
> (Fyodor Dostoyevsky,The Double: A Petersburg Poem)
> (Fyodor Dostoyevsky,Poor Folk)
> (George Orwell,Coming Up for Air)
> (George Orwell,Burmese Days)
> (George Orwell,A Clergyman's Daughter)
> (George Orwell,Down and Out in Paris and London)
> (Albert Camus,The Plague)
> (Fyodor Dostoyevsky,The Eternal Husband)
> (Fyodor Dostoyevsky,The Gambler)
> (Fyodor Dostoyevsky,The House of the Dead)
> (Fyodor Dostoyevsky,Crime and Punishment)
> (Fyodor Dostoyevsky,Netochka Nezvanova)
> .
> 
> 
> 
> 
> 
> -- 
> --
> -- Felipe Oliveira Gutierrez
> -- skype: felipe.o.gutierrez
> -- https://felipeogutierrez.blogspot.com 
> 


Re: Watermark not worked as expected in TimeCharacteristic.EvenTime setting.

2018-03-02 Thread Xingcan Cui
Hi,

for periodically generated watermarks, you should use 
`ExecutionConfig.setAutoWatermarkInterval()` to set an interval.

Hope that helps.

Best,
Xingcan

> On 3 Mar 2018, at 12:08 PM, sundy <543950...@qq.com> wrote:
> 
> 
> 
> Hi, I got a problem in Flink  and need your help.
> 
> I tried to use TimeCharacteristic.EvenTime, but the sink function never be 
> executed.  
> 
> public class StreamingJob {
>   public static void main(String[] args) throws Exception {
> // set up the streaming execution environment
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> ObjectMapper jsonMapper = new ObjectMapper();
> 
> Properties properties = new Properties();
> //String brokers = "172.27.138.8:9092";
> String brokers = "localhost:9092";
> properties.setProperty("bootstrap.servers", brokers);
> properties.setProperty("group.id", "test_fink");
> String topic = "stream_test";
> 
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> FlinkKafkaConsumer010 myConsumer =
> new FlinkKafkaConsumer010(topic, new 
> BitRate.BitRateDeserializtionSchema(), properties);
> 
> DataStream stream = env.addSource(myConsumer)
> .assignTimestampsAndWatermarks(new CustomWatermarkEmitter());
> DataStream
> reduceItems =
> stream
> .keyBy(a -> a.gameId)
> .timeWindow(Time.seconds(10))
> .reduce((a, b) -> a.add(b));
> 
> reduceItems.addSink(new FlinkKafkaProducer010<>(brokers, "stream_sink", 
> (tuple) -> {
>   try {
> tuple.end();
> System.out.println(tuple.rate + "\t" + tuple.user);
> return jsonMapper.writeValueAsBytes(tuple);
>   } catch (JsonProcessingException e) {
> e.printStackTrace();
> return "".getBytes();
>   }
> }));
> 
> env.execute("Flink Streaming Java API Skeleton");
>   }
> 
> }
> 
> 
> Here is the CustomWatermarkEmitter. I tried to increase the lag number,  but 
> not worked.
> 
> public class CustomWatermarkEmitter implements 
> AssignerWithPeriodicWatermarks {
> 
>   private long currentMax = 0;
>   private long lag = 3600 * 1000; //not worked ,even though the lag is very 
> big
> 
>   @Nullable
>   @Override
>   public Watermark getCurrentWatermark() {
> long atLeastTime = currentMax - lag;
> System.out.println("water mark" + atLeastTime);
> return new Watermark(atLeastTime < 0 ? 0 : atLeastTime);
>   }
> 
>   @Override
>   public long extractTimestamp(BitRate bitRate, long l) {
> currentMax = Long.max(bitRate.eventTime, currentMax);
> return bitRate.eventTime;
>   }
> }
> 
> 
> Here is the entity BitRate, the logs are generated in time , sample log   
> `4281_783_1520047769115`
> 
> 
> public BitRate(long eventTime, long gameId, long rate, long user) {
>   this.eventTime = eventTime;
> 
>   this.gameId = gameId;
>   this.rate = rate;
>   this.user = user;
>   this.startTs = System.currentTimeMillis();
>   this.type = 0;
> }
> 
> public void end() {
>   this.endTs = System.currentTimeMillis();
> }
> 
> public BitRate add(BitRate b) {
>   System.out.println("Add:" + b.rate);
>   this.rate += b.rate;
>   this.user += b.user;
>   return this;
> }
> 



Re: Problems to use toAppendStream

2018-02-22 Thread Xingcan Cui
Hi Esa and Fabian,

sorry for my inaccurate conclusion before, but I think the reason is clear now. 
The org.apache.flink.streaming.api.scala._ and org.apache.flink.api.scala._  
should not be imported simultaneously due to conflict. Just remove either of 
them.

Best,
Xingcan

> On 22 Feb 2018, at 5:20 PM, Xingcan Cui <xingc...@gmail.com> wrote:
> 
> Hi Fabian and Esa,
> 
> I ran the code myself and also noticed the strange behavior. It seems that 
> only I explicitly import the function i.e., 
> org.apache.flink.streaming.api.scala.asScalaStream, can it works. In other 
> words, the underscore import becomes useless. I also checked other package 
> objects (e.g., org.apache.flink.table.api.scala._) and they are the same.
> 
> @Esa, you can temporarily solve the problem by importing 
> org.apache.flink.streaming.api.scala.asScalaStream in your code and we'll 
> continue working on this issue.
> 
> Best,
> Xingcan
> 
>> On 22 Feb 2018, at 4:47 PM, Esa Heikkinen <esa.heikki...@student.tut.fi 
>> <mailto:esa.heikki...@student.tut.fi>> wrote:
>> 
>> Hi
>>  
>> How to check versions ?
>>  
>> In pom.xml there are lines:
>>  
>>  
>>   
>> UTF-8
>>   
>> 1.4.0
>>   
>> 1.7.7
>>   
>> 1.2.17
>>   
>> 2.11
>>   
>> 2.11.11
>>  
>>  
>> BR Esa
>>  
>> From: Fabian Hueske [mailto:fhue...@gmail.com <mailto:fhue...@gmail.com>] 
>> Sent: Thursday, February 22, 2018 10:35 AM
>> To: Esa Heikkinen <esa.heikki...@student.tut.fi 
>> <mailto:esa.heikki...@student.tut.fi>>
>> Cc: Xingcan Cui <xingc...@gmail.com <mailto:xingc...@gmail.com>>; 
>> user@flink.apache.org <mailto:user@flink.apache.org>
>> Subject: Re: Problems to use toAppendStream
>>  
>> Hi Esa,
>> 
>> which Scala version do you use?
>> Flink supports Scala 2.11 (and Scala 2.10 support was dropped with Flink 
>> 1.4.0).
>> 
>> Fabian
>>  
>> 2018-02-22 9:28 GMT+01:00 Esa Heikkinen <esa.heikki...@student.tut.fi 
>> <mailto:esa.heikki...@student.tut.fi>>:
>>  
>> 
>> It should be ok. This is the list of my all imports. First part of it has 
>> been highlighted weaker. I don’t know why.
>>  
>> import org.apache.flink.streaming.api.windowing.time.Time
>> import org.apache.flink.api.java.utils.ParameterTool
>> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
>> import org.apache.flink.streaming.api.windowing.time.Time
>> import org.apache.flink.cep.scala.{CEP, PatternStream}
>> import org.apache.flink.cep.scala.pattern.Pattern
>> import org.apache.flink.cep.{PatternFlatSelectFunction, 
>> PatternFlatTimeoutFunction}
>> import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
>> import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction
>> import 
>> org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
>> import org.apache.flink.util.Collector
>> import org.apache.flink.streaming.api.scala._
>> import org.apache.flink.api.scala._
>> import org.apache.flink.table.api.scala._
>> import org.apache.flink.table.api.scala.StreamTableEnvironment
>> import org.apache.flink.table.api.java.StreamTableEnvironment
>>  
>>  
>> import org.apache.flink.types.Row
>> import org.apache.flink.streaming.api.TimeCharacteristic
>> import org.apache.flink.streaming.api.scala.{DataStream, 
>> StreamExecutionEnvironment}
>> import org.apache.flink.table.api.TableEnvironment
>> import org.apache.flink.table.sources.CsvTableSource
>> import org.apache.flink.api.common.typeinfo.Types
>>  
>> BR Esa
>>  
>> From: Xingcan Cui [mailto:xingc...@gmail.com <mailto:xingc...@gmail.com>] 
>> Sent: Thursday, February 22, 2018 10:09 AM
>> 
>> To: Esa Heikkinen <esa.heikki...@student.tut.fi 
>> <mailto:esa.heikki...@student.tut.fi>>
>> Cc: user@flink.apache.org <mailto:user@flink.apache.org>
>> Subject: Re: Problems to use toAppendStream
>>  
>> Hi Esa,
>>  
>> just to remind that don’t miss the dot and underscore.
>>  
>> Best,
>> Xingcan
>>  

Re: Problems to use toAppendStream

2018-02-22 Thread Xingcan Cui
Hi Fabian and Esa,

I ran the code myself and also noticed the strange behavior. It seems that only 
I explicitly import the function i.e., 
org.apache.flink.streaming.api.scala.asScalaStream, can it works. In other 
words, the underscore import becomes useless. I also checked other package 
objects (e.g., org.apache.flink.table.api.scala._) and they are the same.

@Esa, you can temporarily solve the problem by importing 
org.apache.flink.streaming.api.scala.asScalaStream in your code and we'll 
continue working on this issue.

Best,
Xingcan

> On 22 Feb 2018, at 4:47 PM, Esa Heikkinen <esa.heikki...@student.tut.fi> 
> wrote:
> 
> Hi
>  
> How to check versions ?
>  
> In pom.xml there are lines:
>  
>  
>   
> UTF-8
>   
> 1.4.0
>   
> 1.7.7
>   
> 1.2.17
>   
> 2.11
>   
> 2.11.11
>  
>  
> BR Esa
>  
> From: Fabian Hueske [mailto:fhue...@gmail.com] 
> Sent: Thursday, February 22, 2018 10:35 AM
> To: Esa Heikkinen <esa.heikki...@student.tut.fi>
> Cc: Xingcan Cui <xingc...@gmail.com>; user@flink.apache.org
> Subject: Re: Problems to use toAppendStream
>  
> Hi Esa,
> 
> which Scala version do you use?
> Flink supports Scala 2.11 (and Scala 2.10 support was dropped with Flink 
> 1.4.0).
> 
> Fabian
>  
> 2018-02-22 9:28 GMT+01:00 Esa Heikkinen <esa.heikki...@student.tut.fi 
> <mailto:esa.heikki...@student.tut.fi>>:
>  
> 
> It should be ok. This is the list of my all imports. First part of it has 
> been highlighted weaker. I don’t know why.
>  
> import org.apache.flink.streaming.api.windowing.time.Time
> import org.apache.flink.api.java.utils.ParameterTool
> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
> import org.apache.flink.streaming.api.windowing.time.Time
> import org.apache.flink.cep.scala.{CEP, PatternStream}
> import org.apache.flink.cep.scala.pattern.Pattern
> import org.apache.flink.cep.{PatternFlatSelectFunction, 
> PatternFlatTimeoutFunction}
> import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
> import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction
> import 
> org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
> import org.apache.flink.util.Collector
> import org.apache.flink.streaming.api.scala._
> import org.apache.flink.api.scala._
> import org.apache.flink.table.api.scala._
> import org.apache.flink.table.api.scala.StreamTableEnvironment
> import org.apache.flink.table.api.java.StreamTableEnvironment
>  
>  
> import org.apache.flink.types.Row
> import org.apache.flink.streaming.api.TimeCharacteristic
> import org.apache.flink.streaming.api.scala.{DataStream, 
> StreamExecutionEnvironment}
> import org.apache.flink.table.api.TableEnvironment
> import org.apache.flink.table.sources.CsvTableSource
> import org.apache.flink.api.common.typeinfo.Types
>  
> BR Esa
>  
> From: Xingcan Cui [mailto:xingc...@gmail.com <mailto:xingc...@gmail.com>] 
> Sent: Thursday, February 22, 2018 10:09 AM
> 
> To: Esa Heikkinen <esa.heikki...@student.tut.fi 
> <mailto:esa.heikki...@student.tut.fi>>
> Cc: user@flink.apache.org <mailto:user@flink.apache.org>
> Subject: Re: Problems to use toAppendStream
>  
> Hi Esa,
>  
> just to remind that don’t miss the dot and underscore.
>  
> Best,
> Xingcan
>  
> 
> On 22 Feb 2018, at 3:59 PM, Esa Heikkinen <esa.heikki...@student.tut.fi 
> <mailto:esa.heikki...@student.tut.fi>> wrote:
>  
> Hi
>  
> Actually I have also line “import org.apache.flink.streaming.api.scala” on my 
> code, but this line seems to be highlighted weaker in window of IDEA IntelliJ 
> editor. What does this mean ?
>  
> But the same errors will still be generated.
>  
> Esa
>  
> From: Fabian Hueske [mailto:fhue...@gmail.com <mailto:fhue...@gmail.com>] 
> Sent: Wednesday, February 21, 2018 9:41 PM
> To: Esa Heikkinen <esa.heikki...@student.tut.fi 
> <mailto:esa.heikki...@student.tut.fi>>
> Cc: user@flink.apache.org <mailto:user@flink.apache.org>
> Subject: Re: Problems to use toAppendStream
>  
> Hi Esa,
> 
> whenever you observe the error "could not find implicit value for evidence 
> parameter of type X" in a streaming program, you need to add the following

Re: Problems to use toAppendStream

2018-02-22 Thread Xingcan Cui
Hi Esa,

just to remind that don’t miss the dot and underscore.

Best,
Xingcan

> On 22 Feb 2018, at 3:59 PM, Esa Heikkinen  
> wrote:
> 
> Hi
>  
> Actually I have also line “import org.apache.flink.streaming.api.scala” on my 
> code, but this line seems to be highlighted weaker in window of IDEA IntelliJ 
> editor. What does this mean ?
>  
> But the same errors will still be generated.
>  
> Esa
>  
> From: Fabian Hueske [mailto:fhue...@gmail.com ] 
> Sent: Wednesday, February 21, 2018 9:41 PM
> To: Esa Heikkinen  >
> Cc: user@flink.apache.org 
> Subject: Re: Problems to use toAppendStream
>  
> Hi Esa,
> 
> whenever you observe the error "could not find implicit value for evidence 
> parameter of type X" in a streaming program, you need to add the following 
> import:
> 
> import org.apache.flink.streaming.api.scala._
> 
> Best, Fabian
>  
> 2018-02-21 19:49 GMT+01:00 Esa Heikkinen  >:
>  
> Hi
>  
>  
> I have tried to solve below Errors for long time, but no succeed yet. Could 
> you give some hint how to solve it ?
>  
> Errors in compiling:
> --
> Error:(56, 46) could not find implicit value for evidence parameter of type 
> org.apache.flink.api.common.typeinfo.TypeInformation[org.apache.flink.types.Row]
> val stream = tableEnv.toAppendStream[Row](tableTest)
>  
> Error:(56, 46) not enough arguments for method toAppendStream: (implicit 
> evidence$3: 
> org.apache.flink.api.common.typeinfo.TypeInformation[org.apache.flink.types.Row])org.apache.flink.streaming.api.scala.DataStream[org.apache.flink.types.Row].
> Unspecified value parameter evidence$3.
> val stream = tableEnv.toAppendStream[Row](tableTest)
>  
> Code:
> -
> import org.apache.flink.types.Row
> import org.apache.flink.streaming.api.TimeCharacteristic
> import org.apache.flink.streaming.api.scala.{DataStream, 
> StreamExecutionEnvironment}
> import org.apache.flink.table.api.TableEnvironment
> import org.apache.flink.table.sources.CsvTableSource
> import org.apache.flink.api.common.typeinfo.Types
>  
> object CepTest2 {
>  
>   def main(args: Array[String]) {
>  
> println("Start ...")
>  
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>  
> //val tableEnv = StreamTableEnvironment.getTableEnvironment(env)
> val tableEnv = TableEnvironment.getTableEnvironment(env)
>  
> val csvtable = CsvTableSource
>   .builder
>   .path("/home/esa/Log_EX1_gen_track_5.csv")
>   .ignoreFirstLine
>   .fieldDelimiter(",")
>   .field("time", Types.INT)
>   .field("id", Types.STRING)
>   .field("sources", Types.STRING)
>   .field("targets", Types.STRING)
>   .field("attr", Types.STRING)
>   .field("data", Types.STRING)
>   .build
>  
> tableEnv.registerTableSource("test", csvtable)
>  
> val tableTest = 
> tableEnv.scan("test").where("id='5'").select("id,sources,targets")
>  
> val stream = tableEnv.toAppendStream[Row](tableTest)
>  
> stream.print
> env.execute()
>   }
> }
> 



Re: A "per operator instance" window all ?

2018-02-19 Thread Xingcan Cui
Hi Julien,

you could use the OperatorState 
<https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/state/state.html#using-managed-operator-state>
 to cache the data in a window and the last time your window fired. Then you 
check the ctx.timerService().currentProcessingTime() in processElement() and 
once it exceeds the next window boundary, all the cached data should be 
processed as if the window is fired.

Note that currently, there are only memory-based operator states provided.

Hope this helps,
Xingcan

> On 19 Feb 2018, at 4:34 PM, Julien <jmassio...@gmail.com> wrote:
> 
> Hello,
> 
> I've already tried to key my stream with "resourceId.hashCode%parallelism" 
> (with parallelism of 4 in my example).
> So all my keys will be either 0,1, 2 or 3. I can then benefit from a time 
> window on this keyed stream and do only 4 queries to my external system.
> But it is not well distributed with the default partitioner on keyed stream. 
> (keys 0, 1, 2 and 3 only goes to operator idx 2, 3).
> 
> I think I should explore the customer partitioner, as you suggested Xingcan.
> Maybe my last question on this will be: "can you give me more details on this 
> point "and simulate a window operation by yourself in a ProcessFunction" ?
> 
> When I look at the documentation about the custom partitioner, I can see that 
> the result of partitionCustom is a DataStream.
> It is not a KeyedStream.
> So the only window I have will be windowAll (which will bring me back to a 
> parallelism of 1, no ?).
> 
> And if I do something like "myStream.partitionCustom(, key>).keyBy().window(...)", will it preserve my custom partitioner ?
> When looking at the "KeyedStream" class, it seems that it will go back to the 
> "KeyGroupStreamPartitioner" and forget my custom partitioner ?
> 
> Thanks again for your feedback,
> 
> Julien.
> 
> 
> On 19/02/2018 03:45, 周思华 wrote:
>> Hi Julien,
>> If I am not misunderstand, I think you can key your stream on a 
>> `Random.nextInt() % parallesm`, this way  you can "group" together alerts 
>> from different and benefit from multi parallems.
>> 
>> 
>> 发自网易邮箱大师
>> 
>> On 02/19/2018 09:08,Xingcan Cui<xingc...@gmail.com> wrote: 
>> Hi Julien,
>> 
>> sorry for my misunderstanding before. For now, the window can only be 
>> defined on a KeyedStream or an ordinary DataStream but with parallelism = 1. 
>> I’d like to provide three options for your scenario.
>> 
>> 1. If your external data is static and can be fit into the memory, you can 
>> use ManagedStates to cache them without considering the querying problem.
>> 2. Or you can use a CustomPartitioner to manually distribute your alert data 
>> and simulate an window operation by yourself in a ProcessFuncton.
>> 3. You may also choose to use some external systems such as in-memory store, 
>> which can work as a cache for your queries.
>> 
>> Best,
>> Xingcan
>> 
>>> On 19 Feb 2018, at 5:55 AM, Julien <jmassio...@gmail.com> wrote:
>>> 
>>> Hi Xingcan,
>>> 
>>> Thanks for your answer.
>>> Yes, I understand that point:
>>> • if I have 100 resource IDs with parallelism of 4, then each operator 
>>> instance will handle about 25 keys
>>> 
>>> 
>>> The issue I have is that I want, on a given operator instance, to group 
>>> those 25 keys together in order to do only 1 query to an external system 
>>> per operator instance:
>>> 
>>> • on a given operator instance, I will do 1 query for my 25 keys
>>> • so with the 4 operator instances, I will do 4 query in parallel (with 
>>> about 25 keys per query)
>>> 
>>> I do not know how I can do that.
>>> 
>>> If I define a window on my keyed stream (with for example 
>>> stream.key(_.resourceId).window(TumblingProcessingTimeWindows.of(Time.milliseconds(500))),
>>>  then my understanding is that the window is "associated" to the key. So in 
>>> this case, on a given operator instance, I will have 25 of those windows 
>>> (one per key), and I will do 25 queries (instead of 1).
>>> 
>>> Do you understand my point ?
>>> Or maybe am I missing something ?
>>> 
>>> I'd like to find a way on operator instance 1 to group all the alerts 
>>> received on those 25 resource ids and do 1 query for those 25 resource ids.
>>> Same thing for operator instance 2, 3 and 4.
>>> 
>>> 
>>> Thank you,
>>> Regards.
>>> 
>>> 
>>

Re: A "per operator instance" window all ?

2018-02-18 Thread Xingcan Cui
Hi Julien,

sorry for my misunderstanding before. For now, the window can only be defined 
on a KeyedStream or an ordinary DataStream but with parallelism = 1. I’d like 
to provide three options for your scenario.

1. If your external data is static and can be fit into the memory, you can use 
ManagedStates 
<https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/state/state.html#using-managed-keyed-state>
 to cache them without considering the querying problem.
2. Or you can use a CustomPartitioner 
<https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/#physical-partitioning>
 to manually distribute your alert data and simulate an window operation by 
yourself in a ProcessFuncton.
3. You may also choose to use some external systems such as in-memory store, 
which can work as a cache for your queries.

Best,
Xingcan

> On 19 Feb 2018, at 5:55 AM, Julien <jmassio...@gmail.com> wrote:
> 
> Hi Xingcan,
> 
> Thanks for your answer.
> Yes, I understand that point:
> if I have 100 resource IDs with parallelism of 4, then each operator instance 
> will handle about 25 keys
> 
> The issue I have is that I want, on a given operator instance, to group those 
> 25 keys together in order to do only 1 query to an external system per 
> operator instance:
> 
> on a given operator instance, I will do 1 query for my 25 keys
> so with the 4 operator instances, I will do 4 query in parallel (with about 
> 25 keys per query)
> 
> I do not know how I can do that.
> 
> If I define a window on my keyed stream (with for example 
> stream.key(_.resourceId).window(TumblingProcessingTimeWindows.of(Time.milliseconds(500))),
>  then my understanding is that the window is "associated" to the key. So in 
> this case, on a given operator instance, I will have 25 of those windows (one 
> per key), and I will do 25 queries (instead of 1).
> 
> Do you understand my point ?
> Or maybe am I missing something ?
> 
> I'd like to find a way on operator instance 1 to group all the alerts 
> received on those 25 resource ids and do 1 query for those 25 resource ids.
> Same thing for operator instance 2, 3 and 4.
> 
> 
> Thank you,
> Regards.
> 
> 
> On 18/02/2018 14:43, Xingcan Cui wrote:
>> Hi Julien,
>> 
>> the cardinality of your keys (e.g., resource ID) will not be restricted to 
>> the parallelism. For instance, if you have 100 resource IDs processed by 
>> KeyedStream with parallelism 4, each operator instance will handle about 25 
>> keys. 
>> 
>> Hope that helps.
>> 
>> Best,
>> Xingcan
>> 
>>> On 18 Feb 2018, at 8:49 PM, Julien <jmassio...@gmail.com 
>>> <mailto:jmassio...@gmail.com>> wrote:
>>> 
>>> Hi,
>>> 
>>> I am pretty new to flink and I don't know what will be the best way to deal 
>>> with the following use case:
>>> 
>>> as an input, I recieve some alerts from a kafka topic
>>> an alert is linked to a network resource (like router-1, router-2, 
>>> switch-1, switch-2, ...)
>>> so an alert has two main information (the alert id and the resource id of 
>>> the resource on which this alert has been raised)
>>> then I need to do a query to an external system in order to enrich the 
>>> alert with additional information on the resource
>>> 
>>> (A "natural" candidate for the key on this stream will be the resource id)
>>> 
>>> The issue I have is that regarding the query to the external system:
>>> I do not want to do 1 query per resource id
>>> I want to do a small number of queries in parallel (for example 4 queries 
>>> in parallel every 500ms), each query requesting the external system for 
>>> several alerts linked to several resource id
>>> Currently, I don't know what will be the best way to deal with that:
>>> I can key my stream on the resource id and then define a processing time 
>>> window of 500ms and when the trigger is ok, then I do my query
>>> by doing so, I will "group" several alerts in a single query, but they will 
>>> all be linked to the same resource.
>>> so I will do 1 query per resource id (which will be too much in my use case)
>>> I can also do a windowAll on a non keyed stream
>>> by doing so, I will "group" together alerts from different resource ids, 
>>> but from what I've read in such a case the parallelism will always be one.
>>> so in this case, I will only do 1 query whereas I'd like to have some 
>>> parallelism
>>> I am thinking that a way to deal with that will be:
>>> 
>>&

Re: Only a single message processed

2018-02-18 Thread Xingcan Cui
Hi Niclas,

About the second point you mentioned, was the processed message a random one or 
a fixed one? 

The default startup mode for FlinkKafkaConsumer is StartupMode.GROUP_OFFSETS, 
maybe you could try StartupMode.EARLIST while debugging. Also, before that, you 
may try fetching the messages with the Kafka console consumer tool to see 
whether they can be consumed completely.

Besides, I wonder if you could provide the code for you Flink pipeline. That’ll 
be helpful.

Best,
Xingcan



> On 18 Feb 2018, at 7:52 PM, Niclas Hedhman  wrote:
> 
> 
> So, the producer is run (at the moment) manually (command-line) one message 
> at a time.
> Kafka's tooling (different consumer group) shows that a message is added each 
> time.
> 
> Since my last post, I have also added a UUID as the key, and that didn't make 
> a difference, so you are likely correct about de-dup.
> 
> 
> There is only a single partition on the topic, so it shouldn't be a 
> partitioning issue.
> 
> I also noticed;
> 1. Sending a message while consumer topology is running, after the first 
> message, then that message will be processed after a restart.
> 
> 2. Sending many messages, while consumer is running, and then doing many 
> restarts will only process a single of those. No idea what happens to the 
> others.
> 
> I am utterly confused.
> 
> And digging in the internals are not for the faint-hearted, but the 
> kafka.poll() returns frequently with empty records.
> 
> Will continue debugging that tomorrow...
> 
> 
> Niclas
> 
> On Feb 18, 2018 18:50, "Fabian Hueske"  > wrote:
> Hi Niclas,
> 
> Flink's Kafka consumer should not apply any deduplication. AFAIK, such a 
> "feature" is not implemented.
> Do you produce into the topic that you want to read or is the data in the 
> topic static?
> If you do not produce in the topic while the consuming application is 
> running, this might be an issue with the start position of the consumer [1]. 
> 
> Best, Fabian
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-consumers-start-position-configuration
>  
> 
> 
> 2018-02-18 8:14 GMT+01:00 Niclas Hedhman  >:
> Hi,
> I am pretty new to Flink, and I like what I see and have started to build my 
> first application using it.
> I must be missing something very fundamental. I have a FlinkKafkaConsumer011, 
> followed by a handful of filter, map and flatMap functions and terminated 
> with the standard CassandraSink. I have try..catch on all my own maps/filters 
> and the first message in the queue is processed after start-up, but any 
> additional messages are ignore, i.e. not reaching the first map(). Any 
> additional messages are swallowed (i.e. consumed but not forwarded).
> 
> I suspect that this is some type of de-duplication going on, since the (test) 
> producer of these messages. The producer provide different values on each, 
> but there is no "key" being passed to the KafkaProducer.
> 
> Is that required? And if so, why? Can I tell Flink or Flink's KafkaConsumer 
> to ingest all messages, and not try to de-duplicate them?
> 
> Thanks
> 
> --
> Niclas Hedhman, Software Developer
> http://zest.apache.org  - New Energy for Java
> 



Re: CoProcess() VS union.Process() & Timers in them

2018-02-13 Thread Xingcan Cui
Hi Max,

Currently, the timers can only be used with keyed streams. As @Fabian 
suggested, you can “forge” a keyed stream with the special KeySelector, which 
maps all the records to the same key.

IMO, Flink uses keyed streams/states as it’s a deterministic distribution 
mechanism. Here, “the parallelism changes” may also refer to a parallelism 
change after the job restarts (e.g., when a node crashes). Flink can make sure 
that all the processing tasks and states will be safely re-distributed across 
the new cluster.

Hope that helps.

Best,
Xingcan

> On 13 Feb 2018, at 5:18 PM, m@xi  wrote:
> 
> OK Great!
> 
> Thanks a lot for the super ultra fast answer Fabian!
> 
> One intuitive follow-up question.
> 
> So, keyed state is the most preferable one, as it is easy for the Flink
> System to perform the re-distribution in case of change in parallelism, if
> we have a scale-up or scale-down. Also, it is useful to use hash partition a
> stream to different nodes/processors/PU (Processing Units) in general, by
> Keyed State.
> 
> Any other reasons for making Keyed State a must?
> 
> Last but not least, can you elaborate further on the "when the parallelism
> changes" part. I have read this in many topics in this forum, but I cannot
> understand its essence. For example, I define the parallelism of each
> operator in my Flink Job program based on the number of available PU. Maybe
> the essence lies in the fast that the number of PU might change from time to
> time, e.g. add more servers to the cluster where Flink runs and without
> stopping the Flink Job that runs you may perform the rescaling.
> 
> Thanks in advance.
> 
> Best,
> Max
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
> 



Re: CoProcess() VS union.Process()

2018-02-09 Thread Xingcan Cui
Hi Max,

if I understood correctly, instead of joining three streams, you actually 
performed two separate joins, say S1 JOIN S3 and S2 JOIN S3, right? Your plan 
"(S1 UNION S2) JOIN S3” seems to be identical with “(S1 JOIN S3)  UNION (S2 
JOIN S3)” and if that’s what you need, your pipeline should be feasible I think.

However, If you want to join three streams, you may first join S1 with S2 to 
produce S12 with a CoProcessFunction, and then set another CoProcessFunction to 
join S12 with S3.

Hope that helps.

Best,
Xingcan

> On 10 Feb 2018, at 1:06 PM, m@xi  wrote:
> 
> Hello Flinkers,
> 
> I would like to discuss with you about something that bothers me. So, I have
> two streams that I want to join along with a third stream which I want to
> consult its data from time to time and triggers decisions.
> 
> Essentially, this boils down to coProcessing 3 streams together instead of
> 2, which to the best of my knowledge is not possible.
> 
> I thought to append an extra field to the 2 streams I want to join, namely
> say S1, S2 are the streams with tuples t1, t2. After the pumping with the
> extra field which is the stream id (1 or 2) the tuple would be (1, t1) and
> (2, t2) resulting to S1' and S2'.
> 
> Then I will do S1'.union(S2') which gives me a single data stream. Then this
> I may join with the 3rd stream and do the processing with a coProcess
> function.
> 
> Although, whenever I process and element from the united streams I should
> have an if-then-else to check to which stream a tuple belongs and process
> and update S1' and S2' state accordingly.
> 
> Do you think this is a good idea? In terms of efficiency compared with
> having two functions to do this, namely processElement1() and
> processElement2() of the coProcess function in case I only had two streams.
> 
> And if the aforementioned scheme is feasible, then I guess up til now, this
> is the only way of joining more than 2 streams. Am I right?
> 
> Thanks in advance for your help.
> 
> Best,
> Max
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: Joining data in Streaming

2018-01-30 Thread Xingcan Cui
Hi Hayden,

To perform a full-history join on two streams has not been natively
supported now.

As a workaround, you may implement a CoProcessFunction and cache the
records from both sides in states until the stream with fewer data has been
fully cached. Then you could safely clear the cache for the "larger
stream", which should have produced completed results, and perform a nested
loop join (i.e., whenever comes a new record, join it with the fully cached
set).

Hope this helps.

Best,
Xingcan

On Tue, Jan 30, 2018 at 7:42 PM, Marchant, Hayden 
wrote:

> We have a use case where we have 2 data sets - One reasonable large data
> set (a few million entities), and a smaller set of data. We want to do a
> join between these data sets. We will be doing this join after both data
> sets are available.  In the world of batch processing, this is pretty
> straightforward - we'd load both data sets into an application and execute
> a join operator on them through a common key.   Is it possible to do such a
> join using the DataStream API? I would assume that I'd use the connect
> operator, though I'm not sure exactly how I should do the join - do I need
> one 'smaller' set to be completely loaded into state before I start flowing
> the large set? My concern is that if I read both data sets from streaming
> sources, since I can't be guaranteed of the order that the data is loaded,
> I may lose lots of potential joined entities since their pairs might not
> have been read yet.
>
>
> Thanks,
> Hayden Marchant
>
>
>


Re: how does time-windowed join and Over-Window Aggregation implemented in flink SQL?

2017-12-13 Thread Xingcan Cui
Hi Yan Zhou,

as you may have noticed, the SQL level stream join was not built on top of
some join APIs but was implemented with the low-level CoProcessFunction
(see TimeBoundedStreamInnerJoin.scala
).
The pipeline is generated in DataStreamWindowJoin.scala

.

Regarding the over-window aggregation, most of the implementations can be
found in this package
.
The pipeline is generated in DataStreamOverAggregate.scala

.

In summary, they use built-in state tools to cache the rows/intermediate
results and clean/fire them when necessary.

Hope that helps.

Best,
Xingcan

On Thu, Dec 14, 2017 at 7:09 AM, Yan Zhou [FDS Science] 
wrote:

> Hi,
>
>
> I am building a data pipeline with a lot of streaming join and
> over window aggregation. And flink SQL have these feature supported. However,
> there is no similar DataStream APIs provided(maybe there is and I didn't
> find them. please point out if there is). I got confused because I assume
> that the SQL logical plan will be translated into a graph of operators
> or transformations.
>
>
> Could someone explain how these two sql query are  implemented or
> translated into low level code ( operators or transformations)? I am asking
> this because I have implemented these features without using SQL and the
> performance looks good. And I certainly love to migrate to SQL, but I want
> to understand them well first. Any information or hints or links are
> appreciated.
>
>
>
>1. Time-Windowed Join
>
> The DataStream API only provides streaming join within same window. But
> the SQL API (time-windowed join) can join two streams within quite
> different time range. Below is an sample query that listed in official
> doc, and we can see that *Orders* and *Shipments *have 4 hours
> difference. Is it implemented by CoProcessFunction or TwoInputOperator
> which buffers the event for a certain period?
>
>
> SELECT *FROM Orders o, Shipments sWHERE o.id = s.orderId AND
>   o.ordertime BETWEEN s.shiptime - INTERVAL '4' HOUR AND s.shiptime
>
>
> 2. Over-Window Aggregation
> There is no similar feature in DataStream API. How does this get
> implemented? Does it use keyed state to buffer the previous events, and
> pull the records when there is a need? How does sorting get handled?
>
>
> Best
> Yan
>
>
>
>
>


Re: Apache Flink - Question about Global Windows

2017-11-15 Thread Xingcan Cui
Hi Mans,

the "global" here indicates the "horizontal" (count, time, etc.) dimension
instead of the "vertical" (keyBy) dimension, i.e., all the received data
will be placed into a single huge window. Actually, it's an orthogonal
concept with the *KeyBy* operations since both *DataStream* and
*KeyedStream* can define their own global windows. Compared with other
windows (e.g., tumbling or sliding ones), it's more flexible to implement
your own triggers on it.

Hope that helps.

Best,
Xingcan

On Wed, Nov 15, 2017 at 2:12 AM, M Singh  wrote:

> Hi:
>
> I am reading about global windows and the documentation indicates:
>
> 'A *global windows* assigner assigns all elements with the same key to
> the same single *global window'*
>
> From my understanding if we have a keyed stream - then all elements with
> the same key are also assigned to a single window.  I understand that
> global windows never trigger window computation.  But is there any other
> the difference between the two windows (global vs non-global) ?
>
> Thanks
>
> Mans
>
>
>


Re: Generate watermarks per key in a KeyedStream

2017-11-08 Thread Xingcan Cui
Hi Shailesh,

actually, the watermarks are generated per partition, but all of them will
be forcibly aligned to the minimum one during processing. That is decided
by the semantics of watermark and KeyedStream, i.e., the watermarks belong
to a whole stream and a stream is made up of different partitions (one per
key).

If the physical devices work in different time systems due to delay, the
event streams from them should be treated separately.

Hope that helps.

Best,
Xingcan

On Wed, Nov 8, 2017 at 11:48 PM, Shailesh Jain 
wrote:

> Hi,
>
> I'm working on implementing a use case wherein different physical devices
> are sending events, and due to network/power issues, there can be a delay
> in receiving events at Flink source. One of the operators within the flink
> job is the Pattern operator, and there are certain patterns which are time
> sensitive, so I'm using Event time characteristic. But the problem comes
> when there are unpredictable delays in events from a particular device(s),
> which causes those events to be dropped (as I cannot really define a static
> bound to allow for lateness).
>
> Since I'm using a KeyedStream, keyed on the source device ID, is there a
> way to allow each CEP operator instance (one per key) to progress its time
> based on the event time in the corresponding stream partition. Or in other
> words, is there a way to generate watermarks per partition in a KeyedStream?
>
> Thanks,
> Shailesh
>


Re: Handle event time

2017-09-12 Thread Xingcan Cui
Hi AndreaKinn,

Reordering in a stream environment is quite costly. AFAIK, Flink doesn't
provide such functions internally.

Watermark is just one of the approaches to deal with the out-of-order
problem. IMO, it just like a coarse-grained
reordering. The late records should be dropped *manually*. Maybe you can
try changing your function to be applied
on streams with such "coarse-grained" ordering. However, if the fully
ordered stream is necessary in your
application, I'm afraid you must cache and re-emit them in a user-defined
processFunction.

Best,
Xingcan


On Tue, Sep 12, 2017 at 1:48 AM, Eron Wright  wrote:

> As mentioned earlier, the watermark is the basis for reasoning about the
> overall progression of time.   Many operators use the watermark to
> correctly organize records, e.g. into the correct time-based window.
> Within that window the records may still be unordered.   That said, some
> operators do take pains to reorder the records, notably the Flink CEP
> operator to correctly detect temporal patterns.  Basically, the operator
> buffers records until a watermark arrives; all buffered records older than
> the watermark may then be sorted and processed.
>
> It is tempting to write a standalone operator that simply reorders records
> as described, but subsequent repartitioning to downstream operators would
> reintroduce disorder.  Therefore one must ensure that subsequent processing
> is done with a 'forward' partitioning strategy.
>
> Hope this helps!
> Eron
>
> On Fri, Sep 8, 2017 at 3:50 AM, AndreaKinn  wrote:
>
>> Thank you, effectively I developed also a simple custom solution for
>> watermark looking at flink doc but anyway I see unordered printed streams.
>> I have a doubt about flink behaviour: if I understand, flink doesn't
>> perform
>> automatically reordering of records in a stream, so if for instance a
>> record
>> arrives in late what is the behaviour of flink? In the doc it's described
>> that elements arrive after in late are dropped (allowed lateness default
>> value is 0) but also using this watermark emitter:
>>
>> *public class CustomTimestampExtractor implements
>> AssignerWithPeriodicWatermarks> String, Double>>{
>>
>> private static final long serialVersionUID = 5448621759931440489L;
>> private final long maxOutOfOrderness = 0;
>> private long currentMaxTimestamp;
>>
>> @Override
>> public long extractTimestamp(Tuple6> String, String,
>> Double> element, long previousElementTimestamp) {
>> long timestamp = element.f2.getTime();
>> currentMaxTimestamp = Math.max(timestamp,
>> currentMaxTimestamp);
>> return timestamp;
>> }
>>
>> @Override
>> public Watermark getCurrentWatermark() {
>> return new Watermark(currentMaxTimestamp -
>> maxOutOfOrderness);
>> }
>> }*
>>
>> with maxOutOfOrderness = 0 I see unordered record in the stream.
>>
>> What I want to obtain is a fully ordered stream, is there a way to
>> implement
>> it?
>>
>>
>>
>> --
>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.
>> nabble.com/
>>
>
>


Re: Handle event time

2017-09-07 Thread Xingcan Cui
Hi AndreaKinn,

The AscendingTimestampExtractor do not work as you think. It should be
applied for streams where timestamps are
monotonously ascending, naturally.

Flink uses watermark to deal with unordered data. When a watermark *t* is
received, it means there should be no more
records whose timestamps are less than or equal to *t*. However, you must
implement your own watermark generation
policy. There are two basic watermark
assigners: AssignerWithPeriodicWatermarks for generating watermarks
periodically
and  AssignerWithPunctuatedWatermarks for generating watermarks when
encountered certain records.

For more information, please refer to [1] and [2].

Best,
Xingcan

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/event_time.html
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/event_timestamps_watermarks.html

On Fri, Sep 8, 2017 at 4:24 AM, AndreaKinn  wrote:

> Hi,
> I'm getting sensor data from a kafka source and I absolutely need they are
> ordered on time data generation basis. I've implemented a custom
> deserialiser and employed an AscendingTimestampExtractor to handle event
> time.
> Obviously I set EventTime as streamTimeCharacteristics.
> Unfortunately when I print the stream I see there are many records
> unordered. Am I doing something wrong?
> I've attached a prove of that:
>
> *env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> env.enableCheckpointing(CHECKPOINT_TIME);
> env.setParallelism(1);
> env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
>
> Properties properties = new Properties();
> properties.setProperty("bootstrap.servers",
> KAFKA_ADDRESS);
> properties.setProperty("group.id", GROUP_ID);
>
> DataStream Double>> stream
> = env
> .addSource(new
> FlinkKafkaConsumer010<>(TOPIC, new CustomDeserializer(),
> properties))
> .assignTimestampsAndWatermarks(new
> AscendingTimestampExtractor String,
> Double>>() {
>
> @Override
> public long 
> extractAscendingTimestamp(Tuple6 String,
> Date, String, String, Double> element) {
> return element.f2.getTime();
> }
> })
> .keyBy(0);
>
> stream.print()*
>
>  n4.nabble.com/file/t985/Screen_Shot_2017-09-07_at_21.png>
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>


Re: termination of stream#iterate on finite streams

2017-09-04 Thread Xingcan Cui
Hi Peter,

That's a good idea, but may not be applicable with an iteration operator.
The operator can
not determine when to generate the "end-of-stream message" for the feedback
stream.
The provided function (e.g., filter(_ > 0).map(_ - 1)) is stateless and has
no side-effects.

Best,
Xingcan



On Mon, Sep 4, 2017 at 4:40 AM, Peter Ertl <peter.e...@gmx.net> wrote:

> Hi Xingcan!
>
> if a _finite_ stream would, at the end, emit a special, trailing
> "End-Of-Stream Message" that floats downward the operator stream, wouldn't
> this enable us to deterministically end the iteration without needing a
> timeout?
>
> Having an arbitrary timeout that must be longer than any iteration step
> takes seems really awkward.
>
> What you think?
>
> Best regards
> Peter
>
>
> Am 02.09.2017 um 17:16 schrieb Xingcan Cui <xingc...@gmail.com>:
>
> Hi Peter,
>
> I just omitted the filter part. Sorry for that.
>
> Actually, as the javadoc explained, by default a DataStream with iteration
> will never terminate. That's because in a
> stream environment with iteration, the operator will never know whether
> the feedback stream has reached its end
> (though the data source is terminated, *there may
> be unknowable subsequent data*) and that's why it needs a
> timeout value to make the judgement, just like many other function calls
> in network connection. In other words,
> you know the feedback stream will be empty in the future, but the operator
> doesn't. Thus we provide it a maximum
> waiting time for the next record.
>
> Internally, this mechanism is implemented via a blocking queue (the
> related code can be found here
> <https://github.com/apache/flink/blob/12b4185c6c09101b64e12a84c33dc4d28f95cff9/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java#L80>
> ).
>
> Hope everything is considered this time : )
>
> Best,
> Xingcan
>
> On Sat, Sep 2, 2017 at 10:08 PM, Peter Ertl <peter.e...@gmx.net> wrote:
>
>>
>> Am 02.09.2017 um 04:45 schrieb Xingcan Cui <xingc...@gmail.com>:
>>
>> In your codes, all the the long values will subtract 1 and be sent back
>> to the iterate operator, endlessly.
>>
>>
>>
>> Is this true? shouldn't
>>
>>   val iterationResult2 = env.generateSequence(1, 4).iterate(it => {
>> (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'y')) // dump 
>> meaningless 'y' chars just to do anything
>>   })
>>   iterationResult2.print()
>>
>>
>> produce the following _feedback_ streams?
>>
>> initial input to #iterate(): [1 2 3 4]
>>
>> iteration #1 : [1 2 3]
>> iteration #2 : [1 2]
>> iteration #3 : [1]
>> iteration #4 : []  => empty feedback stream => cause termination? (which
>> actually only happens when setting a timeout value)
>>
>> Best regards
>> Peter
>>
>
>
> Am 02.09.2017 um 17:16 schrieb Xingcan Cui <xingc...@gmail.com>:
>
> Hi Peter,
>
> I just omitted the filter part. Sorry for that.
>
> Actually, as the javadoc explained, by default a DataStream with iteration
> will never terminate. That's because in a
> stream environment with iteration, the operator will never know whether
> the feedback stream has reached its end
> (though the data source is terminated, *there may
> be unknowable subsequent data*) and that's why it needs a
> timeout value to make the judgement, just like many other function calls
> in network connection. In other words,
> you know the feedback stream will be empty in the future, but the operator
> doesn't. Thus we provide it a maximum
> waiting time for the next record.
>
> Internally, this mechanism is implemented via a blocking queue (the
> related code can be found here
> <https://github.com/apache/flink/blob/12b4185c6c09101b64e12a84c33dc4d28f95cff9/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java#L80>
> ).
>
> Hope everything is considered this time : )
>
> Best,
> Xingcan
>
> On Sat, Sep 2, 2017 at 10:08 PM, Peter Ertl <peter.e...@gmx.net> wrote:
>
>>
>> Am 02.09.2017 um 04:45 schrieb Xingcan Cui <xingc...@gmail.com>:
>>
>> In your codes, all the the long values will subtract 1 and be sent back
>> to the iterate operator, endlessly.
>>
>>
>>
>> Is this true? shouldn't
>>
>>   val iterationResult2 = env.generateSequence(1, 4).iterate(it => {
>> (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'y')) // dump 
>> meaningless 'y' chars just to do anything
>>   })
>>   iterationResult2.print()
>>
>>
>> produce the following _feedback_ streams?
>>
>> initial input to #iterate(): [1 2 3 4]
>>
>> iteration #1 : [1 2 3]
>> iteration #2 : [1 2]
>> iteration #3 : [1]
>> iteration #4 : []  => empty feedback stream => cause termination? (which
>> actually only happens when setting a timeout value)
>>
>> Best regards
>> Peter
>>
>>
>>
>
>


Re: termination of stream#iterate on finite streams

2017-09-02 Thread Xingcan Cui
Hi Peter,

I just omitted the filter part. Sorry for that.

Actually, as the javadoc explained, by default a DataStream with iteration
will never terminate. That's because in a
stream environment with iteration, the operator will never know whether the
feedback stream has reached its end
(though the data source is terminated, *there may be unknowable subsequent
data*) and that's why it needs a
timeout value to make the judgement, just like many other function calls in
network connection. In other words,
you know the feedback stream will be empty in the future, but the operator
doesn't. Thus we provide it a maximum
waiting time for the next record.

Internally, this mechanism is implemented via a blocking queue (the related
code can be found here
<https://github.com/apache/flink/blob/12b4185c6c09101b64e12a84c33dc4d28f95cff9/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java#L80>
).

Hope everything is considered this time : )

Best,
Xingcan

On Sat, Sep 2, 2017 at 10:08 PM, Peter Ertl <peter.e...@gmx.net> wrote:

>
> Am 02.09.2017 um 04:45 schrieb Xingcan Cui <xingc...@gmail.com>:
>
> In your codes, all the the long values will subtract 1 and be sent back to
> the iterate operator, endlessly.
>
>
>
> Is this true? shouldn't
>
>   val iterationResult2 = env.generateSequence(1, 4).iterate(it => {
> (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'y')) // dump 
> meaningless 'y' chars just to do anything
>   })
>   iterationResult2.print()
>
>
> produce the following _feedback_ streams?
>
> initial input to #iterate(): [1 2 3 4]
>
> iteration #1 : [1 2 3]
> iteration #2 : [1 2]
> iteration #3 : [1]
> iteration #4 : []  => empty feedback stream => cause termination? (which
> actually only happens when setting a timeout value)
>
> Best regards
> Peter
>
>
>


Re: termination of stream#iterate on finite streams

2017-09-01 Thread Xingcan Cui
Hi Peter,

Let me try to explain this.

As you shown in the examples, the iterate method takes a function, which
"split" the initial stream
into two separate streams, i.e., initialStream => (stream1, stream2). The
stream2 works as the output
stream, whose results will be emitted to the successor operators (PrintSink
in your example), while
the stream1 works as a feedback stream, whose results will be resent to the
iterate operator.

In your codes, all the the long values will subtract 1 and be sent back to
the iterate operator, endlessly.
Try replacing your first map function to (_ + 1) and you'll see the
infinite results. For more information,
you can refer to this

or
read the javadoc.

Hope that helps.

Best,
Xingcan

On Fri, Sep 1, 2017 at 5:29 PM, Peter Ertl  wrote:

> Hi folks,
>
> I was doing some experiments with DataStream#iterate and what felt strange
> to me is the fact that #iterate() does not terminate on it's own when
> consuming a _finite_ stream.
>
> I think this is awkward und unexpected. Only thing that "helped" was
> setting an arbitrary and meaningless timeout on iterate.
>
> Imho this should not be necessary (maybe sent an internal "poison message"
> downward the iteration stream to signal shutdown of the streaming task?)
>
> example:
>
> // ---
>
> // does terminate by introducing a meaningless timeout
> // ---
> val iterationResult1 = env.generateSequence(1, 4).iterate(it => {
>   (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'x')) // dump 
> meaningless 'x' chars just to do anything
> }, 1000, keepPartitioning = false)
>
> iterationResult1.print()
>
> // ---
> // does NEVER terminate
> // ---
> val iterationResult2 = env.generateSequence(1, 4).iterate(it => {
>   (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'y')) // dump 
> meaningless 'y' chars just to do anything
> })
> iterationResult2.print()
>
>
> Can someone elaborate on this - should I file a ticket?
>
> Regards
> Peter
>


Re: Question about the custom partitioner

2017-06-16 Thread Xingcan Cui
Hi Aljoscha,

Thanks for your explanation. I'll try what you suggests.

Best,
Xingcan

On Fri, Jun 16, 2017 at 5:19 PM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> Hi,
>
> I’m afraid that’s not possible out-of-box with the current APIs. I
> actually don’t know why the user-facing Partitioner only allows returning
> one target because the internal StreamPartitioner (which extends
> ChannelSelector) allows returning multiple target partitions.
>
> You can hack around the API by manually creating your own
> StreamPartitioner and applying it to the DataStream as
> DataStream.partitionCustom() and DataStream.setConnectionType() (the first
> calls the latter) do.
>
> Best,
> Aljoscha
>
> On 14. Jun 2017, at 09:09, Xingcan Cui <xingc...@gmail.com> wrote:
>
> Hi all,
>
> I want to duplicate records to multiple downstream tasks (not all of them
> thus the
> Broadcasting should not work) in stream environment.
> However, it seems that the current custom partitioner can return only one
> partition index.
> Why this restriction exists or do I miss something?
>
> Thanks,
> Xingcan
>
>
>


Question about the custom partitioner

2017-06-14 Thread Xingcan Cui
Hi all,

I want to duplicate records to multiple downstream tasks (not all of them
thus the
Broadcasting should not work) in stream environment.
However, it seems that the current custom partitioner can return only one
partition index.
Why this restriction exists or do I miss something?

Thanks,
Xingcan


Re: Cross operation on two huge datasets

2017-03-02 Thread Xingcan Cui
Hi Gwen,

in my view, indexing and searching are two isolated processes and they
should be separated. Maybe you should take the RTree structure as a new
dataset (fortunately it's static, right?) and store it to a distributed
cache or DFS that can be accessed by operators from any nodes. That will
make the mapping from index partition to operator consistent (regardless of
the locality problem).

Besides, you can make a "weak" index first, e.g., partitioning the points
and shapes to "left" and "right", and in that way you do not need to
broadcast the points to all index nodes (only left to left and right to
right).

Best,
Xingcan

On Fri, Mar 3, 2017 at 1:49 AM, Jain, Ankit  wrote:

> If I understood correctly, you have just implemented flink broadcasting by
> hand J.
>
>
>
> You are still sending out the whole points dataset to each shape partition
> – right?
>
>
>
> I think this could be optimized by using a keyBy or custom partition which
> is common across shapes & points – that should make sure a given point
> always go to same shape node.
>
>
>
> I didn’t understand why Till Rohrmann said “you don’t know where Flink
> will schedule the new operator instance” – new operators are created when
> flink job is started – right? So, there should be no more new operators
> once the job is running and if you use consistent hash partitioning, same
> input should always end at same task manager node.
>
>
>
> You could store the output as Flink State – that would be more fault
> tolerant but storing it as cache in JVM should work too.
>
>
>
> Is this a batch job or streaming?
>
>
>
> Between I am a newbee to Flink, still only learning – so take my
> suggestions with caution J
>
>
>
> Thanks
>
> Ankit
>
>
>
> *From: *Gwenhael Pasquiers 
> *Date: *Thursday, March 2, 2017 at 7:28 AM
> *To: *"user@flink.apache.org" 
> *Subject: *RE: Cross operation on two huge datasets
>
>
>
> I made it so that I don’t care where the next operator will be scheduled.
>
>
>
> I configured taskslots = 1 and parallelism = yarnnodes so that :
>
> · Each node contains 1/N th  of the shapes (simple repartition()
> of the shapes dataset).
>
> · The points will be cloned so that each partition of the points
> dataset will contain the whole original dataset
>
> o   Flatmap creates “#parallelism” clones of each entry
>
> o   Custom partitioning so that each clone of each entry is sent to a
> different partition
>
>
>
> That way, whatever flink choses to do, each point will be compared to each
> shape. That’s why I think that in my case I can keep it in the JVM without
> issues. I’d prefer to avoid ser/deser-ing that structure.
>
>
>
> I tried to use join (all items have same key) but it looks like flink
> tried to serialize the RTree anyway and it went in StackOverflowError
> (locally with only 1 parititon, not even on yarn).
>
>
>
>
>
> *From:* Till Rohrmann [mailto:trohrm...@apache.org]
> *Sent:* jeudi 2 mars 2017 15:40
> *To:* user@flink.apache.org
> *Subject:* Re: Cross operation on two huge datasets
>
>
>
> Yes you’re right about the “split” and broadcasting.
>
> Storing it in the JVM is not a good approach, since you don’t know where
> Flink will schedule the new operator instance. It might be the case that an
> operator responsible for another partition gets scheduled to this JVM and
> then it has the wrong RTree information. Maybe you can model the set of
> RTrees as a DataSet[(PartitionKey, RTree)] and then join with the
> partitioned point data set.
>
> Cheers,
> Till
>
> On Thu, Mar 2, 2017 at 3:29 PM, Gwenhael Pasquiers
> [gwenhael.pasqui...@ericsson.com](mailto:gwenhael.pasqui...@ericsson.com)
> 
> wrote:
>
> The best for me would be to make it “persist” inside of the JVM heap in
> some map since I don’t even know if the structure is Serializable (I could
> try). But I understand.
>
>
>
> As for broadcasting, wouldn’t broadcasting the variable cancel the efforts
> I did to “split” the dataset parsing over the nodes ?
>
>
>
>
>
> *From:* Till Rohrmann [mailto:trohrm...@apache.org]
> *Sent:* jeudi 2 mars 2017 14:42
>
>
> *To:* user@flink.apache.org
> *Subject:* Re: Cross operation on two huge datasets
>
>
>
> Hi Gwenhael,
>
> if you want to persist operator state, then you would have to persist it
> (e.g. writing to a shared directory or emitting the model and using one of
> Flink’s sinks) and when creating the new operators you have to reread it
> from there (usually in the open method or from a Flink source as part of a
> broadcasted data set).
>
> If you want to give a data set to all instances of an operator, then you
> should broadcast this data set. You can do something like
>
> DataSet input = ...
>
> DataSet broadcastSet = ...
>
>
>
> input.flatMap(new RichFlatMapFunction() {
>
> List broadcastSet;
>
>
>
> @Override
>

Re: Cross operation on two huge datasets

2017-02-23 Thread Xingcan Cui
Hi,

@Gwen, sorry that I missed the cross function and showed you the wrong way.
@Fabian's answers are what I mean.

Considering that the cross function is so expensive, can we find a way to
restrict the broadcast. That is, if the groupBy function is a many-to-one
mapping, the cross function is an all-to-all mapping, is it possible to
define a many-to-many mapping function that broadcasts shapes to more than
one (but not all) index area?

Best,
Xingcan

On Thu, Feb 23, 2017 at 7:07 PM, Fabian Hueske <fhue...@gmail.com> wrote:

> Hi Gwen,
>
> sorry I didn't read your answer, I was still writing mine when you sent
> yours ;-)
>
> Regarding your strategy, this is basically what Cross does:
> It keeps on input partitioned and broadcasts (replicates) the other one.
> On each partition, it combines the records of the partition of the first
> input with all records of the replicated second input.
> I think this is what you describe as well, right?
>
> As I wrote before, this approach is quadratic and does not scale to large
> data sizes.
> I would recommend to look into spatial partitioning. Otherwise, I do not
> see how the problem can be solved for large data sets.
>
> Best, Fabian
>
> 2017-02-23 12:00 GMT+01:00 Fabian Hueske <fhue...@gmail.com>:
>
>> Hi,
>>
>> Flink's batch DataSet API does already support (manual) theta-joins via
>> the CrossFunction. It combines each pair of records of two input data sets.
>> This is done by broadcasting (and hence replicating) one of the inputs.
>> @Xingcan, so I think what you describe is already there.
>> However, as I said before, it is often prohibitively expensive to
>> compute. When you are at a point, where a MapFunction with broadcast set is
>> not longer sufficient (the smaller data set does not fit into memory),
>> you're problem is often too big too compute.
>> The complexity of a Cartesian product (Cross) is simply quadratic.
>>
>> Regarding the specific problem of joining spatial shapes and points, I
>> would go with a spatial partitioning as follows:
>> - Partition the space and compute for each shape into which partitions it
>> belongs (could be more than one).
>> - Do the same for the points (will be exactly one).
>> - Do a 1-n join on the partition ids + an additional check if the point
>> is actually in the shape.
>>
>> The challenge here is to have partitions of similar size.
>>
>> Cheers, Fabian
>>
>> 2017-02-23 5:59 GMT+01:00 Xingcan Cui <xingc...@gmail.com>:
>>
>>> Hi all,
>>>
>>> @Gwen From the database's point of view, the only way to avoid Cartesian
>>> product in join is to use index, which exhibits as key grouping in Flink.
>>> However, it only supports many-to-one mapping now, i.e., a shape or a point
>>> can only be distributed to a single group. Only points and shapes belonging
>>> to the same group can be joined and that could reduce the inherent pair
>>> comparisons (compared with a Cartesian product). It's perfectly
>>> suitable for equi-join.
>>>
>>> @Fabian I saw this thread when I was just considering about theta-join
>>> (which will eventually be supported) in Flink. Since it's impossible to
>>> group (index) a dataset for an arbitrary theta-join, I think we may need
>>> some duplication mechanism here. For example, split a dataset into n parts
>>> and send the other dataset to all of these parts. This could be more useful
>>> in stream join. BTW, it seems that I've seen another thread discussing
>>> about this, but can not find it now. What do you think?
>>>
>>> Best,
>>> Xingcan
>>>
>>> On Thu, Feb 23, 2017 at 6:41 AM, Fabian Hueske <fhue...@gmail.com>
>>> wrote:
>>>
>>>> Hi Gwen,
>>>>
>>>> Flink usually performs a block nested loop join to cross two data sets.
>>>> This algorithm spills one input to disk and streams the other input.
>>>> For each input it fills a memory buffer and to perform the cross. Then the
>>>> buffer of the spilled input is refilled with spilled records and records
>>>> are again crossed. This is done until one iteration over the spill records
>>>> is done. Then the other buffer of the streamed input is filled with the
>>>> next records.
>>>>
>>>> You should be aware that cross is a super expensive operation,
>>>> especially if you evaluate a complex condition for each pair of records. So
>>>> cross can be easily too expensive to compute.
>>>> For such use cases it is usually better to apply a co

Re: Cross operation on two huge datasets

2017-02-22 Thread Xingcan Cui
Hi all,

@Gwen From the database's point of view, the only way to avoid Cartesian
product in join is to use index, which exhibits as key grouping in Flink.
However, it only supports many-to-one mapping now, i.e., a shape or a point
can only be distributed to a single group. Only points and shapes belonging
to the same group can be joined and that could reduce the inherent pair
comparisons (compared with a Cartesian product). It's perfectly suitable
for equi-join.

@Fabian I saw this thread when I was just considering about theta-join
(which will eventually be supported) in Flink. Since it's impossible to
group (index) a dataset for an arbitrary theta-join, I think we may need
some duplication mechanism here. For example, split a dataset into n parts
and send the other dataset to all of these parts. This could be more useful
in stream join. BTW, it seems that I've seen another thread discussing
about this, but can not find it now. What do you think?

Best,
Xingcan

On Thu, Feb 23, 2017 at 6:41 AM, Fabian Hueske  wrote:

> Hi Gwen,
>
> Flink usually performs a block nested loop join to cross two data sets.
> This algorithm spills one input to disk and streams the other input. For
> each input it fills a memory buffer and to perform the cross. Then the
> buffer of the spilled input is refilled with spilled records and records
> are again crossed. This is done until one iteration over the spill records
> is done. Then the other buffer of the streamed input is filled with the
> next records.
>
> You should be aware that cross is a super expensive operation, especially
> if you evaluate a complex condition for each pair of records. So cross can
> be easily too expensive to compute.
> For such use cases it is usually better to apply a coarse-grained spatial
> partitioning and do a key-based join on the partitions. Within each
> partition you'd perform a cross.
>
> Best, Fabian
>
>
> 2017-02-21 18:34 GMT+01:00 Gwenhael Pasquiers <
> gwenhael.pasqui...@ericsson.com>:
>
>> Hi,
>>
>>
>>
>> I need (or at least I think I do) to do a cross operation between two
>> huge datasets. One dataset is a list of points. The other one is a list of
>> shapes (areas).
>>
>>
>>
>> I want to know, for each point, the areas (they might overlap so a point
>> can be in multiple areas) it belongs to so I thought I’d “cross” my points
>> and areas since I need to test each point against each area.
>>
>>
>>
>> I tried it and my job stucks seems to work for some seconds then, at some
>> point, it stucks.
>>
>>
>>
>> I’m wondering if Flink, for cross operations, tries to load one of the
>> two datasets into RAM or if it’s able to split the job in multiple
>> iterations (even if it means reading one of the two datasets multiple
>> times).
>>
>>
>>
>> Or maybe I’m going at it the wrong way, or missing some parameters, feel
>> free to correct me J
>>
>>
>>
>> I’m using flink 1.0.1.
>>
>>
>>
>> Thanks in advance
>>
>>
>>
>> Gwen’
>>
>
>


Re: Questions about the V-C Iteration in Gelly

2017-02-13 Thread Xingcan Cui
Hi Greg,

I also found that in VertexCentricIteration.java, the message set is taken
as the workset while the vertex set is taken as the delta for solution set.
By doing like that, the setNewVertex method will not actually active a
vertex. In other words, if no message is generated (the workset is empty)
the "pact.runtime.workset-empty-aggregator" will judge convergence of the
delta iteration and then the iteration just terminates. Is this a bug?

Best,
Xingcan


On Mon, Feb 13, 2017 at 5:24 PM, Xingcan Cui <xingc...@gmail.com> wrote:

> Hi Greg,
>
> Thanks for your attention.
>
> It takes me a little time to read the old PR on FLINK-1885. Though
> the VertexCentricIteration, as well as its related classes, has been
> refactored, I understand what Markus want to achieve.
>
> I am not sure if using a bulk iteration instead of a delta one could
> eliminate the "out of memory" problem.  Except for that, I think the "auto
> update" has nothing to do with the bulk mode. Considering the compatible
> guarantee, here is my suggestions to improve gelly's iteration API:
>
> 1) Add an "autoHalt" flag to the ComputeFunction.
>
> 2) If the flag is set true (default), apply the current mechanism .
>
> 3) If the flag is set false, call out.collect() to update the vertex value
> whether the setNewVertexValue() method is called or not, unless the user
> explicitly call a (new added) voteToHalt() method in the ComputeFunction.
>
> By adding these, users can decide when to halt a vertex themselves. What
> do you think?
>
> As for the "update edge values during vertex iterations" problem, I think
> it needs a redesign for the gelly framework (Maybe merge the vertices and
> edges into a single data set? Or just change the iterations'
>  implementation? I can't think it clearly now.), so that's it for now.
> Besides, I don't think there will be someone who really would love to write
> a graph algorithm with Flink native operators and that's why gelly is
> designed, isn't it?
>
> Best,
> Xingcan
>
> On Fri, Feb 10, 2017 at 10:31 PM, Greg Hogan <c...@greghogan.com> wrote:
>
>> Hi Xingcan,
>>
>> FLINK-1885 looked into adding a bulk mode to Gelly's iterative models.
>>
>> As an alternative you could implement your algorithm with Flink operators
>> and a bulk iteration. Most of the Gelly library is written with native
>> operators.
>>
>> Greg
>>
>> On Fri, Feb 10, 2017 at 5:02 AM, Xingcan Cui <xingc...@gmail.com> wrote:
>>
>>> Hi Vasia,
>>>
>>> b) As I said, when some vertices finished their work in current phase,
>>> they have nothing to do (no value updates, no message received, just like
>>> slept) but to wait for other vertices that have not finished (the current
>>> phase) yet. After that in the next phase, all the vertices should go back
>>> to work again and if there are some vertices become inactive in last phase,
>>> it could be hard to reactive them again by message since we even don't know
>>> which vertices to send to. The only solution is to keep all vertices
>>> active, whether by updating vertices values in each super step or sending
>>> heartbeat messages to vertices themselves (which will bring a lot of extra
>>> work to the MessageCombiner).
>>>
>>> c) I know it's not elegant or even an awful idea to store the edge info
>>> into vertex values. However, we can not change edge values or maintain
>>> states (even a pick or unpick mark) in edges during a vertex-centric
>>> iteration. Then what can we do if an algorithm really need that?
>>>
>>> Thanks for your patience.
>>>
>>> Best,
>>> Xingcan
>>>
>>> On Fri, Feb 10, 2017 at 4:50 PM, Vasiliki Kalavri <
>>> vasilikikala...@gmail.com> wrote:
>>>
>>>> Hi Xingcan,
>>>>
>>>> On 9 February 2017 at 18:16, Xingcan Cui <xingc...@gmail.com> wrote:
>>>>
>>>>> Hi Vasia,
>>>>>
>>>>> thanks for your reply. It helped a lot and I got some new ideas.
>>>>>
>>>>> a) As you said, I did use the getPreviousIterationAggregate() method
>>>>> in preSuperstep() of the next superstep.
>>>>> However, if the (only?) global (aggregate) results can not be guaranteed
>>>>> to be consistency,  what should we
>>>>> do with the postSuperstep() method?
>>>>>
>>>>
>>>> ​The postSuperstep() method is analogous to the close() method in a
>>>> RichFunction, which is typically used

Re: Questions about the V-C Iteration in Gelly

2017-02-13 Thread Xingcan Cui
Hi Greg,

Thanks for your attention.

It takes me a little time to read the old PR on FLINK-1885. Though
the VertexCentricIteration, as well as its related classes, has been
refactored, I understand what Markus want to achieve.

I am not sure if using a bulk iteration instead of a delta one could
eliminate the "out of memory" problem.  Except for that, I think the "auto
update" has nothing to do with the bulk mode. Considering the compatible
guarantee, here is my suggestions to improve gelly's iteration API:

1) Add an "autoHalt" flag to the ComputeFunction.

2) If the flag is set true (default), apply the current mechanism .

3) If the flag is set false, call out.collect() to update the vertex value
whether the setNewVertexValue() method is called or not, unless the user
explicitly call a (new added) voteToHalt() method in the ComputeFunction.

By adding these, users can decide when to halt a vertex themselves. What do
you think?

As for the "update edge values during vertex iterations" problem, I think
it needs a redesign for the gelly framework (Maybe merge the vertices and
edges into a single data set? Or just change the iterations'
 implementation? I can't think it clearly now.), so that's it for now.
Besides, I don't think there will be someone who really would love to write
a graph algorithm with Flink native operators and that's why gelly is
designed, isn't it?

Best,
Xingcan

On Fri, Feb 10, 2017 at 10:31 PM, Greg Hogan <c...@greghogan.com> wrote:

> Hi Xingcan,
>
> FLINK-1885 looked into adding a bulk mode to Gelly's iterative models.
>
> As an alternative you could implement your algorithm with Flink operators
> and a bulk iteration. Most of the Gelly library is written with native
> operators.
>
> Greg
>
> On Fri, Feb 10, 2017 at 5:02 AM, Xingcan Cui <xingc...@gmail.com> wrote:
>
>> Hi Vasia,
>>
>> b) As I said, when some vertices finished their work in current phase,
>> they have nothing to do (no value updates, no message received, just like
>> slept) but to wait for other vertices that have not finished (the current
>> phase) yet. After that in the next phase, all the vertices should go back
>> to work again and if there are some vertices become inactive in last phase,
>> it could be hard to reactive them again by message since we even don't know
>> which vertices to send to. The only solution is to keep all vertices
>> active, whether by updating vertices values in each super step or sending
>> heartbeat messages to vertices themselves (which will bring a lot of extra
>> work to the MessageCombiner).
>>
>> c) I know it's not elegant or even an awful idea to store the edge info
>> into vertex values. However, we can not change edge values or maintain
>> states (even a pick or unpick mark) in edges during a vertex-centric
>> iteration. Then what can we do if an algorithm really need that?
>>
>> Thanks for your patience.
>>
>> Best,
>> Xingcan
>>
>> On Fri, Feb 10, 2017 at 4:50 PM, Vasiliki Kalavri <
>> vasilikikala...@gmail.com> wrote:
>>
>>> Hi Xingcan,
>>>
>>> On 9 February 2017 at 18:16, Xingcan Cui <xingc...@gmail.com> wrote:
>>>
>>>> Hi Vasia,
>>>>
>>>> thanks for your reply. It helped a lot and I got some new ideas.
>>>>
>>>> a) As you said, I did use the getPreviousIterationAggregate() method
>>>> in preSuperstep() of the next superstep.
>>>> However, if the (only?) global (aggregate) results can not be guaranteed
>>>> to be consistency,  what should we
>>>> do with the postSuperstep() method?
>>>>
>>>
>>> ​The postSuperstep() method is analogous to the close() method in a
>>> RichFunction, which is typically used for cleanup.​
>>>
>>>
>>>
>>>>
>>>> b) Though we can active vertices by update method or messages, IMO, it
>>>> may be more proper for users
>>>> themselves to decide when to halt a vertex's iteration. Considering a
>>>> complex algorithm that contains different
>>>> phases inside a vertex-centric iteration. Before moving to the next
>>>> phase (that should be synchronized),
>>>> there may be some vertices that already finished their work in current
>>>> phase and they just wait for others.
>>>> Users may choose the finished vertices to idle until the next phase,
>>>> but rather than to halt them.
>>>> Can we consider adding the voteToHalt() method and some internal
>>>> variables to the Vertex/Edge class
>>>> (or just create an "advanced" 

Re: Questions about the V-C Iteration in Gelly

2017-02-10 Thread Xingcan Cui
Hi Vasia,

b) As I said, when some vertices finished their work in current phase, they
have nothing to do (no value updates, no message received, just like slept)
but to wait for other vertices that have not finished (the current phase)
yet. After that in the next phase, all the vertices should go back to work
again and if there are some vertices become inactive in last phase, it
could be hard to reactive them again by message since we even don't know
which vertices to send to. The only solution is to keep all vertices
active, whether by updating vertices values in each super step or sending
heartbeat messages to vertices themselves (which will bring a lot of extra
work to the MessageCombiner).

c) I know it's not elegant or even an awful idea to store the edge info
into vertex values. However, we can not change edge values or maintain
states (even a pick or unpick mark) in edges during a vertex-centric
iteration. Then what can we do if an algorithm really need that?

Thanks for your patience.

Best,
Xingcan

On Fri, Feb 10, 2017 at 4:50 PM, Vasiliki Kalavri <vasilikikala...@gmail.com
> wrote:

> Hi Xingcan,
>
> On 9 February 2017 at 18:16, Xingcan Cui <xingc...@gmail.com> wrote:
>
>> Hi Vasia,
>>
>> thanks for your reply. It helped a lot and I got some new ideas.
>>
>> a) As you said, I did use the getPreviousIterationAggregate() method in
>> preSuperstep() of the next superstep.
>> However, if the (only?) global (aggregate) results can not be guaranteed
>> to be consistency,  what should we
>> do with the postSuperstep() method?
>>
>
> ​The postSuperstep() method is analogous to the close() method in a
> RichFunction, which is typically used for cleanup.​
>
>
>
>>
>> b) Though we can active vertices by update method or messages, IMO, it
>> may be more proper for users
>> themselves to decide when to halt a vertex's iteration. Considering a
>> complex algorithm that contains different
>> phases inside a vertex-centric iteration. Before moving to the next phase
>> (that should be synchronized),
>> there may be some vertices that already finished their work in current
>> phase and they just wait for others.
>> Users may choose the finished vertices to idle until the next phase, but
>> rather than to halt them.
>> Can we consider adding the voteToHalt() method and some internal
>> variables to the Vertex/Edge class
>> (or just create an "advanced" version of them) to make the halting more
>> controllable?
>>
>
>
> ​I suppose adding a voteToHalt() method is possible, but I'm not sure I
> see how that would make halting more controllable. If a vertex hasn't
> changed value or hasn't received a message, it has no work to do in the
> next iteration, so why keep it active? If in a later superstep, a
> previously inactive vertex receives a message, it will become active again.
> ​Is this what you're looking for or am I missing something?
>
>
>
>>
>> c) Sorry that I didn't make it clear before. Here the initialization
>> means a "global" one that executes once
>> before the iteration. For example, users may want to initialize the
>> vertices' values by their adjacent edges
>> before the iteration starts. Maybe we can add an extra coGroupFunction
>> to the configuration parameters
>> and apply it before the iteration?
>>
>
>
> ​You can initialize the graph by using any Gelly transformation methods
> before starting the iteration, e.g. mapVertices, mapEdges, reduceOnEdges,
> etc.
> Btw, a vertex can iterate over its edges inside the ComputeFunction using
> the getEdges() method. Initializing the vertex values with neighboring
> edges might not be a good idea if you have vertices with high degrees.​
>
>
> ​Cheers,
> -Vasia.​
>
>
>
>>
>> What do you think?
>>
>> (BTW, I started a PR on FLINK-1526(MST Lib). Considering the
>> complexity, the example is not
>> provided.)
>>
>> Really appreciate for all your help.
>>
>> Best,
>> Xingcan
>>
>> On Thu, Feb 9, 2017 at 5:36 PM, Vasiliki Kalavri <
>> vasilikikala...@gmail.com> wrote:
>>
>>> Hi Xingcan,
>>>
>>> On 7 February 2017 at 10:10, Xingcan Cui <xingc...@gmail.com> wrote:
>>>
>>>> Hi all,
>>>>
>>>> I got some question about the vertex-centric iteration in Gelly.
>>>>
>>>> a)  It seems the postSuperstep method is called before the superstep
>>>> barrier (I got different aggregate values of the same superstep in this
>>>> method). Is this a bug? Or the design is just like that?
>>>>
>>>

Re: Questions about the V-C Iteration in Gelly

2017-02-09 Thread Xingcan Cui
Hi Vasia,

thanks for your reply. It helped a lot and I got some new ideas.

a) As you said, I did use the getPreviousIterationAggregate() method in
preSuperstep() of the next superstep.
However, if the (only?) global (aggregate) results can not be guaranteed to
be consistency,  what should we
do with the postSuperstep() method?

b) Though we can active vertices by update method or messages, IMO, it may
be more proper for users
themselves to decide when to halt a vertex's iteration. Considering a
complex algorithm that contains different
phases inside a vertex-centric iteration. Before moving to the next phase
(that should be synchronized),
there may be some vertices that already finished their work in current
phase and they just wait for others.
Users may choose the finished vertices to idle until the next phase, but
rather than to halt them.
Can we consider adding the voteToHalt() method and some internal variables
to the Vertex/Edge class
(or just create an "advanced" version of them) to make the halting more
controllable?

c) Sorry that I didn't make it clear before. Here the initialization means
a "global" one that executes once
before the iteration. For example, users may want to initialize the
vertices' values by their adjacent edges
before the iteration starts. Maybe we can add an extra coGroupFunction to
the configuration parameters
and apply it before the iteration?

What do you think?

(BTW, I started a PR on FLINK-1526(MST Lib). Considering the
complexity, the example is not
provided.)

Really appreciate for all your help.

Best,
Xingcan

On Thu, Feb 9, 2017 at 5:36 PM, Vasiliki Kalavri <vasilikikala...@gmail.com>
wrote:

> Hi Xingcan,
>
> On 7 February 2017 at 10:10, Xingcan Cui <xingc...@gmail.com> wrote:
>
>> Hi all,
>>
>> I got some question about the vertex-centric iteration in Gelly.
>>
>> a)  It seems the postSuperstep method is called before the superstep
>> barrier (I got different aggregate values of the same superstep in this
>> method). Is this a bug? Or the design is just like that?
>>
>
> ​The postSuperstep() method is called inside the close() method of a
> RichCoGroupFunction that wraps the ComputeFunction. The close() method It
> is called after the last call to the coGroup() after each iteration
> superstep.
> The aggregate values are not guaranteed to be consistent during the same
> superstep when they are computed. To retrieve an aggregate value for
> superstep i, you should use the getPreviousIterationAggregate() method in
> superstep i+1.
>
>
>>
>> b) There is not setHalt method for vertices. When no message received, a
>> vertex just quit the next iteration. Should I manually send messages (like
>> heartbeat) to keep the vertices active?
>>
>
> ​That's because vertex halting is implicitly controlled by the underlying
> delta iterations of Flink. ​A vertex will remain active as long as it
> receives a message or it updates its value, otherwise it will become
> inactive. The documentation on Gelly iterations [1] and DataSet iterations
> [2] might be helpful.
>
>
>
>>
>> c) I think we may need an initialization method in the ComputeFunction.
>>
>
>
> ​There exists a preSuperstep() method for initialization. This one will be
> executed once per superstep before the compute function is invoked for
> every vertex. Would this work for you?
>
>
>
>>
>> Any opinions? Thanks.
>>
>> Best,
>> Xingcan
>>
>>
>>
> ​I hope this helps,
> -Vasia.​
>
>
> ​[1]: https://ci.apache.org/projects/flink/flink-docs-
> release-1.2/dev/libs/gelly/iterative_graph_processing.
> html#vertex-centric-iterations
> [2]: https://ci.apache.org/projects/flink/flink-docs-
> release-1.2/dev/batch/iterations.html​
>
>


Questions about the V-C Iteration in Gelly

2017-02-07 Thread Xingcan Cui
Hi all,

I got some question about the vertex-centric iteration in Gelly.

a)  It seems the postSuperstep method is called before the superstep
barrier (I got different aggregate values of the same superstep in this
method). Is this a bug? Or the design is just like that?

b) There is not setHalt method  for vertices. When no message received, a
vertex just quit the next iteration. Should I manually send messages (like
heartbeat) to keep the vertices active?

c) I think we may need an initialization method in the ComputeFunction.

Any opinions? Thanks.

Best,
Xingcan