Re: Confusion about multiple use of one ValueState

2016-05-12 Thread Balaji Rajagopalan
I don't think the valuestate defined in one map function is accessible in
other map function this is my understanding, also you need to be aware
there will be instance of map function created for each of your tuple in
your stream, I had a similar use case where I had to pass in some state
from one map function to another, I used redis for that.

On Fri, May 13, 2016 at 8:58 AM, Nirmalya Sengupta <
sengupta.nirma...@gmail.com> wrote:

> Hello all,
>
> Let's say I want to hold some state value derived during one
> transformation, and then use that same state value in a subsequent
> transformation? For example:
>
> myStream
> .keyBy(fieldID) // Some field ID, may be 0
> .map(new MyStatefulMapper())
> .map(new MySubsequentMapper())
> 
>
> Now, I define MyStatefulMapper in the usual fashion:
>
> public class MyStatefulMapper extends RichFlatMapFunction, 
> Tuple2> {
>
> /** * The ValueState handle. The first field is the count, the second 
> field a running sum. */
> private transient ValueState> sum;
>
> @Override
> public void flatMap(Tuple2 input, Collector Long>> out) throws Exception {
>
>// logic of accessing and updating the ValueState 'sum' above
> }
>
> @Override
> public void open(Configuration config) {
> ValueStateDescriptor> descriptor =
> new ValueStateDescriptor<>(
> "mySum", // the state name
> TypeInformation.of(new TypeHint>() 
> {}), // type information
> Tuple2.of(0L, 0L)); // default value of the state, if 
> nothing was set
> sum = getRuntimeContext().getState(descriptor);
> }}
>
>
> So, by now, RuntimeContext has registered a State holder named 'mySum'.
>
> In the implementation of 'MySubsequentMapper', I need to access this State
> holder named 'mySum', perhaps thus (my thinking, I may be wrong):
>
> public class MySubsequentMapper extends RichFlatMapFunction Long>, Tuple2> {
>
> /** * The ValueState handle. The first field is the count, the second 
> field a running sum. */
> private transient ValueState> aSubsequentSum;
>
> private transient ValueState> sum; // defined earlier
>
>
> @Override
> public void flatMap(Tuple2 input, Collector Long>> out) throws Exception {
>
>// logic of accessing and updating the ValueState 'aSubsequentSum' 
> above
>
>// but this logic depends on the current contents of ValueState 'sum' 
> created earlier
> }
>
> @Override
> public void open(Configuration config) {
> // Logic to create ValueDescriptor for 'aSubsequentSum' which is 
> owned by this operator
>
> // ...
>
> // Question: now, how do I prepare for accessing 'sum' which is a 
> State holder, but created inside an earlier operator?
> sum = getRuntimeContext().getState(descriptor) // how can I pass the 
> name 'mySum' (used in StateDescriptor)?
> }}
>
> I have two questions:
>
> 1) What I am trying to achieve: is that possible and even, advisable? If
> not, then what is the alternative?
> 2) Is there a guarantee that Flink will execute MyStatefulOperator.open()
> always before MySubsequentOperator.open() because of the lexical order of
> appearance in the source code?
>
> -- Nirmalya
>
>
>
>
> --
> Software Technologist
> http://www.linkedin.com/in/nirmalyasengupta
> "If you have built castles in the air, your work need not be lost. That is
> where they should be.
> Now put the foundation under them."
>


Confusion about multiple use of one ValueState

2016-05-12 Thread Nirmalya Sengupta
Hello all,

Let's say I want to hold some state value derived during one
transformation, and then use that same state value in a subsequent
transformation? For example:

myStream
.keyBy(fieldID) // Some field ID, may be 0
.map(new MyStatefulMapper())
.map(new MySubsequentMapper())


Now, I define MyStatefulMapper in the usual fashion:

public class MyStatefulMapper extends RichFlatMapFunction, Tuple2> {

/** * The ValueState handle. The first field is the count, the
second field a running sum. */
private transient ValueState> sum;

@Override
public void flatMap(Tuple2 input,
Collector> out) throws Exception {

   // logic of accessing and updating the ValueState 'sum' above
}

@Override
public void open(Configuration config) {
ValueStateDescriptor> descriptor =
new ValueStateDescriptor<>(
"mySum", // the state name
TypeInformation.of(new TypeHint>() {}), // type information
Tuple2.of(0L, 0L)); // default value of the
state, if nothing was set
sum = getRuntimeContext().getState(descriptor);
}}


So, by now, RuntimeContext has registered a State holder named 'mySum'.

In the implementation of 'MySubsequentMapper', I need to access this State
holder named 'mySum', perhaps thus (my thinking, I may be wrong):

public class MySubsequentMapper extends
RichFlatMapFunction, Tuple2> {

/** * The ValueState handle. The first field is the count, the
second field a running sum. */
private transient ValueState> aSubsequentSum;

private transient ValueState> sum; // defined earlier


@Override
public void flatMap(Tuple2 input,
Collector> out) throws Exception {

   // logic of accessing and updating the ValueState 'aSubsequentSum' above

   // but this logic depends on the current contents of ValueState
'sum' created earlier
}

@Override
public void open(Configuration config) {
// Logic to create ValueDescriptor for 'aSubsequentSum' which
is owned by this operator

// ...

// Question: now, how do I prepare for accessing 'sum' which
is a State holder, but created inside an earlier operator?
sum = getRuntimeContext().getState(descriptor) // how can I
pass the name 'mySum' (used in StateDescriptor)?
}}

I have two questions:

1) What I am trying to achieve: is that possible and even, advisable? If
not, then what is the alternative?
2) Is there a guarantee that Flink will execute MyStatefulOperator.open()
always before MySubsequentOperator.open() because of the lexical order of
appearance in the source code?

-- Nirmalya




-- 
Software Technologist
http://www.linkedin.com/in/nirmalyasengupta
"If you have built castles in the air, your work need not be lost. That is
where they should be.
Now put the foundation under them."


Re: Local Cluster have problem with connect to elasticsearch

2016-05-12 Thread rafal green
...btw I found this (in folder:
"flink/flink-streaming-connectors/flink-connector-elasticsearch2.pom.xml")
:



2.2.1


I change it to 2.3.2 version and of course rebuild with that command "mvn
clean install -DskipTests"

...but nothing is changed.


2016-05-12 22:39 GMT+02:00 rafal green :

