Re: Flink - Type Erasure Exception trying to use Tuple6 instead of Tuple

2019-05-09 Thread Vijay Balakrishnan
 I solved the problem by following another person's recommendation on the
other post about using a wrapper POJO.
So, I used a wrapper MonitoringTuple to wrap the Tuple and that solved my
problem with varying number of fields in the Tuple interface.

public class MonitoringTuple {
> private Tuple tuple;
>
>
Then, I used it like this:

> KeyedStream, MonitoringTuple>
> monitoringTupleKeyedStream = kinesisStream.keyBy(new
> MapTupleKeySelector(groupBySet));


The MapTupleKeySelector is defined below:

> public static class MapTupleKeySelector implements KeySelector Object>, MonitoringTuple> {

private final Set groupBySet;


> public MapTupleKeySelector(Set groupBySet) {

this.groupBySet = groupBySet;

}


> @Override

public MonitoringTuple getKey(Map inputMap) {

int groupBySetSize = groupBySet.size();

Tuple tuple = Tuple.newInstance(groupBySetSize);

int count = 0;

for (String groupBy : groupBySet) {

count = setTupleField(inputMap, tuple, count, groupBy);

}

return new MonitoringTuple(tuple);

}

}


> public static int setTupleField(Map inputMap, Tuple
> tuple, int count, String groupBy) {

Object groupByValueObj = inputMap.get(groupBy);

String tupleValue = Utils.convertToString(groupByValueObj);

tuple.setField(tupleValue, count++);

return count;

}

}





TIA,

On Wed, May 1, 2019 at 1:39 PM Vijay Balakrishnan 
wrote:

> Hi,
> Had asked this questions earlier as topic - "Flink - Type Erasure
> Exception trying to use Tuple6 instead of Tuple"
>
> Having issues defining a generic Tuple instead of a specific Tuple1,Tuple2
> etc.
> Exception in thread "main"
> org.apache.flink.api.common.functions.InvalidTypesException: Usage of class
> Tuple as a type is not allowed. Use a concrete subclass (e.g. Tuple1,
> Tuple2, etc.) instead.
>
> DataStream> kinesisStream = ...;
> KeyedStream, Tuple> monitoringTupleKeyedStream =
> kinesisStream.keyBy(new MapTupleKeySelector(groupBySet));//<= complains
> about Tuple type for monitoringTupleKeyedStream
> .
>
> public static class MapTupleKeySelector implements KeySelector Object>, Tuple> {
> private final Set groupBySet;
>
> public MapTupleKeySelector(Set groupBySet) {
> this.groupBySet = groupBySet;
> }
>
> @Override
> public Tuple getKey(Map inputMap) throws Exception
> {
> int groupBySetSize = groupBySet.size();
> Tuple tuple = Tuple.newInstance(groupBySetSize);
> //Tuple1 tuple = new Tuple1();
> int count = 0;
> for (String groupBy : groupBySet) {
> tuple.setField(groupByValue, count++);
> }
> return tuple;
> }
> }
>
> Abhishek had replied back in the Thread as follows: (posting in that
> thread as well creating a new thread):
> However, If you are trying to build some generic framework and for
> different streams, there would be different fields, you can follow the Map
> approach. For the latter approach, you need to write extra mapper class
> which will convert all the fields in the stream to the Map based stream.
>
> Can I get an example of how to create this extra Mapper class ?
>
> Currently, I am using deserialization to convert the incoming byte[] by
> implementing KinesisDeserializationSchema> to convert
> to a DataStream> kinesisStream.
>
> TIA,
>
> On Sun, Apr 7, 2019 at 5:40 PM abhishek sharma 
> wrote:
>
>> I agree with Timothy, POJO would be a much better approach.
>>
>> However, If you are trying to build some generic framework and for
>> different streams, there would be different fields, you can follow the Map
>> approach. For the latter approach, you need to write extra mapper class
>> which will convert all the fields in the stream to the Map based stream.
>>
>> Abhishek
>>
>> On Sun, Apr 7, 2019 at 3:07 AM Timothy Victor  wrote:
>>
>>> Could this just be solved by creating a POJO model class for your
>>> problem?
>>>
>>> That is, instead of using Tuple6 - create a class that encapsulates your
>>> data.   This, I think, would solve your problem.  But beyond that I think
>>> the code will be more understandable.  It's hard to have a Tuple6 of all
>>> Strings, and remember what each one means -- even if I wrote the code :-)
>>> Furthermore, if and when you need to add more elements to your data model,
>>> you will need to refactor your entire Flink graph.   Keeping a data model
>>> in POJO protects against those things.
>>>
>>> The latter is just unsolicited code review feedback.   And I know I gave
>>> it without much context to your problem.  So please take with a large grain
>>> of salt, and if it doesn't apply just ignore it.
>>>
>>> Tim
>>>
>>>
>>> On Fri, Apr 5, 2019 at 7:53 AM Chesnay Schepler 
>>> wrote:
>>>
 > I tried using  [ keyBy(KeySelector, TypeInformation) ]

 What was 

Re: 请教集群稳定性问题

2019-05-09 Thread Yun Gao
心跳超时的话,先看一下AM和TM的内存使用情况,看下GC Log有没有长时间的GC。


--
From:naisili Yuan 
Send Time:2019 May 10 (Fri.) 09:34
To:user-zh 
Subject:请教集群稳定性问题

我的集群配置的是内存checkpoint,自动重启,但是经常跑了一晚上就自动重启,重启的原因日志是这样的:

org.apache.flink.util.FlinkException: The assigned slot
f6b9b4065386152879a01dfc7d396f42_1 was removed.
 at 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlot(SlotManager.java:893)
 at 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlots(SlotManager.java:863)
 at 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.internalUnregisterTaskManager(SlotManager.java:1058)
 at 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.unregisterTaskManager(SlotManager.java:385)
 at 
org.apache.flink.runtime.resourcemanager.ResourceManager.closeTaskManagerConnection(ResourceManager.java:825)
 at 
org.apache.flink.runtime.resourcemanager.ResourceManager$TaskManagerHeartbeatListener$1.run(ResourceManager.java:1139)
 at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
 at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
 at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
 at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
 at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
 at 
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
 at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
 at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
 at akka.actor.ActorCell.invoke(ActorCell.scala:495)
 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
 at akka.dispatch.Mailbox.run(Mailbox.scala:224)
 at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
 at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

