Re: [Discussion] Query regarding Join

2016-06-13 Thread Vinay Patil
Hi Matthias ,

I did not get you, even if we use Co-Group we have to apply it on a key

sourceStream.coGroup(destStream)
.where(new ElementSelector())
.equalTo(new ElementSelector())
.window(TumblingEventTimeWindows.of(Time.seconds(30)))
.apply(new CoGroupFunction() {
private static final long serialVersionUID = 6408179761497497475L;

@Override
public void coGroup(Iterable paramIterable, Iterable
paramIterable1,
Collector paramCollector) throws Exception {
Iterator iterator = paramIterable.iterator();
while(iterator.hasNext()) {
}
}
});

when I debug this ,only the matched element from both stream will come in
the coGroup function.

What I want is how do I check for unmatched elements from both streams and
write it to sink.

Regards,
Vinay Patil

*+91-800-728-4749*

On Tue, Jun 14, 2016 at 2:07 AM, Matthias J. Sax  wrote:

> You need to do an outer-join. However, there is no build-in support for
> outer-joins yet.
>
> You can use Window-CoGroup to implement the outer-join as an own operator.
>
>
> -Matthias
>
> On 06/13/2016 06:53 PM, Vinay Patil wrote:
> > Hi,
> >
> > I have a question regarding the join operation, consider the following
> > dummy example:
> >
> > StreamExecutionEnvironment env =
> > StreamExecutionEnvironment.getExecutionEnvironment();
> > env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
> > DataStreamSource sourceStream =
> > env.fromElements(10,20,23,25,30,33,102,18);
> > DataStreamSource destStream =
> env.fromElements(20,30,40,50,60,10);
> >
> > sourceStream.join(destStream)
> > .where(new ElementSelector())
> > .equalTo(new ElementSelector())
> > .window(TumblingEventTimeWindows.of(Time.milliseconds(10)))
> > .apply(new JoinFunction() {
> >
> > private static final long serialVersionUID = 1L;
> >
> > @Override
> > public Integer join(Integer paramIN1, Integer paramIN2) throws Exception
> {
> > return paramIN1;
> > }
> > }).print();
> >
> > I perfectly get the elements that are matching in both the streams,
> however
> > my requirement is to write these matched elements and also the unmatched
> > elements to sink(S3)
> >
> > How do I get the unmatched elements from each stream ?
> >
> > Regards,
> > Vinay Patil
> >
>
>


Re: [Discussion] Query regarding Join

2016-06-13 Thread Matthias J. Sax
You need to do an outer-join. However, there is no build-in support for
outer-joins yet.

You can use Window-CoGroup to implement the outer-join as an own operator.


-Matthias

On 06/13/2016 06:53 PM, Vinay Patil wrote:
> Hi,
> 
> I have a question regarding the join operation, consider the following
> dummy example:
> 
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
> DataStreamSource sourceStream =
> env.fromElements(10,20,23,25,30,33,102,18);
> DataStreamSource destStream = env.fromElements(20,30,40,50,60,10);
> 
> sourceStream.join(destStream)
> .where(new ElementSelector())
> .equalTo(new ElementSelector())
> .window(TumblingEventTimeWindows.of(Time.milliseconds(10)))
> .apply(new JoinFunction() {
> 
> private static final long serialVersionUID = 1L;
> 
> @Override
> public Integer join(Integer paramIN1, Integer paramIN2) throws Exception {
> return paramIN1;
> }
> }).print();
> 
> I perfectly get the elements that are matching in both the streams, however
> my requirement is to write these matched elements and also the unmatched
> elements to sink(S3)
> 
> How do I get the unmatched elements from each stream ?
> 
> Regards,
> Vinay Patil
> 



signature.asc
Description: OpenPGP digital signature


Re: How to contribute to Streaming Table API and StreamSQL

2016-06-13 Thread Fabian Hueske
Hi Jark,

wow, that's good news!
You are right, the streaming Table API is currently very limited. In the
last month's we changed the internal architecture and put everything on top
of Apache Calcite.
For the upcoming 1.1 release, we won't add new features to the Table API /
SQL. However for the 1.2 release, it we plan to focus on the streaming
Table API and Stream SQL to add support for windowed aggregates and joins,
which corresponds to Task 7 and 9 in the design document. You are
completely right, that we should start to add tickets to JIRA for this. I
will do that tomorrow.

