Re: PatternFlatSelectAdapter - Serialization issue after 1.8 upgrade

2019-04-30 Thread Oytun Tez
Hi all,

Making the tag a static element worked out, thank you!

---
Oytun Tez

*M O T A W O R D*
The World's Fastest Human Translation Platform.
oy...@motaword.com — www.motaword.com


On Tue, Apr 23, 2019 at 10:37 AM Oytun Tez  wrote:

> Thank you Guowei and Dawid! I am trying your suggestions today and will
> report back.
>
> - I assume the cleaning operation should be done only once because of the
> upgrade, or should I run every time the application is up?
> - `static` sounds a very simple fix to get rid of this. Any drawbacks here?
>
>
>
>
> ---
> Oytun Tez
>
> *M O T A W O R D*
> The World's Fastest Human Translation Platform.
> oy...@motaword.com — www.motaword.com
>
>
> On Tue, Apr 23, 2019 at 2:56 AM Dawid Wysakowicz 
> wrote:
>
>> Hi Oytun,
>>
>> I think there is a regression introduced in 1.8 how we handle output
>> tags. The problem is we do not call ClosureCleaner on OutputTag.
>>
>> There are two options how you can workaround this issue:
>>
>> 1. Declare the OutputTag static
>>
>> 2. Clean the closure explicitly as Guowei suggested:
>> StreamExecutionEnvironment.clean(pendingProjectsTag)
>>
>> I also opened a jira issue to fix this (FLINK-12297[1])
>>
>> Best,
>>
>> Dawid
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-12297
>> On 22/04/2019 03:06, Guowei Ma wrote:
>>
>> I think you could try
>> StreamExecutionEnvironment.clean(pendingProjectsTag).
>>
>>
>> Oytun Tez 于2019年4月19日 周五下午9:58写道:
>>
>>> Forgot to answer one of your points: the parent class compiles well
>>> without this CEP selector (with timeout signature)...
>>>
>>>
>>> ---
>>> Oytun Tez
>>>
>>> *M O T A W O R D*
>>> The World's Fastest Human Translation Platform.
>>> oy...@motaword.com — www.motaword.com
>>>
>>>
>>> On Fri, Apr 19, 2019 at 9:40 AM Oytun Tez  wrote:
>>>
 Hey JingsongLee!

 Here are some findings...

- flatSelect *without timeout* works normally:
patternStream.flatSelect(PatternFlatSelectFunction), this compiles
well.
- Converted the both timeout and select selectors to an *inner
class* (not static), yielded the same results, doesn't compile.
- flatSelect *without* timeout, but with an inner class for
PatternFlatSelectFunction, it compiles (same as first bullet).
- Tried both of these selectors with *empty* body. Just a skeleton
class. Doesn't compile either. Empty body example is in my first email.
- Tried making both selectors *static public inner* classes,
doesn't compile either.
- Extracted both timeout and flat selectors to their own *independent
classes* in separate files. Doesn't compile.
- I am putting the *error stack* below.
- Without the timeout selector in any class or lambda shape, with
empty or full body, flatSelect compiles well.

 Would these findings help? Any ideas?

 Here is an error stack:

 09:36:51,925 ERROR
 com.motaword.ipm.kernel.error.controller.ExceptionHandler -
 org.apache.flink.api.common.InvalidProgramException: The implementation
 of the PatternFlatSelectAdapter is not serializable. The object probably
 contains or references non serializable fields.
 at
 org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99)
 at
 org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1558)
 at
 org.apache.flink.cep.PatternStreamBuilder.clean(PatternStreamBuilder.java:86)
 at org.apache.flink.cep.PatternStream.process(PatternStream.java:114)
 at org.apache.flink.cep.PatternStream.flatSelect(PatternStream.java:451)
 at org.apache.flink.cep.PatternStream.flatSelect(PatternStream.java:408)
 at
 com.motaword.ipm.business.invitation.controller.PendingProjects.getPending(PendingProjects.java:89)
 at
 com.motaword.ipm.business.invitation.controller.PendingProjects.run(PendingProjects.java:45)
 at
 com.motaword.ipm.business.invitation.boundary.InvitationJob.run(InvitationJob.java:31)
 at com.motaword.ipm.kernel.Application.main(Application.java:63)
 Caused by: java.io.NotSerializableException:
 org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
 at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
 at
 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
 at
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
 at
 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
 at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
 at
 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
 at
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
 at
 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
 at 

Re: Timestamp and key preservation over operators

2019-04-30 Thread Averell
Hi Fabian, Guowei

Thanks for the help. My flow is as the attached photo. Where (1) and (2) are
the main data streams from file sources, while (3) and (4) are the
enrichment data, also from file sources.

 

(5) is to merge-parse (1) and (2), which consists of: 
A tumbling window function, with early trigger (basing on the number of
records in the window: FIRE when there have been at least one msg from each
stream 1 & 2, not waiting for window end-time)
A flat map function to parse the incoming msg
A filter and a map

(6) works as a data enricher, to enrich output of (5) with data from (3) and
(4). As (4) is broadcasted, what My implementation for (6) is like:
/stream5.union(stream3).keyBy(key2).connect(stream4).process(MyFunction6
extends KeyedBroadcastProcessFunction)/
In this KeyedBroadcastProcessFunction, one msg from (5) would trigger one
output, while a msg from (3) or (4) doesn't send out any records, but update
the States only.

Regarding messages type:
Outputs of (1) and (2) are of the same type EventType1.
Output of (3) is of type EventType2_1 extends EventType2
Output of (5) is of type EventType2_2 extends EventType2
Input of (6) is of type EventType2 (from the unioned-keyed-stream), and 
of
type Type3 (from the broadcast stream)
Output of (6) is of the type EventType2_3, which is mapped from 
EvenType2_1

As seen on my screenshot, only (5) showed watermark, not (6) nor (7). I
noticed that problem because my (7) didn't work as expected. And when I put
an eventTimeExtractor between (6) and (7), then (7) worked.