或者是:

java.util.concurrent.TimeoutException: Heartbeat of TaskManager with
id 7675b5849deb7da116ad946eed0f74b6 timed out.
 at 
org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1631)
 at 
org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl$HeartbeatMonitor.run(HeartbeatManagerImpl.java:339)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 at 
org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:154)
 at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
 at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
 at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

想请教下,有没有给出的flink参考配置能解决这方面的问题,我的是standalone模式部署的。先谢谢了!


Re: Type Erasure - Usage of class Tuple as a type is not allowed. Use a concrete subclass (e.g. Tuple1, Tuple2, etc.

2019-05-09 Thread Vijay Balakrishnan
Hi Chesnay,
Sorry for causing the confusion. I solved the problem by following another
person's recommendation on the other post about using a wrapper POJO.
So, I used a wrapper MonitoringTuple to wrap the Tuple and that solved my
problem with varying number of fields in the Tuple interface.

public class MonitoringTuple {
> private Tuple tuple;
>
>
Then, I used it like this:

> KeyedStream, MonitoringTuple>
> monitoringTupleKeyedStream = kinesisStream.keyBy(new
> MapTupleKeySelector(groupBySet));


The MapTupleKeySelector is defined below:

> public static class MapTupleKeySelector implements KeySelector Object>, MonitoringTuple> {

private final Set groupBySet;


> public MapTupleKeySelector(Set groupBySet) {

this.groupBySet = groupBySet;

}


> @Override

public MonitoringTuple getKey(Map inputMap) {

int groupBySetSize = groupBySet.size();

Tuple tuple = Tuple.newInstance(groupBySetSize);

int count = 0;

for (String groupBy : groupBySet) {

count = setTupleField(inputMap, tuple, count, groupBy);

}

return new MonitoringTuple(tuple);

}

}


> public static int setTupleField(Map inputMap, Tuple
> tuple, int count, String groupBy) {

Object groupByValueObj = inputMap.get(groupBy);

String tupleValue = Utils.convertToString(groupByValueObj);

tuple.setField(tupleValue, count++);

return count;

}

}





TIA,

On Thu, May 2, 2019 at 4:54 AM Chesnay Schepler  wrote:

> I'm not sure what you're asking.
>
> If you have a Deserialization schema that convert the data into a Map
> you're done as I understand it, what do you believe to be missing?
>
> If, for a given job, the number/types of fields are fixed you could look
> into using Row.
>
> On 01/05/2019 22:40, Vijay Balakrishnan wrote:
>
> Hi,
> Had asked this questions earlier as topic - "Flink - Type Erasure
> Exception trying to use Tuple6 instead of Tuple"
>
> Having issues defining a generic Tuple instead of a specific Tuple1,Tuple2
> etc.
> Exception in thread "main"
> org.apache.flink.api.common.functions.InvalidTypesException: Usage of class
> Tuple as a type is not allowed. Use a concrete subclass (e.g. Tuple1,
> Tuple2, etc.) instead.
>
> DataStream> kinesisStream = ...;
> KeyedStream, Tuple> monitoringTupleKeyedStream =
> kinesisStream.keyBy(new MapTupleKeySelector(groupBySet));//<= complains
> about Tuple type for monitoringTupleKeyedStream
> .
>
> public static class MapTupleKeySelector implements KeySelector Object>, Tuple> {
> private final Set groupBySet;
>
> public MapTupleKeySelector(Set groupBySet) {
> this.groupBySet = groupBySet;
> }
>
> @Override
> public Tuple getKey(Map inputMap) throws Exception
> {
> int groupBySetSize = groupBySet.size();
> Tuple tuple = Tuple.newInstance(groupBySetSize);
> //Tuple1 tuple = new Tuple1();
> int count = 0;
> for (String groupBy : groupBySet) {
> tuple.setField(groupByValue, count++);
> }
> return tuple;
> }
> }
>
> Abhishek had replied back in the Thread as follows: (posting in that
> thread as well creating a new thread):
> However, If you are trying to build some generic framework and for
> different streams, there would be different fields, you can follow the Map
> approach. For the latter approach, you need to write extra mapper class
> which will convert all the fields in the stream to the Map based stream.
>
> Can I get an example of how to create this extra Mapper class ?
>
> Currently, I am using deserialization to convert the incoming byte[] by
> implementing KinesisDeserializationSchema> to convert
> to a DataStream> kinesisStream.
>
> TIA,
>
>
>


请教集群稳定性问题

2019-05-09 Thread naisili Yuan
我的集群配置的是内存checkpoint,自动重启,但是经常跑了一晚上就自动重启,重启的原因日志是这样的:

org.apache.flink.util.FlinkException: The assigned slot
f6b9b4065386152879a01dfc7d396f42_1 was removed.
at 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlot(SlotManager.java:893)
at 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlots(SlotManager.java:863)
at 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.internalUnregisterTaskManager(SlotManager.java:1058)
at 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.unregisterTaskManager(SlotManager.java:385)
at 
org.apache.flink.runtime.resourcemanager.ResourceManager.closeTaskManagerConnection(ResourceManager.java:825)
at 
org.apache.flink.runtime.resourcemanager.ResourceManager$TaskManagerHeartbeatListener$1.run(ResourceManager.java:1139)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
at 
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

或者是:

java.util.concurrent.TimeoutException: Heartbeat of TaskManager with
id 7675b5849deb7da116ad946eed0f74b6 timed out.
at 
org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1631)
at 
org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl$HeartbeatMonitor.run(HeartbeatManagerImpl.java:339)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:154)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

想请教下,有没有给出的flink参考配置能解决这方面的问题,我的是standalone模式部署的。先谢谢了!


Re: How to export all not-null keyed ValueState

2019-05-09 Thread Averell Huyen Levan
Hi Fabian,

Sorry, but I am still confused about your guide. If I union the Toggle
stream with the StateReportTrigger stream, would that means I need to make
my Toggles broadcasted states? Or there's some way to modify the keyed
states from within the processBroadcastElement() method?

I tried to implement the other direction (which I briefed in my previous
email). It seems working, but I am not confident in that, not sure whether
it has any flaws. Could you please give your comment?
In my view, this implementation causes a lot of type-casting for my main
data stream, which might cause a high impact on performance (my toggle is
on in only about 1% of the keys, and the rate of input1.left is less than a
millionth comparing to the rate of input1.right)

