I'll have an other look at the example code you sent me.
Thanks.

On Fri, Dec 11, 2015 at 3:00 PM, Stephan Ewen <se...@apache.org> wrote:

> Hi Niels!
>
> Quick question (I am probably overlooking something here) - if you simply
> want to emit each element (Trigger onElement) in addition to a special data
> stream, can you not simply have two operators that consume the original
> data stream: The window operator and the additional source.
>
> If you need each element to pass through the window function anyways, I
> think it may almost be easier to use the custom state with timeout example
> I sent you a while back. There you have fill flexibility and need not
> separate between trigger state, window state, etc...
>
> Stephan
>
>
>
> On Fri, Dec 11, 2015 at 1:59 PM, Aljoscha Krettek <aljos...@apache.org>
> wrote:
>
>> Hi Niels,
>> I’m afraid this will not work. (If I understood correctly what you are
>> trying to do.) When the trigger is being serialized/deserialized each
>> parallel instance of the trigger has their own copy of the QueueSource
>> object. Plus, a separate instance of the QueueSource itself will be running
>> in each parallel instance of the source operator. And there is no way for
>> there being communication between the trigger and source, since they might
>> now even run on the same machine in the end.
>>
>> Cheers,
>> Aljoscha
>> > On 11 Dec 2015, at 13:11, Niels Basjes <ni...@basjes.nl> wrote:
>> >
>> > Hi,
>> >
>> > Just to let you know: I tried passing a SourceFunction but I haven't
>> been able to get that to work (yet).
>> >
>> > I passed an instance of this (see code below) into my Trigger and
>> stored it there as:
>> >     private QueueSource output;
>> > and then I called from the onElement something like:
>> >    output.put("Foo",1234);
>> >
>> > When I run this from my IDE I get two distinct instances of the queue
>> (effect: the stuff I put in doesn't come out at the other end).
>> >
>> > Any pointers how (and if) this can be fixed are welcome.
>> > Only if this works will I look into making this a generic (I got some
>> type related exceptions when I tried that).
>> >
>> > Niels
>> >
>> >
>> > (Below has Apache 2.0 License; so copy adapt and improve if you want to)
>> > package nl.basjes.flink.experiments;
>> >
>> > import org.apache.flink.configuration.Configuration;
>> > import
>> org.apache.flink.streaming.api.functions.source.RichEventTimeSourceFunction;
>> >
>> > import java.util.concurrent.ConcurrentLinkedQueue;
>> >
>> > public class QueueSource extends RichEventTimeSourceFunction<String> {
>> >     private static final long serialVersionUID = 1L;
>> >
>> >     private volatile boolean isRunning = true;
>> >
>> >     private ConcurrentLinkedQueue<QueueElement> queue = new
>> ConcurrentLinkedQueue<>();
>> >
>> >     @Override
>> >     public void open(Configuration parameters) throws Exception {
>> >         super.open(parameters);
>> >     }
>> >
>> >     @Override
>> >     public void close() throws Exception {
>> >         super.close();
>> >     }
>> >
>> >     @Override
>> >     public void run(SourceContext<String> ctx) throws Exception {
>> >         this.isRunning = true;
>> >
>> >         while (this.isRunning) {
>> >             if (queue.isEmpty()) {
>> >                 Thread.sleep(1); // Sleep 1 ms before retrying to
>> dequeue again
>> >                 continue;
>> >             }
>> >             QueueElement queueElement = queue.poll();
>> >             ctx.collectWithTimestamp(queueElement.element,
>> queueElement.timestamp);
>> >         }
>> >     }
>> >
>> >     public void cancel() {
>> >         this.isRunning = false;
>> >     }
>> >
>> >     public void put(String element, long timestamp) {
>> >         QueueElement queueElement = new QueueElement();
>> >         queueElement.element = element;
>> >         queueElement.timestamp = timestamp;
>> >         queue.add(queueElement);
>> >     }
>> > }
>> >
>> > class QueueElement {
>> >     String element;
>> >     long timestamp;
>> > }
>> >
>> >
>> >
>> > On Fri, Dec 11, 2015 at 11:07 AM, Niels Basjes <ni...@basjes.nl> wrote:
>> > Thanks.
>> >
>> > The way I solved it now is by creating a class that persists data into
>> something external (right now HBase and/or Kafka) and use that from the
>> trigger to output the data.
>> >
>> > I have two followup questions:
>> > 1) Is it possible to pass an instance of  'SourceFunction' as such a
>> parameter (without breaking Flink)?
>> > 2) I want to save resources so I'm using a single instance of my 'Extra
>> data output class' in the instance of the Trigger. Thus reusing the
>> connections to the outside over multiple Window instances. Can I assume
>> that a single instance of Trigger will only be used by a single thread?
>> I.e. Can I assume that I do not need locking and synchronization?
>> >
>> > Niels
>> >
>> > On Thu, Dec 10, 2015 at 4:14 PM, Stephan Ewen <se...@apache.org> wrote:
>> > Hi Niels!
>> >
>> > I think there is no clean way to emit data from a trigger right now,
>> you can only emit data from the window functions.
>> >
>> > You can emit two different kind of data types using an "Either" type.
>> This is built-in in Scala, in Java we added it on 1.0-SNAPSHOT:
>> >
>> https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/typeutils/Either.java
>> >
>> > Maybe being able to emit different type of elements helps your use
>> case...
>> >
>> >
>> > These types of questions have been coming up quite a bit, people
>> looking to do different actions inside the windows on different triggers
>> (on element, on event time).
>> >
>> > As per discussion with Aljoscha, one way to make this more flexible is
>> to enhance what you can do with custom state:
>> >   - State has timeouts (for cleanup)
>> >   - Functions allow you to schedule event-time progress notifications
>> >
>> > Stephan
>> >
>> >
>> >
>> > On Thu, Dec 10, 2015 at 11:55 AM, Niels Basjes <ni...@basjes.nl> wrote:
>> > Hi,
>> >
>> > I'm working on something that uses the Flink Window feature.
>> > I have written a custom Trigger to build the Window I need.
>> >
>> > I am using the Window feature because I need state and I need to expire
>> (and clean) this state after a timeout (I use the onEventTime to do that).
>> > Because I need the data streaming in real time (augmented with the
>> mentioned state) I 'FIRE' after every event. Just before I 'PURGE' the
>> window I need the fact of this purge (and some of the stats of this Window)
>> as a separate event in a separate 'DataStream'.
>> >
>> > Now the interfaces of the various classes only support output as a
>> single java type (very sane choice).
>> > So what I do right now is put my events on something 'external'
>> (HBase/Kafka) and read it in via a different Source implementation.
>> >
>> > My question: Is there a better way to do this?
>> > Can I (for example) create a special 'Source' that I can pass as a
>> parameter to my Trigger and then onEventTime just output a 'new event' ?
>> >
>> > What do you recommend?
>> >
>> > --
>> > Best regards / Met vriendelijke groeten,
>> >
>> > Niels Basjes
>> >
>> >
>> >
>> >
>> > --
>> > Best regards / Met vriendelijke groeten,
>> >
>> > Niels Basjes
>> >
>> >
>> >
>> > --
>> > Best regards / Met vriendelijke groeten,
>> >
>> > Niels Basjes
>>
>>
>


-- 
Best regards / Met vriendelijke groeten,

Niels Basjes

Reply via email to