Typing all the way until now, I guess that I have known where my issue came
from: I have not assign timestamp/watermark for (3) and (4) because I
thought that they are just idle sources of enrichment data.

/*Because of this, I have another question:*/
I read the text regarding Idling sources [1], but not sure how to implement
that for my file sources. Could you please recommend a
solution/good-practice here?

I have one more question about the recommendation [2] to emit timestamp and
watermark from within the source function. Is there any way to do that with
the file sources?

Thanks and best regards,
Averell

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/event_time.html#idling-sources
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/event_timestamps_watermarks.html#source-functions-with-timestamps-and-watermarks



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Filter push-down not working for a custom BatchTableSource

2019-04-30 Thread Josh Bradt
Hi all,

I'm trying to implement filter push-down on a custom BatchTableSource that
retrieves data from a REST API and returns it as POJO instances. I've
implemented FilterableTableSource as described in the docs, returning a new
instance of my table source containing the predicates that I've removed
from the list of predicates passed into applyPredicate. However, when
getDataSet is eventually called, it's called on the instance of the table
source that was originally registered with the table environment, which
does not have any filters in it. I've stepped through the code in a
debugger, and applyPredicates is definitely being called, and it's
definitely returning new instances of my table source, but they don't seem
to be being used.

I also played with the OrcTableSource, which is the only example of a
push-down filter implementation I could find, and it doesn't behave this
way. When I set a breakpoint in getDataSet in that case, it's being called
on one of the new instances of the table source that contains the accepted
filters.

Are there any other requirements for implementing push-down filters that
aren't listed in the docs? Or does anyone have any tips for this?

Thanks,

Josh

-- 
Josh Bradt
Software Engineer
225 Franklin St, Boston, MA 02110
klaviyo.com 
[image: Klaviyo Logo]


Re: Flink dashboard+ program arguments

2019-04-30 Thread Rad Rad
Thanks, Fabian. 

The problem was incorrect java path. Now, everything works fine. 

I would ask about the command for running  sql-client.sh 

These  commands don't work 
./sql-client.sh OR ./flink sql-client  



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Can't build Flink for Scala 2.12

2019-04-30 Thread Visser, M.J.H. (Martijn)
Hi all,

I'm trying to build Flink (from current master branch) for Scala 2.12, using:

mvn clean install -Pscala-2.12 -Dscala-2.12 -DskipTests

It fails for me on the with this error:

[ERROR] 
/home/pa35uq/Workspace/flink/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMetadataQuery.scala:52:
 error: value EMPTY in class RelMetadataQuery cannot be accessed in object 
org.apache.calcite.rel.metadata.RelMetadataQuery
[ERROR]  Access to protected value EMPTY not permitted because
[ERROR]  enclosing package metadata in package plan is not a subclass of
[ERROR]  class RelMetadataQuery in package metadata where target is defined
[ERROR] this(RelMetadataQuery.THREAD_PROVIDERS.get, RelMetadataQuery.EMPTY)
[ERROR]

[ERROR] Failed to execute goal 
net.alchim31.maven:scala-maven-plugin:3.2.2:compile (scala-compile-first) on 
project flink-table-planner-blink_2.12: wrap: 
org.apache.commons.exec.ExecuteException: Process exited with an error: 1 (Exit 
value: 1) -> [Help 1]

Anyone an idea how to fix this?

Best regards,

Martijn


-
ATTENTION:
The information in this e-mail is confidential and only meant for the intended 
recipient. If you are not the intended recipient, don't use or disclose it in 
any way. Please let the sender know and delete the message immediately.
-


Re: Flink dashboard+ program arguments

2019-04-30 Thread Fabian Hueske
Hi,

With Flink 1.5.0, we introduced a new distributed architecture (see release
announcement [1] and FLIP-6 [2]).
>From what you describe, I cannot tell what is going wrong.

How do you submit your application?
Which action resulted in the error message you shared?

Btw. why do you go for Flink 1.5/1.6? We released Flink 1.8 a few weeks ago
and the 1.7 line received a few bug fix releases already.

Best, Fabian

[1]
https://flink.apache.org/news/2018/05/25/release-1.5.0.html#rewrite-of-flinks-deployment-and-process-model
[2]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077

Am Di., 30. Apr. 2019 um 17:16 Uhr schrieb Rad Rad :

> Hi,
> I am using Flink 1.4.2 and I can't see my running jobs on Flink we
> dashboard.
>
> I downloaded Flink 1.5 and 1.6, I received this message when I tried to
> send
> my arguments like this
>
> --topic sensor --bootstrap.servers localhost:9092  --zookeeper.connect
> localhost:2181 --group.id test-consumer-group2
>
>
> {"errors":["Bad request, could not parse parameters: Cannot resolve query
> parameter (parallelism) from value \"\"."]}
>
>
> Regards.
>
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Flink dashboard+ program arguments

2019-04-30 Thread Rad Rad
Hi, 
I am using Flink 1.4.2 and I can't see my running jobs on Flink we
dashboard. 

I downloaded Flink 1.5 and 1.6, I received this message when I tried to send
my arguments like this 

--topic sensor --bootstrap.servers localhost:9092  --zookeeper.connect
localhost:2181 --group.id test-consumer-group2


{"errors":["Bad request, could not parse parameters: Cannot resolve query
parameter (parallelism) from value \"\"."]}


Regards. 





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Preserve accumulators after failure in DataStream API

2019-04-30 Thread Wouter Zorgdrager
Hi all,

In the documentation I read about UDF accumulators [1] "Accumulators are
automatically backup-ed by Flink’s checkpointing mechanism and restored in
case of a failure to ensure exactly-once semantics." So I assumed this also
was the case of accumulators used in the DataStream API, but I noticed that
it isn't. So every time my jobs crashes and restarts, the accumulator is
reset. Is there a way to retain this information?

Thanks,
Wouter


[1].
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/udfs.html


Re: Emitting current state to a sink