> Sorry not jar from elasticsearch-connector but from twitter-connector   
> *".m2/org/apache/flink/flink-connector-twitter_2.11/1.1-SNAPSHOT"
> - *it's work fine
>
> 2016-05-12 22:35 GMT+02:00 rafal green :
>
>> This is my working jar that i download it form
>> *.m2/org/apache/flink/flink-connector-elasticsearch2_2.11/1.1-SNAPSHOT*
>>
>> 2016-05-12 22:26 GMT+02:00 rafal green :
>>
>>> Hi Gordon,
>>>
>>> Thanks for advice - it's work perfect but only in elasticsearch case.
>>>
>>> This pom version works for elasticsearch 2.2.1.
>>>
>>> 
>>>org.apache.flink
>>>flink-connector-elasticsearch2_${scala.version}
>>>1.1-SNAPSHOT
>>>jar
>>>false
>>>${project.build.directory}/classes
>>>org/apache/flink/**
>>> 
>>> 
>>>org.elasticsearch
>>>elasticsearch
>>>2.2.1
>>>jar
>>>false
>>>${project.build.directory}/classes
>>>org/elasticsearch/**
>>> 
>>>
>>>
>>>
>>> Why 2.2.1 ? Beacuse if you check the 
>>> *"flink/flink-streaming-connectors/flink-connector-elasticsearch2/pom.xml"
>>> * you will see this line*
>>> "2.2.1"*
>>>
>>>
>>> But Gordon your idea* not working with twitter-connector*. and I try
>>> add  this: (to pom) and it's not working
>>>
>>> 
>>>org.apache.flink
>>>flink-connector-twitter_${scala.version}
>>>1.1-SNAPSHOT
>>>jar
>>>false
>>>${project.build.directory}/classes
>>>org/apache/flink/**
>>> 
>>> 
>>>com.twitter
>>>hbc-core
>>>2.2.0
>>>jar
>>>false
>>>${project.build.directory}/classes
>>>com/twitter/**
>>> 
>>>
>>>
>>>
>>> or that
>>>
>>> 
>>>org.apache.flink
>>>flink-connector-twitter_${scala.version}
>>>1.1-SNAPSHOT
>>>jar
>>>false
>>>${project.build.directory}/classes
>>>org/apache/flink/**
>>> 
>>> 
>>>com.twitter
>>>hbc-core
>>>2.2.0
>>>jar
>>>false
>>>${project.build.directory}/classes
>>>com/twitter/**
>>> 
>>> 
>>>org.apache.httpcomponents
>>>httpclient
>>>4.2.5
>>>jar
>>>false
>>>${project.build.directory}/classes
>>>org/apache/httpcomponents/**
>>> 
>>> 
>>>com.twitter
>>>joauth
>>>6.0.2
>>>jar
>>>false
>>>${project.build.directory}/classes
>>>com/twitter/**
>>> 
>>> 
>>>org.apache.httpcomponents
>>>httpcore
>>>4.2.4
>>>jar
>>>false
>>>${project.build.directory}/classes
>>>org/apache/httpcomponents/**
>>> 
>>> 
>>>com.google.guava
>>>guava
>>>14.0.1
>>>jar
>>>false
>>>${project.build.directory}/classes
>>>com/google/guava/**
>>> 
>>>
>>>
>>>
>>> And if I run job I see this error:
>>>
>>> 2016-05-12 21:49:37,681 INFO  org.elasticsearch.plugins 
>>> - [node-1] modules [], plugins [], sites []
>>> 2016-05-12 21:49:37,738 INFO  org.apache.flink.runtime.blob.BlobCache   
>>> - Downloading 5ff307efcde8deebfb2886733e40994c01fbba7d from 
>>> localhost/127.0.0.1:47639
>>> 2016-05-12 21:49:38,109 INFO  
>>> org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink  - 
>>> Created Elasticsearch TransportClient 
>>> org.elasticsearch.client.transport.TransportClient@66cdf89
>>> 2016-05-12 21:49:38,114 INFO  
>>> org.apache.flink.streaming.connectors.twitter.TwitterSource   - 
>>> Initializing Twitter Streaming API connection
>>> 2016-05-12 21:49:38,357 INFO  com.twitter.hbc.httpclient.BasicClient
>>> - New connection executed: flink-twitter-source, endpoint: 
>>> /1.1/statuses/sample.json
>>> 2016-05-12 21:49:38,357 INFO  
>>> org.apache.flink.streaming.connectors.twitter.TwitterSource   - Twitter 
>>> Streaming API connection established successfully
>>> 2016-05-12 21:49:38,376 WARN  com.twitter.hbc.httpclient.ClientBase 
>>> - flink-twitter-source Uncaught exception
>>> java.lang.NoSuchMethodError: 
>>> org.apache.http.impl.conn.DefaultClientConnectionOperator.(Lorg/apache/http/conn/scheme/SchemeRegistry;Lorg/apache/http/conn/DnsResolver;)V
>>> at 
>>> org.apache.http.impl.conn.PoolingClientConnectionManager.createConnectionOperator(PoolingClientConnectionManager.java:140)
>>> at 
>>> org.apache.http.impl.conn.PoolingClientConnectionManager.(PoolingClientConnectionManager.java:114)
>>> at 
>>> org.apache.http.impl.conn.PoolingClientConnectionManager.(PoolingClientConnectionManager.java:99)
>>> at 
>>> org.apache.http.impl.conn.PoolingClientConnectionManager.(PoolingClientConnectionManager.java:85)
>>> at 
>>> com.twitter.hbc.httpclient.RestartableHttpClient.setup(RestartableHttpClient.java:56)
>>> at 

Re: Interesting window behavior with savepoints

2016-05-12 Thread Ufuk Celebi
On Thu, May 12, 2016 at 10:44 PM, Andrew Whitaker
 wrote:
> From what I've observed, most of the time when Flink can't successfully
> restore a checkpoint it throws an exception saying as much. I was expecting
> to see that behavior here. Could someone explain why this "works" (as in,
> flink accepts the program with the savepoint from the first version of the
> program), and if this is a bug?

Hey Andrew! Thanks for reporting this.

Flink generates operator IDs and uses these to map the state back to
the same operator when restoring from a savepoint. We want these IDs
to stay the same as long as the program does not change.

The ID can either be generated automatically by Flink or manually by the user.

The automatically generated ID is based on certain topology attributes
like parallelism, operator placement, etc. If the attribute changes,
the operator ID changes and you can't map the savepoint state back. If
it stays the same, we assume that the program has not changed.

The problem in your example is that to Flink both programs look the
same with respect to how the IDs are generated: the topology didn't
change and both the time and count window are executed by the
WindowOperator with an InternalWindowFunction.

The recommended way to work with savepoints is to skip the automatic
IDs altogether and assign the IDs manually instead. You can do this
via the "uid(String)" method of each operator, which gives you
fine-grained control over the "versioning" of state:

env.addSource(..).uid("my-source")

vs.

env.addSource(..).uid("my-source-2")

The problem I've just noticed is that you can't specify this on
WindowedStreams, but only on DataStreams, which is clearly a bug.
Furthermore, it might be a good idea to special case windows when
automatically generating the IDs.

I hope this helps a little with understanding the core problem. If you
have further questions, feel free to ask. I will make sure to fix this
soon.

– Ufuk


Re: Interesting window behavior with savepoints

2016-05-12 Thread Andrew Whitaker
"Flink can't successfully restore a checkpoint" should be "Flink can't
successfully restore a savepoint".

On Thu, May 12, 2016 at 3:44 PM, Andrew Whitaker <
andrew.whita...@braintreepayments.com> wrote:

> Hi,
>
> I was recently experimenting with savepoints and various situations in
> which they succeed or fail. I expected this example to fail:
>
> https://gist.github.com/AndrewWhitaker/fa46db04066ea673fe0eda232f0a5ce1
>
> Basically, the first program runs with a count window. The second program
> is identical except that it uses a time window instead of a count window.
>
> From what I've observed, most of the time when Flink can't successfully
> restore a checkpoint it throws an exception saying as much. I was expecting
> to see that behavior here. Could someone explain why this "works" (as in,
> flink accepts the program with the savepoint from the first version of the
> program), and if this is a bug?
>
> Thanks,
>
> --
> Andrew Whitaker | andrew.whita...@braintreepayments.com
>



-- 
Andrew Whitaker | andrew.whita...@braintreepayments.com
--
Note: this information is confidential. It is prohibited to share, post
online or otherwise publicize without Braintree's prior written consent.


Interesting window behavior with savepoints

2016-05-12 Thread Andrew Whitaker
Hi,

I was recently experimenting with savepoints and various situations in
which they succeed or fail. I expected this example to fail:

https://gist.github.com/AndrewWhitaker/fa46db04066ea673fe0eda232f0a5ce1

Basically, the first program runs with a count window. The second program
is identical except that it uses a time window instead of a count window.

>From what I've observed, most of the time when Flink can't successfully
restore a checkpoint it throws an exception saying as much. I was expecting
to see that behavior here. Could someone explain why this "works" (as in,
flink accepts the program with the savepoint from the first version of the
program), and if this is a bug?

Thanks,

-- 
Andrew Whitaker | andrew.whita...@braintreepayments.com


Re: Does Kafka connector leverage Kafka message keys?

2016-05-12 Thread Krzysztof Zarzycki
If I can throw in my 2 cents, I agree with what Elias says. Without that
feature (not partitioning already partitioned Kafka data), Flink is in bad
position for common simpler processing, that don't involve shuffling at
all, for example simple readKafka-enrich-writeKafka . The systems like the
new Kafka Streams processing system, that leverage Kafka partitioning, will
probably win with Flink in performance (of course, it's just an intuition).

Are you planning to provide such feature? Is it simple to do with Flink
current engine and API?




czw., 14.04.2016 o 03:11 użytkownik Elias Levy 
napisał:

> On Wed, Apr 13, 2016 at 2:10 AM, Stephan Ewen  wrote:
>
>> If you want to use Flink's internal key/value state, however, you need to
>> let Flink re-partition the data by using "keyBy()". That is because Flink's
>> internal sharding of state (including the re-sharding to adjust parallelism
>> we are currently working on) follows a dedicated hashing scheme which is
>> with all likelihood different from the partition function that writes the
>> key/value pairs to the Kafka Topics.
>>
>
> That is interesting, if somewhat disappointing.  I was hoping that
> performing a keyBy from a Kafka source would perform no reshuffling if you
> used the same value as you used for the Kafka message key.  But it makes
> sense if you are using different hash functions.
>
> It may be useful to have a variant of keyBy() that converts the stream to
> a KeyedStream but performs no shuffling if the caller is certain that the
> DataStream is already partitioned by the given key.
>
>
>


Re: Local Cluster have problem with connect to elasticsearch

2016-05-12 Thread rafal green
Hi Gordon,

Thanks for advice - it's work perfect but only in elasticsearch case.

This pom version works for elasticsearch 2.2.1.


   org.apache.flink
   flink-connector-elasticsearch2_${scala.version}
   1.1-SNAPSHOT
   jar
   false
   ${project.build.directory}/classes
   org/apache/flink/**


   org.elasticsearch
   elasticsearch
   2.2.1
   jar
   false
   ${project.build.directory}/classes
   org/elasticsearch/**




Why 2.2.1 ? Beacuse if you check the
*"flink/flink-streaming-connectors/flink-connector-elasticsearch2/pom.xml"
* you will see this line*
"2.2.1"*


But Gordon your idea* not working with twitter-connector*. and I try add
 this: (to pom) and it's not working


   org.apache.flink
   flink-connector-twitter_${scala.version}
   1.1-SNAPSHOT
   jar
   false
   ${project.build.directory}/classes
   org/apache/flink/**


   com.twitter
   hbc-core
   2.2.0
   jar
   false
   ${project.build.directory}/classes
   com/twitter/**




or that


   org.apache.flink
   flink-connector-twitter_${scala.version}
   1.1-SNAPSHOT
   jar
   false
   ${project.build.directory}/classes
   org/apache/flink/**


   com.twitter
   hbc-core
   2.2.0
   jar
   false
   ${project.build.directory}/classes
   com/twitter/**


   org.apache.httpcomponents
   httpclient
   4.2.5
   jar
   false
   ${project.build.directory}/classes
   org/apache/httpcomponents/**


   com.twitter
   joauth
   6.0.2
   jar
   false
   ${project.build.directory}/classes
   com/twitter/**


   org.apache.httpcomponents
   httpcore
   4.2.4
   jar
   false
   ${project.build.directory}/classes
   org/apache/httpcomponents/**


   com.google.guava
   guava
   14.0.1
   jar
   false
   ${project.build.directory}/classes
   com/google/guava/**




And if I run job I see this error:

2016-05-12 21:49:37,681 INFO  org.elasticsearch.plugins
 - [node-1] modules [], plugins [], sites []
2016-05-12 21:49:37,738 INFO  org.apache.flink.runtime.blob.BlobCache
 - Downloading
5ff307efcde8deebfb2886733e40994c01fbba7d from
localhost/127.0.0.1:47639
2016-05-12 21:49:38,109 INFO
org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink
 - Created Elasticsearch TransportClient
org.elasticsearch.client.transport.TransportClient@66cdf89
2016-05-12 21:49:38,114 INFO
org.apache.flink.streaming.connectors.twitter.TwitterSource   -
Initializing Twitter Streaming API connection
2016-05-12 21:49:38,357 INFO  com.twitter.hbc.httpclient.BasicClient
 - New connection executed: flink-twitter-source,
endpoint: /1.1/statuses/sample.json
2016-05-12 21:49:38,357 INFO
org.apache.flink.streaming.connectors.twitter.TwitterSource   -
Twitter Streaming API connection established successfully
2016-05-12 21:49:38,376 WARN  com.twitter.hbc.httpclient.ClientBase
 - flink-twitter-source Uncaught exception
java.lang.NoSuchMethodError:
org.apache.http.impl.conn.DefaultClientConnectionOperator.(Lorg/apache/http/conn/scheme/SchemeRegistry;Lorg/apache/http/conn/DnsResolver;)V
at 
org.apache.http.impl.conn.PoolingClientConnectionManager.createConnectionOperator(PoolingClientConnectionManager.java:140)
at 
org.apache.http.impl.conn.PoolingClientConnectionManager.(PoolingClientConnectionManager.java:114)
at 
org.apache.http.impl.conn.PoolingClientConnectionManager.(PoolingClientConnectionManager.java:99)
at 
org.apache.http.impl.conn.PoolingClientConnectionManager.(PoolingClientConnectionManager.java:85)
at 
com.twitter.hbc.httpclient.RestartableHttpClient.setup(RestartableHttpClient.java:56)
at com.twitter.hbc.httpclient.ClientBase.run(ClientBase.java:118)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
2016-05-12 21:49:38,379 INFO  com.twitter.hbc.httpclient.ClientBase
 - flink-twitter-source exit event -
java.lang.NoSuchMethodError:
org.apache.http.impl.conn.DefaultClientConnectionOperator.(Lorg/apache/http/conn/scheme/SchemeRegistry;Lorg/apache/http/conn/DnsResolver;)V
2016-05-12 21:49:38,380 INFO  com.twitter.hbc.httpclient.ClientBase
 - flink-twitter-source Shutting down httpclient
connection manager




 ... and finaly  "flink-connector-twitter_2.11-1.1-SNAPSHOT.jar" -  if I
add jar to this location: flink/build-target/lib/   - it's working. No idea
why :P


2016-05-12 0:32 GMT+02:00 Tzu-Li (Gordon) Tai :

> Hi Rafal,
>
> From your description, it seems like Flink is complaining because it cannot
> access the Elasticsearch API related dependencies as well. You'd also have
> to include the following into your Maven build, under :
>
> 
> org.elasticsearch
> elasticsearch
> 2.3.2
> jar
> false
> ${project.build.directory}/classes
> org/elasticsearch/**
> 
>
> Now your built 

Re: How to measure Flink performance

2016-05-12 Thread Konstantin Knauf
Hi Prateek,

regarding throughput, what about simply filling the input Kafka topic
with some (a lot) of messages and monitor (e.g.
http://quantifind.github.io/KafkaOffsetMonitor/) how quickly Flink can
work the lag off. The messages should be representative of your use
case, of course.

Latency is harder, I think, and I would also be interested in the
approaches of others to measure latency in Flink.

To some extend, you could do it by adding some logging inside Flink, but
this effects latency and only measure latency whithin Flink (excluding
reading from source and writing to sink).

Cheers,

Konstantin

On 12.05.2016 18:57, prateekarora wrote:
> Hi
> 
> How can i measure  throughput and latency  of my application in flink 1.0.2
> ?
> 
> Regards
> Prateek
> 
> 
> 
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-measure-Flink-performance-tp6741p6863.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.
> 

-- 
Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082


Re: How to measure Flink performance

2016-05-12 Thread prateekarora
Hi

How can i measure  throughput and latency  of my application in flink 1.0.2
?

Regards
Prateek



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-measure-Flink-performance-tp6741p6863.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Java heap space error

2016-05-12 Thread Flavio Pompermaier
Hi to all,
running a job that writes parquet-thrift files I had this exception (in a
Task Manager):

io.netty.channel.nio.NioEventLoop - Unexpected
exception in the selector loop.
java.lang.OutOfMemoryError: Java heap space
2016-05-12 18:49:11,302 WARN
org.jboss.netty.channel.socket.nio.AbstractNioSelector- Unexpected
exception in the selector loop.
java.lang.OutOfMemoryError: Java heap space
2016-05-12 18:49:11,302 ERROR
org.apache.flink.runtime.io.disk.iomanager.IOManager  - The handler
of the request-complete-callback threw an exception: Java heap space
java.lang.OutOfMemoryError: Java heap space
2016-05-12 18:49:11,303 ERROR
org.apache.flink.runtime.io.disk.iomanager.IOManager  - I/O reading
thread encountered an error: segment has been freed
java.lang.IllegalStateException: segment has been freed
at
org.apache.flink.core.memory.HeapMemorySegment.wrap(HeapMemorySegment.java:85)
at
org.apache.flink.runtime.io.disk.iomanager.SegmentReadRequest.read(AsynchronousFileIOChannel.java:310)
at
org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$ReaderThread.run(IOManagerAsync.java:396)
2016-05-12 18:49:11,303 ERROR
org.apache.flink.runtime.io.disk.iomanager.IOManager  - I/O reading
thread encountered an error: segment has been freed


Any idea of what could be the cause?

Best,
Flavio


Re: normalize vertex values

2016-05-12 Thread Vasiliki Kalavri
Hi Lydia,

there is no dedicated Gelly API method that performs normalization. If you
know the max value, then a mapVertices() would suffice. Otherwise, you can
get the Dataset of vertices with getVertices() and apply any kind of
operation supported by the Dataset API on it.

Best,
-Vasia.
On May 12, 2016 10:31 AM, "Lydia Ickler"  wrote:

> Hi all,
>
> If I have a Graph g: Graph g
> and I would like to normalize all vertex values by the absolute max of all
> vertex values -> what API function would I choose?
>
> Thanks in advance!
> Lydia


Re: synchronizing two streams

2016-05-12 Thread Matthias J. Sax
I see. But even if you would have an operator (A,B)->(A,B), it would not
be possible to block A if B does not deliver any data, because of
Flink's internal design.

You will need to use an custom solution: something like to a map (one
for each steam) that use an side-communication channel (ie, external to
Flink). The maps could send heart-beats to each other as long as there
are input date available. As long as heart beats are received, data is
forwarded. If there are no heart beats for the other map, it indicates
that the other stream lacks data and thus forwarding can block to
throttle the own stream.

-Matthias


On 05/12/2016 03:36 PM, Alexander Gryzlov wrote:
> Yes, this is generally a viable design, and is actually something we
> started off with.
> 
> The problem in our case is, however, that either of the streams can
> occasionally (due to external producer's issues) get stuck for an
> arbitrary period of time, up to several hours. Buffering the other one
> during all this time would just blow the memory - streams' rates are
> dozens or even hundreds of Mb/sec. 
> 
> Alex
> 
> On Thu, May 12, 2016 at 4:00 PM, Matthias J. Sax  > wrote:
> 
> That is correct. But there is no reason to throttle an input stream.
> 
> If you implements an Outer-Join you will have two in-memory buffers
> holding the record of each stream of your "time window". Each time you
> receive a watermark, you can remove all "expired" records from the
> buffer of the other stream. Furthermore, you need to track if a record
> got joined of not. For all records that got not joined, before removing
> them emit a "record-null" (or "null-record") result tuple.
> 
> No need to block/sleep.
> 
> Does this make sense?
> 
> 
> -Matthias
> 
> 
> On 05/12/2016 02:51 PM, Alexander Gryzlov wrote:
> > Hmm, probably I don't really get how Flink's execution model works. As
> > far as I understand, the preferred way to throttle down stream
> > consumption is to simply have an operator with a conditional
> > Thread.sleep() inside. Wouldn't calling sleep() in either
> > of TwoInputStreamOperator's processWatermarkN() methods just freeze the
> > entire operator, stopping the consumption of both streams (as opposed to
> > just one)?
> >
> > Alex
> >
> > On Thu, May 12, 2016 at 2:31 PM, Matthias J. Sax  
> > >> wrote:
> >
> > I cannot follow completely. TwoInputStreamOperators defines
> two methods
> > to process watermarks for each stream.
> >
> > So you can sync both stream within your outer join operator
> you plan to
> > implement.
> >
> > -Matthias
> >
> > On 05/11/2016 05:00 PM, Alexander Gryzlov wrote:
> > > Hello,
> > >
> > > We're implementing a streaming outer join operator based on a
> > > TwoInputStreamOperator with an internal buffer. In our use-case
> > only the
> > > items whose timestamps are within a several-second interval
> of each
> > > other can join, so we need to synchronize the two input
> streams to
> > > ensure maximal yield. Our plan is to utilize the watermark
> > mechanism to
> > > implement some sort of a "throttling" operator, which would
> take two
> > > streams and stop passing through one of them based on the
> > watermarks in
> > > another. However, there doesn't seem to exist an operator of
> the shape
> > > (A,B)->(A,B) in Flink, where A and B can be received and emitted
> > > independently. What would be a resource-saving way to
> implement such
> > > (e.g., without spawning two more parallel
> TwoInputStreamOperators)?
> > >
> > > Alex
> >
> >
> 
> 



signature.asc
Description: OpenPGP digital signature


Re: synchronizing two streams

2016-05-12 Thread Alexander Gryzlov
Yes, this is generally a viable design, and is actually something we
started off with.

The problem in our case is, however, that either of the streams can
occasionally (due to external producer's issues) get stuck for an arbitrary
period of time, up to several hours. Buffering the other one during all
this time would just blow the memory - streams' rates are dozens or even
hundreds of Mb/sec.

Alex

On Thu, May 12, 2016 at 4:00 PM, Matthias J. Sax  wrote:

> That is correct. But there is no reason to throttle an input stream.
>
> If you implements an Outer-Join you will have two in-memory buffers
> holding the record of each stream of your "time window". Each time you
> receive a watermark, you can remove all "expired" records from the
> buffer of the other stream. Furthermore, you need to track if a record
> got joined of not. For all records that got not joined, before removing
> them emit a "record-null" (or "null-record") result tuple.
>
> No need to block/sleep.
>
> Does this make sense?
>
>
> -Matthias
>
>
> On 05/12/2016 02:51 PM, Alexander Gryzlov wrote:
> > Hmm, probably I don't really get how Flink's execution model works. As
> > far as I understand, the preferred way to throttle down stream
> > consumption is to simply have an operator with a conditional
> > Thread.sleep() inside. Wouldn't calling sleep() in either
> > of TwoInputStreamOperator's processWatermarkN() methods just freeze the
> > entire operator, stopping the consumption of both streams (as opposed to
> > just one)?
> >
> > Alex
> >
> > On Thu, May 12, 2016 at 2:31 PM, Matthias J. Sax  > > wrote:
> >
> > I cannot follow completely. TwoInputStreamOperators defines two
> methods
> > to process watermarks for each stream.
> >
> > So you can sync both stream within your outer join operator you plan
> to
> > implement.
> >
> > -Matthias
> >
> > On 05/11/2016 05:00 PM, Alexander Gryzlov wrote:
> > > Hello,
> > >
> > > We're implementing a streaming outer join operator based on a
> > > TwoInputStreamOperator with an internal buffer. In our use-case
> > only the
> > > items whose timestamps are within a several-second interval of each
> > > other can join, so we need to synchronize the two input streams to
> > > ensure maximal yield. Our plan is to utilize the watermark
> > mechanism to
> > > implement some sort of a "throttling" operator, which would take
> two
> > > streams and stop passing through one of them based on the
> > watermarks in
> > > another. However, there doesn't seem to exist an operator of the
> shape
> > > (A,B)->(A,B) in Flink, where A and B can be received and emitted
> > > independently. What would be a resource-saving way to implement
> such
> > > (e.g., without spawning two more parallel TwoInputStreamOperators)?
> > >
> > > Alex
> >
> >
>
>


Re: synchronizing two streams

2016-05-12 Thread Matthias J. Sax
That is correct. But there is no reason to throttle an input stream.

If you implements an Outer-Join you will have two in-memory buffers
holding the record of each stream of your "time window". Each time you
receive a watermark, you can remove all "expired" records from the
buffer of the other stream. Furthermore, you need to track if a record
got joined of not. For all records that got not joined, before removing
them emit a "record-null" (or "null-record") result tuple.

No need to block/sleep.

Does this make sense?


-Matthias


On 05/12/2016 02:51 PM, Alexander Gryzlov wrote:
> Hmm, probably I don't really get how Flink's execution model works. As
> far as I understand, the preferred way to throttle down stream
> consumption is to simply have an operator with a conditional
> Thread.sleep() inside. Wouldn't calling sleep() in either
> of TwoInputStreamOperator's processWatermarkN() methods just freeze the
> entire operator, stopping the consumption of both streams (as opposed to
> just one)?
> 
> Alex
> 
> On Thu, May 12, 2016 at 2:31 PM, Matthias J. Sax  > wrote:
> 
> I cannot follow completely. TwoInputStreamOperators defines two methods
> to process watermarks for each stream.
> 
> So you can sync both stream within your outer join operator you plan to
> implement.
> 
> -Matthias
> 
> On 05/11/2016 05:00 PM, Alexander Gryzlov wrote:
> > Hello,
> >
> > We're implementing a streaming outer join operator based on a
> > TwoInputStreamOperator with an internal buffer. In our use-case
> only the
> > items whose timestamps are within a several-second interval of each
> > other can join, so we need to synchronize the two input streams to
> > ensure maximal yield. Our plan is to utilize the watermark
> mechanism to
> > implement some sort of a "throttling" operator, which would take two
> > streams and stop passing through one of them based on the
> watermarks in
> > another. However, there doesn't seem to exist an operator of the shape
> > (A,B)->(A,B) in Flink, where A and B can be received and emitted
> > independently. What would be a resource-saving way to implement such
> > (e.g., without spawning two more parallel TwoInputStreamOperators)?
> >
> > Alex
> 
> 



signature.asc
Description: OpenPGP digital signature


Re: synchronizing two streams

2016-05-12 Thread Alexander Gryzlov
Hmm, probably I don't really get how Flink's execution model works. As far
as I understand, the preferred way to throttle down stream consumption is
to simply have an operator with a conditional Thread.sleep() inside.
Wouldn't calling sleep() in either of TwoInputStreamOperator's
processWatermarkN()
methods just freeze the entire operator, stopping the consumption of both
streams (as opposed to just one)?

Alex

On Thu, May 12, 2016 at 2:31 PM, Matthias J. Sax  wrote:

> I cannot follow completely. TwoInputStreamOperators defines two methods
> to process watermarks for each stream.
>
> So you can sync both stream within your outer join operator you plan to
> implement.
>
> -Matthias
>
> On 05/11/2016 05:00 PM, Alexander Gryzlov wrote:
> > Hello,
> >
> > We're implementing a streaming outer join operator based on a
> > TwoInputStreamOperator with an internal buffer. In our use-case only the
> > items whose timestamps are within a several-second interval of each
> > other can join, so we need to synchronize the two input streams to
> > ensure maximal yield. Our plan is to utilize the watermark mechanism to
> > implement some sort of a "throttling" operator, which would take two
> > streams and stop passing through one of them based on the watermarks in
> > another. However, there doesn't seem to exist an operator of the shape
> > (A,B)->(A,B) in Flink, where A and B can be received and emitted
> > independently. What would be a resource-saving way to implement such
> > (e.g., without spawning two more parallel TwoInputStreamOperators)?
> >
> > Alex
>
>


Re: synchronizing two streams

2016-05-12 Thread Matthias J. Sax
I cannot follow completely. TwoInputStreamOperators defines two methods
to process watermarks for each stream.

So you can sync both stream within your outer join operator you plan to
implement.

-Matthias

On 05/11/2016 05:00 PM, Alexander Gryzlov wrote:
> Hello,
> 
> We're implementing a streaming outer join operator based on a
> TwoInputStreamOperator with an internal buffer. In our use-case only the
> items whose timestamps are within a several-second interval of each
> other can join, so we need to synchronize the two input streams to
> ensure maximal yield. Our plan is to utilize the watermark mechanism to
> implement some sort of a "throttling" operator, which would take two
> streams and stop passing through one of them based on the watermarks in
> another. However, there doesn't seem to exist an operator of the shape
> (A,B)->(A,B) in Flink, where A and B can be received and emitted
> independently. What would be a resource-saving way to implement such
> (e.g., without spawning two more parallel TwoInputStreamOperators)?
> 
> Alex



signature.asc
Description: OpenPGP digital signature


Re: Bug while using Table API

2016-05-12 Thread Vasiliki Kalavri
Good to know :)

On 12 May 2016 at 11:16, Simone Robutti 
wrote:

> Ok, I tested it and it works on the same example. :)
>
> 2016-05-11 12:25 GMT+02:00 Vasiliki Kalavri :
>
>> Hi Simone,
>>
>> Fabian has pushed a fix for the streaming TableSources that removed the
>> Calcite Stream rules [1].
>> The reported error does not appear anymore with the current master. Could
>> you please also give it a try and verify that it works for you?
>>
>> Thanks,
>> -Vasia.
>>
>> [1]:
>> https://github.com/apache/flink/commit/7ed07933d2dd3cf41948287dc8fd79dbef902311
>>
>> On 4 May 2016 at 17:33, Vasiliki Kalavri 
>> wrote:
>>
>>> Thanks Simone! I've managed to reproduce the error. I'll try to figure
>>> out what's wrong and I'll keep you updated.
>>>
>>> -Vasia.
>>> On May 4, 2016 3:25 PM, "Simone Robutti" 
>>> wrote:
>>>
 Here is the code:

 package org.example

 import org.apache.flink.api.scala._
 import org.apache.flink.api.table.TableEnvironment

 object Job {
   def main(args: Array[String]) {
 // set up the execution environment
 val env = ExecutionEnvironment.getExecutionEnvironment
 val tEnv = TableEnvironment.getTableEnvironment(env)


 val input = env.fromElements(WC("hello", 1), WC("hello", 1),
 WC("ciao", 1))
 val expr = tEnv.fromDataSet(input)
 val result = expr
   .groupBy("word")
   .select("word , count.sum as count")
 tEnv.toDataSet[WC](result).print()

 env.execute("Flink Scala API Skeleton")
   }
 }

 case class WC(word:String,count:Int)



>>
>


Re: Force triggering events on watermark

2016-05-12 Thread Aljoscha Krettek
Yes, this should work.

On Tue, 10 May 2016 at 19:01 Srikanth  wrote:

> Yes, will work.
> I was trying another route of having a "finalize & purge trigger" that will
>i) onElement - Register for event time watermark but not alter nested
> trigger's TriggerResult
>   ii) OnEventTime - Always purge after fire
>
> That will work with CountTrigger and other custom trigger too rt?
>
> public class FinalizePurgingTrigger  extends
> Trigger {
>
> @Override
> public TriggerResult onElement(T element, long timestamp, W window,
> TriggerContext ctx) throws Exception {
> ctx.registerEventTimeTimer(window.getEnd)
> return nestedTrigger.onElement(element, timestamp, window, ctx);
> }
>
> @Override
> public TriggerResult onEventTime(long time, W window, TriggerContext ctx)
> throws Exception {
> TriggerResult triggerResult = nestedTrigger.onEventTime(time, window, ctx);
> switch (triggerResult) {
> case FIRE:
> return TriggerResult.FIRE_AND_PURGE;
> case FIRE_AND_PURGE:
> return TriggerResult.FIRE_AND_PURGE;
> default:
> return TriggerResult.CONTINUE;
> }
> }
> }
>
> Srikanth
>
> On Tue, May 10, 2016 at 11:36 AM, Fabian Hueske  wrote:
>
>> Maybe the last example of this blog post is helpful [1].
>>
>> Best, Fabian
>>
>> [1]
>> https://www.mapr.com/blog/essential-guide-streaming-first-processing-apache-flink
>>
>> 2016-05-10 17:24 GMT+02:00 Srikanth :
>>
>>> Hi,
>>>
>>> I read the following in Flink doc "We can explicitly specify a Trigger
>>> to overwrite the default Trigger provided by the WindowAssigner. Note that
>>> specifying a triggers does not add an additional trigger condition but
>>> replaces the current trigger."
>>> So, I tested out the below code with count trigger. As per my
>>> understanding this will override the default watermark based trigger.
>>>
>>> val testStream = env.fromCollection(List( ("2016-04-07 13:11:59",
>>> 157428, 4),
>>>  ("2016-04-07 13:11:59", 157428, 4),
>>>  ("2016-04-07 13:11:59", 111283, 23),
>>>  ("2016-04-07 13:11:57", 108042, 23),
>>>  ("2016-04-07 13:12:00", 161374, 9),
>>>  ("2016-04-07 13:12:00", 161374, 9),
>>>  ("2016-04-07 13:11:59", 136505, 4)
>>> )
>>> )
>>>.assignAscendingTimestamps(b => f.parse(b._1).getTime())
>>>.map(b => (b._3, b._2))
>>>
>>> testStream.print
>>>
>>> val countStream = testStream
>>> .keyBy(_._1)
>>> .timeWindow(Time.seconds(20))
>>> .trigger(CountTrigger.of(3))
>>> .fold((0, List[Int]())) { case((k,r),i) => (i._1, r ++ List(i._2)) }
>>>
>>> countStream.print
>>>
>>> Output I saw confirms the documented behavior. Processing is triggered
>>> only when we have 3 elements for a key.
>>> How do I force trigger the left over records when watermark is past the
>>> window? I.e, I want to use triggers to start early processing but finalize
>>> the window based on watermark.
>>>
>>> Output shows that records for keys 23 & 9 weren't processed.
>>>   (4,157428)
>>>   (4,157428)
>>>   (23,111283)
>>>   (23,108042)
>>>   (9,161374)
>>>   (9,161374)
>>>   (4,136505)
>>>
>>>   (4,List(157428, 157428, 136505))
>>>
>>> Thanks,
>>> Srikanth
>>>
>>
>>
>


Re: Flink + Avro GenericRecord - first field value overwrites all other fields

2016-05-12 Thread Fabian Hueske
Hi Tarandeep,

the AvroInputFormat was recently extended to support GenericRecords. [1]
You could also try to run the latest SNAPSHOT version and see if it works
for you.

Cheers, Fabian

[1] https://issues.apache.org/jira/browse/FLINK-3691

2016-05-12 10:05 GMT+02:00 Tarandeep Singh :

> I think I found a workaround. Instead of reading Avro files as
> GenericRecords, if I read them as specific records and then use a map to
> convert (typecast) them as GenericRecord, the problem goes away.
>
> I ran some tests and so far this workaround seems to be working in my
> local setup.
>
> -Tarandeep
>
> On Wed, May 11, 2016 at 10:24 PM, Tarandeep Singh 
> wrote:
>
>> Hi,
>>
>> I am using DataSet API and reading Avro files as DataSet.
>> I am seeing this weird behavior that record is read correctly from file
>> (verified by printing all values) but when when this record is passed to
>> Flink chain/DAG (e.g. KeySelector), every field in this record has the same
>> value as the first field value. Even more weird is they values are of
>> different types, e.g. I have a record Query with two fields key (integer)
>> and query (String). When the record was read from file, correct values were
>> read (e.g. 100, "apache flink"). But when I print/check values in
>> KeySelector, I get (100, 100).
>>
>> I saw similar post on stackoverflow-
>>
>> http://stackoverflow.com/questions/37115618/apache-flink-union-operator-giving-wrong-response
>>
>> Any idea what might be happening?
>> Any workaround will be greatly appreciated.
>>
>> Thank you,
>> Tarandeep
>>
>>
>


Re: get start and end time stamp from time window

2016-05-12 Thread Fabian Hueske
Hi Martin,

You can use a FoldFunction and a WindowFunction to process the same!
window. The FoldFunction is eagerly applied, so the window state is only
one element. When the window is closed, the aggregated element is given to
the WindowFunction where you can add start and end time. The iterator of
the WindowFunction will provide only one (the aggregated) element.

See the apply method on WindowedStream with the following signature:
apply(initialValue: R, foldFunction: FoldFunction[T, R], function:
WindowFunction[R, R, K, W]): DataStream[R]

Cheers, Fabian

2016-05-11 20:16 GMT+02:00 Martin Neumann :

> Hej,
>
> I have a windowed stream and I want to run a (generic) fold function on
> it. The result should have the start and the end time stamp of the window
> as fields (so I can relate it to the original data). *Is there a simple
> way to get the timestamps from within the fold function?*
>
> I could find the lowest and the highest ts as part of the fold function
> but that would not be very accurate especially when I the number of events
> in the window is low. Also, I want to write in a generic way so I can use
> it even if the data itself does not contain a time stamp field (running on
> processing time).
>
> I have looked into using a WindowFunction where I would have access to the
> start and end timestamp. I have not quite figured out how I would implement
> a fold function using this. Also, from my understanding this approach would
> require holding the whole window in memory which is not a good option since
> the window data can get very large.
>
> Is there a better way of doing this
>
>
> cheers Martin
>


Unexpected behaviour in datastream.broadcast()

2016-05-12 Thread Biplob Biswas
Hi,

I am running this following sample code to understand how iteration and
broadcast works in streaming context.

final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
 env.setParallelism(4);
 long i = 5;
 DataStream mainInput = env.generateSequence(2, 8);
 DataStream initialIterateInput = env.fromElements(i);


IterativeStream.ConnectedIterativeStreams iteration 
=
   
mainInput.iterate().withFeedbackType(BasicTypeInfo.LONG_TYPE_INFO);
 

DataStream iterateHead = iteration
.flatMap(new CoFlatMapFunction() {
long globalVal = 1;
@Override
public void flatMap1(Long value, Collector 
out) throws
Exception {
Thread.sleep(3000);
System.out.println("SEEING FROM INPUT 1: " + 
value+",
"+globalVal);
//globalVal = globalVal + value;
out.collect(globalVal+value);
}

@Override
public void flatMap2(Long value, Collector 
out) throws
Exception {
Thread.sleep(1000);
globalVal = value;
System.out.println("SEEING FROM INPUT 2: " + 
value+",
"+globalVal);

//out.collect(value);

}
});

iteration.closeWith(iterateHead.broadcast());

iterateHead.map(new MapFunction() {
@Override
public Long map(Long value) throws Exception {
System.out.println("SEEING OUTPUT FROM ITERATION: " + 
value);
return value;
}
});

I was expecting that after  out.collect(globalVal+value); is called the
value would be broadcasted to every partition as given by the closewith
statement. Also, i was expecting to get the broadcasted value to the
flatmap2 function and then update the globalval in every partition. 
But  rather than that, the values are not broadcasted and iterated properly
as i was expecting and i am getting the following output,

SEEING FROM INPUT 1: 2, 1
SEEING OUTPUT FROM ITERATION: 3
SEEING FROM INPUT 1: 3, 1
SEEING OUTPUT FROM ITERATION: 4
SEEING FROM INPUT 1: 4, 1
SEEING FROM INPUT 1: 5, 1
SEEING OUTPUT FROM ITERATION: 5
SEEING OUTPUT FROM ITERATION: 6
SEEING FROM INPUT 2: 5, 5
SEEING FROM INPUT 2: 6, 6
SEEING FROM INPUT 1: 6, 1
SEEING OUTPUT FROM ITERATION: 7
SEEING FROM INPUT 1: 7, 1
SEEING OUTPUT FROM ITERATION: 8
SEEING FROM INPUT 1: 8, 1
SEEING OUTPUT FROM ITERATION: 9
SEEING FROM INPUT 2: 4, 4
SEEING FROM INPUT 2: 4, 4
SEEING FROM INPUT 2: 5, 5
SEEING FROM INPUT 2: 5, 5
SEEING FROM INPUT 2: 3, 3
SEEING FROM INPUT 2: 6, 6
SEEING FROM INPUT 2: 6, 6
SEEING FROM INPUT 2: 6, 6
SEEING FROM INPUT 2: 9, 9
SEEING FROM INPUT 2: 3, 3
SEEING FROM INPUT 2: 4, 4
SEEING FROM INPUT 2: 4, 4
SEEING FROM INPUT 2: 8, 8
SEEING FROM INPUT 2: 5, 5
SEEING FROM INPUT 2: 3, 3
SEEING FROM INPUT 2: 3, 3
SEEING FROM INPUT 2: 7, 7
SEEING FROM INPUT 2: 9, 9
SEEING FROM INPUT 2: 9, 9
SEEING FROM INPUT 2: 9, 9
SEEING FROM INPUT 2: 8, 8
SEEING FROM INPUT 2: 8, 8
SEEING FROM INPUT 2: 8, 8
SEEING FROM INPUT 2: 7, 7
SEEING FROM INPUT 2: 7, 7
SEEING FROM INPUT 2: 7, 7


Can anyone please explain why such behaviour? Why is the iteration happening
after reading all the elements of the first input stream? what if it is an
infinite stream, would the iteration wait for it to finish? 

Thanks and Regards



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Unexpected-behaviour-in-datastream-broadcast-tp6848.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Bug while using Table API

2016-05-12 Thread Simone Robutti
Ok, I tested it and it works on the same example. :)

2016-05-11 12:25 GMT+02:00 Vasiliki Kalavri :

> Hi Simone,
>
> Fabian has pushed a fix for the streaming TableSources that removed the
> Calcite Stream rules [1].
> The reported error does not appear anymore with the current master. Could
> you please also give it a try and verify that it works for you?
>
> Thanks,
> -Vasia.
>
> [1]:
> https://github.com/apache/flink/commit/7ed07933d2dd3cf41948287dc8fd79dbef902311
>
> On 4 May 2016 at 17:33, Vasiliki Kalavri 
> wrote:
>
>> Thanks Simone! I've managed to reproduce the error. I'll try to figure
>> out what's wrong and I'll keep you updated.
>>
>> -Vasia.
>> On May 4, 2016 3:25 PM, "Simone Robutti" 
>> wrote:
>>
>>> Here is the code:
>>>
>>> package org.example
>>>
>>> import org.apache.flink.api.scala._
>>> import org.apache.flink.api.table.TableEnvironment
>>>
>>> object Job {
>>>   def main(args: Array[String]) {
>>> // set up the execution environment
>>> val env = ExecutionEnvironment.getExecutionEnvironment
>>> val tEnv = TableEnvironment.getTableEnvironment(env)
>>>
>>>
>>> val input = env.fromElements(WC("hello", 1), WC("hello", 1),
>>> WC("ciao", 1))
>>> val expr = tEnv.fromDataSet(input)
>>> val result = expr
>>>   .groupBy("word")
>>>   .select("word , count.sum as count")
>>> tEnv.toDataSet[WC](result).print()
>>>
>>> env.execute("Flink Scala API Skeleton")
>>>   }
>>> }
>>>
>>> case class WC(word:String,count:Int)
>>>
>>>
>>>
>


Re: reading from latest kafka offset when flink starts

2016-05-12 Thread Balaji Rajagopalan
No I am using 0.8.0.2 kafka. I did some experiments with changing the
parallelism from 4 to 16 now the lag has reduced to 20 min from 2 hours,
the cpu utilization (load avg)  has gone up from 20-30 % to 50-60 % , so
parallelism does seem to play a role in reducing the processing lag in
flink as I expected.

On Wed, May 11, 2016 at 11:42 PM, Aljoscha Krettek 
wrote:

> Hi,
> are you per change using Kafka 0.9?
>
> Cheers,
> Aljoscha
>
> On Tue, 10 May 2016 at 08:37 Balaji Rajagopalan <
> balaji.rajagopa...@olacabs.com> wrote:
>
>> Robert,
>>   Regarding the event qps 4500 events/sec may not be large no, but I am
>> seeing some issue in processing the events due to processing power that I
>> am using, I have deployed flink app on 3 node yarn cluster one node is a
>> master, 2 slave nodes which has the taskmanager running. Each machine is a
>> 2 core machine with 4 gb ram,with default.parallelism set to 4,  I find
>> there is delay of 2 hours from the time event enters into the system to
>> time it gets into the sink, it looks like the events are checkpointed but
>> processed with a huge delay inside flink, is there any recommendation(wiki
>> write up) for the no of taskmanager slots required for processing for
>> certain load of incoming data.
>>
>> balaji
>>
>> On Mon, May 9, 2016 at 1:59 PM, Ufuk Celebi  wrote:
>>
>>> Robert, what do you think about adding a note about this to the Kafka
>>> consumer docs? This has come up a couple of times on the mailing list
>>> already.
>>>
>>> – Ufuk
>>>
>>> On Fri, May 6, 2016 at 12:07 PM, Balaji Rajagopalan
>>>  wrote:
>>> > Thanks Robert appreciate your help.
>>> >
>>> > On Fri, May 6, 2016 at 3:07 PM, Robert Metzger 
>>> wrote:
>>> >>
>>> >> Hi,
>>> >>
>>> >> yes, you can use Kafka's configuration setting for that. Its called
>>> >> "auto.offset.reset". Setting it to "latest" will change the restart
>>> behavior
>>> >> to the current offset ("earliest" is the opposite).
>>> >>
>>> >> How heavy is the processing you are doing? 4500 events/second sounds
>>> not
>>> >> like a lot of throughput.
>>> >>
>>> >> On Wed, May 4, 2016 at 8:24 AM, Balaji Rajagopalan
>>> >>  wrote:
>>> >>>
>>> >>> I am using the flink connector to read from a kafka stream, I ran
>>> into
>>> >>> the problem where the flink job went down due to some application
>>> error, it
>>> >>> was down for sometime, meanwhile the kafka queue was growing as
>>> expected no
>>> >>> consumer to consume from the given group , and when I started the
>>> flink it
>>> >>> started consuming the messages no problem so far, but consumer lag
>>> was huge
>>> >>> since producer is a fast producer about 4500 events/sec. My question
>>> is
>>> >>> there any flink connector configuration which can force it read from
>>> the
>>> >>> latest offset when the flink application starts since in my
>>> application
>>> >>> logic I do not care about older events.
>>> >>>
>>> >>> balaji
>>> >>
>>> >>
>>> >
>>>
>>
>>


Re: HBase write problem

2016-05-12 Thread Flavio Pompermaier
Great :)

On Thu, May 12, 2016 at 10:01 AM, Palle  wrote:

> Hi guys.
>
> Thanks for helping out.
>
> We downgraded to HBase 0.98 and resolved some classpath issues and then it
> worked.
>
> /Palle
>
> - Original meddelelse -
>
> *Fra:* Stephan Ewen 
> *Til:* user@flink.apache.org
> *Dato:* Ons, 11. maj 2016 17:19
>
> *Emne:* Re: HBase write problem
>
> Just to narrow down the problem:
> The insertion into HBase actually works, but the job does not finish after
> that?
> And the same job (same source of data) that writes to a file, or prints,
> finishes?
> If that is the case, can you check what status each task is in, via the
> web dashboard? Are all tasks still in "running"?
>
>
> On Wed, May 11, 2016 at 4:53 PM, Flavio Pompermaier 
> wrote:
>
>> I can't help you with the choice of the db storage, as always the answer
>> is "it depends" on a lot of factors :)
>>
>> For what I can tell you the problem could be that Flink support HBase
>> 0.98, so it could worth to update Flink connectors to a more recent version
>> (that should be backward compatible hopefully..) or maybe create two
>> separte hbase connectors (one for hbase-0.9x and one for 0.1x). Let me know
>> about your attempts :)
>>
>>
>> On Wed, May 11, 2016 at 4:47 PM, Palle  wrote:
>>
>>> Hadoop 2.7.2
>>> HBase 1.2.1
>>>
>>> I have this running from a Hadoop job, but just not from Flink.
>>>
>>> I will look into your suggestions, but would I be better off choosing
>>> another DB for storage? I can see that  Cassandra gets some attention in
>>> this mailing list. I need to store app 2 bio key value pairs consisting of
>>> 100 bytes for each pair.
>>>
>>> - Original meddelelse -
>>>
>>> *Fra:* Flavio Pompermaier 
>>> *Til:* user 
>>> *Dato:* Ons, 11. maj 2016 16:29
>>>
>>> *Emne:* Re: HBase write problem
>>>
>>> And which version of HBase and Hadoop are you running?
>>> Did you try to put the hbase-site.xml in the jar?
>>> Moreover, I don't know how much reliable is at the moment the web client
>>> UI..my experience is that the command line client is much more reliable.
>>> You just need to run from the flink dir something like:
>>>bin/flink  run -c  xxx.yyy.MyMainClass /path/to/shadedJar.jar
>>>
>>> On Wed, May 11, 2016 at 4:19 PM, Palle  wrote:
>>>
 I run the job from the cluster. I run it through the web UI.
 The jar file submitted does not contain the hbase-site.xml file.

 - Original meddelelse -

 *Fra:* Flavio Pompermaier 
 *Til:* user 
 *Dato:* Ons, 11. maj 2016 09:36

 *Emne:* Re: HBase write problem

 Do you run the job from your IDE or from the cluster?

 On Wed, May 11, 2016 at 9:22 AM, Palle  wrote:

> Thanks for the response, but I don't think the problem is the
> classpath - hbase-site.xml should be added. This is what it looks like
> (hbase conf is added at the end):
>
> 2016-05-11 09:16:45,831 INFO  org.apache.zookeeper.ZooKeeper
>  - Client
> environment:java.class.path=C:\systems\packages\flink-1.0.2\lib\flink-dist_2.11-1.0.2.jar;C:\systems\packages\flink-1.0.2\lib\flink-python_2.11-1.0.2.jar;C:\systems\packages\flink-1.0.2\lib\guava-11.0.2.jar;C:\systems\packages\flink-1.0.2\lib\hbase-annotations-1.2.1-tests.jar;C:\systems\packages\flink-1.0.2\lib\hbase-annotations-1.2.1.jar;C:\systems\packages\flink-1.0.2\lib\hbase-client-1.2.1.jar;C:\systems\packages\flink-1.0.2\lib\hbase-common-1.2.1-tests.jar;C:\systems\packages\flink-1.0.2\lib\hbase-common-1.2.1.jar;C:\systems\packages\flink-1.0.2\lib\hbase-examples-1.2.1.jar;C:\systems\packages\flink-1.0.2\lib\hbase-external-blockcache-1.2.1.jar;C:\systems\packages\flink-1.0.2\lib\hbase-hadoop-compat-1.2.1.jar;C:\systems\packages\flink-1.0.2\lib\hbase-hadoop2-compat-1.2.1.jar;C:\systems\packages\flink-1.0.2\lib\hbase-it-1.2.1-tests.jar;C:\systems\packages\flink-1.0.2\lib\hbase-it-1.2.1.jar;C:\systems\packages\flink-1.0.2\lib\hbase-prefix-tree-1.2.1.jar;C:\systems\packages\flink-1.0.2\lib\hbase-procedure-1.2.1.jar;C:\systems\packages\flink-1.0.2\lib\hbase-protocol-1.2.1.jar;C:\systems\packages\flink-1.0.2\lib\hbase-resource-bundle-1.2.1.jar;C:\systems\packages\flink-1.0.2\lib\hbase-rest-1.2.1.jar;C:\systems\packages\flink-1.0.2\lib\hbase-server-1.2.1-tests.jar;C:\systems\packages\flink-1.0.2\lib\hbase-server-1.2.1.jar;C:\systems\packages\flink-1.0.2\lib\hbase-shell-1.2.1.jar;C:\systems\packages\flink-1.0.2\lib\hbase-thrift-1.2.1.jar;C:\systems\packages\flink-1.0.2\lib\log4j-1.2.17.jar;C:\systems\packages\flink-1.0.2\lib\slf4j-log4j12-1.7.7.jar;C:\systems\master_flink\bin;C:\systems\packages\flink-1.0.2\lib;;C:\systems\packages\hbase-1.2.1\lib;C:\systems\hbase\conf;C:\systems\hbase\conf\hbase-site.xml;
>
> 2016-05-11 09:16:45,831 INFO  

normalize vertex values

2016-05-12 Thread Lydia Ickler
Hi all,

If I have a Graph g: Graph g
and I would like to normalize all vertex values by the absolute max of all 
vertex values -> what API function would I choose?

Thanks in advance!
Lydia

checkpoints not being removed from HDFS

2016-05-12 Thread Maciek Próchniak

Hi,

we have stream job with quite large state (few GB), we're using 
FSStateBackend and we're storing checkpoints in hdfs.
What we observe is that v. often old checkpoints are not discarded 
properly. In hadoop logs I can see:


2016-05-10 12:21:06,559 INFO BlockStateChange: BLOCK* addToInvalidates: 
blk_1084791727_11053122 10.10.113.10:50010
2016-05-10 12:21:06,559 INFO org.apache.hadoop.ipc.Server: IPC Server 
handler 9 on 8020, call 
org.apache.hadoop.hdfs.protocol.ClientProtocol.delete from 
10.10.113.9:49233 Call#12337 Retry#0
org.apache.hadoop.fs.PathIsNotEmptyDirectoryException: 
`/flink/checkpoints_test/570d6e67d571c109daab468e5678402b/chk-62 is non 
empty': Directory is not empty
at 
org.apache.hadoop.hdfs.server.namenode.FSDirDeleteOp.delete(FSDirDeleteOp.java:85)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.delete(FSNamesystem.java:3712)


While on flink side (jobmanager log) we don't see any problems:
2016-05-10 12:20:22,636 [Checkpoint Timer] INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - 
Triggering checkpoint 62 @ 1462875622636
2016-05-10 12:20:32,507 [flink-akka.actor.default-dispatcher-240088] 
INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - 
Completed checkpoint 62 (in 9843 ms)
2016-05-10 12:20:52,637 [Checkpoint Timer] INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - 
Triggering checkpoint 63 @ 1462875652637
2016-05-10 12:21:06,563 [flink-akka.actor.default-dispatcher-240028] 
INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - 
Completed checkpoint 63 (in 13909 ms)
2016-05-10 12:21:22,636 [Checkpoint Timer] INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - 
Triggering checkpoint 64 @ 1462875682636


I see in the code that delete operations in flink are done with 
recursive flag set to false - but I'm not sure why the contents are not 
being deleted before?

When we were using RocksDB backed we didn't encounter such situation.
we're using flink 1.0.1 and hdfs 2.7.2.

Do anybody has any idea why this could be happening?

thanks,
maciek





[ANNOUNCE] Flink 1.0.3 Released

2016-05-12 Thread Ufuk Celebi
The Flink PMC is pleased to announce the availability of Flink 1.0.3.

The official release announcement:
http://flink.apache.org/news/2016/05/11/release-1.0.3.html

Release binaries:
http://apache.openmirror.de/flink/flink-1.0.3/

Please update your Maven dependencies to the new 1.0.3 version and
update your binaries.

On behalf of the Flink PMC, I would like to thank everybody who
contributed to the release.


Re: Flink + Avro GenericRecord - first field value overwrites all other fields

2016-05-12 Thread Tarandeep Singh
I think I found a workaround. Instead of reading Avro files as
GenericRecords, if I read them as specific records and then use a map to
convert (typecast) them as GenericRecord, the problem goes away.

I ran some tests and so far this workaround seems to be working in my local
setup.

-Tarandeep

On Wed, May 11, 2016 at 10:24 PM, Tarandeep Singh 
wrote:

> Hi,
>
> I am using DataSet API and reading Avro files as DataSet. I
> am seeing this weird behavior that record is read correctly from file
> (verified by printing all values) but when when this record is passed to
> Flink chain/DAG (e.g. KeySelector), every field in this record has the same
> value as the first field value. Even more weird is they values are of
> different types, e.g. I have a record Query with two fields key (integer)
> and query (String). When the record was read from file, correct values were
> read (e.g. 100, "apache flink"). But when I print/check values in
> KeySelector, I get (100, 100).
>
> I saw similar post on stackoverflow-
>
> http://stackoverflow.com/questions/37115618/apache-flink-union-operator-giving-wrong-response
>
> Any idea what might be happening?
> Any workaround will be greatly appreciated.
>
> Thank you,
> Tarandeep
>
>


Re: HBase write problem

2016-05-12 Thread Palle
Hi guys.

Thanks for helping out.

We downgraded to HBase 0.98 and resolved some classpath issues and then
it worked.

/Palle

- Original meddelelse -

> Fra: Stephan Ewen 
> Til: user@flink.apache.org
> Dato: Ons, 11. maj 2016 17:19
> Emne: Re: HBase write problem
> 
> Just to narrow down the problem:The insertion into HBase actually
> works, but the job does not finish after that?And the same job (same
> source of data) that writes to a file, or prints, finishes?If that is
> the case, can you check what status each task is in, via the web
> dashboard? Are all tasks still in "running"?
> 
> On Wed, May 11, 2016 at 4:53 PM, Flavio Pompermaier <
> pomperma...@okkam.it [mailto:pomperma...@okkam.it]> wrote:
> 
>   I can't help you with the choice of the db storage, as always the
>   answer is "it depends" on a lot of factors :)
> 
>   For what I can tell you the problem could be that Flink support
>   HBase 0.98, so it could worth to update Flink connectors to a
>   more recent version (that should be backward compatible
>   hopefully..) or maybe create two separte hbase connectors (one
>   for hbase-0.9x and one for 0.1x). Let me know about your attempts
>   :)
> 
>   On Wed, May 11, 2016 at 4:47 PM, Palle < pa...@sport.dk 
> [mailto:pa...@sport.dk]>
>   wrote:
> 
> Hadoop 2.7.2
> HBase 1.2.1
> 
> I have this running from a Hadoop job, but just not from
> Flink.
> 
> I will look into your suggestions, but would I be better off
> choosing another DB for storage? I can see that Cassandra
> gets some attention in this mailing list. I need to store app
> 2 bio key value pairs consisting of 100 bytes for each pair.
> 
> - Original meddelelse -
> 
>   Fra: Flavio Pompermaier < pomperma...@okkam.it 
> [mailto:pomperma...@okkam.it]>
>   Til: user < user@flink.apache.org [mailto:user@flink.apache.org]>
>   Dato: Ons, 11. maj 2016 16:29
>   Emne: Re: HBase write problem
> 
>   And which version of HBase and Hadoop are you running?
>   Did you try to put the hbase-site.xml in the jar?Moreover,
>   I don't know how much reliable is at the moment the web
>   client UI..my experience is that the command line client
>   is much more reliable.You just need to run from the flink
>   dir something like: bin/flink run -c xxx.yyy.MyMainClass
>   /path/to/shadedJar.jar
>   On Wed, May 11, 2016 at 4:19 PM, Palle < pa...@sport.dk 
> [mailto:pa...@sport.dk]>
>   wrote:
> 
> I run the job from the cluster. I run it through the
> web UI.
> The jar file submitted does not contain the
> hbase-site.xml file.
> 
> - Original meddelelse -
> 
>   Fra: Flavio Pompermaier < pomperma...@okkam.it 
> [mailto:pomperma...@okkam.it]>
>   Til: user < user@flink.apache.org [mailto:user@flink.apache.org]>
>   Dato: Ons, 11. maj 2016 09:36
>   Emne: Re: HBase write problem
> 
>   Do you run the job from your IDE or from the
>   cluster?
> 
>   On Wed, May 11, 2016 at 9:22 AM, Palle <
>   pa...@sport.dk [mailto:pa...@sport.dk]> wrote:
> 
> Thanks for the response, but I don't think
> the problem is the classpath - hbase-site.xml
> should be added. This is what it looks like
> (hbase conf is added at the end):
> 
> 2016-05-11 09:16:45,831 INFO
> org.apache.zookeeper.ZooKeeper - Client
> 
>