It is great that you have already working code for windowed aggregates and
joins! Here is a link to our current API draft for windows in the Table API
[1]. Would be great if you could share how your API looks like. As you
said, the internals have changed a lot by now, but we might want to reuse
your API for Table API windows and maybe the code of the runtime. However,
we need to go through Calcite for optimization and SQL support, so some
parts need to be definitely changed. Stream SQL is also on the roadmap of
the Calcite community, but it might be that some features that we will need
are not completed yet. So, maybe we help the Calcite community with that as
well.

If you want to contribute, you should first read the how to contribute
guide [2] and guide for code contributions [3].
The general rule is to first open a JIRA and later a pull request. Changes
that are extensive or modify current behavior (except bugs) should be
discussed before starting to work on them.

Looking forward to work with you on Flink,
Fabian

[1]
https://docs.google.com/document/d/19kSOAOINKCSWLBCKRq2WvNtmuaA9o3AyCh2ePqr3V5E/edit#heading=h.3iw7frfjdcb2
[2] http://flink.apache.org/how-to-contribute.html
[3] http://flink.apache.org/contribute-code.html


2016-06-13 11:31 GMT+02:00 Jark Wu :

> Hi,
>
> We have a great interest in the new Table API & SQL. In Alibaba, we have
> added a lot of features (groupBy, agg, window, join, UDF …) to Streaming
> Table API (base on Flink 1.0). Now, many jobs run on Table API in
> production environment. But we want to keep pace with the community, and we
> have noticed that Flink Community reworked the Table API and also supported
> SQL. That is really cool. However, the Streaming Table API is still so
> weak. So we want to contribute to accelerate the Streaming Table API and
> StreamSQL growth.
>
> It seems that we have complete Task-5 and Task-6 mentioned in the Work
> Plan <
> https://docs.google.com/document/d/1TLayJNOTBle_-m1rQfgA6Ouj1oYsfqRjPcp1h2TVqdI/edit#>.
> So can we start Task-7 and Task-9 now? Is there any more specific plans? I
> think it’s better to create an umbrella JIRA like FLINK-3221 to make the
> develop plan clearer.
>
> If I want to contribute code for groupBy and agg function, what should I
> do? As I didn’t find related JIRAs, can I create JIRA and pull a request
> directly?
>
> Sorry for so many questions at a time.
>
>
>
> - Jark Wu (wuchong)
>
>


Re: Adding a Histogram Metric

2016-06-13 Thread Stephan Ewen
I think it is totally fine to add a "ThreadsafeCounter" that uses an atomic
long internally

On Sat, Jun 11, 2016 at 7:25 PM, Steve Cosenza  wrote:

> Also, we may be able to avoid the need for concurrent metrics by
> configuring each Finagle source to use a single thread. We'll investigate
> the performance implications this week and update you with the results.
>
> Thanks,
> Steve
>
>
> On Friday, June 10, 2016, Steve Cosenza  wrote:
>
>> +Chris Hogue who is also working on operationalizing Flink with me
>>
>> Hi Stephan,
>>
>> Thanks for the background on your current implementations!
>>
>> While we don't require a specific implementation for histogram, counter,
>> or gauge, it just became clear that we'll need threadsafe versions of all
>> three of these metrics. This is because our messaging source is implemented
>> using Finagle, and Finagle expects to be able to emit stats concurrently
>> from its managed threads.
>>
>> That being said, if adding threadsafe versions of the Flink counters is
>> not an option, we'd also be fine with directly reading and writing from the
>> singleton Codahale MetricsRegistry that you start up in each TaskManager.
>>
>> Thanks,
>> Steve
>>
>> On Fri, Jun 10, 2016 at 7:10 AM, Stephan Ewen  wrote:
>>
>>> A recent discussion brought up the point of adding a "histogram" metric
>>> type to Flink. This open thread is to gather some more of the requirements
>>> for that metric.
>>>
>>> The most important question is whether users need Flink to offer
>>> specific implementations of "Histogram", like for example the "
>>> com.codahale.metrics.Histogram", or if a "
>>> org.apache.flink.metrics.Histogram" interface would work as well.
>>> The histogram could still be reported for example via dropwizard
>>> reporters.
>>>
>>> *Option (1):* If a Flink Histogram works as well, it would be simple to
>>> add one. The dropwizard reporter would need to wrap the Flink Histogram for
>>> reporting.
>>>
>>> *Option (2)*: If the code needs the specific Dropwizard Histogram type,
>>> then one would need a wrapper class that makes a Flink Histogram look like
>>> a dropwizard histogram.
>>>
>>> --
>>>
>>> As a bit of background for the discussion, here are some thoughts behind
>>> the way that Metrics are currently implemented in Flink.
>>>
>>>   - The metric types in Flink are independent from libraries like
>>> "dropwizard" to reduce dependencies and retain freedom to swap
>>> implementations.
>>>
>>>   - Metric reporting allows to reuse reporters from dropwizard
>>>
>>>   - Some Flink metric implementations are also more lightweight than for
>>> example in dropwizard. Counters for example are not thread safe, but do not
>>> impose memory barriers. That is important for metrics deep in the streaming
>>> runtime.
>>>
>>>
>>>
>>
>
> --
> -Steve
>
> Sent from Gmail Mobile
>