2019-04-30 Thread M Singh
 Thanks Avi for your help.  Mans

On Tuesday, April 30, 2019, 5:57:51 AM EDT, Avi Levi 
 wrote:  
 
 Sure! 
you get the context and the collector in the processBroadcastElement method see 
snippet below 
  override def processBroadcastElement(value: BroadcastRequest, ctx: 
KeyedBroadcastProcessFunction[String, Request, BroadcastRequest, 
String]#Context, out: Collector[String]): Unit = {
   
ctxctx.applyToKeyedState(stateDescriptor, new KeyedStateFunction[String, 
ValueState[String]] {  override def process(key: String, state: 
ValueState[String]): Unit = Option(state.value()).foreach(s => out.collect(s))
  }) ... 
 }
On Mon, Apr 29, 2019 at 5:45 PM M Singh  wrote:

This Message originated outside your organization.
 Hi Avi:
Can you please elaborate (or include an example/code snippet) of how you were 
able to achieve collecting the keyed states from the processBroadcastElement 
method using the applyToKeyedState ?  

I am trying to understand which collector you used to emit the state since the 
broadcasted elements/state might be different from the non-broadcast 
elements/state.
Thanks for your help.

Mans
On Monday, April 29, 2019, 7:29:23 AM EDT, Fabian Hueske 
 wrote:  
 
 Nice! 
Thanks for the confirmation :-)
Am Mo., 29. Apr. 2019 um 13:21 Uhr schrieb Avi Levi :

Thanks! Works like a charm :)

On Mon, Apr 29, 2019 at 12:11 PM Fabian Hueske  wrote:

This Message originated outside your organization.
Hi Avi,
I'm not sure if  you cannot emit data from the keyed state when you receive a 
broadcasted message.
The Context parameter of the processBroadcastElement() method in the 
KeyedBroadcastProcessFunction has the applyToKeyedState() method.The method 
takes a KeyedStateFunction that is applied to each key of a state, but does not 
provide a Collector to emit data.Maybe you can pass the collector to the 
KeyedStateFunction and emit records while it iterates over the key space.

Best, Fabian

Am Fr., 26. Apr. 2019 um 17:35 Uhr schrieb Avi Levi :

