CEP join across events

2017-04-25 Thread Elias Levy
There doesn't appear to be a way to join events across conditions using the
CEP library.

Consider events of the form (type, value_a, value_b) on a stream keyed by
the value_a field.

Under 1.2 you can create a pattern that for a given value_a, as specified
by the stream key, there is a match if an event of type 1 is followed by an
event of type 2 (e.g.
begin("foo").where(_.type==1).followedBy("bar").where(_.type==2).  But this
will return a match regardless of whether value_b in the first event
matches value_b in the second event.

1.3 snapshot introduces iterative conditions, but this is insufficient.  In
1.3 you can do:

begin("foo").where(_.type==1).followedBy("bar").where(
(v, ctx) => {
   v.type == 2 &&
   ctx.getEventsForPattern("foo").asScala.exists(prev => prev.value_b
== v.value_b)
})

This will accept the current event if any if any previously had a value_b
that matches the current event. But the matches will include all previous
events, even those that did not match the current event at value_b, instead
of only matching the previous event where value_b equals the current event.

Is there a way to only output the match there previous event matches the
current event value_b (e.g. foo == (type=1, value_a=K, value_b=X) and bar
== (type=2, value_a=K, value_b=X)?


回复:Multiple consumers on a subpartition

2017-04-25 Thread Zhijiang(wangzhijiang999)
Hi albert,
As I know, if the upstream data will be consumed by multiple consumers, it 
will generate multiple subpartitions, and each subpartition will correspond to 
one input channel consumer.So it is one-to-one correspondence among 
subpartition -> subpartition view -> input channel.
cheers,zhijiang--发件人:albertjonathan
 发送时间:2017年4月26日(星期三) 02:37收件人:user 主 
题:Multiple consumers on a subpartition
Hello,

Is there a way Flink allow a (pipelined) subpartition to be consumed by
multiple consumers? If not, would it make more sense to implement it as
multiple input channels for a single subpartition or multiple subpartition
views for each input channel?

Any suggestion is appreciated.

Thanks



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Multiple-consumers-on-a-subpartition-tp12809.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Flink docs in regards to State

2017-04-25 Thread Sand Stone
Hi, Flink newbie here.

I played with the API (built from GitHub master), I encountered some
issues but I am not sure if they are limitations or actually by
design:
1. the data stream reduce method does not take a
RichReduceFunction. The code compiles but throws runtime exception
when submitted. [My intent is to maintain a MapState, more below]

 2. Flink seems to be picky on where the MapState is used at
runtime. MapState is restricted to keyed stream, and cannot be used
with certain operators. However I might need to maintain a MapState
for certain (persistent) keyed state for processing contexts. [I could
use an external kv store via async io API, but I am hoping Flink could
help to maintain the (rocksdb) db instances so I could avoid another
layer of external store].

Any pointer to blog/doc/video is greatly appreciated.

Thanks!


Re: put record to kinesis and then trying consume using flink connector

2017-04-25 Thread Alex Reid
Hi Sathi,

I believe the issue is because you pushed the event into the stream and
then you started up a consumer app to start reading after that. If you push
an event into the kinesis stream prior to starting up a reader that sets
its initial stream position to LATEST, it will not read that record because
you told it to start reading from the time you started up the consumer app.
"LATEST" does not mean read the last event that was pushed into the stream,
it means start reading from "now"/consumer app start basically.

- alex

On Tue, Apr 25, 2017 at 4:00 PM, Sathi Chowdhury <
sathi.chowdh...@elliemae.com> wrote:

> Hi ,
>
> I also had a question around how long is the data that you broadcast in a
> stream that is not changing available in operator’s JVM …will it be as long
> as the operator is alive.
>
> What happens when a slot dies. Does the new slot automatically get aware
> of the broadcasted data?
>
> Thanks
>
> Sathi
>
>
>
> *From: *Sathi Chowdhury 
> *Date: *Tuesday, April 25, 2017 at 3:56 PM
> *To: *"Tzu-Li (Gordon) Tai" , "user@flink.apache.org"
> 
>
> *Subject: *Re: put record to kinesis and then trying consume using flink
> connector
>
>
>
> Hi Gordon,
>
> That was a typo, as I was trying to mask off the stream name.. I still had
> issues with using Latest as the initial stream position , I moved to using
> AT_TIMESTAMP to solve it, it works fine now.
>
> Thanks so much for your response.
>
> Sathi
>
>
>
> *From: *"Tzu-Li (Gordon) Tai" 
> *Date: *Sunday, April 23, 2017 at 3:32 PM
> *To: *"user@flink.apache.org" 
> *Subject: *Re: put record to kinesis and then trying consume using flink
> connector
>
>
>
> Hi Sathi,
>
>
>
> Here, in the producer-side log, it says:
>
> 2017-04-22 19:46:53,608 INFO  [main] DLKnesisPublisher: Successfully
> published record, of bytes :162810 partition key 
> :fb043b73-1d9d-447a-a593-b9c1bf3d4974-1492915604228
> ShardId: shardId-Sequence number495725395777626377931320
> 19619873654976833479400677703682 Stream Name:mystream
>
> The stream the record was inserted into is “mystream”.
>
> However,
>
> DataStream outputStream = see.addSource(*new 
> *FlinkKinesisConsumer<>(*"myStream"*, *new *MyDeserializerSchema(), 
> consumerConfig));
>
> you seem to be consuming from “myStream”.
>
> Could the capital “S” be the issue?
>
>
>
> Cheers,
>
> Gordon
>
>
>
>
>
> On 24 April 2017 at 12:33:28 AM, Sathi Chowdhury (
> sathi.chowdh...@elliemae.com) wrote:
>
> *Hi Flink Dev,*
>
> *I thought something will work easily with flink and it is simple enough
> ,yer I am struggling to make it work.*
>
> *I am using flink kinesis connector ..using 1.3-SNAPSHOT version.*
>
>
>
> *Basically I am trying to bootstrap a stream with one event pushed into it
> as a warmup inside flink job’s main method and I use aws kinesis client to
> simply putrecord into a given stream.*
>
> *My expectation is that now if I addSource to a kinesis stream the data
> stream will consume the event I pushed.*
>
>
>
>
>
>
>
> *//This is the method that pushes to the kinesis Stream “mystream”*
>
> *publishToKinesis(“mystream”,regionName,data) ;*
>
>
>
>
>
> Properties consumerConfig = *new *Properties();
> consumerConfig.setProperty(ConsumerConfigConstants.*AWS_REGION*, 
> region);
> 
> consumerConfig.setProperty(ConsumerConfigConstants.*DEFAULT_STREAM_INITIAL_POSITION*,
>  ConsumerConfigConstants.InitialPosition.*LATEST*.toString());
> 
> consumerConfig.setProperty(ConsumerConfigConstants.*AWS_CREDENTIALS_PROVIDER*,
>  *"AUTO"*);
>
> *final *LocalFlinkMiniCluster cluster = *new 
> *LocalFlinkMiniCluster(*new *Configuration(), *false*);
> cluster.start();
> ObjectMapper mapper = *new *ObjectMapper();
> *final *StreamExecutionEnvironment see = 
> StreamExecutionEnvironment.*createRemoteEnvironment*(
> *"localhost"*, cluster.getLeaderRPCPort());
>
> DataStream outputStream = see.addSource(*new 
> *FlinkKinesisConsumer<>(*"myStream"*, *new *MyDeserializerSchema(), 
> consumerConfig));
>
>
> *for *(Iterator it = 
> DataStreamUtils.*collect*(outputStream); it.hasNext(); ) {
> String actualOut = it.next();
> ObjectNode actualOutNode = (ObjectNode) 
> mapper.readTree(actualOut);
>
> //then I do want to  either print it or do some further 
> validation etc.
>}
>
>
>
>
>
> ……..
>
>
>
> *Not sure why the record that I published , FlinkKinesisConsumer is not
> able to react to it, it keeps waiting for it…at the step *it.next();
>
>
>
>
>
> I print out the SequenceNumber I put the record at
>
> 2017-04-22 19:46:53,608 INFO  [main] DLKnesisPublisher: Successfully
> published record, of bytes :162810 partition key 
> :fb043b73-1d9d-447a-a593-b9c1bf3d4974-1492915604228
> ShardId: 

Re: put record to kinesis and then trying consume using flink connector

2017-04-25 Thread Sathi Chowdhury
Hi ,
I also had a question around how long is the data that you broadcast in a 
stream that is not changing available in operator’s JVM …will it be as long as 
the operator is alive.
What happens when a slot dies. Does the new slot automatically get aware of the 
broadcasted data?
Thanks
Sathi

From: Sathi Chowdhury 
Date: Tuesday, April 25, 2017 at 3:56 PM
To: "Tzu-Li (Gordon) Tai" , "user@flink.apache.org" 

Subject: Re: put record to kinesis and then trying consume using flink connector

Hi Gordon,
That was a typo, as I was trying to mask off the stream name.. I still had 
issues with using Latest as the initial stream position , I moved to using 
AT_TIMESTAMP to solve it, it works fine now.
Thanks so much for your response.
Sathi

From: "Tzu-Li (Gordon) Tai" 
Date: Sunday, April 23, 2017 at 3:32 PM
To: "user@flink.apache.org" 
Subject: Re: put record to kinesis and then trying consume using flink connector

Hi Sathi,

Here, in the producer-side log, it says:
2017-04-22 19:46:53,608 INFO  [main] DLKnesisPublisher: Successfully published 
record, of bytes :162810 partition key 
:fb043b73-1d9d-447a-a593-b9c1bf3d4974-1492915604228 ShardId: 
shardId-Sequence 
number49572539577762637793132019619873654976833479400677703682 Stream 
Name:mystream

The stream the record was inserted into is “mystream”.

However,

DataStream outputStream = see.addSource(new 
FlinkKinesisConsumer<>("myStream", new MyDeserializerSchema(), consumerConfig));

you seem to be consuming from “myStream”.
Could the capital “S” be the issue?

Cheers,
Gordon



On 24 April 2017 at 12:33:28 AM, Sathi Chowdhury 
(sathi.chowdh...@elliemae.com) wrote:
Hi Flink Dev,
I thought something will work easily with flink and it is simple enough ,yer I 
am struggling to make it work.
I am using flink kinesis connector ..using 1.3-SNAPSHOT version.

Basically I am trying to bootstrap a stream with one event pushed into it as a 
warmup inside flink job’s main method and I use aws kinesis client to simply 
putrecord into a given stream.
My expectation is that now if I addSource to a kinesis stream the data stream 
will consume the event I pushed.



//This is the method that pushes to the kinesis Stream “mystream”
publishToKinesis(“mystream”,regionName,data) ;



Properties consumerConfig = new Properties();
consumerConfig.setProperty(ConsumerConfigConstants.AWS_REGION, region);

consumerConfig.setProperty(ConsumerConfigConstants.DEFAULT_STREAM_INITIAL_POSITION,
 ConsumerConfigConstants.InitialPosition.LATEST.toString());

consumerConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, 
"AUTO");

final LocalFlinkMiniCluster cluster = new LocalFlinkMiniCluster(new 
Configuration(), false);
cluster.start();
ObjectMapper mapper = new ObjectMapper();
final StreamExecutionEnvironment see = 
StreamExecutionEnvironment.createRemoteEnvironment(
"localhost", cluster.getLeaderRPCPort());

DataStream outputStream = see.addSource(new 
FlinkKinesisConsumer<>("myStream", new MyDeserializerSchema(), consumerConfig));


for (Iterator it = DataStreamUtils.collect(outputStream); 
it.hasNext(); ) {
String actualOut = it.next();
ObjectNode actualOutNode = (ObjectNode) 
mapper.readTree(actualOut);

//then I do want to  either print it or do some further 
validation etc.
   }


……..

Not sure why the record that I published , FlinkKinesisConsumer is not able to 
react to it, it keeps waiting for it…at the step it.next();


I print out the SequenceNumber I put the record at
2017-04-22 19:46:53,608 INFO  [main] DLKnesisPublisher: Successfully published 
record, of bytes :162810 partition key 
:fb043b73-1d9d-447a-a593-b9c1bf3d4974-1492915604228 ShardId: 
shardId-Sequence 
number49572539577762637793132019619873654976833479400677703682 Stream 
Name:mystream


And Flink job is logging this at the end where it is waiting
2017-04-22 19:47:29,423 INFO  [Source: Custom Source (1/1)] KinesisDataFetcher: 
Subtask 0 will be seeded with initial shard 
KinesisStreamShard{streamName='mystream', shard='{ShardId: 
shardId-,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 
340282366920938463463374607431768211455},SequenceNumberRange: 
{StartingSequenceNumber: 
49572531519165352852103352736022695959324427654906511362,}}'}, starting state 
set as sequence number LATEST_SEQUENCE_NUM
2017-04-22 19:47:29,425 INFO  [Source: Custom Source (1/1)] KinesisDataFetcher: 
Subtask 0 will start consuming seeded shard 
KinesisStreamShard{streamName=’mystream’, shard='{ShardId: 
shardId-,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 

Re: put record to kinesis and then trying consume using flink connector

2017-04-25 Thread Sathi Chowdhury
Hi Gordon,
That was a typo, as I was trying to mask off the stream name.. I still had 
issues with using Latest as the initial stream position , I moved to using 
AT_TIMESTAMP to solve it, it works fine now.
Thanks so much for your response.
Sathi

From: "Tzu-Li (Gordon) Tai" 
Date: Sunday, April 23, 2017 at 3:32 PM
To: "user@flink.apache.org" 
Subject: Re: put record to kinesis and then trying consume using flink connector

Hi Sathi,

Here, in the producer-side log, it says:
2017-04-22 19:46:53,608 INFO  [main] DLKnesisPublisher: Successfully published 
record, of bytes :162810 partition key 
:fb043b73-1d9d-447a-a593-b9c1bf3d4974-1492915604228 ShardId: 
shardId-Sequence 
number49572539577762637793132019619873654976833479400677703682 Stream 
Name:mystream

The stream the record was inserted into is “mystream”.

However,

DataStream outputStream = see.addSource(new 
FlinkKinesisConsumer<>("myStream", new MyDeserializerSchema(), consumerConfig));

you seem to be consuming from “myStream”.
Could the capital “S” be the issue?

Cheers,
Gordon



On 24 April 2017 at 12:33:28 AM, Sathi Chowdhury 
(sathi.chowdh...@elliemae.com) wrote:
Hi Flink Dev,
I thought something will work easily with flink and it is simple enough ,yer I 
am struggling to make it work.
I am using flink kinesis connector ..using 1.3-SNAPSHOT version.

Basically I am trying to bootstrap a stream with one event pushed into it as a 
warmup inside flink job’s main method and I use aws kinesis client to simply 
putrecord into a given stream.
My expectation is that now if I addSource to a kinesis stream the data stream 
will consume the event I pushed.



//This is the method that pushes to the kinesis Stream “mystream”
publishToKinesis(“mystream”,regionName,data) ;



Properties consumerConfig = new Properties();
consumerConfig.setProperty(ConsumerConfigConstants.AWS_REGION, region);

consumerConfig.setProperty(ConsumerConfigConstants.DEFAULT_STREAM_INITIAL_POSITION,
 ConsumerConfigConstants.InitialPosition.LATEST.toString());

consumerConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, 
"AUTO");