[Discussion] Query regarding Join

2016-06-13 Thread Vinay Patil
Hi,

I have a question regarding the join operation, consider the following
dummy example:

StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
DataStreamSource sourceStream =
env.fromElements(10,20,23,25,30,33,102,18);
DataStreamSource destStream = env.fromElements(20,30,40,50,60,10);

sourceStream.join(destStream)
.where(new ElementSelector())
.equalTo(new ElementSelector())
.window(TumblingEventTimeWindows.of(Time.milliseconds(10)))
.apply(new JoinFunction() {

private static final long serialVersionUID = 1L;

@Override
public Integer join(Integer paramIN1, Integer paramIN2) throws Exception {
return paramIN1;
}
}).print();

I perfectly get the elements that are matching in both the streams, however
my requirement is to write these matched elements and also the unmatched
elements to sink(S3)

How do I get the unmatched elements from each stream ?

Regards,
Vinay Patil


[jira] [Created] (FLINK-4068) Move constant computations out of code-generated `flatMap` functions.

2016-06-13 Thread Fabian Hueske (JIRA)
Fabian Hueske created FLINK-4068:


 Summary: Move constant computations out of code-generated 
`flatMap` functions.
 Key: FLINK-4068
 URL: https://issues.apache.org/jira/browse/FLINK-4068
 Project: Flink
  Issue Type: Improvement
  Components: Table API
Affects Versions: 1.1.0
Reporter: Fabian Hueske


The generated functions for expressions of the Table API or SQL include 
constant computations.

For instance the code generated for a predicate like:

{code}
myInt < (10 + 20)
{code}

looks roughly like:

{code}

public void flatMap(Row in, Collector out) {

  Integer in1 = in.productElement(1);
  int temp = 10 + 20;  
  if (in1 < temp) {
out.collect(in)
  }
}
{code}

In this example the computation of {{temp}} is constant and could be moved out 
of the {{flatMap()}} method.

The same might apply for generated function other than {{FlatMap}} as well.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4067) Add version header to savepoints

2016-06-13 Thread Ufuk Celebi (JIRA)
Ufuk Celebi created FLINK-4067:
--

 Summary: Add version header to savepoints
 Key: FLINK-4067
 URL: https://issues.apache.org/jira/browse/FLINK-4067
 Project: Flink
  Issue Type: Improvement
Affects Versions: 1.0.3
Reporter: Ufuk Celebi
 Fix For: 1.1.0


Adding a header with version information to savepoints ensures that we can 
migrate savepoints between Flink versions in the future (for example when 
changing internal serialization formats between versions).

After talking with Till, we propose to add the following meta data:

- Magic number (int): identify data as savepoint
- Version (int): savepoint version (independent of Flink version)
- Data Offset (int): specifies at which point the actual savepoint data starts. 
With this, we can allow future Flink versions to add fields to the header 
without breaking stuff, e.g. Flink 1.1 could read savepoints of Flink 2.0.

For Flink 1.0 savepoint support, we have to try reading the savepoints without 
a header before failing if we don't find the magic number.




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4066) RabbitMQ source, customize queue arguments

2016-06-13 Thread Kanstantsin Kamkou (JIRA)
Kanstantsin Kamkou created FLINK-4066:
-

 Summary: RabbitMQ source, customize queue arguments
 Key: FLINK-4066
 URL: https://issues.apache.org/jira/browse/FLINK-4066
 Project: Flink
  Issue Type: Improvement
Reporter: Kanstantsin Kamkou
Priority: Minor


Please, add a functionality to customize the line (custom attributes for a 
queue).
{code}channel.queueDeclare(queueName, false, false, false, null);{code}
Thank you.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Scheduling task slots in round-robin