Hi Timo,I defiantly did. but broadcasting a command and trying to address the 
persisted state (I mean the state of the data stream and not the broadcasted 
one) you get the exception that I wrote (java.lang.NullPointerException: No key 
set. This method should not be called outside of a keyed context). e.g doing 
something likeoverride def processBroadcastElement(value: BroadcastRequest, 
ctx: KeyedBroadcastProcessFunction[String, Request, BroadcastRequest, 
Response]#Context, out: Collector[Response]): Unit = {
  value match {
case Command(StateCmd.Fetch, _) =>
  if (state.value() != null) {
ouout.collecy(state.value())
  }will yield that exception
BRAvi
On Fri, Apr 26, 2019 at 11:55 AM Timo Walther  wrote:

This Message originated outside your organization.

Hi Avi,

did you have a look at the .connect() and .broadcast() API 
functionalities? They allow you to broadcast a control stream to all 
operators. Maybe this example [1] or other examples in this repository 
can help you.

Regards,
Timo

[1] 
https://github.com/ververica/flink-training-exercises/blob/master/src/main/java/com/dataartisans/flinktraining/solutions/datastream_java/broadcast/TaxiQuerySolution.java

Am 26.04.19 um 07:57 schrieb Avi Levi:
> Hi,
> We have a keyed pipeline with persisted state.
> Is there a way to broadcast a command and collect all values that 
> persisted in  the state ?
>
> The end result can be for example sending a fetch command to all 
> operators and emitting the results to some sink
>
> why do we need it ? from time to time we might want to check if we are 
> missing keys what are the additional keys or simply emit the current 
> state to a table and to query it.
>
> I tried simply broadcasting a command and addressing the persisted 
> state but that resulted with:
> java.lang.NullPointerException: No key set. This method should not be 
> called outside of a keyed context.
>
> is there a good way to achieve that ?
>
> Cheers
> Avi





  
  

Re: Emitting current state to a sink

2019-04-30 Thread Avi Levi
Sure!
you get the context and the collector in the processBroadcastElement method
see snippet below

  override def processBroadcastElement(value: BroadcastRequest, ctx:
KeyedBroadcastProcessFunction[String, Request, BroadcastRequest,
String]#Context, out: Collector[String]): Unit = {

ctxctx.applyToKeyedState(stateDescriptor, new
KeyedStateFunction[String, ValueState[String]] {

  override def process(key: String, state: ValueState[String]): Unit =
Option(state.value()).foreach(s => out.collect(s))
  })

...
}


On Mon, Apr 29, 2019 at 5:45 PM M Singh  wrote:

> *This Message originated outside your organization.*
> --
> Hi Avi:
>
> Can you please elaborate (or include an example/code snippet) of how you
> were able to achieve collecting the keyed states from the
> processBroadcastElement method using the applyToKeyedState ?
>
> I am trying to understand which collector you used to emit the state since
> the broadcasted elements/state might be different from the non-broadcast
> elements/state.
>
> Thanks for your help.
>
> Mans
> On Monday, April 29, 2019, 7:29:23 AM EDT, Fabian Hueske <
> fhue...@gmail.com> wrote:
>
>
> Nice!
> Thanks for the confirmation :-)
>
> Am Mo., 29. Apr. 2019 um 13:21 Uhr schrieb Avi Levi <
> avi.l...@bluevoyant.com>:
>
> Thanks! Works like a charm :)
>
> On Mon, Apr 29, 2019 at 12:11 PM Fabian Hueske  wrote:
>
> *This Message originated outside your organization.*
> --
> Hi Avi,
>
> I'm not sure if  you cannot emit data from the keyed state when you
> receive a broadcasted message.
> The Context parameter of the processBroadcastElement() method in the
> KeyedBroadcastProcessFunction has the applyToKeyedState() method.
> The method takes a KeyedStateFunction that is applied to each key of a
> state, but does not provide a Collector to emit data.
> Maybe you can pass the collector to the KeyedStateFunction and emit
> records while it iterates over the key space.
>
> Best, Fabian
>
> Am Fr., 26. Apr. 2019 um 17:35 Uhr schrieb Avi Levi <
> avi.l...@bluevoyant.com>:
>
> Hi Timo,
> I defiantly did. but broadcasting a command and trying to address the
> persisted state (I mean the state of the data stream and not the
> broadcasted one) you get the exception that I wrote
> (java.lang.NullPointerException: No key set. This method should not be
> called outside of a keyed context). e.g doing something like
>
> override def processBroadcastElement(value: BroadcastRequest, ctx: 
> KeyedBroadcastProcessFunction[String, Request, BroadcastRequest, 
> Response]#Context, out: Collector[Response]): Unit = {
>   value match {
> case Command(StateCmd.Fetch, _) =>
>   if (state.value() != null) {
> ouout.collecy(state.value())
>   }
>
> will yield that exception
>
> BR
> Avi
>
> On Fri, Apr 26, 2019 at 11:55 AM Timo Walther  wrote:
>
> This Message originated outside your organization.
>
> Hi Avi,
>
> did you have a look at the .connect() and .broadcast() API
> functionalities? They allow you to broadcast a control stream to all
> operators. Maybe this example [1] or other examples in this repository
> can help you.
>
> Regards,
> Timo
>
> [1]
>
> https://github.com/ververica/flink-training-exercises/blob/master/src/main/java/com/dataartisans/flinktraining/solutions/datastream_java/broadcast/TaxiQuerySolution.java
> 
>
> Am 26.04.19 um 07:57 schrieb Avi Levi:
> > Hi,
> > We have a keyed pipeline with persisted state.
> > Is there a way to broadcast a command and collect all values that
> > persisted in  the state ?
> >
> > The end result can be for example sending a fetch command to all
> > operators and emitting the results to some sink
> >
> > why do we need it ? from time to time we might want to check if we are
> > missing keys what are the additional keys or simply emit the current
> > state to a table and to query it.
> >
> > I tried simply broadcasting a command and addressing the persisted
> > state but that resulted with:
> > java.lang.NullPointerException: No key set. This method should not be
> > called outside of a keyed context.
> >
> > is there a good way to achieve that ?
> >
> > Cheers
> > Avi
>
>


Re: Flink heap memory

2019-04-30 Thread Rad Rad
Thanks a lot. 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink orc file write example

2019-04-30 Thread Fabian Hueske
Hi,

I had a look but couldn't find an ORC writer in flink-orc, only an
InputFormat and TableSource to read ORC data into DataSet programs or Table
/ SQL queries.
Where did you find the ORC writer?

Thanks, Fabian

Am Di., 30. Apr. 2019 um 09:09 Uhr schrieb Hai :

> Hi,
>
>
> I found flink now support the orc file writer in the module
> flink-connectors/flink-orc. Could any one show me a basic usage about this
> module ?  I didn’t find that on the official site and the internet.
>
>
> Many thanks.
>


Re: Timestamp and key preservation over operators

2019-04-30 Thread Fabian Hueske
Hi,

Actually all operators should preserve record timestamps if set the correct
TimeCharacteritics to event time.
A window operator will set the timestamp of all emitted records to the
end-timestamp of the window.
Not sure what happens if you use a processing time window in an event time
application though...
Can you show a concise example of your program and explain how to check the
timestamps?

In general it is not a good idea to assign timestamps and watermarks in the
middle of a program because it is can be quite hard to reason about
out-of-orderness after the data was shuffled and processed.

You can use reinterpretAsKeyedStream() if an operator does not change the
keys and if the parallelism of the source and target operators are the same.

Best,
Fabian

Am Di., 30. Apr. 2019 um 08:59 Uhr schrieb Guowei Ma :

> Hi,
> Most operators will preserve the input elements timestamp if it has.
> Window is a special case. The timestamp of elements emitted by window is
> the maxTimestamp of the Window which is triggered. Different Window will
> have different implementation.(GlobalWindow/TimeWindow/CustomizedWindow).
> Keyby just shuffle data. I think it could not affect the element's
> timestamp.
>
> Hope this could help.
>
> Best,
> Guowei
>
>
> Averell  于2019年4月30日周二 上午7:28写道:
>
>> Hello,
>>
>> I extracted timestamps using BoundedOutOfOrdernessTimestampExtractor from
>> my
>> sources, have a WindowFunction, and found that my timestamps has been
>> lost.
>> To do another Window operation, I need to extract timestamp again. I tried
>> to find a document for that but haven't found one.
>> Could you please help tell which type of operators would preserve records'
>> timestamp?
>>
>> The same question for keyed stream. I have been using the same key
>> throughout my flow, but with many tranformations (using different
>> operators,
>> including coProcessFunction, and converting my data between different
>> classes), and I have been trying to use
>> DataStreamUtils.reinterpretAsKeyedStream. Is it safe to assume that as
>> long
>> as I dont do transformation on key, I could use that
>> reinterpretAsKeyedStream function?
>>
>> Thanks and best regards,
>> Averell
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>


Re: can we do Flink CEP on event stream or batch or both?

2019-04-30 Thread Fabian Hueske
Hi,

Stateful streaming applications are typically designed to run continuously
(i.e., until forever or until they are not needed anymore or replaced).
May jobs run for weeks or months.

IMO, using CEP for "simple" equality matches would add too much complexity
for a use case that can be easily solved with a stateful function.
If your task is to ensure that two streams have the same events, I'd
recommend to implement a custom DataStream application with a stateful
ProcessFunction.
Holding state for two years is certainly possible if you know exactly which
events to keep, i.e., you do not store the full stream but only those few
events that have not had a match yet.

If you need to run the same logic also on batch data, you might want to
check if you can use SQL or the Table API which are designed to work on
static and streaming data with the same processing semantics.

Best,
Fabian


Am Di., 30. Apr. 2019 um 06:49 Uhr schrieb kant kodali :

> Hi All,
>
> I have the following questions.
>
> 1) can we do Flink CEP on event stream or batch?
> 2) If we can do streaming I wonder how long can we keep the stream
> stateful? I also wonder if anyone successfully had done any stateful
> streaming for days or months(with or without CEP)? or is stateful streaming
> is mainly to keep state only for a few hours?
>
> I have a use case where events are ingested from multiple sources and in
> theory, the sources are supposed to have the same events however in
> practice the sources will not have the same events so when the events are
> ingested from multiple sources the goal is to detect where the "breaks"
> are(meaning the missing events like exists in one source but not in other)?
> so I realize this is the typical case for CEP.
>
> Also, in this particular use case events that supposed to come 2 years ago
> can come today and if so, need to update those events also in real time or
> near real time. Sure there wouldn't be a lot of events that were missed 2
> years ago but there will be a few. What would be the best approach?
>
> One solution I can think of is to do Stateful CEP with a window of one day
> or whatever short time period where most events will occur and collect the
> events that fall beyond that time period(The late ones) into some Kafka
> topic and have a separate stream analyze the time period of the late ones,
> construct the corresponding NFA and run through it again.  Please let me
> know how this sounds or if there is a better way to do it.
>
> Thanks!
>
>
>
>