final LocalFlinkMiniCluster cluster = new LocalFlinkMiniCluster(new 
Configuration(), false);
cluster.start();
ObjectMapper mapper = new ObjectMapper();
final StreamExecutionEnvironment see = 
StreamExecutionEnvironment.createRemoteEnvironment(
"localhost", cluster.getLeaderRPCPort());

DataStream outputStream = see.addSource(new 
FlinkKinesisConsumer<>("myStream", new MyDeserializerSchema(), consumerConfig));


for (Iterator it = DataStreamUtils.collect(outputStream); 
it.hasNext(); ) {
String actualOut = it.next();
ObjectNode actualOutNode = (ObjectNode) 
mapper.readTree(actualOut);

//then I do want to  either print it or do some further 
validation etc.
   }


……..

Not sure why the record that I published , FlinkKinesisConsumer is not able to 
react to it, it keeps waiting for it…at the step it.next();


I print out the SequenceNumber I put the record at
2017-04-22 19:46:53,608 INFO  [main] DLKnesisPublisher: Successfully published 
record, of bytes :162810 partition key 
:fb043b73-1d9d-447a-a593-b9c1bf3d4974-1492915604228 ShardId: 
shardId-Sequence 
number49572539577762637793132019619873654976833479400677703682 Stream 
Name:mystream