2016-06-13 Thread Gyula Fóra
Thanks! I found this PR already but seemed to be completely outdated :)

Maybe it's worth restarting this discussion.

Gyula

Chesnay Schepler  ezt írta (időpont: 2016. jún. 13., H,
14:58):

> FLINK-1003 may be related.
>
> On 13.06.2016 12:46, Gyula Fóra wrote:
> > Hey,
> >
> > The Flink scheduling mechanism has become quite a bit of a pain lately
> for
> > us when trying to schedule IO heavy streaming jobs. And by IO heavy I
> mean
> > it has a fairly large state that is being continuously updated/read.
> >
> > The main problem is that the scheduled task slots are not evenly
> > distributed among the different task managers but usually the first TM
> > takes as much slots as possibles and the other TMs get much fewer. And
> > since the job is RocksDB IO bound the uneven load causes a significant
> > performance penalty.
> >
> > This is further accentuated during historical runs when we are trying to
> > "fast-forward" the application. The difference can be quite substantial
> in
> > a 3-4 node cluster: with even task distribution the history might run 3
> > times faster compared to an uneven one.
> >
> > I was wondering if there was a simple way to modify the scheduler so it
> > allocates resources in a round-robin fashion. Probably someone has a lot
> of
> > experience with this already :) (I'm running 1.0.3 for this job btw)
> >
> > Cheers,
> > Gyula
> >
>
>


Re: Scheduling task slots in round-robin

2016-06-13 Thread Chesnay Schepler

FLINK-1003 may be related.

On 13.06.2016 12:46, Gyula Fóra wrote:

Hey,

The Flink scheduling mechanism has become quite a bit of a pain lately for
us when trying to schedule IO heavy streaming jobs. And by IO heavy I mean
it has a fairly large state that is being continuously updated/read.

The main problem is that the scheduled task slots are not evenly
distributed among the different task managers but usually the first TM
takes as much slots as possibles and the other TMs get much fewer. And
since the job is RocksDB IO bound the uneven load causes a significant
performance penalty.

This is further accentuated during historical runs when we are trying to
"fast-forward" the application. The difference can be quite substantial in
a 3-4 node cluster: with even task distribution the history might run 3
times faster compared to an uneven one.

I was wondering if there was a simple way to modify the scheduler so it
allocates resources in a round-robin fashion. Probably someone has a lot of
experience with this already :) (I'm running 1.0.3 for this job btw)

Cheers,
Gyula





[jira] [Created] (FLINK-4065) Unstable Kafka09ITCase.testMultipleSourcesOnePartition test

2016-06-13 Thread Kostas Kloudas (JIRA)
Kostas Kloudas created FLINK-4065:
-

 Summary: Unstable Kafka09ITCase.testMultipleSourcesOnePartition 
test
 Key: FLINK-4065
 URL: https://issues.apache.org/jira/browse/FLINK-4065
 Project: Flink
  Issue Type: Test
  Components: Tests
Reporter: Kostas Kloudas
Priority: Critical


Sometime the Kafka09ITCase.testMultipleSourcesOnePartition test fails on 
travis. Here is a log of such a failure:
https://s3.amazonaws.com/archive.travis-ci.org/jobs/136758707/log.txt



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4063) Add Metrics Support for Triggers

2016-06-13 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created FLINK-4063:
---

 Summary: Add Metrics Support for Triggers
 Key: FLINK-4063
 URL: https://issues.apache.org/jira/browse/FLINK-4063
 Project: Flink
  Issue Type: Improvement
  Components: Streaming
Affects Versions: 1.1.0
Reporter: Aljoscha Krettek


Now that we have proper support for metrics we should also add a hook that 
allows triggers to report metrics.

This supersedes FLINK-3758 which was about using accumulators for metrics in 
triggers.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[FLINK-3867] Flink-VM: Vagrant/Ansible based VMs to easily setup Flink

2016-06-13 Thread Julius Neuffer
Hi,

I put together some Ansible scripts to automatically setup VMs running
Flink [1]. Might that be useful for the Flink project?

I already created a pull request [2]. Two of the CI builds failed. The
reasons for the failure, however, seem to be unrelated to the flink-vm
module.

It would be awesome to get some feedback regarding flink-vm. And, if there
is interest among you developers to add flink-vm to the Flink project,
could you point out on how to proceed with the pull request?

Best regards,
Julius Neuffer

[1] https://github.com/jneuff/flink/tree/master/flink-contrib/flink-vm
[2] https://github.com/apache/flink/pull/2075