Re: How to verify what maxParallelism is set to?

2019-04-30 Thread Fabian Hueske
Hi Sean,

I was looking for the max-parallelism value in the UI, but couldn't find
it. Also the REST API does not seem to provide it.
Would you mind opening a Jira issue for adding it to the REST API and the
Web UI?

Thank you,
Fabian

Am Di., 30. Apr. 2019 um 06:36 Uhr schrieb Sean Bollin :

> Thanks! Do you know if it's possible somehow to verify the global
> maxParallelism other than calling .getMaxParallelism? Either through
> an API call or the UI?
>
> On Mon, Apr 29, 2019 at 8:12 PM Guowei Ma  wrote:
> >
> > Hi,
> > StreamExecutionEnvironment is used to set a default maxParallelism for
> global. If a "operator"'s maxParallelism is -1 the operator will be set the
> maxParallelism which is set by StreamExecutionEnvironment.
> >
> > >>>Any API or way I can verify?
> > I can't find any easy way to do that. But you could use get the
> StreamGraph from api StreamExecutionEnvironment.getStreamGraph and try to
> print the StreamNode::maxParallelism.
> > Best,
> > Guowei
> >
> >
> > Sean Bollin  于2019年4月30日周二 上午8:19写道:
> >>
> >> Hi all,
> >>
> >> How do you verify what max parallelism is set to on the job level? I do
> not see it in the 1.6 UI, for example.
> >>
> >> I’m setting maxParallelism to 4096 on the StreamExecutionEnvironment
> before execution but printing out the maxParallelism in an operator still
> displays -1. Since this is such an important setting I’d like some sanity
> check to verify it is the value I expect.
> >>
> >> Any API or way I can verify?
>


Re: flink-kafka 消费组问题

2019-04-30 Thread Becket Qin
Flink 的Kafka source没有使用 Kafka 本身的Consumer Group管理机制。所以不同Flink job 即使使用同样的
group id 也会消费到同样的消息。

On Mon, Apr 22, 2019 at 1:24 PM 13341000780 <13341000...@163.com> wrote:

> hi, 各位大牛好!
> kafka作为数据源时,在 Properties 
> 中设置了group.id,信息,但是同一个group下的consumer中能消费到同一个主题下想同一条消息。
> 另外我单独使用  kafka consumer时就不会出现这种情况,即同一个消费组下只能有一个 consume 消费到同一条消息。
> 各位大牛有人遇到过这个问题吗?望赐教。
>
>
> 非常感谢.


Flink Kafka Connection Failure Notifications

2019-04-30 Thread Chirag Dewan
Hi,
I am using Flink 1.7.2 with Kafka Connector 0.11 for Consuming records from 
Kafka. 
I observed that if the broker is down, Kafka Consumer does nothing but logs the 
connection error and keeps on reconnecting to the broker. And infact the log 
level seems to be DEBUG. 
Is there any way to capture such connection failures with the Flink APIs? 
Or maybe restart the job if the connection is not successful?
Thanks,Chirag



Re: assignTimestampsAndWatermarks not work after KeyedStream.process

2019-04-30 Thread Fabian Hueske
An operator task broadcasts its current watermark to all downstream tasks
that might receive its records.
If you have an the following code:

DataStream a = ...
a.map(A).map(B).keyBy().window(C)

and execute this with parallelism 2, your plan looks like this

A.1 -- B.1 --\--/-- C.1
  X
A.2 -- B.2 --/--\-- C.2

A.1 will propagate its watermarks to B.1 because only B.1 will receive its
output events.
However, B.1 will propagate its watermarks to C.1 and C.2 because the
output of B.1 is partitioned and all C tasks might receive output events
from B.1.

Best, Fabian

Am Mo., 29. Apr. 2019 um 20:06 Uhr schrieb an0 :