/**
  * This KeyedBroadcastProcessFunction has:
  *input1: a keyed `DataStream[Either[Toggle, MyEvent]]`:
  *   input1.left: Toggles in the form of a tuple (Key, Boolean).
  *  When Toggle._2 == true, records from input1.right for the
same key will be forwarded to the main output.
  *  If it is false, records from input1.right for that same
key will be dropped
  *   input1.right: the main data stream
  *
  *input2: a broadcasted stream of StateReport triggers. When a
record arrived on this stream,
  *   the current value of Toggles will be sent out via the outputTag
  */
class EventFilterAndExportToggles(outputTag: OutputTag[Toggle])
  extends KeyedBroadcastProcessFunction[Key, Either[Toggle,
MyEvent], Any, MyEvent] {

   val toggleStateDescriptor = new
ValueStateDescriptor[Boolean]("MyEventToggle", classOf[Boolean])

   override def processElement(in1: Either[Toggle, MyEvent],
readOnlyContext:
KeyedBroadcastProcessFunction[Key, Either[Toggle, MyEvent], Any,
MyEvent]#ReadOnlyContext,
collector: Collector[MyEvent]): Unit = {
  in1 match {
 case Left(toggle) =>
getRuntimeContext.getState(toggleStateDescriptor).update(toggle._2)
 case Right(event) =>
if (getRuntimeContext.getState(toggleStateDescriptor).value())
   collector.collect(event)
  }
   }

   override def processBroadcastElement(in2: Any,
   context:
KeyedBroadcastProcessFunction[Key, Either[Toggle, MyEvent], Any,
MyEvent]#Context,
   collector: Collector[MyEvent]): Unit = {
  context.applyToKeyedState(toggleStateDescriptor, (k: Key, s:
ValueState[Boolean]) =>
 if (s != null) context.output(outputTag, (k, s.value(
   }
}

Thanks for your help.
Regards,
Averell

On Thu, May 9, 2019 at 7:31 PM Fabian Hueske  wrote:

> Hi,
>
> Passing a Context through a DataStream definitely does not work.
> You'd need to have the keyed state that you want to scan over in the
> KeyedBroadcastProcessFunction.
>
> For the toggle filter use case, you would need to have a unioned stream
> with Toggle and StateReport events.
> For the output, you can use side outputs to route the different outputs to
> different streams.
>
> Best, Fabian
>
> Am Do., 9. Mai 2019 um 10:34 Uhr schrieb Averell :
>
>> Thank you Congxian and Fabian.
>>
>> @Fabian: could you please give a bit more details? My understanding is: to
>> pass the context itself and an OutputTag to the KeyedStateFunction
>> parameter
>> of  KeyedBroadcastProcessFunction#Context.applyToKeyedState(), and from
>> within that KeyedStateFunction.process() send out the side output. Am I
>> understand your idea correctly?
>>
>> BTW, I have another question regarding KeyedBroadcastProcessFunction best
>> practice: I am having two streams: Data and Toggle. The stream Toggle is
>> just a keyed boolean stream, being used to filter data from the stream
>> Data.
>> And I am implementing that filter using a simple RichCoFlatMapFunction.
>>
>> Now that I want to export the list of keys which are currently toggled on.
>> Should I
>> (1) have one additional KeyedBroadcastProcessFunction operator (which has
>> Toggle and BroadCast as the input streams), or
>> (2) replace that RichCoFlatMapFunction with a new
>> KeyedBroadcastProcessFunction, which has both functionalities: filter and
>> export? Doing this would require unioning Toggle and Data into one single
>> keyed stream.
>>
>> Thanks and best regards,
>> Averell
>>
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>


Re: Reconstruct object through partial select query

2019-05-09 Thread Shahar Cizer Kobrinsky
Thanks Fabian,

I'm looking into a way to enrich it without having to know the internal
fields of the original event type.
Right now what I managed to do is to map Car into a TaggedEvent prior
to the SQL query, tags being empty, then run the SQL query selecting *origin,
enrich(.. ) as tags*
Not sure there's a better way but i guess that works



On Thu, May 9, 2019 at 12:50 AM Fabian Hueske  wrote:

> Hi,
>
> you can use the value construction function ROW to create a nested row (or
> object).
> However, you have to explicitly reference all attributes that you will add.
>
> If you have a table Cars with (year, modelName) a query could look like
> this:
>
> SELECT
>   ROW(year, modelName) AS car,
>   enrich(year, modelName) AS tags
> FROM Cars;
>
> Handling many attributes is always a bit painful in SQL.
> There is an effort to make the Table API easier to use for these use cases
> (for example Column Operations [1]).
>
> Best, Fabian
>
> [1] https://issues.apache.org/jira/browse/FLINK-11967
>
>
>
> Am Do., 9. Mai 2019 um 01:44 Uhr schrieb shkob1 <
> shahar.kobrin...@gmail.com>:
>
>> Just to be more clear on my goal -
>> Im trying to enrich the incoming stream with some meaningful tags based on
>> conditions from the event itself.
>> So the input stream could be an event looks like:
>> Class Car {
>>   int year;
>>   String modelName;
>> }
>>
>> i will have a config that are defining tags as:
>> "NiceCar" -> "year > 2015 AND position("Luxury" in modelName) > 0"
>>
>> So ideally my output will be in the structure of
>>
>> Class TaggedEvent {
>>Car origin;
>>String[] tags;
>> }
>>
>>
>>
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>


[no subject]

2019-05-09 Thread an0
You are right, thanks. But something is still not totally clear to me. I'll 
reuse your diagram with a little modification:

DataStream a = ...
a.map(A).map(B).keyBy().timeWindow(C)

and execute this with parallelism 2. However, keyBy only generates one single 
key value, and assume they all go to C.1. Does the data flow look like this?

A.1 -- B.1 -/-- C.1
/
A.2 -- B.2 --/   C.2

Will the lack of data into C.2 prevent C.1's windows from firing? Will the 
location of assignTimestampsAndWatermarks(after a, after map(A), after map(B), 
after keyBy) matter for the firing of C.1's windows?

By my understanding, the answers are "no" and "no". Correct?

Now comes the *silly* question: does C.2 exist at all? Since there is only one 
key value, only one C instance is needed. I could see that C.2 as a physical 
instance may exist, but as a logical instance it shouldn't exist in the diagram 
because it is unused. I feel the answer to this silly question may be the most 
important in understanding my and(perhaps many others') misunderstanding of 
situations like this.

If C.2 exists just because parallelism is set to 2, even though it is not 
logically needed, and it also serves as an input to the next operator if there 
is one, then the mystery is completely solved for me.

Use a concrete example:

DataStream a = ...
a.map(A).map(B).keyBy().assignTimestampsAndWatermarks(C).timeWindowAll(D)

A.1 -- B.1 -/-- C.1 --\
/ D
A.2 -- B.2 --/   C.2 --/

D's watermark can not progress because C.2's watermark can not progress, 
because C.2 doesn't have any input data, even though C.2 is not logically 
needed but it does exist and it ruins everything :p. Is this understanding 
correct?

On 2019/05/09 10:01:44, Fabian Hueske  wrote: 
> Hi,
> 
> Please find my response below.
> 
> Am Fr., 3. Mai 2019 um 16:16 Uhr schrieb an0 :
> 
> > Thanks, but it does't seem covering this rule:
> > --- Quote
> > Watermarks are generated at, or directly after, source functions. Each
> > parallel subtask of a source function usually generates its watermarks
> > independently. These watermarks define the event time at that particular
> > parallel source.
> >
> > As the watermarks flow through the streaming program, they advance the
> > event time at the operators where they arrive. Whenever an operator
> > advances its event time, it generates a new watermark downstream for its
> > successor operators.
> >
> > Some operators consume multiple input streams; a union, for example, or
> > operators following a keyBy(…) or partition(…) function. Such an operator’s
> > current event time is the minimum of its input streams’ event times. As its
> > input streams update their event times, so does the operator.
> > --- End Quote
> >
> > The most relevant part, I believe, is this:
> > "Some operators consume multiple input streams…operators following a
> > keyBy(…) function. Such an operator’s current event time is the minimum of
> > its input streams’ event times."
> >
> > But the wording of "current event time is the minimum of its input
> > streams’ event times" actually implies that the input streams(produced by
> > keyBy) have different watermarks, the exactly opposite of what you just
> > explained.
> >
> >
> IMO, the description in the documentation is correct, but looks at the
> issue from a different angle.
> An operator task typically has many input from which it receives records.
> Depending on the number of input operators (one ore more) and the
> connection between the operator and its input operators (forward,
> partition, broadcast), an operator task might have a connection to one,
> some, or all tasks of its input operators. Each input task can send a
> different watermark, but each task will also send the same watermark to all
> its output tasks.
> 
> So, this is a matter of distinguishing receiving (different) watermarks and
> emitting (the same) watermarks.
> 
> Best, Fabian
> 
> 
> > On 2019/05/03 07:32:07, Fabian Hueske  wrote:
> > > Hi,
> > >
> > > this should be covered here:
> > >
> > https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/event_time.html#watermarks-in-parallel-streams
> > >
> > > Best, Fabian
> > >
> > > Am Do., 2. Mai 2019 um 17:48 Uhr schrieb an0 :
> > >
> > > > This explanation is exactly what I'm looking for, thanks! Is such an
> > > > important rule documented anywhere in the official document?
> > > >
> > > > On 2019/04/30 08:47:29, Fabian Hueske  wrote:
> > > > > An operator task broadcasts its current watermark to all downstream
> > tasks
> > > > > that might receive its records.
> > > > > If you have an the following code:
> > > > >
> > > > > DataStream a = ...
> > > > > a.map(A).map(B).keyBy().window(C)
> > > > >
> > > > > and execute this with parallelism 2, your plan looks like this
> > > > >
> > > > > A.1 -- B.1 --\--/-- C.1
> > > > >   X
> > > > > A.2 -- B.2 --/--\-- C.2
> > > > >
> > > > > 

Re: I want to use MapState on an unkeyed stream

2019-05-09 Thread an0
Thanks, I didn't know that. But it is checkpoints to RocksDB, isn't it? BTW, is 
this special treatment of operator state documented anywhere?

On 2019/05/09 07:39:34, Fabian Hueske  wrote: 
> Hi,
> 
> Yes, IMO it is more clear.
> However, you should be aware that operator state is maintained on heap only
> (not in RocksDB).
> 
> Best, Fabian
> 
> 
> Am Mi., 8. Mai 2019 um 20:44 Uhr schrieb an0 :
> 
> > I switched to using operator list state. It is more clear. It is also
> > supported by RocksDBKeyedStateBackend, isn't it?
> >
> > On 2019/05/08 14:42:36, Till Rohrmann  wrote:
> > > Hi,
> > >
> > > if you want to increase the parallelism you could also pick a key
> > randomly
> > > from a set of keys. The price you would pay is a shuffle operation
> > (network
> > > I/O) which would not be needed if you were using the unkeyed stream and
> > > used the operator list state.
> > >
> > > However, with keyed state you could also use Flink's
> > > RocksDBKeyedStateBackend which allows to go out of core if your state
> > size
> > > should grow very large.
> > >
> > > Cheers,
> > > Till
> > >
> > > On Tue, May 7, 2019 at 5:57 PM an0  wrote:
> > >
> > > > But I only have one stream, nothing to connect it to.
> > > >
> > > > On 2019/05/07 00:15:59, Averell  wrote:
> > > > > From my understanding, having a fake keyBy (stream.keyBy(r =>
> > > > "dummyString"))
> > > > > means there would be only one slot handling the data.
> > > > > Would a broadcast function [1] work for your case?
> > > > >
> > > > > Regards,
> > > > > Averell
> > > > >
> > > > > [1]
> > > > >
> > > >
> > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > Sent from:
> > > > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
> > > > >
> > > >
> > >
> >
> 


Reduce key state

2019-05-09 Thread Frank Wilson
Hi,

In an unwindowed key stream while using event time semantics is state
stored indefinitely or does it get expired eventually (was wondering if the
state inherits the event time of the element that updated, and if it
expires when the watermark goes past it).

Thanks,

Frank


Re: assignTimestampsAndWatermarks not work after KeyedStream.process

2019-05-09 Thread Fabian Hueske
Hi,

Please find my response below.

Am Fr., 3. Mai 2019 um 16:16 Uhr schrieb an0 :

> Thanks, but it does't seem covering this rule:
> --- Quote
> Watermarks are generated at, or directly after, source functions. Each
> parallel subtask of a source function usually generates its watermarks
> independently. These watermarks define the event time at that particular
> parallel source.
>
> As the watermarks flow through the streaming program, they advance the
> event time at the operators where they arrive. Whenever an operator
> advances its event time, it generates a new watermark downstream for its
> successor operators.
>
> Some operators consume multiple input streams; a union, for example, or
> operators following a keyBy(…) or partition(…) function. Such an operator’s
> current event time is the minimum of its input streams’ event times. As its
> input streams update their event times, so does the operator.
> --- End Quote
>
> The most relevant part, I believe, is this:
> "Some operators consume multiple input streams…operators following a
> keyBy(…) function. Such an operator’s current event time is the minimum of
> its input streams’ event times."
>
> But the wording of "current event time is the minimum of its input
> streams’ event times" actually implies that the input streams(produced by
> keyBy) have different watermarks, the exactly opposite of what you just
> explained.
>
>
IMO, the description in the documentation is correct, but looks at the
issue from a different angle.
An operator task typically has many input from which it receives records.
Depending on the number of input operators (one ore more) and the
connection between the operator and its input operators (forward,
partition, broadcast), an operator task might have a connection to one,
some, or all tasks of its input operators. Each input task can send a
different watermark, but each task will also send the same watermark to all
its output tasks.

So, this is a matter of distinguishing receiving (different) watermarks and
emitting (the same) watermarks.

Best, Fabian


> On 2019/05/03 07:32:07, Fabian Hueske  wrote:
> > Hi,
> >
> > this should be covered here:
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/event_time.html#watermarks-in-parallel-streams
> >
> > Best, Fabian
> >
> > Am Do., 2. Mai 2019 um 17:48 Uhr schrieb an0 :
> >
> > > This explanation is exactly what I'm looking for, thanks! Is such an
> > > important rule documented anywhere in the official document?
> > >
> > > On 2019/04/30 08:47:29, Fabian Hueske  wrote:
> > > > An operator task broadcasts its current watermark to all downstream
> tasks
> > > > that might receive its records.
> > > > If you have an the following code:
> > > >
> > > > DataStream a = ...
> > > > a.map(A).map(B).keyBy().window(C)
> > > >
> > > > and execute this with parallelism 2, your plan looks like this
> > > >
> > > > A.1 -- B.1 --\--/-- C.1
> > > >   X
> > > > A.2 -- B.2 --/--\-- C.2
> > > >
> > > > A.1 will propagate its watermarks to B.1 because only B.1 will
> receive
> > > its
> > > > output events.
> > > > However, B.1 will propagate its watermarks to C.1 and C.2 because the
> > > > output of B.1 is partitioned and all C tasks might receive output
> events
> > > > from B.1.
> > > >
> > > > Best, Fabian
> > > >
> > > > Am Mo., 29. Apr. 2019 um 20:06 Uhr schrieb an0 :
> > > >
> > > > > Thanks very much. It definitely explains the problem I'm seeing.
> > > However,
> > > > > something I need to confirm:
> > > > > You say "Watermarks are broadcasted/forwarded anyway." Do you
> mean, in
> > > > > assingTimestampsAndWatermarks.keyBy.window, it doesn't matter what
> data
> > > > > flows through a specific key's stream, all key streams have the
> same
> > > > > watermarks? So time-wise, `window` behaves as if `keyBy` is not
> there
> > > at
> > > > > all?
> > > > >
> > > > > On 2019/04/26 06:34:10, Dawid Wysakowicz 
> > > wrote:
> > > > > > Hi,
> > > > > >
> > > > > > Watermarks are meta events that travel independently of data
> events.
> > > > > >
> > > > > > 1) If you assingTimestampsAndWatermarks before keyBy, all
> parallel
> > > > > > instances of trips have some data(this is my assumption) so
> > > Watermarks
> > > > > > can be generated. Afterwards even if some of the keyed partitions
> > > have
> > > > > > no data, Watermarks are broadcasted/forwarded anyway. In other
> words
> > > if
> > > > > > at some point Watermarks were generated for all partitions of a
> > > single
> > > > > > stage, they will be forwarded beyond this point.
> > > > > >
> > > > > > 2) If you assingTimestampsAndWatermarks after keyBy, you try to
> > > assign
> > > > > > watermarks for an empty partition which produces no Watermarks
> at all
> > > > > > for this partition, therefore there is no progress beyond this
> point.
> > > > > >
> > > > > > I hope this clarifies it a bit.
> > > > > >
> > > > > > Best,
> > > > > >
> > > > > > Dawid
> > > > > >
> > > > > > On 

Re: How to export all not-null keyed ValueState

2019-05-09 Thread Fabian Hueske
Hi,

Passing a Context through a DataStream definitely does not work.
You'd need to have the keyed state that you want to scan over in the
KeyedBroadcastProcessFunction.

For the toggle filter use case, you would need to have a unioned stream
with Toggle and StateReport events.
For the output, you can use side outputs to route the different outputs to
different streams.

Best, Fabian

Am Do., 9. Mai 2019 um 10:34 Uhr schrieb Averell :

> Thank you Congxian and Fabian.
>
> @Fabian: could you please give a bit more details? My understanding is: to
> pass the context itself and an OutputTag to the KeyedStateFunction
> parameter
> of  KeyedBroadcastProcessFunction#Context.applyToKeyedState(), and from
> within that KeyedStateFunction.process() send out the side output. Am I
> understand your idea correctly?
>
> BTW, I have another question regarding KeyedBroadcastProcessFunction best
> practice: I am having two streams: Data and Toggle. The stream Toggle is
> just a keyed boolean stream, being used to filter data from the stream
> Data.
> And I am implementing that filter using a simple RichCoFlatMapFunction.
>
> Now that I want to export the list of keys which are currently toggled on.
> Should I
> (1) have one additional KeyedBroadcastProcessFunction operator (which has
> Toggle and BroadCast as the input streams), or
> (2) replace that RichCoFlatMapFunction with a new
> KeyedBroadcastProcessFunction, which has both functionalities: filter and
> export? Doing this would require unioning Toggle and Data into one single
> keyed stream.
>
> Thanks and best regards,
> Averell
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: How to export all not-null keyed ValueState

2019-05-09 Thread Averell
Thank you Congxian and Fabian.

@Fabian: could you please give a bit more details? My understanding is: to
pass the context itself and an OutputTag to the KeyedStateFunction parameter
of  KeyedBroadcastProcessFunction#Context.applyToKeyedState(), and from
within that KeyedStateFunction.process() send out the side output. Am I
understand your idea correctly?

BTW, I have another question regarding KeyedBroadcastProcessFunction best
practice: I am having two streams: Data and Toggle. The stream Toggle is
just a keyed boolean stream, being used to filter data from the stream Data.
And I am implementing that filter using a simple RichCoFlatMapFunction.

Now that I want to export the list of keys which are currently toggled on.
Should I
(1) have one additional KeyedBroadcastProcessFunction operator (which has
Toggle and BroadCast as the input streams), or 
(2) replace that RichCoFlatMapFunction with a new
KeyedBroadcastProcessFunction, which has both functionalities: filter and
export? Doing this would require unioning Toggle and Data into one single
keyed stream.

Thanks and best regards,
Averell 




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


flink??????????????

2019-05-09 Thread ??????????
 flink??

Re: [Discuss] Add JobListener (hook) in flink job lifecycle

2019-05-09 Thread Flavio Pompermaier
Hi everybody,
any news on this? For us would be VERY helpful to have such a feature
because we need to execute a call to a REST service once a job ends.
Right now we do this after the env.execute() but this works only if the job
is submitted via the CLI client, the REST client doesn't execute anything
after env.execute().

Best,
Flavio




On Thu, Apr 25, 2019 at 3:12 PM Jeff Zhang  wrote:

> Hi  Beckett,
>
> Thanks for your feedback, See my comments inline
>
> >>>  How do user specify the listener? *
> What I proposal is to register JobListener in ExecutionEnvironment. I
> don't think we should make ClusterClient as public api.
>
> >>> Where should the listener run? *
> I don't think it is proper to run listener in JobMaster. The listener is
> user code, and usually it is depends on user's other component. So running
> it in client side make more sense to me.
>
> >>> What should be reported to the Listener? *
> I am open to add other api in this JobListener. But for now, I am afraid
> the ExecutionEnvironment is not aware of failover, so it is not possible to
> report failover event.
>
> >>> What can the listeners do on notifications? *
> Do you mean to pass JobGraph to these methods ? like following ( I am
> afraid JobGraph is not a public and stable api, we should not expose it to
> users)
>
> public interface JobListener {
>
> void onJobSubmitted(JobGraph graph, JobID jobId);
>
> void onJobExecuted(JobGraph graph, JobExecutionResult jobResult);
>
> void onJobCanceled(JobGraph graph, JobID jobId, String savepointPath);
> }
>
>
> Becket Qin  于2019年4月25日周四 下午7:40写道:
>
>> Thanks for the proposal, Jeff. Adding a listener to allow users handle
>> events during the job lifecycle makes a lot of sense to me.
>>
>> Here are my two cents.
>>
>> * How do user specify the listener? *
>> It is not quite clear to me whether we consider ClusterClient as a public
>> interface? From what I understand ClusterClient is not a public interface
>> right now. In contrast, ExecutionEnvironment is the de facto interface for
>> administrative work. After job submission, it is essentially bound to a job
>> as an administrative handle. Given this current state, personally I feel
>> acceptable to have the listener registered to the ExecutionEnvironment.
>>
>> * Where should the listener run? *
>> If the listener runs on the client side, the client have to be always
>> connected to the Flink cluster. This does not quite work if the Job is a
>> streaming job. Should we provide the option to run the listener in
>> JobMaster as well?
>>
>> * What should be reported to the Listener? *
>> Besides the proposed APIs, does it make sense to also report events such
>> as failover?
>>
>> * What can the listeners do on notifications? *
>> If the listeners are expected to do anything on the job, should some
>> helper class to manipulate the jobs be passed to the listener method?
>> Otherwise users may not be able to easily take action.
>>
>> Thanks,
>>
>> Jiangjie (Becket) Qin
>>
>>
>>
>>
>> On Wed, Apr 24, 2019 at 2:43 PM Jeff Zhang  wrote:
>>
>>> Hi Till,
>>>
>>> IMHO, allow adding hooks involves 2 steps.
>>> 1. Provide hook interface, and call these hook in flink (ClusterClient)
>>> at the right place. This should be done by framework (flink)
>>> 2. Implement new hook implementation and add/register them into
>>> framework(flink)
>>>
>>> What I am doing is step 1 which should be done by flink, step 2 is done
>>> by users. But IIUC, your suggestion of using custom ClusterClient seems
>>> mixing these 2 steps together. Say I'd like to add new hooks, I have to
>>> implement a new custom ClusterClient, add new hooks and call them in the
>>> custom ClusterClient at the right place.
>>> This doesn't make sense to me. For a user who want to add hooks, he is
>>> not supposed to understand the mechanism of ClusterClient, and should not
>>> touch ClusterClient. What do you think ?
>>>
>>>
>>>
>>>
>>> Till Rohrmann  于2019年4月23日周二 下午4:24写道:
>>>
 I think we should not expose the ClusterClient configuration via the
 ExecutionEnvironment (env.getClusterClient().addJobListener) because this
 is effectively the same as exposing the JobListener interface directly on
 the ExecutionEnvironment. Instead I think it could be possible to provide a
 ClusterClient factory which is picked up from the Configuration or some
 other mechanism for example. That way it would not need to be exposed via
 the ExecutionEnvironment at all.

 Cheers,
 Till

 On Fri, Apr 19, 2019 at 11:12 AM Jeff Zhang  wrote:

> >>>  The ExecutionEnvironment is usually used by the user who writes
> the code and this person (I assume) would not be really interested in 
> these
> callbacks.
>
> Usually ExecutionEnvironment is used by the user who write the code,
> but it doesn't needs to be created and configured by this person. e.g. in
> Zeppelin notebook, ExecutionEnvironment is created by Zeppelin, user just

Re: How to export all not-null keyed ValueState

2019-05-09 Thread Fabian Hueske
Hi,

The KeyedBroadcastProcessFunction has a method to iterate over all keys of
a keyed state.
This function is available via the Context object of the processBroadcast()
method.
Hence you need a broadcasted message to trigger the operation.

Best, Fabian

Am Do., 9. Mai 2019 um 08:46 Uhr schrieb Congxian Qiu <
qcx978132...@gmail.com>:

> Hi, Averell
>
> AFAIK, we can't get all the key-values from value state, but MapState has
> a function called `entries` can do this, maybe can use MapState as a
> workaround.
> On May 7, 2019, 16:16 +0800, Averell , wrote:
>
> Hi,
>
> I have a keyed value state which is available for only about 1% the total
> number of keyed values that I have. Is there any way to get the values of
> all those state values?
> I looked at the queryable state option, but it looks like supporting
> querying by keyed value only.
>
> Thanks and best regards,
> Averell
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>
>


Re: Reconstruct object through partial select query

2019-05-09 Thread Fabian Hueske
Hi,

you can use the value construction function ROW to create a nested row (or
object).
However, you have to explicitly reference all attributes that you will add.

If you have a table Cars with (year, modelName) a query could look like
this:

SELECT
  ROW(year, modelName) AS car,
  enrich(year, modelName) AS tags
FROM Cars;

Handling many attributes is always a bit painful in SQL.
There is an effort to make the Table API easier to use for these use cases
(for example Column Operations [1]).

Best, Fabian

[1] https://issues.apache.org/jira/browse/FLINK-11967



Am Do., 9. Mai 2019 um 01:44 Uhr schrieb shkob1 :

> Just to be more clear on my goal -
> Im trying to enrich the incoming stream with some meaningful tags based on
> conditions from the event itself.
> So the input stream could be an event looks like:
> Class Car {
>   int year;
>   String modelName;
> }
>
> i will have a config that are defining tags as:
> "NiceCar" -> "year > 2015 AND position("Luxury" in modelName) > 0"
>
> So ideally my output will be in the structure of
>
> Class TaggedEvent {
>Car origin;
>String[] tags;
> }
>
>
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: I want to use MapState on an unkeyed stream

2019-05-09 Thread Fabian Hueske
Hi,

Yes, IMO it is more clear.
However, you should be aware that operator state is maintained on heap only
(not in RocksDB).

Best, Fabian


Am Mi., 8. Mai 2019 um 20:44 Uhr schrieb an0 :

> I switched to using operator list state. It is more clear. It is also
> supported by RocksDBKeyedStateBackend, isn't it?
>
> On 2019/05/08 14:42:36, Till Rohrmann  wrote:
> > Hi,
> >
> > if you want to increase the parallelism you could also pick a key
> randomly
> > from a set of keys. The price you would pay is a shuffle operation
> (network
> > I/O) which would not be needed if you were using the unkeyed stream and
> > used the operator list state.
> >
> > However, with keyed state you could also use Flink's
> > RocksDBKeyedStateBackend which allows to go out of core if your state
> size
> > should grow very large.
> >
> > Cheers,
> > Till
> >
> > On Tue, May 7, 2019 at 5:57 PM an0  wrote:
> >
> > > But I only have one stream, nothing to connect it to.
> > >
> > > On 2019/05/07 00:15:59, Averell  wrote:
> > > > From my understanding, having a fake keyBy (stream.keyBy(r =>
> > > "dummyString"))
> > > > means there would be only one slot handling the data.
> > > > Would a broadcast function [1] work for your case?
> > > >
> > > > Regards,
> > > > Averell
> > > >
> > > > [1]
> > > >
> > >
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html
> > > >
> > > >
> > > >
> > > > --
> > > > Sent from:
> > > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
> > > >
> > >
> >
>


Re: taskmanager.tmp.dirs vs io.tmp.dirs

2019-05-09 Thread Fabian Hueske
Hi,

I created FLINK-12460 to update the documentation.

Cheers, Fabian

Am Mi., 8. Mai 2019 um 17:48 Uhr schrieb Flavio Pompermaier <
pomperma...@okkam.it>:

> Great, thanks Till!
>
> On Wed, May 8, 2019 at 4:20 PM Till Rohrmann  wrote:
>
>> Hi Flavio,
>>
>> taskmanager.tmp.dirs is the deprecated configuration key which has been
>> superseded by the io.tmp.dirs configuration option. In the future, you
>> should use io.tmp.dirs.
>>
>> Cheers,
>> Till
>>
>> On Wed, May 8, 2019 at 3:32 PM Flavio Pompermaier 
>> wrote:
>>
>>> Hi to all,
>>> looking at
>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html it's
>>> not very clear to me the difference between these two settings (actually I
>>> always used the same value for the two).
>>>
>>> My understanding is that taskmanager.tmp.dirs is used to spill memory
>>> when there's no more RAM available, while io.tmp.dirs for all other
>>> situations (but which are them exactly?).
>>>
>>> Thanks in advance,
>>> Flavio
>>>
>>
>


Re: Inconsistent documentation of Table Conditional functions

2019-05-09 Thread Flavio Pompermaier
Ok, thanks a lot for the clarification! Adding an "Example" on the right of
"Description" would be very helpful (IMHO)

Best,
Flavio

On Wed, May 8, 2019 at 6:19 PM Xingcan Cui  wrote:

> Hi Flavio,
>
> In the description, resultX is just an identifier for the result of the
> first meeting condition.
>
> Best,
> Xingcan
>
> On May 8, 2019, at 12:02 PM, Flavio Pompermaier 
> wrote:
>
> Hi to all,
> in the documentation of the Table Conditional functions [1] the example is
> inconsistent with the related description (there's no resultX for example).
> Or am I wrong?
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/functions.html#conditional-functions
>
>
>


Re: Testing with ProcessingTime and FromElementsFunction with TestEnvironment

2019-05-09 Thread Till Rohrmann
Hi Steve,

afaik there is no such thing in Flink. I agree that Flink's testing
utilities should be improved. If you implement such a source, then you
might be able to contribute it back to the community. That would be super
helpful.

Cheers,
Till

On Wed, May 8, 2019 at 6:40 PM Steven Nelson 
wrote:

>
> That’s what I figured was happening :( Your explanation is a lot better
> than what I gave to my team, so that will help a lot, thank you!
>
> Is there a testing source already created that does this sort of thing?
> The Flink-testing library seems a bit sparse.
>
> -Steve
>
> Sent from my iPhone
>
> On May 8, 2019, at 9:33 AM, Till Rohrmann  wrote:
>
> Hi Steve,
>
> I think the reason for the different behaviour is due to the way event
> time and processing time are implemented.
>
> When you are using event time, watermarks need to travel through the
> topology denoting the current event time. When you source terminates, the
> system will send a watermark with Long.MAX_VALUE through the topology. This
> will effectively trigger the completion of all pending event time
> operations.
>
> In the case of processing time, Flink does not do this. Instead it simply
> relies on the processing time clocks on each machine. Hence, there is no
> way for Flink to tell the different machines that their respective
> processing time clocks should proceed to a certain time in case of a
> shutdown. Instead you should make sure that you don't terminate the job
> before a certain time (processing time) has passed. You could do this by
> adding a sleep to your source function after you've output all records and
> just before leaving the source loop.
>
> Cheers,
> Till
>
> On Tue, May 7, 2019 at 11:49 PM Steven Nelson 
> wrote:
>
>> Hello!
>>
>> I am trying to write a test that runs in the TestEnviroment. I create a
>> process that uses ProcessingTime, has a source constructed from a
>> FromElementsFunction and runs data through a Keyed Stream into
>> a ProcessingTimeSessionWindows.withGap().
>>
>> The problem is that it appears that the env.execute method returns
>> immediately after the session closes, not allowing the events to be
>> released from the window before shutdown occurs. This used to work when I
>> used EventTime.
>>
>> Thoughts?
>> -Steve
>>
>


Re: Checkpoint expired before completing with cleanupInRocksdbCompactFilter

2019-05-09 Thread Congxian Qiu
Hi, Mu
Is there anything  looks like `Received  late message for now expired 
checkpoint attempt ${checkpointID} from ${taskkExecutionID} of job ${jobID}` in 
JM log?

If yes, that means this task complete the checkpoint too long (maybe receive 
barrier too late, maybe spend too much time to do checkpoint, can investigate 
more from TM log);


Best
Congxian
On May 9, 2019, 14:44 +0800, Mu Kong , wrote:
> Hi community,
>
> I'm glad that in Flink 1.8.0, it introduced cleanupInRocksdbCompactFilter to 
> support state clean up for rocksdb backend.
> We have an application that heavily relies on managed keyed store.
> As we are using rocksdb as the state backend, we were suffering the issue of 
> ever-growing state size. To be more specific, our checkpoint size grows into 
> 200GB in 2 weeks.
>
> After upgrade to 1.8.0 and utilize the cleanupInRocksdbCompactFilter ttl 
> config, the checkpoint size never grows over 10GB.
> However, two days after upgrade, checkpointing started to fail because of the 
> "Checkpoint expired before completing".
>
> From the log, I could not get anything useful.
> But in the Flink UI, the last successful checkpoint took 1m to finish, and 
> our checkpoint timeout is set to 15m.
> It seems that the checkpoint period became extremely long all of a sudden.
>
> Is there anyway that I can further look into this? Or is there any direction 
> that I can tune the ttl for the application?
>
> Thanks in advance!
>
> Best regards,
> Mu


Re: writeAsFormattedText sets only Unix/Linux line endings

2019-05-09 Thread Chesnay Schepler
The line-ending is hard-coded into the TextOutputFormat. You will have 
to either extend this class and override #writeRecord(), or convert your 
POJOs to a Tuple and use the CsvOutputFormat, which supports setting the 
line delimiter (called recordDelimiter).


On 09/05/2019 08:32, Papadopoulos, Konstantinos wrote:


Kind reminder

*From:*Papadopoulos, Konstantinos 


*Sent:* Monday, May 06, 2019 5:31 PM
*To:* user@flink.apache.org
*Subject:* writeAsFormattedText sets only Unix/Linux line endings

Hi all,

We are developing an application using Flink DataSet API focusing on 
generating a CSV file from a dataset of POJOs using 
writeAsFormattedText and a custom TextFormatter.


During the testing of our application, we observed that the files 
generated consist of Unix line endings (i.e., ‘\n’) even if the 
execution environment is Windows (i.e., expected line endings ‘\r\n’).


Since this is critical to our solution, is there any way to configure 
the line endings of the files generated?


Thanks in advance,

Konstantinos





Re: How to export all not-null keyed ValueState

2019-05-09 Thread Congxian Qiu
Hi, Averell

AFAIK, we can't get all the key-values from value state, but MapState has a 
function called `entries` can do this, maybe can use MapState as a workaround.
On May 7, 2019, 16:16 +0800, Averell , wrote:
> Hi,
>
> I have a keyed value state which is available for only about 1% the total
> number of keyed values that I have. Is there any way to get the values of
> all those state values?
> I looked at the queryable state option, but it looks like supporting
> querying by keyed value only.
>
> Thanks and best regards,
> Averell
>
>
>
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Checkpoint expired before completing with cleanupInRocksdbCompactFilter

2019-05-09 Thread Mu Kong
Hi community,

I'm glad that in Flink 1.8.0, it introduced cleanupInRocksdbCompactFilter
to support state clean up for rocksdb backend.
We have an application that heavily relies on managed keyed store.
As we are using rocksdb as the state backend, we were suffering the issue
of ever-growing state size. To be more specific, our checkpoint size grows
into 200GB in 2 weeks.

After upgrade to 1.8.0 and utilize the cleanupInRocksdbCompactFilter ttl
config, the checkpoint size never grows over 10GB.
However, two days after upgrade, checkpointing started to fail because of
the "*Checkpoint expired before completing*".

>From the log, I could not get anything useful.
But in the Flink UI, the last successful checkpoint took 1m to finish, and
our checkpoint timeout is set to 15m.
It seems that the checkpoint period became extremely long all of a sudden.

Is there anyway that I can further look into this? Or is there any
direction that I can tune the ttl for the application?

Thanks in advance!

Best regards,
Mu


RE: writeAsFormattedText sets only Unix/Linux line endings

2019-05-09 Thread Papadopoulos, Konstantinos
Kind reminder

From: Papadopoulos, Konstantinos 
Sent: Monday, May 06, 2019 5:31 PM
To: user@flink.apache.org
Subject: writeAsFormattedText sets only Unix/Linux line endings

Hi all,

We are developing an application using Flink DataSet API focusing on generating 
a CSV file from a dataset of POJOs using writeAsFormattedText and a custom 
TextFormatter.
During the testing of our application, we observed that the files generated 
consist of Unix line endings (i.e., '\n') even if the execution environment is 
Windows (i.e., expected line endings '\r\n').
Since this is critical to our solution, is there any way to configure the line 
endings of the files generated?

Thanks in advance,
Konstantinos