And Flink job is logging this at the end where it is waiting
2017-04-22 19:47:29,423 INFO  [Source: Custom Source (1/1)] KinesisDataFetcher: 
Subtask 0 will be seeded with initial shard 
KinesisStreamShard{streamName='mystream', shard='{ShardId: 
shardId-,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 
340282366920938463463374607431768211455},SequenceNumberRange: 
{StartingSequenceNumber: 
49572531519165352852103352736022695959324427654906511362,}}'}, starting state 
set as sequence number LATEST_SEQUENCE_NUM
2017-04-22 19:47:29,425 INFO  [Source: Custom Source (1/1)] KinesisDataFetcher: 
Subtask 0 will start consuming seeded shard 
KinesisStreamShard{streamName=’mystream’, shard='{ShardId: 
shardId-,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 
340282366920938463463374607431768211455},SequenceNumberRange: 
{StartingSequenceNumber: 
49572531519165352852103352736022695959324427654906511362,}}'} from sequence 
number LATEST_SEQUENCE_NUM with ShardConsumer 0

Any clue will be awesome to clear my confusion.
Thanks
Sathi
=Notice to Recipient: This e-mail transmission, and any documents, 
files or previous e-mail messages attached to it may contain information that 
is confidential or legally privileged, and intended for the use of the 
individual or entity named above. If you are not the intended recipient, or a 
person responsible for delivering 

Re: Re-keying / sub-keying a stream without repartitioning

2017-04-25 Thread Elias Levy
Anyone?

On Fri, Apr 21, 2017 at 10:15 PM, Elias Levy 
wrote:

> This is something that has come up before on the list, but in a different
> context.  I have a need to rekey a stream but would prefer the stream to
> not be repartitioned.  There is no gain to repartitioning, as the new
> partition key is a composite of the stream key, going from a key of A to a
> key of (A, B), so all values for the resulting streams are already being
> rerouted to the same node and repartitioning them to other nodes would
> simply generate unnecessary network traffic and serde overhead.
>
> Unlike previous use cases, I am not trying to perform aggregate
> operations.  Instead I am executing CEP patterns.  Some patterns apply the
> the stream keyed by A and some on the stream keyed by (A,B).
>
> The API does not appear to have an obvious solution to this situation.
> keyBy() will repartition and there is isn't something like subKey() to
> subpartion a stream without repartitioning (e.g. keyBy(A).subKey(B)).
>
> I suppose I could accomplish it by using partitionCustom(), ignoring the
> second element in the key, and delegating to the default partitioner
> passing it only the first element, thus resulting in no change of task
> assignment.
>
> Thoughts?
>


Re: [BUG?] Cannot Load User Class on Local Environment

2017-04-25 Thread Matt
I updated the code a little bit for clarity, now the line #56 mentioned in
my previous message is line #25.

In summary the error I'm getting is this:

---
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException:
Cannot load user class: com.test.Test
ClassLoader info: URL ClassLoader:
Class not resolvable through given classloader.
---

But if I'm not wrong, after trying to load the class through
URLClassLoader, Flink should try loading it with its parent ClassLoader,
which should be the same ClassLoader that executed the environment, and it
does have access to the class.

Not sure what is wrong.

On Tue, Apr 25, 2017 at 5:38 PM, Matt  wrote:

> Hi Stefan,
>
> Check the code here: https://gist.github.com/
> 17d82ee7dd921a0d649574a361cc017d , the output is at the bottom of the
> page.
>
> Here are the results of the additional tests you mentioned:
>
> 1. I was able to instantiate an inner class (Test$Foo) inside the Ignite
> closure, no problem with that
> 2. I tried implementing SourceFunction and SinkFunction in Test itself, I
> was able to instantiate the class inside the Ignite closure
> 3. I'm not sure what you meant in this point, is it something like what I
> tried in line #56?
>
> Additionally, I tried implementing the SourceFunction and SinkFunction in
> Test$Foo with the same result: it says "Cannot load user class:
> com.test.Test$Foo"
>
> Looks like Flink is not using the correct ClassLoader. Any idea?
>
> Regards,
> Matt
>
> On Tue, Apr 25, 2017 at 7:00 AM, Stefan Richter <
> s.rich...@data-artisans.com> wrote:
>
>> Hi,
>>
>> I would expect that the local environment picks up the class path from
>> the code that launched it. So I think the question is what happens behind
>> the scenes when you call ignite.compute().broadcast(runnable); . Which
>> classes are shipped and how is the classpath build in the environment that
>> runs the code. Your example is also not fully conclusive, because
>> com.myproj.Test (which you can successfully instantiate) and
>> com.myproj.Test$1$2 (which fails) are different classes, so maybe only the
>> outer class is shipped with the broadcast call. My theory is that not all
>> classes are shipped (e.g. inner classes), but only Test . You could try
>> three things to analyze to problem a little more:
>>
>> 1) Create another inner class inside Test and try if you are still able
>> to instantiate also this class via reflection.
>> 2) Let Test class itself implement the map function (avoiding the usage
>> of other/inner classes) and see if this works.
>> 3) Check and set the thread’s context classloader inside the runnable to
>> something that contains all required classes and see if this gets picked up
>> by Flink.
>>
>> Best,
>> Stefan
>>
>> Am 25.04.2017 um 07:27 schrieb Matt :
>>
>> Hi all,
>>
>> I'm trying to run Flink using a local environment, but on an Ignite node
>> to achieve collocation (as mentioned in my previous message on this list).
>>
>> Have a look at the code in [1]. It's pretty simple, but I'm getting a
>> "cannot load user class" error as shown in [2].
>>
>> If you check line #29 on the code, I'm able to create an instance of
>> class Test, and it's the same context from which I'm creating the Flink
>> job. Shouldn't it work provided I'm using a local environment?
>>
>> It would be really nice to be able to inject a ClassLoader into the chunk
>> of code that creates the job. Is this currently possible?
>>
>> Any fix or workaround is appreciated!
>>
>> Best,
>> Matt
>>
>> [1] https://gist.github.com/f248187b9638023b95ba8bd9d7f06215
>> [2] https://gist.github.com/796ee05425535ece1736df7b1e884cce
>>
>>
>>
>


Re: [BUG?] Cannot Load User Class on Local Environment

2017-04-25 Thread Matt
Hi Stefan,

Check the code here:
https://gist.github.com/17d82ee7dd921a0d649574a361cc017d , the output is at
the bottom of the page.

Here are the results of the additional tests you mentioned:

1. I was able to instantiate an inner class (Test$Foo) inside the Ignite
closure, no problem with that
2. I tried implementing SourceFunction and SinkFunction in Test itself, I
was able to instantiate the class inside the Ignite closure
3. I'm not sure what you meant in this point, is it something like what I
tried in line #56?

Additionally, I tried implementing the SourceFunction and SinkFunction in
Test$Foo with the same result: it says "Cannot load user class:
com.test.Test$Foo"

Looks like Flink is not using the correct ClassLoader. Any idea?

Regards,
Matt

On Tue, Apr 25, 2017 at 7:00 AM, Stefan Richter  wrote:

> Hi,
>
> I would expect that the local environment picks up the class path from the
> code that launched it. So I think the question is what happens behind the
> scenes when you call ignite.compute().broadcast(runnable); . Which
> classes are shipped and how is the classpath build in the environment that
> runs the code. Your example is also not fully conclusive, because
> com.myproj.Test (which you can successfully instantiate) and
> com.myproj.Test$1$2 (which fails) are different classes, so maybe only the
> outer class is shipped with the broadcast call. My theory is that not all
> classes are shipped (e.g. inner classes), but only Test . You could try
> three things to analyze to problem a little more:
>
> 1) Create another inner class inside Test and try if you are still able to
> instantiate also this class via reflection.
> 2) Let Test class itself implement the map function (avoiding the usage of
> other/inner classes) and see if this works.
> 3) Check and set the thread’s context classloader inside the runnable to
> something that contains all required classes and see if this gets picked up
> by Flink.
>
> Best,
> Stefan
>
> Am 25.04.2017 um 07:27 schrieb Matt :
>
> Hi all,
>
> I'm trying to run Flink using a local environment, but on an Ignite node
> to achieve collocation (as mentioned in my previous message on this list).
>
> Have a look at the code in [1]. It's pretty simple, but I'm getting a
> "cannot load user class" error as shown in [2].
>
> If you check line #29 on the code, I'm able to create an instance of class
> Test, and it's the same context from which I'm creating the Flink job.
> Shouldn't it work provided I'm using a local environment?
>
> It would be really nice to be able to inject a ClassLoader into the chunk
> of code that creates the job. Is this currently possible?
>
> Any fix or workaround is appreciated!
>
> Best,
> Matt
>
> [1] https://gist.github.com/f248187b9638023b95ba8bd9d7f06215
> [2] https://gist.github.com/796ee05425535ece1736df7b1e884cce
>
>
>


Multiple consumers on a subpartition

2017-04-25 Thread albertjonathan
Hello,

Is there a way Flink allow a (pipelined) subpartition to be consumed by
multiple consumers? If not, would it make more sense to implement it as
multiple input channels for a single subpartition or multiple subpartition
views for each input channel?

Any suggestion is appreciated.

Thanks



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Multiple-consumers-on-a-subpartition-tp12809.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Fault tolerance & idempotency on window functions

2017-04-25 Thread Kamil Dziublinski
Hi guys,

I have a flink streaming job that reads from kafka, creates some statistics
increments and stores this in hbase (using normal puts).
I'm using fold function here of with window of few seconds.

My tests showed me that restoring state with window functions is not
exactly working how I expected.
I thought that if my window functions emits an aggregated object to a sink,
and that object fails in a sink, this write to hbase will be replayed. So
even if it actually got written to HBase, but flink thought it didnt (for
instance during network problem) I could be sure of idempotent writes. I
wanted to enforce that by using the timestamp of the first event used in
that window for aggregation.

Now correct me if I'm wrong but it seems that in the case of failure (even
if its in sink) whole flow is getting replayed from last checkpoint which
means that my window function might evict aggregated object in a different
form. For instance not only having tuples that failed but also other ones,
which would break my idempotency her and I might end up with having higher
counters than I should have.

Do you have any suggestion on how to solve/workaround such problem in flink?

Thanks,
Kamil.


Re: Generate Timestamps and emit Watermarks - unordered events - Kafka source

2017-04-25 Thread Kostas Kloudas
Perfect! 

Thanks a lot for testing it Luis!
And keep us posted if you find anything else.
As you may have seen the CEP library is undergoing heavy refactoring for the 
upcoming release.

Kostas

> On Apr 25, 2017, at 12:30 PM, Luis Lázaro  wrote:
> 
> Hi Aljoscha and Kostas, thanks in advance.
> 
> Kostas, i followed your recommendation and it seems to be working fine.
> 
> I did:
> - upgrade to 1.3.-SNAPSHOT from master branch.
> - try assign timestamp and emit watermarks using AscendingTimestampExtractor: 
> alerts are correct (do not process late events as normal ones) and i get a 
> lot of warning about violated ascending monotony (its ok, my events are not 
> ordered in time).
> - try assign timestamp and emit watermarks using 
> BoundedOutOfOrdernessTimestampExtractor: alerts are correct.
> 
> 
> Thanks a lot, 
> best regards, Luis.
> 
> 
> 



Re: Problems reading Parquet input from HDFS

2017-04-25 Thread Lukas Kircher
Thanks for your suggestions.

@Flavio
This is very similar to the code I use and yields basically the same problems. 
The examples are based on flink-1.0-SNAPSHOT and avro-1.7.6. which is more than 
three years old. Do you have a working setup with newer version of avro and 
flink?

@Jörn
I tried to do that but I can't see how to get around the AvroParquetInputFormat 
(see below). I can pass a schema for projection as a string but then I get a 
NullPointerException as there is no ReadSupport class available in 
ParquetInputFormat. There is a constructor to instantiate ParquetInputFormat 
with a class that extends ReadSupport but I haven't found a suitable one to 
pass to the constructor. Do you know of a way around this?


  public static void main(String[] args) throws Exception {
  ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

  Job job = Job.getInstance();
  HadoopInputFormat hif = new HadoopInputFormat<>(new 
ParquetInputFormat(), Void.class,
  Customer.class, job);
  FileInputFormat.addInputPath((JobConf) job.getConfiguration(), new 
org.apache.hadoop.fs.Path(
  "/tmp/tpchinput/01/customer_parquet"));
  job.getConfiguration().set("parquet.avro.projection", 
"{\"type\":\"record\",\"name\":\"Customer\",\"fields\":[{\"name\":\"c_custkey\",\"type\":\"int\"}]}");
  env.createInput(hif).print();
  }


I am pretty sure that I miss something very basic? Let me know if you need any 
additional information.

Thanks ...



> On 24 Apr 2017, at 20:51, Flavio Pompermaier  wrote:
> 
> I started from this guide:
> 
> https://github.com/FelixNeutatz/parquet-flinktacular 
> 
> 
> Best,
> Flavio 
> 
> On 24 Apr 2017 6:36 pm, "Jörn Franke"  > wrote:
> Why not use a parquet only format? Not sure why you need an avtoparquetformat.
> 
> On 24. Apr 2017, at 18:19, Lukas Kircher  > wrote:
> 
>> Hello,
>> 
>> I am trying to read Parquet files from HDFS and having problems. I use Avro 
>> for schema. Here is a basic example:
>> 
>> public static void main(String[] args) throws Exception {
>> ExecutionEnvironment env = 
>> ExecutionEnvironment.getExecutionEnvironment();
>> 
>> Job job = Job.getInstance();
>> HadoopInputFormat hif = new HadoopInputFormat<>(new 
>> AvroParquetInputFormat(), Void.class,
>> Customer.class, job);
>> FileInputFormat.addInputPath((JobConf) job.getConfiguration(), new 
>> org.apache.hadoop.fs.Path(
>> "/tmp/tpchinput/01/customer_parquet"));
>> Schema projection = Schema.createRecord(Customer.class.getSimpleName(), 
>> null, null, false);
>> List fields = Arrays.asList(
>> new Schema.Field("c_custkey", Schema.create(Schema.Type.INT), null, 
>> (Object) null)
>> );
>> projection.setFields(fields);
>> AvroParquetInputFormat.setRequestedProjection(job, projection);
>> 
>> DataSet> dataset = env.createInput(hif);
>> dataset.print();
>> }
>> If I submit this to the job manager I get the following stack trace:
>> 
>> java.lang.NoSuchMethodError: 
>> org.apache.avro.Schema$Field.(Ljava/lang/String;Lorg/apache/avro/Schema;Ljava/lang/String;Ljava/lang/Object;)V
>>  at misc.Misc.main(Misc.java:29)
>> 
>> The problem is that I use the parquet-avro dependency (which provides 
>> AvroParquetInputFormat) in version 1.9.0 which relies on the avro dependency 
>> 1.8.0. The flink-core itself relies on the avro dependency in version 1.7.7. 
>> Jfyi the dependency tree looks like this:
>> 
>> [INFO] --- maven-dependency-plugin:2.8:tree (default-cli) @ 
>> flink-experiments ---
>> [INFO] ...:1.0-SNAPSHOT
>> [INFO] +- org.apache.flink:flink-java:jar:1.2.0:compile
>> [INFO] |  +- org.apache.flink:flink-core:jar:1.2.0:compile
>> [INFO] |  |  \- (org.apache.avro:avro:jar:1.7.7:compile - omitted for 
>> conflict with 1.8.0)
>> [INFO] |  \- org.apache.flink:flink-shaded-hadoop2:jar:1.2.0:compile
>> [INFO] | \- (org.apache.avro:avro:jar:1.7.7:compile - omitted for 
>> duplicate)
>> [INFO] \- org.apache.parquet:parquet-avro:jar:1.9.0:compile
>> [INFO]\- org.apache.avro:avro:jar:1.8.0:compile
>> 
>> Fixing the above NoSuchMethodError just leads to further problems. 
>> Downgrading parquet-avro to an older version creates other conflicts as 
>> there is no version that uses avro 1.7.7 like Flink does.
>> 
>> Is there a way around this or can you point me to another approach to read 
>> Parquet data from HDFS? How do you normally go about this?
>> 
>> Thanks for your help,
>> Lukas
>> 
>> 
>> 



Re: Generate Timestamps and emit Watermarks - unordered events - Kafka source

2017-04-25 Thread Luis Lázaro
Hi Aljoscha and Kostas, thanks in advance.

Kostas, i followed your recommendation and it seems to be working fine.

I did:
- upgrade to 1.3.-SNAPSHOT from master branch.
- try assign timestamp and emit watermarks using AscendingTimestampExtractor: 
alerts are correct (do not process late events as normal ones) and i get a lot 
of warning about violated ascending monotony (its ok, my events are not ordered 
in time).
- try assign timestamp and emit watermarks using 
BoundedOutOfOrdernessTimestampExtractor: alerts are correct.


Thanks a lot, 
best regards, Luis.