> Thanks very much. It definitely explains the problem I'm seeing. However,
> something I need to confirm:
> You say "Watermarks are broadcasted/forwarded anyway." Do you mean, in
> assingTimestampsAndWatermarks.keyBy.window, it doesn't matter what data
> flows through a specific key's stream, all key streams have the same
> watermarks? So time-wise, `window` behaves as if `keyBy` is not there at
> all?
>
> On 2019/04/26 06:34:10, Dawid Wysakowicz  wrote:
> > Hi,
> >
> > Watermarks are meta events that travel independently of data events.
> >
> > 1) If you assingTimestampsAndWatermarks before keyBy, all parallel
> > instances of trips have some data(this is my assumption) so Watermarks
> > can be generated. Afterwards even if some of the keyed partitions have
> > no data, Watermarks are broadcasted/forwarded anyway. In other words if
> > at some point Watermarks were generated for all partitions of a single
> > stage, they will be forwarded beyond this point.
> >
> > 2) If you assingTimestampsAndWatermarks after keyBy, you try to assign
> > watermarks for an empty partition which produces no Watermarks at all
> > for this partition, therefore there is no progress beyond this point.
> >
> > I hope this clarifies it a bit.
> >
> > Best,
> >
> > Dawid
> >
> > On 25/04/2019 16:49, an0 wrote:
> > > If my understanding is correct, then why
> `assignTimestampsAndWatermarks` before `keyBy` works? The `timeWindowAll`
> stream's input streams are task 1 and task 2, with task 2 idling, no matter
> whether `assignTimestampsAndWatermarks` is before or after `keyBy`, because
> whether task 2 receives elements only depends on the key distribution, has
> nothing to do with timestamp assignment, right?
> > >
> > >
>  /key 1 trips\
> > >
>/\
> > > (A) trips--> assignTimestampsAndWatermarks-->keyBy
> timeWindowAll
> > >
>\   idle/
> > >
>  \key 2 trips/
> > >
> > >/key 1 trips-->
> assignTimestampsAndWatermarks\
> > >  /
>  \
> > > (B) trips-->keyBy
>timeWindowAll
> > >  \   idle
>/
> > >\key 2 trips-->
> assignTimestampsAndWatermarks/
> > >
> > > How things are different between A and B from `timeWindowAll`'s
> perspective?
> > >
> > > BTW, thanks for the webinar link, I'll check it later.
> > >
> > > On 2019/04/25 08:30:20, Dawid Wysakowicz 
> wrote:
> > >> Hi,
> > >>
> > >> Yes I think your explanation is correct. I can also recommend Seth's
> > >> webinar where he talks about debugging Watermarks[1]
> > >>
> > >> Best,
> > >>
> > >> Dawid
> > >>
> > >> [1]
> > >>
> https://www.ververica.com/resources/webinar/webinar/debugging-flink-tutorial
> > >>
> > >> On 22/04/2019 22:55, an0 wrote:
> > >>> Thanks, I feel I'm getting closer to the truth.
> > >>>
> > >>> So parallelism is the cause? Say my parallelism is 2. Does that mean
> I get 2 tasks running after `keyBy` if even all elements have the same key
> so go to 1 down stream(say task 1)? And it is the other task(task 2) with
> no incoming data that caused the `timeWindowAll` stream unable to progress?
> Because both task 1 and task 2 are its input streams and one is idling so
> its event time cannot make progress?
> > >>>
> > >>> On 2019/04/22 01:57:39, Guowei Ma  wrote:
> >  HI,
> > 
> >  BoundedOutOfOrdernessTimestampExtractors can send a WM at least
> after it
> >  receives an element.
> > 
> >  For after Keyby:
> >  Flink uses the HashCode of key and the parallelism of down stream
> to decide
> >  which subtask would receive the element. This means if your key is
> always
> >  same, all the sources will only send the elements to the same down
> stream
> >  task, for example only no. 3
> BoundedOutOfOrdernessTimestampExtractor.
> > 
> >  For before Keyby:
> >  In your case, the Source and
> BoundedOutOfOrdernessTimestampExtractors would
> >  be chained together, which means every
> >  BoundedOutOfOrdernessTimestampExtractors will receive elements.
> > 
> >  Best,
> >  Guowei
> > 
> > 
> >  an0  于2019年4月19日周五 下午10:41写道:
> > 
> > > Hi,
> > >
> > 

Re: Working around lack of SQL triggers

2019-04-30 Thread Fabian Hueske
You could implement aggregation functions that just do AVG, COUNT, etc. and
a parameterizable aggregation function that can be configured to call the
avg, count, etc. functions.

When configuring, you would specify the input and output, for example like
this:
input: [int, int, double]
key: input.1
output: [key, count(*), sum(input.2), avg(input.3)]

This is pretty much what the SQL optimizer would do when translating the
relational aggregation operator into a Flink function.

Am Mo., 29. Apr. 2019 um 18:43 Uhr schrieb deklanw :

> Hi,
>
> Thanks for the reply.
>
> I had already almost completely lost hope in using Flink SQL. You have
> confirmed that.
>
> But, like I said, I don't know how to reduce the large amount of
> boilerplate
> I foresee this requiring with the DataStream API. Can you help me with
> that?
> You mention "parameterizable aggregation functions", can you show an
> example? I don't know how to do this without reinventing AVG and COUNT over
> and over again.
>
> Thanks
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Data Locality in Flink

2019-04-30 Thread Fabian Hueske
Such a decision would require some distribution statistics, preferably
stats on the actual data that needs to be rebalanced or not.
This data would only be available while a job is executed and a component
that changes a running program is very difficult to implement.

Best, Fabian


Am Mo., 29. Apr. 2019 um 15:30 Uhr schrieb Flavio Pompermaier <
pomperma...@okkam.it>:

> Thanks Fabian, that's more clear..many times you don't know when to
> rebalance or not a dataset because it depends on the specific use case and
> dataset distribution.
> An automatic way of choosing whether a Dataset could benefit from a
> rebalance or not could be VERY nice (at least for batch) but I fear this
> would be very hard to implement..am I wrong?
>
> On Mon, Apr 29, 2019 at 3:10 PM Fabian Hueske  wrote:
>
>> Hi Flavio,
>>
>> These typos of race conditions are not failure cases, so no exception is
>> thrown.
>> It only means that a single source tasks reads all (or most of the)
>> splits and no splits are left for the other tasks.
>> This can be a problem if a record represents a large amount of IO or an
>> intensive computation as they might not be properly distributed. In that
>> case you'd need to manually rebalance the partitions of a DataSet.
>>
>> Fabian
>>
>> Am Mo., 29. Apr. 2019 um 14:42 Uhr schrieb Flavio Pompermaier <
>> pomperma...@okkam.it>:
>>
>>> Hi Fabian, I wasn't aware that  "race-conditions may happen if your
>>> splits are very small as the first data source task might rapidly request
>>> and process all splits before the other source tasks do their first
>>> request". What happens exactly when a race-condition arise? Is this
>>> exception internally handled by Flink or not?
>>>
>>> On Mon, Apr 29, 2019 at 11:51 AM Fabian Hueske 
>>> wrote:
>>>
 Hi,

 The method that I described in the SO answer is still implemented in
 Flink.
 Flink tries to assign splits to tasks that run on local TMs.
 However, files are not split per line (this would be horribly
 inefficient) but in larger chunks depending on the number of subtasks (and
 in case of HDFS the file block size).

 Best, Fabian

 Am So., 28. Apr. 2019 um 18:48 Uhr schrieb Soheil Pourbafrani <
 soheil.i...@gmail.com>:

> Hi
>
> I want to exactly how Flink read data in the both case of file in
> local filesystem and file on distributed file system?
>
> In reading data from local file system I guess every line of the file
> will be read by a slot (according to the job parallelism) for applying the
> map logic.
>
> In reading from HDFS I read this
>  answer by Fabian Hueske
>  and i want to
> know is that still the Flink strategy fro reading from distributed system
> file?
>
> thanks
>

>>>
>>>
>


Re: kafka partitions, data locality

2019-04-30 Thread Fabian Hueske
Hi Sergey,

You are right, keys are managed in key groups. Each key belongs to a key
group and one or more key groups are assigned to each parallel task of an
operator.
Key groups are not exposed to users and the assignments of keys ->
key-groups and key-groups -> tasks cannot be changed without changing Flink
itself (i.e., a custom build).
If you don't want to change Flink, you'd need to change the partitioning in
Kafka (mapping key-groups to partitions) and ensuring that all partitions
are read by the correct task.

I don't think this is possible (with reasonable effort) and if you get it
to work it would be quite fragile with respect to changing parallelism
(Kafka, Flink) etc.
Right now there is no way around partitioning the events with keyBy() if
you want to use keyed state.
After the first keyBy() partitioning, reinterpretAsKeyedStream() can be
used to reuse an existing partitioning.

Best, Fabian

Am Mo., 29. Apr. 2019 um 15:23 Uhr schrieb Smirnov Sergey Vladimirovich
(39833) :

> Hi Stefan,
>
>
>
> Thnx for clarify!
>
> But still it remains an open question for me because we use keyBy method
> and I did not found any public interface of keys reassignment (smth like
> partionCustom for DataStream).
>
> As I heard, there is some internal mechanism with key groups and mapping
> key to groups. Is it supposed to become public?
>
>
>
>
>
> Regards,
>
> Sergey
>
>
>
> *From:* Stefan Richter [mailto:s.rich...@ververica.com]
> *Sent:* Friday, April 26, 2019 11:15 AM
> *To:* Smirnov Sergey Vladimirovich (39833) 
> *Cc:* Dawid Wysakowicz ; Ken Krugler <
> kkrugler_li...@transpac.com>; user@flink.apache.org; d...@flink.apache.org
> *Subject:* Re: kafka partitions, data locality
>
>
>
> Hi Sergey,
>
>
>
> The point why this I flagged as beta is actually less about stability but
> more about the fact that this is supposed to be more of a "power user"
> feature because bad things can happen if your data is not 100% correctly
> partitioned in the same way as Flink would partition it. This is why
> typically you should only use it if the data was partitioned by Flink and
> you are very sure what your are doing, because the is not really something
> we can to at the API level to protect you from mistakes in using this
> feature. Eventually some runtime exceptions might show you that something
> is going wrong, but that is not exactly a good user experience.
>
>
>
> On a different note, there actually is currently one open issue [1] to be
> aware of in connection with this feature and operator chaining, but at the
> same time this is something that should not hard to fix in for the next
> minor release.
>
>
>
> Best,
>
> Stefan
>
>
>
> [1]
> https://issues.apache.org/jira/browse/FLINK-12296?focusedCommentId=16824945=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16824945
>
>
>
>
> On 26. Apr 2019, at 09:48, Smirnov Sergey Vladimirovich (39833) <
> s.smirn...@tinkoff.ru> wrote:
>
>
>
> Hi,
>
>
>
> Dawid, great, thanks!
>
> Any plans to make it stable? 1.9?
>
>
>
>
>
> Regards,
>
> Sergey
>
>
>
> *From:* Dawid Wysakowicz [mailto:dwysakow...@apache.org
> ]
> *Sent:* Thursday, April 25, 2019 10:54 AM
> *To:* Smirnov Sergey Vladimirovich (39833) ; Ken
> Krugler 
> *Cc:* user@flink.apache.org; d...@flink.apache.org
> *Subject:* Re: kafka partitions, data locality
>
>
>
> Hi Smirnov,
>
> Actually there is a way to tell Flink that data is already partitioned.
> You can try the reinterpretAsKeyedStream[1] method. I must warn you though
> this is an experimental feature.
>
> Best,
>
> Dawid
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/experimental.html#experimental-features
>
> On 19/04/2019 11:48, Smirnov Sergey Vladimirovich (39833) wrote:
>
> Hi Ken,
>
>
>
> It’s a bad story for us: even for a small window we have a dozens of
> thousands events per job with 10x in peaks or even more. And the number of
> jobs was known to be high. So instead of N operations (our
> producer/consumer mechanism) with shuffle/resorting (current flink
> realization) it will be N*ln(N) - the tenfold loss of execution speed!
>
> 4 all, my next step? Contribute to apache flink? Issues backlog?
>
>
>
>
>
> With best regards,
>
> Sergey
>
> *From:* Ken Krugler [mailto:kkrugler_li...@transpac.com
> ]
> *Sent:* Wednesday, April 17, 2019 9:23 PM
> *To:* Smirnov Sergey Vladimirovich (39833) 
> 
> *Subject:* Re: kafka partitions, data locality
>
>
>
> Hi Sergey,
>
>
>
> As you surmised, once you do a keyBy/max on the Kafka topic, to group by
> clientId and find the max, then the topology will have a partition/shuffle
> to it.
>
>
>
> This is because Flink doesn’t know that client ids don’t span Kafka
> partitions.
>
>
>
> I don’t know of any way to tell Flink that the data doesn’t need to be
> shuffled. There was a discussion
> 
>  about
> adding a 