Scheduling task slots in round-robin

2016-06-13 Thread Gyula Fóra
Hey,

The Flink scheduling mechanism has become quite a bit of a pain lately for
us when trying to schedule IO heavy streaming jobs. And by IO heavy I mean
it has a fairly large state that is being continuously updated/read.

The main problem is that the scheduled task slots are not evenly
distributed among the different task managers but usually the first TM
takes as much slots as possibles and the other TMs get much fewer. And
since the job is RocksDB IO bound the uneven load causes a significant
performance penalty.

This is further accentuated during historical runs when we are trying to
"fast-forward" the application. The difference can be quite substantial in
a 3-4 node cluster: with even task distribution the history might run 3
times faster compared to an uneven one.

I was wondering if there was a simple way to modify the scheduler so it
allocates resources in a round-robin fashion. Probably someone has a lot of
experience with this already :) (I'm running 1.0.3 for this job btw)

Cheers,
Gyula


[jira] [Created] (FLINK-4062) Update Window Documentation

2016-06-13 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created FLINK-4062:
---

 Summary: Update Window Documentation
 Key: FLINK-4062
 URL: https://issues.apache.org/jira/browse/FLINK-4062
 Project: Flink
  Issue Type: Sub-task
  Components: Documentation
Affects Versions: 1.1.0
Reporter: Aljoscha Krettek
Assignee: Aljoscha Krettek


The window documentation could be a bit more principled and also needs updating 
with the new allowed lateness setting.

There is also essentially no documentation about how to write a custom trigger.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: About Memory usage

2016-06-13 Thread Maximilian Michels
Hi Davood,

The streaming part of Flink currently does not support managed memory;
all memory is allocated directly on the heap (apart from the network
buffers). There are plans to change that in the future.

The batch side uses manages memory, e.g. see ReduceCombineDriver.

Cheers,
Max


On Sat, Jun 11, 2016 at 1:11 PM, Davood Rafiei  wrote:
> Hi community,
>
>
> I am using   WordCount example in DataStream API. I want to know the memory
> usage of each task and operator and possible modify them.
>
> As far as I know, the memory(off-heap or heap) is retrieved through
> MemoryManager.
> There is, releaseAll(Object owner) method in MemoryManager, that is called
> from Task class when releasing the resources. However, I could not find how
> the operator gets the required memory (heap or off-heap) from
> MemoryManager.
>
>
> Cheers
> Davood


How to contribute to Streaming Table API and StreamSQL

2016-06-13 Thread Jark Wu
Hi,

We have a great interest in the new Table API & SQL. In Alibaba, we have added 
a lot of features (groupBy, agg, window, join, UDF …) to Streaming Table API 
(base on Flink 1.0). Now, many jobs run on Table API in production environment. 
But we want to keep pace with the community, and we have noticed that Flink 
Community reworked the Table API and also supported SQL. That is really cool. 
However, the Streaming Table API is still so weak. So we want to contribute to 
accelerate the Streaming Table API and StreamSQL growth.

It seems that we have complete Task-5 and Task-6 mentioned in the Work Plan 
.
 So can we start Task-7 and Task-9 now? Is there any more specific plans? I 
think it’s better to create an umbrella JIRA like FLINK-3221 to make the 
develop plan clearer.

If I want to contribute code for groupBy and agg function, what should I do? As 
I didn’t find related JIRAs, can I create JIRA and pull a request directly?

Sorry for so many questions at a time.



- Jark Wu (wuchong)



[jira] [Created] (FLINK-4059) Requirement of Streaming layer for Complex Event Processing

2016-06-13 Thread Akshay Shingote (JIRA)
Akshay Shingote created FLINK-4059:
--

 Summary: Requirement of Streaming layer for Complex Event 
Processing
 Key: FLINK-4059
 URL: https://issues.apache.org/jira/browse/FLINK-4059
 Project: Flink
  Issue Type: Test
  Components: CEP
Affects Versions: 1.0.1
Reporter: Akshay Shingote


I am trying to get this Flink CEP Example : 
https://github.com/tillrohrmann/cep-monitoring
One question which arises here is that they have made one single application 
which does the work of Pattern Matching or CEP. (Like ingest & process is done 
by 1 application)...They didn't use any streaming layer in between like Kafka 
or any...I just want to know why any streaming layer is not use in between the 
producer & the consumer?? Also,I want to know what will be the advantages and 
disadvantages if I put a streaming layer in between ?? Thank You



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)