Re: 使用hdfs保存checkpoint一段时间后报错

2019-04-30 Thread Yun Tang
Hi 志鹏

核心原因是HDFS的问题
Caused by: org.apache.hadoop.ipc.RemoteException(java.io.IOException): File 
/flink/checkpoints/PaycoreContextHopJob/cbb3a580d0323fbace80e71a25c966d0/chk-11352/fc4b8b08-2c32-467c-a1f4-f384eba246ff
 could only be replicated to 0 nodes instead of minReplication (=1).  There are 
3 datanode(s) running and no node(s) are excluded in this operation.

在出现问题的时候,观察一下集群HDFS的情况,以及相关的日志。
也许这个stackoverflow的回答[1] 能帮助到你。


[1] 
https://stackoverflow.com/questions/36015864/hadoop-be-replicated-to-0-nodes-instead-of-minreplication-1-there-are-1/36310025

祝好
唐云



From: 邵志鹏 
Sent: Tuesday, April 30, 2019 15:26
To: user-zh@flink.apache.org
Subject: 使用hdfs保存checkpoint一段时间后报错

使用hdfs保存checkpoint一段时间后报错,自动重启后正常运行一段时间后继续报同样的错

Caused by: java.io.IOException: Could not flush and close the file system 
output stream to 
hdfs://master:9000/flink/checkpoints/PaycoreContextHopJob/cbb3a580d0323fbace80e71a25c966d0/chk-11352/fc4b8b08-2c32-467c-a1f4-f384eba246ff
 in order to obtain the stream state handle
at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:326)
at 
org.apache.flink.runtime.state.CheckpointStreamWithResultProvider$PrimaryStreamOnly.closeAndFinalizeCheckpointStreamResult(CheckpointStreamWithResultProvider.java:77)
at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$HeapSnapshotStrategy$1.callInternal(HeapKeyedStateBackend.java:765)
at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$HeapSnapshotStrategy$1.callInternal(HeapKeyedStateBackend.java:724)
at 
org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:76)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50)
... 7 more
Caused by: org.apache.hadoop.ipc.RemoteException(java.io.IOException): File 
/flink/checkpoints/PaycoreContextHopJob/cbb3a580d0323fbace80e71a25c966d0/chk-11352/fc4b8b08-2c32-467c-a1f4-f384eba246ff
 could only be replicated to 0 nodes instead of minReplication (=1).  There are 
3 datanode(s) running and no node(s) are excluded in this operation.
at 
org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:1726)
at 
org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.chooseTargetForNewBlock(FSDirWriteFileOp.java:265)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:2567)
at 
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:829)
at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:510)
at 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:447)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:989)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:850)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:793)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2489)

at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1489)
at org.apache.hadoop.ipc.Client.call(Client.java:1435)
at org.apache.hadoop.ipc.Client.call(Client.java:1345)
at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:227)
at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116)
at com.sun.proxy.$Proxy17.addBlock(Unknown Source)
at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock(ClientNamenodeProtocolTranslatorPB.java:444)
at sun.reflect.GeneratedMethodAccessor19.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:409)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:163)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:155)
at 

Flink orc file write example

2019-04-30 Thread Hai
Hi,


I found flink now support the orc file writer in the module 
flink-connectors/flink-orc. Could any one show me a basic usage about this 
module ? I didn’t find that on the official site and the internet.


Many thanks.

Re: Timestamp and key preservation over operators

2019-04-30 Thread Guowei Ma
Hi,
Most operators will preserve the input elements timestamp if it has.
Window is a special case. The timestamp of elements emitted by window is
the maxTimestamp of the Window which is triggered. Different Window will
have different implementation.(GlobalWindow/TimeWindow/CustomizedWindow).
Keyby just shuffle data. I think it could not affect the element's
timestamp.

Hope this could help.

Best,
Guowei


Averell  于2019年4月30日周二 上午7:28写道:

> Hello,
>
> I extracted timestamps using BoundedOutOfOrdernessTimestampExtractor from
> my
> sources, have a WindowFunction, and found that my timestamps has been lost.
> To do another Window operation, I need to extract timestamp again. I tried
> to find a document for that but haven't found one.
> Could you please help tell which type of operators would preserve records'
> timestamp?
>
> The same question for keyed stream. I have been using the same key
> throughout my flow, but with many tranformations (using different
> operators,
> including coProcessFunction, and converting my data between different
> classes), and I have been trying to use
> DataStreamUtils.reinterpretAsKeyedStream. Is it safe to assume that as long
> as I dont do transformation on key, I could use that
> reinterpretAsKeyedStream function?
>
> Thanks and best regards,
> Averell
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Flink heap memory

2019-04-30 Thread Konstantin Knauf
Hi Rad,

the heap memory can only measured on a the JVM (Taskmanager/Jobmanager)
level. If you have multiple jobs running in the same cluster, you can not
separate their memory footprint easily unless you you only run Taskmanagers
with a single Taskslot, so that one Taskmanager is always only executing a
single job.

For the per JVM metrics, please check out this blogpost or the
documentation [1,2] as a starting point.

Best,

Konstantin

[1] https://flink.apache.org/news/2019/02/25/monitoring-best-practices.html
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.8/monitoring/metrics.html#memory

On Mon, Apr 29, 2019 at 8:17 PM Rad Rad  wrote:

> Hi,
>
> I would like to know the amount of heap memory currently used (in bytes) of
> a specific job which runs on Flink cluster.
>
> Regards.
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


-- 

Konstantin Knauf | Solutions Architect

+49 160 91394525


Planned Absences: -




Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen