Processing millions of messages in milliseconds real time -- Architecture guide required

2016-04-18 Thread Deepak Sharma
Hi all,
I am looking for an architecture to ingest 10 mils of messages in the real
time streaming mode.
If anyone has worked on similar kind of architecture  , can you please
point me to any documentation around the same like what should be the
architecture , which all components/big data ecosystem tools should i
consider etc.
The messages has to be in xml/json format , a preprocessor engine or
message enhancer and then finally a processor.
I thought about using data cache as well for serving the data
The data cache should have the capability to serve the historical  data in
milliseconds (may be upto 30 days of data)

-- 
Thanks
Deepak


Flink + S3

2016-04-18 Thread Michael-Keith Bernard
Hello Flink Users!

I'm a Flink newbie at the early stages of deploying our first Flink cluster 
into production and I have a few questions about wiring up Flink with S3:

* We are going to use the HA configuration[1] from day one (we have existing zk 
infrastructure already). Can S3 be used as a state backend for the Job Manager? 
The documentation talks about using S3 as a state backend for TM[2] (and in 
particular for streaming), but I'm wondering if it's a suitable backend for the 
JM as well.

* How do I configure S3 for Flink when I don't already have an existing Hadoop 
cluster? The documentation references the Hadoop configuration manifest[3], 
which kind of implies to me that I must already be running Hadoop (or at least 
have a properly configured Hadoop cluster). Is there an example somewhere of 
using S3 as a storage backend for a standalone cluster?

* Bonus: I'm writing a Puppet module for installing/configuring/managing Flink 
in stand alone mode with an existing zk cluster. Are there any existing modules 
for this (I didn't find anything in the forge)? Would others in the community 
be interested if we added our module to the forge once complete?

Thanks so much for your time and consideration. We look forward to using Flink 
in production!

Cheers,
Michael-Keith

[1]: 
https://ci.apache.org/projects/flink/flink-docs-master/setup/jobmanager_high_availability.html#standalone-cluster-high-availability

[2]: 
https://ci.apache.org/projects/flink/flink-docs-master/setup/aws.html#s3-simple-storage-service

[3]: 
https://ci.apache.org/projects/flink/flink-docs-master/setup/aws.html#set-s3-filesystem

Does Flink support joining multiple streams based on event time window now?

2016-04-18 Thread Yifei Li
Hi,

I am new to Flink and I've read some documentation and think Flink may fit
my scenario.

Here is my scenario:

1. Assume I have 3 streams: S1(id, name, email, action, date), S2(id, name,
email, level, date), S3(id, name, position, date).

*2. S2 always delays(hours to days, not determined..) *

3. Based on the event time, I want to join S1, S2 and S3 every 5 minutes.
The join is like a SQL join:
select S1.name, S3.position from S1, S2, S3 where S1.id = S2.id and
S1.id = S3.id and S1.action = 'download' and S2.level = 5



Can I use Flink for my scenario? Is yes, can anyone point me to some
working examples(I found some examples but they are outdated), or tell me
some workaround to solve this problem? If no, can anyone tell me the
reasons?

Thanks,

Yifei


Re: Gracefully stop long running streaming job

2016-04-18 Thread Robert Schmidtke
I'm on 0.10.2 which seems to be still lacking this feature. Anyway I'm
happy to see it'll be in future releases, so I'll get to enjoy it once I
upgrade :) I'm using a FlinkKafkaConsumer081 for the record.

Anyway, thanks a bunch
Robert

On Tue, Apr 19, 2016 at 12:14 AM, Matthias J. Sax  wrote:

> If all your sources implements Stoppable interface, you can STOP a job.
>
> ./bin/flink stop JobID
>
> STOP is however quite new and it is ongoing work to make available
> sources stoppable (some are already). Not sure what kind of sources you
> are using right now.
>
> -Matthias
>
>
> On 04/18/2016 10:50 PM, Robert Schmidtke wrote:
> > Hi everyone,
> >
> > I am running a streaming benchmark which involves a potentially
> > infinitely running Flink Streaming Job. I run it blocking on YARN using
> > ./bin/flink run ... and then send the command into background,
> > remembering its PID to kill it later on. While this gets the work done,
> > the job always ends up in the FAILED state. I imagine it would be the
> > same if I used ./bin/flink cancel ... to cancel the job? It's not that
> > pressing but it would be nice to shut down a streaming job properly.
> >
> > Thanks
> >
> > Robert
> >
> > --
> > My GPG Key ID: 336E2680
>
>


-- 
My GPG Key ID: 336E2680


Re: Gracefully stop long running streaming job

2016-04-18 Thread Matthias J. Sax
If all your sources implements Stoppable interface, you can STOP a job.

./bin/flink stop JobID

STOP is however quite new and it is ongoing work to make available
sources stoppable (some are already). Not sure what kind of sources you
are using right now.

-Matthias


On 04/18/2016 10:50 PM, Robert Schmidtke wrote:
> Hi everyone,
> 
> I am running a streaming benchmark which involves a potentially
> infinitely running Flink Streaming Job. I run it blocking on YARN using
> ./bin/flink run ... and then send the command into background,
> remembering its PID to kill it later on. While this gets the work done,
> the job always ends up in the FAILED state. I imagine it would be the
> same if I used ./bin/flink cancel ... to cancel the job? It's not that
> pressing but it would be nice to shut down a streaming job properly.
> 
> Thanks
> 
> Robert
> 
> -- 
> My GPG Key ID: 336E2680



signature.asc
Description: OpenPGP digital signature


Gracefully stop long running streaming job

2016-04-18 Thread Robert Schmidtke
Hi everyone,

I am running a streaming benchmark which involves a potentially infinitely
running Flink Streaming Job. I run it blocking on YARN using ./bin/flink
run ... and then send the command into background, remembering its PID to
kill it later on. While this gets the work done, the job always ends up in
the FAILED state. I imagine it would be the same if I used ./bin/flink
cancel ... to cancel the job? It's not that pressing but it would be nice
to shut down a streaming job properly.

Thanks

Robert

-- 
My GPG Key ID: 336E2680


Hash tables - joins, cogroup, deltaIteration

2016-04-18 Thread Ovidiu-Cristian MARCU
Hi,

Can you please confirm if there is any update regarding the hash tables use 
cases, as in [1] it is specified that Hash tables are used in Joins and for the 
Solution set in iterations (pending work to use them for grouping/aggregations)?

I am interested in the pending work progress and also if you consider to add an 
implementation where Joins and Solution Set in delta iterations (and CoGroup) 
can rely on a hybrid implementation where the engine can use also disk if not 
enough memory available when working with these hash tables.
 
[1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=53741525 
 
Memory Management (Batch API)

Thanks

Best,
Ovidiu

class java.util.UUID is not a valid POJO type

2016-04-18 Thread Leonard Wolters

Hi,

Quick question. I'm currently implementing our Machine Learning into
Spark using the Scala interface.
Do I understand correctly that when using scala only, you can only use
primitive types? How can I register
the UUID class in order to get supported for (de)serializing between the
nodes?

BTW: Would love to see the UUID class get 'native' support.

Keep on going the good work guys!,

Leonard



Re: Compression - AvroOutputFormat and over network ?

2016-04-18 Thread Robert Metzger
Hi Tarandeep,

I think for that you would need to set a codec factory on the DataFileWriter.
Sadly we don't expose that method to the user.

If you want, you can contribute this change to Flink. Otherwise, I can
quickly fix it.

Regards,
Robert


On Mon, Apr 18, 2016 at 2:36 PM, Ufuk Celebi  wrote:

> Hey Tarandeep,
>
> regarding the network part: not possible at the moment. It's pretty
> straight forward to add support for it, but no one ever got around to
> actually implementing it. If you would like to contribute, I am happy
> to give some hints about which parts of the system would need to be
> modified.
>
> – Ufuk
>
>
> On Mon, Apr 18, 2016 at 12:56 PM, Tarandeep Singh 
> wrote:
> > Hi,
> >
> > How can I set compression for AvroOutputFormat when writing files on
> HDFS?
> > Also, can we set compression for intermediate data that is sent over
> network
> > (from map to reduce phase) ?
> >
> > Thanks,
> > Tarandeep
>


Re: throttled stream

2016-04-18 Thread Robert Metzger
Hi,
I would also go for Niels approach. If the mapper has the same parallelism
as the source and its right after it, it'll be chained to the source. The
throttling then happens with almost no overhead.

Regarding the ThrottledIterator: Afaik there is no iterator involved when
reading data out of the Kafka connector, so there is no way to plug it in
anywhere.


On Sun, Apr 17, 2016 at 8:53 AM, Márton Balassi 
wrote:

> There is a utility in flink-streaming-examples that might be useful, but
> is generally the same idea that Niels suggests. [1]
>
> [1]
> https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/utils/ThrottledIterator.java
>
> On Sun, Apr 17, 2016 at 8:42 AM, Niels Basjes  wrote:
>
>> Simple idea: create a map function that only does "sleep 1/5 second" and
>> put that in your pipeline somewhere.
>>
>> Niels
>> On 16 Apr 2016 22:38, "Chen Bekor"  wrote:
>>
>>> is there a way to consume a kafka stream using flink with  a predefined
>>> rate limit (eg 5 events per second)
>>>
>>> we need this because we need to control some 3rd party api rate
>>> limitations so,  even if we have a much larger throughput potential, we
>>> must control the consumption rate in order not to overflow the API channel.
>>>
>>
>


Re: Flink 0.10.2 and Kafka 0.8.1

2016-04-18 Thread Robert Metzger
Yeah, that's the problem
Your Kafka 0.8.1 dependency [1] doesn't depend on kafka-clients. With the
explicit dependency definition, you are overriding the transitive Kafka
dependency from the kafka connector.

[1]
https://repo1.maven.org/maven2/org/apache/kafka/kafka_2.10/0.8.1/kafka_2.10-0.8.1.pom

On Mon, Apr 18, 2016 at 5:38 PM, Robert Schmidtke 
wrote:

> Turns out when I remove the explicit dependency on kafka_2.10 v. 0.8.1,
> then the dependencies are properly included. Guess there was a conflict
> somehow? I'll need to figure out if the rest of the code is fine with
> kafka_2.10 v. 0.8.2.0 as well.
>
> On Mon, Apr 18, 2016 at 4:32 PM, Robert Schmidtke 
> wrote:
>
>> Hi Robert,
>>
>> thanks for your offer. After playing around a bit I would like to take
>> it, if you have the time:
>> https://github.com/robert-schmidtke/HiBench/blob/flink-streaming/src/streambench/flinkbench/pom.xml
>>
>> I would guess the POM is similar to the one in the sample project, yet
>> when building it, the jar does not contain all indirect dependencies.
>>
>> Thanks!!
>>
>> Robert
>>
>> On Mon, Apr 18, 2016 at 4:07 PM, Robert Metzger 
>> wrote:
>>
>>> If you want you can share the pom of your project privately.
>>>
>>> On Mon, Apr 18, 2016 at 4:05 PM, Robert Schmidtke <
>>> ro.schmid...@gmail.com> wrote:
>>>
 You're right, it does not. When including it the resulting jar has the
 Kafka dependencies bundled. Now it's up to me to figure out the difference
 between the sample project and the one I'm working on.

 Thanks! Really quick help.

 Robert

 On Mon, Apr 18, 2016 at 4:02 PM, Robert Metzger 
 wrote:

> Hi,
> the problem with the posted project is that it doesn't have the Flink
> kafka connector as a dependency.
>
> On Mon, Apr 18, 2016 at 3:56 PM, Robert Schmidtke <
> ro.schmid...@gmail.com> wrote:
>
>> Hi Robert,
>>
>> thanks for your hints. I was not sure whether I was building a proper
>> fat jar, as I have not used the Flink Archetype for my project. However, 
>> I
>> have set up a sample project at
>>
>> https://github.com/robert-schmidtke/flink-test/
>>
>> which is nothing more than the Quickstart Archetype plus the
>> instructions to bundle the Kafka dependencies. The resulting jars (mvn
>> clean package -Pbuild-jar and mvn clean package) do not contain the
>> org/apache/kafka/** classes. Can you have a quick look at the pom? 
>> However,
>> as I said, it's verbatim Archetype+Flink Docs.
>>
>> Thanks a lot in advance!
>>
>> Robert
>>
>>
>>
>> On Mon, Apr 18, 2016 at 12:36 PM, Robert Metzger > > wrote:
>>
>>> Hi,
>>> did you check your user jar if it contains the Kafka classes?
>>> Are you building a fat jar? Are you manually excluding any
>>> dependencies?
>>>
>>> Flink's 0.10.2 Kafka connector depends on Kafka 0.8.2.0 [1] which in
>>> turn depends on kafka-clients 0.8.2.0 [2]. And the "kafka-clients"
>>> dependency also contains the org.apache.kafka.common.Node class (the
>>> LegacyFetcher needs that class).
>>>
>>>
>>> [1]
>>> https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/0.10.2/flink-connector-kafka-0.10.2.pom
>>> [2]
>>> https://repo1.maven.org/maven2/org/apache/kafka/kafka_2.10/0.8.2.0/kafka_2.10-0.8.2.0.pom
>>>
>>> On Mon, Apr 18, 2016 at 7:42 AM, Balaji Rajagopalan <
>>> balaji.rajagopa...@olacabs.com> wrote:
>>>
 I had fought with 0.8.0.2 kafka and flink 0.10.2 scala version
 2.11, was never able to get it working confounded with
 noclassdeffounderror, moved to flink 1.0.0 with kafka 0.8.0.2  scala
 version 2.11 things worked for me, if moving to flink 1.0.0 is an 
 option
 for you do so.

 balaji

 On Mon, Apr 18, 2016 at 3:19 AM, Robert Schmidtke <
 ro.schmid...@gmail.com> wrote:

> Hi everyone,
>
> I have a Kafka cluster running on version 0.8.1, hence I'm using
> the FlinkKafkaConsumer081. When running my program, I saw a
> NoClassDefFoundError for org.apache.kafka.common.Node. So I packaged 
> my
> binaries according to
> https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution,
> however I'm still seeing the error.
>
> I played around a bit and it turns out I have to package
> kafka-clients v. 0.8.2.0 instead of kafka_2.10 v. 0.8.1 with my 
> program. Is
> there an error in the documentation or have I not figured out 
> something
> properly?
>
> Thanks!
> Robert
>
> --

Re: Task Slots and Heterogeneous Tasks

2016-04-18 Thread Till Rohrmann
No, it's not possible at the moment to deploy more than one task of the
same kind to a single slot.

On Fri, Apr 15, 2016 at 8:08 PM, Maxim  wrote:

> I see. Sharing slots among subtasks makes sense.
> So by default a subtask that executes a map function that calls a  high
> latency external service is going to be put in a separate slot. Is it
> possible to indicate to the Flink that subtasks of a particular operation
> can be collocated in a slot, as such subtasks are IO bound and require no
> shared memory?
>
> On Fri, Apr 15, 2016 at 5:31 AM, Till Rohrmann 
> wrote:
>
>> Hi Maxim,
>>
>> concerning your second part of the question: The managed memory of a
>> TaskManager is first split among the available slots. Each slot portion of
>> the managed memory is again split among all operators which require managed
>> memory when a pipeline is executed. In contrast to that, the heap memory is
>> shared by all concurrently running tasks.
>>
>> Cheers,
>> Till
>>
>> On Fri, Apr 15, 2016 at 1:58 PM, Stephan Ewen  wrote:
>>
>>> Hi!
>>>
>>> Slots are usually shared between the heavy and non heavy tasks, for that
>>> reason.
>>> Have a look at these resources:
>>> https://ci.apache.org/projects/flink/flink-docs-master/concepts/concepts.html#workers-slots-resources
>>>
>>> Let us know if you have more questions!
>>>
>>> Greetings,
>>> Stephan
>>>
>>>
>>> On Fri, Apr 15, 2016 at 1:20 AM, Maxim  wrote:
>>>
 I'm trying to understand a behavior of Flink in case of heterogeneous
 operations. For example in our pipelines some operation might accumulate
 large windows while another performs high latency calls to external
 services. Obviously the former needs task slot with a large memory
 allocation, while the latter needs no memory but a high degree of
 parallelism.

 Is any way to have different slot types and control allocation of
 operations to them? May be is there another way to ensure good hardware
 utilization?

 Also from the documentation it is not clear if memory of a TaskManager
 is shared across all tasks running on it or each task gets its quota. Could
 you clarify it?

 Thanks,

 Maxim.



>>>
>>
>


Re: Flink HDFS State Backend

2016-04-18 Thread Jason Brelloch
Yep, that was the problem.

Thanks!

On Mon, Apr 18, 2016 at 11:36 AM, Aljoscha Krettek 
wrote:

> Hi,
> could it be that your state is very small? In that case the state is not
> actually stored in HDFS but on the job manager because writing it to HDFS
> and storing a handle to that in the JobManager would be more expensive.
>
> Cheers,
> Aljoscha
>
> On Mon, 18 Apr 2016 at 17:20 Jason Brelloch  wrote:
>
>> Hi everyone,
>>
>> I am trying to set up flink with a hdfs state backend.  I configured
>> state.backend and state.backend.fs.checkpointdir parameters in the
>> flink-conf.yaml.  I run the flink task and the checkpoint directories are
>> created in hdfs, so it appears it can connect and talk to hdfs just fine.
>> Unfortunately no files are ever created in the hdfs directory.  I checked
>> that the state is being saved and restored from the task manager memory and
>> that works fine, it just never writes to hdfs.
>>
>> Am I missing a step?  Do I need to do anything to force a write to hdfs?
>> Does the state variable have to be a particular type to work with hdfs?
>>
>> This is what my snapshot functions look like:
>>
>>   override def restoreState (rState: 
>> scala.collection.mutable.HashMap[String, String]): Unit = {state = 
>> rState  }  override def snapshotState(checkpointId: Long, 
>> checkpointTimestamp: Long): scala.collection.mutable.HashMap[String, String] 
>> = {state  }
>>
>>
>> Thanks!
>> -Jason
>>
>> P.S.  I am running Flink v1.0.1, Hadoop 2.7.1, and Scala 2.11
>>
>


Re: Flink 0.10.2 and Kafka 0.8.1

2016-04-18 Thread Robert Schmidtke
Turns out when I remove the explicit dependency on kafka_2.10 v. 0.8.1,
then the dependencies are properly included. Guess there was a conflict
somehow? I'll need to figure out if the rest of the code is fine with
kafka_2.10 v. 0.8.2.0 as well.

On Mon, Apr 18, 2016 at 4:32 PM, Robert Schmidtke 
wrote:

> Hi Robert,
>
> thanks for your offer. After playing around a bit I would like to take it,
> if you have the time:
> https://github.com/robert-schmidtke/HiBench/blob/flink-streaming/src/streambench/flinkbench/pom.xml
>
> I would guess the POM is similar to the one in the sample project, yet
> when building it, the jar does not contain all indirect dependencies.
>
> Thanks!!
>
> Robert
>
> On Mon, Apr 18, 2016 at 4:07 PM, Robert Metzger 
> wrote:
>
>> If you want you can share the pom of your project privately.
>>
>> On Mon, Apr 18, 2016 at 4:05 PM, Robert Schmidtke > > wrote:
>>
>>> You're right, it does not. When including it the resulting jar has the
>>> Kafka dependencies bundled. Now it's up to me to figure out the difference
>>> between the sample project and the one I'm working on.
>>>
>>> Thanks! Really quick help.
>>>
>>> Robert
>>>
>>> On Mon, Apr 18, 2016 at 4:02 PM, Robert Metzger 
>>> wrote:
>>>
 Hi,
 the problem with the posted project is that it doesn't have the Flink
 kafka connector as a dependency.

 On Mon, Apr 18, 2016 at 3:56 PM, Robert Schmidtke <
 ro.schmid...@gmail.com> wrote:

> Hi Robert,
>
> thanks for your hints. I was not sure whether I was building a proper
> fat jar, as I have not used the Flink Archetype for my project. However, I
> have set up a sample project at
>
> https://github.com/robert-schmidtke/flink-test/
>
> which is nothing more than the Quickstart Archetype plus the
> instructions to bundle the Kafka dependencies. The resulting jars (mvn
> clean package -Pbuild-jar and mvn clean package) do not contain the
> org/apache/kafka/** classes. Can you have a quick look at the pom? 
> However,
> as I said, it's verbatim Archetype+Flink Docs.
>
> Thanks a lot in advance!
>
> Robert
>
>
>
> On Mon, Apr 18, 2016 at 12:36 PM, Robert Metzger 
> wrote:
>
>> Hi,
>> did you check your user jar if it contains the Kafka classes?
>> Are you building a fat jar? Are you manually excluding any
>> dependencies?
>>
>> Flink's 0.10.2 Kafka connector depends on Kafka 0.8.2.0 [1] which in
>> turn depends on kafka-clients 0.8.2.0 [2]. And the "kafka-clients"
>> dependency also contains the org.apache.kafka.common.Node class (the
>> LegacyFetcher needs that class).
>>
>>
>> [1]
>> https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/0.10.2/flink-connector-kafka-0.10.2.pom
>> [2]
>> https://repo1.maven.org/maven2/org/apache/kafka/kafka_2.10/0.8.2.0/kafka_2.10-0.8.2.0.pom
>>
>> On Mon, Apr 18, 2016 at 7:42 AM, Balaji Rajagopalan <
>> balaji.rajagopa...@olacabs.com> wrote:
>>
>>> I had fought with 0.8.0.2 kafka and flink 0.10.2 scala version 2.11,
>>> was never able to get it working confounded with noclassdeffounderror,
>>> moved to flink 1.0.0 with kafka 0.8.0.2  scala version 2.11 things 
>>> worked
>>> for me, if moving to flink 1.0.0 is an option for you do so.
>>>
>>> balaji
>>>
>>> On Mon, Apr 18, 2016 at 3:19 AM, Robert Schmidtke <
>>> ro.schmid...@gmail.com> wrote:
>>>
 Hi everyone,

 I have a Kafka cluster running on version 0.8.1, hence I'm using
 the FlinkKafkaConsumer081. When running my program, I saw a
 NoClassDefFoundError for org.apache.kafka.common.Node. So I packaged my
 binaries according to
 https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution,
 however I'm still seeing the error.

 I played around a bit and it turns out I have to package
 kafka-clients v. 0.8.2.0 instead of kafka_2.10 v. 0.8.1 with my 
 program. Is
 there an error in the documentation or have I not figured out something
 properly?

 Thanks!
 Robert

 --
 My GPG Key ID: 336E2680

>>>
>>>
>>
>
>
> --
> My GPG Key ID: 336E2680
>


>>>
>>>
>>> --
>>> My GPG Key ID: 336E2680
>>>
>>
>>
>
>
> --
> My GPG Key ID: 336E2680
>



-- 
My GPG Key ID: 336E2680


Re: Flink HDFS State Backend

2016-04-18 Thread Aljoscha Krettek
Hi,
could it be that your state is very small? In that case the state is not
actually stored in HDFS but on the job manager because writing it to HDFS
and storing a handle to that in the JobManager would be more expensive.

Cheers,
Aljoscha

On Mon, 18 Apr 2016 at 17:20 Jason Brelloch  wrote:

> Hi everyone,
>
> I am trying to set up flink with a hdfs state backend.  I configured
> state.backend and state.backend.fs.checkpointdir parameters in the
> flink-conf.yaml.  I run the flink task and the checkpoint directories are
> created in hdfs, so it appears it can connect and talk to hdfs just fine.
> Unfortunately no files are ever created in the hdfs directory.  I checked
> that the state is being saved and restored from the task manager memory and
> that works fine, it just never writes to hdfs.
>
> Am I missing a step?  Do I need to do anything to force a write to hdfs?
> Does the state variable have to be a particular type to work with hdfs?
>
> This is what my snapshot functions look like:
>
>   override def restoreState (rState: scala.collection.mutable.HashMap[String, 
> String]): Unit = {state = rState  }  override def 
> snapshotState(checkpointId: Long, checkpointTimestamp: Long): 
> scala.collection.mutable.HashMap[String, String] = {state  }
>
>
> Thanks!
> -Jason
>
> P.S.  I am running Flink v1.0.1, Hadoop 2.7.1, and Scala 2.11
>


Flink HDFS State Backend

2016-04-18 Thread Jason Brelloch
Hi everyone,

I am trying to set up flink with a hdfs state backend.  I configured
state.backend and state.backend.fs.checkpointdir parameters in the
flink-conf.yaml.  I run the flink task and the checkpoint directories are
created in hdfs, so it appears it can connect and talk to hdfs just fine.
Unfortunately no files are ever created in the hdfs directory.  I checked
that the state is being saved and restored from the task manager memory and
that works fine, it just never writes to hdfs.

Am I missing a step?  Do I need to do anything to force a write to hdfs?
Does the state variable have to be a particular type to work with hdfs?

This is what my snapshot functions look like:

  override def restoreState (rState:
scala.collection.mutable.HashMap[String, String]): Unit = {state =
rState  }  override def snapshotState(checkpointId: Long,
checkpointTimestamp: Long): scala.collection.mutable.HashMap[String,
String] = {state  }


Thanks!
-Jason

P.S.  I am running Flink v1.0.1, Hadoop 2.7.1, and Scala 2.11


Flink + Kafka + Scalabuff issue

2016-04-18 Thread Alexander Gryzlov
Hello,



Has anyone tried using ScalaBuff (https://github.com/SandroGrzicic/ScalaBuff)
with Flink? We’re trying to consume Protobuf messages from Kafka 0.8 and
have hit a performance issue. We run this code:



https://gist.github.com/clayrat/05ac17523fcaa52fcc5165d9edb406b8 (where Foo
is pre-generated by ScalaBuff compiler)



and get these numbers (whereas the topic produces 20K msg/sec on the
average):

During the last 153 ms, we received 997 elements. That's 6516.339869281046
elements/second/core.
During the last 214 ms, we received 998 elements. That's 4663.551401869159
elements/second/core.
During the last 223 ms, we received 1000 elements. That's 4484.304932735426
elements/second/core.
During the last 282 ms, we received 1000 elements. That's
3546.0992907801415 elements/second/core.
During the last 378 ms, we received 1001 elements. That's
2648.1481481481483 elements/second/core.
During the last 544 ms, we received 999 elements. That's 1836.3970588235293
elements/second/core.
During the last 434 ms, we received 999 elements. That's 2301.84331797235
elements/second/core.
During the last 432 ms, we received 1000 elements. That's 2314.814814814815
elements/second/core.
During the last 400 ms, we received 991 elements. That's 2477.5
elements/second/core.
During the last 296 ms, we received 998 elements. That's 3371.6216216216217
elements/second/core.
During the last 561 ms, we received 1000 elements. That's
1782.5311942959001 elements/second/core.

...



The number/sec/core keeps falling until it stabilizes at ~5-10
elem/sec after a few hours.



Looking with JMX at the app gets us (first number is RUNNING, second is
MONITOR):

SimpleConsumer - Source: Custom Source -> Flat Map - broker-1  10.8% 89.1%
SimpleConsumer - Source: Custom Source -> Flat Map - broker-2  14.0% 85.9%
SimpleConsumer - Source: Custom Source -> Flat Map - broker-3  13.6% 86.3%
SimpleConsumer - Source: Custom Source -> Flat Map - broker-4  12.6% 87.3%

SimpleConsumer - Source: Custom Source -> Flat Map - broker-10 12.2% 87.7%
SimpleConsumer - Source: Custom Source -> Flat Map - broker-11 15.6% 84.3%
SimpleConsumer - Source: Custom Source -> Flat Map - broker-12 11.6% 88.3%
SimpleConsumer - Source: Custom Source -> Flat Map - broker-13  9.7% 90.2%
​

If the schema is modified to simply return an Array[Byte], we get a proper
speed of ~20K/sec and RUNNING is 100% on all broker threads.


>From a thread dump, it’s clear that only a single consumer thread works at
a time, while the rest are locked by sourceContext.getCheckpointLock() at
https://github.com/apache/flink/blob/release-1.0/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java#L663

Alex


Re: Flink 0.10.2 and Kafka 0.8.1

2016-04-18 Thread Robert Schmidtke
Hi Robert,

thanks for your offer. After playing around a bit I would like to take it,
if you have the time:
https://github.com/robert-schmidtke/HiBench/blob/flink-streaming/src/streambench/flinkbench/pom.xml

I would guess the POM is similar to the one in the sample project, yet when
building it, the jar does not contain all indirect dependencies.

Thanks!!

Robert

On Mon, Apr 18, 2016 at 4:07 PM, Robert Metzger  wrote:

> If you want you can share the pom of your project privately.
>
> On Mon, Apr 18, 2016 at 4:05 PM, Robert Schmidtke 
> wrote:
>
>> You're right, it does not. When including it the resulting jar has the
>> Kafka dependencies bundled. Now it's up to me to figure out the difference
>> between the sample project and the one I'm working on.
>>
>> Thanks! Really quick help.
>>
>> Robert
>>
>> On Mon, Apr 18, 2016 at 4:02 PM, Robert Metzger 
>> wrote:
>>
>>> Hi,
>>> the problem with the posted project is that it doesn't have the Flink
>>> kafka connector as a dependency.
>>>
>>> On Mon, Apr 18, 2016 at 3:56 PM, Robert Schmidtke <
>>> ro.schmid...@gmail.com> wrote:
>>>
 Hi Robert,

 thanks for your hints. I was not sure whether I was building a proper
 fat jar, as I have not used the Flink Archetype for my project. However, I
 have set up a sample project at

 https://github.com/robert-schmidtke/flink-test/

 which is nothing more than the Quickstart Archetype plus the
 instructions to bundle the Kafka dependencies. The resulting jars (mvn
 clean package -Pbuild-jar and mvn clean package) do not contain the
 org/apache/kafka/** classes. Can you have a quick look at the pom? However,
 as I said, it's verbatim Archetype+Flink Docs.

 Thanks a lot in advance!

 Robert



 On Mon, Apr 18, 2016 at 12:36 PM, Robert Metzger 
 wrote:

> Hi,
> did you check your user jar if it contains the Kafka classes?
> Are you building a fat jar? Are you manually excluding any
> dependencies?
>
> Flink's 0.10.2 Kafka connector depends on Kafka 0.8.2.0 [1] which in
> turn depends on kafka-clients 0.8.2.0 [2]. And the "kafka-clients"
> dependency also contains the org.apache.kafka.common.Node class (the
> LegacyFetcher needs that class).
>
>
> [1]
> https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/0.10.2/flink-connector-kafka-0.10.2.pom
> [2]
> https://repo1.maven.org/maven2/org/apache/kafka/kafka_2.10/0.8.2.0/kafka_2.10-0.8.2.0.pom
>
> On Mon, Apr 18, 2016 at 7:42 AM, Balaji Rajagopalan <
> balaji.rajagopa...@olacabs.com> wrote:
>
>> I had fought with 0.8.0.2 kafka and flink 0.10.2 scala version 2.11,
>> was never able to get it working confounded with noclassdeffounderror,
>> moved to flink 1.0.0 with kafka 0.8.0.2  scala version 2.11 things worked
>> for me, if moving to flink 1.0.0 is an option for you do so.
>>
>> balaji
>>
>> On Mon, Apr 18, 2016 at 3:19 AM, Robert Schmidtke <
>> ro.schmid...@gmail.com> wrote:
>>
>>> Hi everyone,
>>>
>>> I have a Kafka cluster running on version 0.8.1, hence I'm using the
>>> FlinkKafkaConsumer081. When running my program, I saw a
>>> NoClassDefFoundError for org.apache.kafka.common.Node. So I packaged my
>>> binaries according to
>>> https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution,
>>> however I'm still seeing the error.
>>>
>>> I played around a bit and it turns out I have to package
>>> kafka-clients v. 0.8.2.0 instead of kafka_2.10 v. 0.8.1 with my 
>>> program. Is
>>> there an error in the documentation or have I not figured out something
>>> properly?
>>>
>>> Thanks!
>>> Robert
>>>
>>> --
>>> My GPG Key ID: 336E2680
>>>
>>
>>
>


 --
 My GPG Key ID: 336E2680

>>>
>>>
>>
>>
>> --
>> My GPG Key ID: 336E2680
>>
>
>


-- 
My GPG Key ID: 336E2680


Re: Flink 0.10.2 and Kafka 0.8.1

2016-04-18 Thread Robert Metzger
If you want you can share the pom of your project privately.

On Mon, Apr 18, 2016 at 4:05 PM, Robert Schmidtke 
wrote:

> You're right, it does not. When including it the resulting jar has the
> Kafka dependencies bundled. Now it's up to me to figure out the difference
> between the sample project and the one I'm working on.
>
> Thanks! Really quick help.
>
> Robert
>
> On Mon, Apr 18, 2016 at 4:02 PM, Robert Metzger 
> wrote:
>
>> Hi,
>> the problem with the posted project is that it doesn't have the Flink
>> kafka connector as a dependency.
>>
>> On Mon, Apr 18, 2016 at 3:56 PM, Robert Schmidtke > > wrote:
>>
>>> Hi Robert,
>>>
>>> thanks for your hints. I was not sure whether I was building a proper
>>> fat jar, as I have not used the Flink Archetype for my project. However, I
>>> have set up a sample project at
>>>
>>> https://github.com/robert-schmidtke/flink-test/
>>>
>>> which is nothing more than the Quickstart Archetype plus the
>>> instructions to bundle the Kafka dependencies. The resulting jars (mvn
>>> clean package -Pbuild-jar and mvn clean package) do not contain the
>>> org/apache/kafka/** classes. Can you have a quick look at the pom? However,
>>> as I said, it's verbatim Archetype+Flink Docs.
>>>
>>> Thanks a lot in advance!
>>>
>>> Robert
>>>
>>>
>>>
>>> On Mon, Apr 18, 2016 at 12:36 PM, Robert Metzger 
>>> wrote:
>>>
 Hi,
 did you check your user jar if it contains the Kafka classes?
 Are you building a fat jar? Are you manually excluding any dependencies?

 Flink's 0.10.2 Kafka connector depends on Kafka 0.8.2.0 [1] which in
 turn depends on kafka-clients 0.8.2.0 [2]. And the "kafka-clients"
 dependency also contains the org.apache.kafka.common.Node class (the
 LegacyFetcher needs that class).


 [1]
 https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/0.10.2/flink-connector-kafka-0.10.2.pom
 [2]
 https://repo1.maven.org/maven2/org/apache/kafka/kafka_2.10/0.8.2.0/kafka_2.10-0.8.2.0.pom

 On Mon, Apr 18, 2016 at 7:42 AM, Balaji Rajagopalan <
 balaji.rajagopa...@olacabs.com> wrote:

> I had fought with 0.8.0.2 kafka and flink 0.10.2 scala version 2.11,
> was never able to get it working confounded with noclassdeffounderror,
> moved to flink 1.0.0 with kafka 0.8.0.2  scala version 2.11 things worked
> for me, if moving to flink 1.0.0 is an option for you do so.
>
> balaji
>
> On Mon, Apr 18, 2016 at 3:19 AM, Robert Schmidtke <
> ro.schmid...@gmail.com> wrote:
>
>> Hi everyone,
>>
>> I have a Kafka cluster running on version 0.8.1, hence I'm using the
>> FlinkKafkaConsumer081. When running my program, I saw a
>> NoClassDefFoundError for org.apache.kafka.common.Node. So I packaged my
>> binaries according to
>> https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution,
>> however I'm still seeing the error.
>>
>> I played around a bit and it turns out I have to package
>> kafka-clients v. 0.8.2.0 instead of kafka_2.10 v. 0.8.1 with my program. 
>> Is
>> there an error in the documentation or have I not figured out something
>> properly?
>>
>> Thanks!
>> Robert
>>
>> --
>> My GPG Key ID: 336E2680
>>
>
>

>>>
>>>
>>> --
>>> My GPG Key ID: 336E2680
>>>
>>
>>
>
>
> --
> My GPG Key ID: 336E2680
>


Re: Flink 0.10.2 and Kafka 0.8.1

2016-04-18 Thread Robert Schmidtke
You're right, it does not. When including it the resulting jar has the
Kafka dependencies bundled. Now it's up to me to figure out the difference
between the sample project and the one I'm working on.

Thanks! Really quick help.

Robert

On Mon, Apr 18, 2016 at 4:02 PM, Robert Metzger  wrote:

> Hi,
> the problem with the posted project is that it doesn't have the Flink
> kafka connector as a dependency.
>
> On Mon, Apr 18, 2016 at 3:56 PM, Robert Schmidtke 
> wrote:
>
>> Hi Robert,
>>
>> thanks for your hints. I was not sure whether I was building a proper fat
>> jar, as I have not used the Flink Archetype for my project. However, I have
>> set up a sample project at
>>
>> https://github.com/robert-schmidtke/flink-test/
>>
>> which is nothing more than the Quickstart Archetype plus the instructions
>> to bundle the Kafka dependencies. The resulting jars (mvn clean package
>> -Pbuild-jar and mvn clean package) do not contain the org/apache/kafka/**
>> classes. Can you have a quick look at the pom? However, as I said, it's
>> verbatim Archetype+Flink Docs.
>>
>> Thanks a lot in advance!
>>
>> Robert
>>
>>
>>
>> On Mon, Apr 18, 2016 at 12:36 PM, Robert Metzger 
>> wrote:
>>
>>> Hi,
>>> did you check your user jar if it contains the Kafka classes?
>>> Are you building a fat jar? Are you manually excluding any dependencies?
>>>
>>> Flink's 0.10.2 Kafka connector depends on Kafka 0.8.2.0 [1] which in
>>> turn depends on kafka-clients 0.8.2.0 [2]. And the "kafka-clients"
>>> dependency also contains the org.apache.kafka.common.Node class (the
>>> LegacyFetcher needs that class).
>>>
>>>
>>> [1]
>>> https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/0.10.2/flink-connector-kafka-0.10.2.pom
>>> [2]
>>> https://repo1.maven.org/maven2/org/apache/kafka/kafka_2.10/0.8.2.0/kafka_2.10-0.8.2.0.pom
>>>
>>> On Mon, Apr 18, 2016 at 7:42 AM, Balaji Rajagopalan <
>>> balaji.rajagopa...@olacabs.com> wrote:
>>>
 I had fought with 0.8.0.2 kafka and flink 0.10.2 scala version 2.11,
 was never able to get it working confounded with noclassdeffounderror,
 moved to flink 1.0.0 with kafka 0.8.0.2  scala version 2.11 things worked
 for me, if moving to flink 1.0.0 is an option for you do so.

 balaji

 On Mon, Apr 18, 2016 at 3:19 AM, Robert Schmidtke <
 ro.schmid...@gmail.com> wrote:

> Hi everyone,
>
> I have a Kafka cluster running on version 0.8.1, hence I'm using the
> FlinkKafkaConsumer081. When running my program, I saw a
> NoClassDefFoundError for org.apache.kafka.common.Node. So I packaged my
> binaries according to
> https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution,
> however I'm still seeing the error.
>
> I played around a bit and it turns out I have to package kafka-clients
> v. 0.8.2.0 instead of kafka_2.10 v. 0.8.1 with my program. Is there an
> error in the documentation or have I not figured out something properly?
>
> Thanks!
> Robert
>
> --
> My GPG Key ID: 336E2680
>


>>>
>>
>>
>> --
>> My GPG Key ID: 336E2680
>>
>
>


-- 
My GPG Key ID: 336E2680


Re: Flink 0.10.2 and Kafka 0.8.1

2016-04-18 Thread Robert Metzger
Hi,
the problem with the posted project is that it doesn't have the Flink kafka
connector as a dependency.

On Mon, Apr 18, 2016 at 3:56 PM, Robert Schmidtke 
wrote:

> Hi Robert,
>
> thanks for your hints. I was not sure whether I was building a proper fat
> jar, as I have not used the Flink Archetype for my project. However, I have
> set up a sample project at
>
> https://github.com/robert-schmidtke/flink-test/
>
> which is nothing more than the Quickstart Archetype plus the instructions
> to bundle the Kafka dependencies. The resulting jars (mvn clean package
> -Pbuild-jar and mvn clean package) do not contain the org/apache/kafka/**
> classes. Can you have a quick look at the pom? However, as I said, it's
> verbatim Archetype+Flink Docs.
>
> Thanks a lot in advance!
>
> Robert
>
>
>
> On Mon, Apr 18, 2016 at 12:36 PM, Robert Metzger 
> wrote:
>
>> Hi,
>> did you check your user jar if it contains the Kafka classes?
>> Are you building a fat jar? Are you manually excluding any dependencies?
>>
>> Flink's 0.10.2 Kafka connector depends on Kafka 0.8.2.0 [1] which in turn
>> depends on kafka-clients 0.8.2.0 [2]. And the "kafka-clients" dependency
>> also contains the org.apache.kafka.common.Node class (the LegacyFetcher
>> needs that class).
>>
>>
>> [1]
>> https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/0.10.2/flink-connector-kafka-0.10.2.pom
>> [2]
>> https://repo1.maven.org/maven2/org/apache/kafka/kafka_2.10/0.8.2.0/kafka_2.10-0.8.2.0.pom
>>
>> On Mon, Apr 18, 2016 at 7:42 AM, Balaji Rajagopalan <
>> balaji.rajagopa...@olacabs.com> wrote:
>>
>>> I had fought with 0.8.0.2 kafka and flink 0.10.2 scala version 2.11, was
>>> never able to get it working confounded with noclassdeffounderror, moved to
>>> flink 1.0.0 with kafka 0.8.0.2  scala version 2.11 things worked for me, if
>>> moving to flink 1.0.0 is an option for you do so.
>>>
>>> balaji
>>>
>>> On Mon, Apr 18, 2016 at 3:19 AM, Robert Schmidtke <
>>> ro.schmid...@gmail.com> wrote:
>>>
 Hi everyone,

 I have a Kafka cluster running on version 0.8.1, hence I'm using the
 FlinkKafkaConsumer081. When running my program, I saw a
 NoClassDefFoundError for org.apache.kafka.common.Node. So I packaged my
 binaries according to
 https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution,
 however I'm still seeing the error.

 I played around a bit and it turns out I have to package kafka-clients
 v. 0.8.2.0 instead of kafka_2.10 v. 0.8.1 with my program. Is there an
 error in the documentation or have I not figured out something properly?

 Thanks!
 Robert

 --
 My GPG Key ID: 336E2680

>>>
>>>
>>
>
>
> --
> My GPG Key ID: 336E2680
>


Re: Flink 0.10.2 and Kafka 0.8.1

2016-04-18 Thread Robert Schmidtke
Hi Robert,

thanks for your hints. I was not sure whether I was building a proper fat
jar, as I have not used the Flink Archetype for my project. However, I have
set up a sample project at

https://github.com/robert-schmidtke/flink-test/

which is nothing more than the Quickstart Archetype plus the instructions
to bundle the Kafka dependencies. The resulting jars (mvn clean package
-Pbuild-jar and mvn clean package) do not contain the org/apache/kafka/**
classes. Can you have a quick look at the pom? However, as I said, it's
verbatim Archetype+Flink Docs.

Thanks a lot in advance!

Robert



On Mon, Apr 18, 2016 at 12:36 PM, Robert Metzger 
wrote:

> Hi,
> did you check your user jar if it contains the Kafka classes?
> Are you building a fat jar? Are you manually excluding any dependencies?
>
> Flink's 0.10.2 Kafka connector depends on Kafka 0.8.2.0 [1] which in turn
> depends on kafka-clients 0.8.2.0 [2]. And the "kafka-clients" dependency
> also contains the org.apache.kafka.common.Node class (the LegacyFetcher
> needs that class).
>
>
> [1]
> https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/0.10.2/flink-connector-kafka-0.10.2.pom
> [2]
> https://repo1.maven.org/maven2/org/apache/kafka/kafka_2.10/0.8.2.0/kafka_2.10-0.8.2.0.pom
>
> On Mon, Apr 18, 2016 at 7:42 AM, Balaji Rajagopalan <
> balaji.rajagopa...@olacabs.com> wrote:
>
>> I had fought with 0.8.0.2 kafka and flink 0.10.2 scala version 2.11, was
>> never able to get it working confounded with noclassdeffounderror, moved to
>> flink 1.0.0 with kafka 0.8.0.2  scala version 2.11 things worked for me, if
>> moving to flink 1.0.0 is an option for you do so.
>>
>> balaji
>>
>> On Mon, Apr 18, 2016 at 3:19 AM, Robert Schmidtke > > wrote:
>>
>>> Hi everyone,
>>>
>>> I have a Kafka cluster running on version 0.8.1, hence I'm using the
>>> FlinkKafkaConsumer081. When running my program, I saw a
>>> NoClassDefFoundError for org.apache.kafka.common.Node. So I packaged my
>>> binaries according to
>>> https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution,
>>> however I'm still seeing the error.
>>>
>>> I played around a bit and it turns out I have to package kafka-clients
>>> v. 0.8.2.0 instead of kafka_2.10 v. 0.8.1 with my program. Is there an
>>> error in the documentation or have I not figured out something properly?
>>>
>>> Thanks!
>>> Robert
>>>
>>> --
>>> My GPG Key ID: 336E2680
>>>
>>
>>
>


-- 
My GPG Key ID: 336E2680


Re: Configuring task slots and parallelism for single node Maven executed

2016-04-18 Thread Prez Cannady
Thank you both.  Will let you guys know how it works out.

Prez Cannady  
p: 617 500 3378  
e: revp...@opencorrelate.org   
GH: https://github.com/opencorrelate   
LI: https://www.linkedin.com/in/revprez   









> On Apr 18, 2016, at 3:48 AM, Till Rohrmann  wrote:
> 
> Hi Prez,
> 
> the configuration setting taskmanager.numberOfTaskSlots says with how many 
> task slots a TaskManager will be started. As a rough rule of thumb, set this 
> value to the number of cores of the machine the TM is running on. This this 
> link [1] for further information. The configuration value parallelism.default 
> is the default parallelism with which a program will be executed if the user 
> didn’t specify it via the submission tool or from within the program.
> You can configure the parallelism programmatically by calling setParallelism 
> on the ExecutionEnvironment. The GlobalConfiguration approach won’t work in a 
> distributed setting.
> see 1.
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/concepts/concepts.html#workers-slots-resources
>  
> 
> Cheers,
> Till
> 
> 
> On Mon, Apr 18, 2016 at 6:55 AM, Balaji Rajagopalan 
> > 
> wrote:
> Answered based on my understanding. 
> 
> On Mon, Apr 18, 2016 at 8:12 AM, Prez Cannady  > wrote:
> Some background.
> 
> I’m running Flink application on a single machine, instrumented by Spring 
> Boot and launched via the Maven Spring Boot plugin. Basically, I’m trying to 
> figure out how much I can squeeze out of a single node processing my task 
> before committing to a cluster solution.
> 
> Couple of questions.
> 
> I assume the configuration options taskmanager.numberOfTaskSlots and 
> parallelism.default pertain to division of work on a single node. Am I 
> correct? You will running with single instance of task manager say if you are 
> running in 4 core machine, you can set the parallelism = 4 
> Is there a way to configure these options programmatically instead of the 
> configuration YAML? Or some Maven tooling that can ingest a properly 
> formatted Flink config? For the record, I’m currently trying 
> GlobalConfigeration.getConfiguration.setInteger(“ name>”,). I am also going to try supplying them as 
> properties in the pom. I’m preparing some tests to see if either of these do 
> as I expect, but thought I’d ask in case I’m heading down a rabbit hole.
>   I have been using GlobalConfiguration with no issues, but here is one thing 
> you have to aware of, in clustered environment, you will have to copy over 
> the yaml file in all the nodes, for example I read the file from 
> /usr/share/flink/conf and I have sure this file is available in master node 
> and task nodes as well.  Why do you want to injest the config from maven 
> tool, you can do this main routine in our application code.  
> I figure task slots is limited to the number of processors/cores/whatever 
> available (and the JVM can get at). Is this accurate?
> Any feedback would be appreciated.
> 
> 
> Prez Cannady  
> p: 617 500 3378   
> e: revp...@opencorrelate.org   
> GH: https://github.com/opencorrelate   
> LI: https://www.linkedin.com/in/revprez  
>  
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 



Re: Compression - AvroOutputFormat and over network ?

2016-04-18 Thread Ufuk Celebi
Hey Tarandeep,

regarding the network part: not possible at the moment. It's pretty
straight forward to add support for it, but no one ever got around to
actually implementing it. If you would like to contribute, I am happy
to give some hints about which parts of the system would need to be
modified.

– Ufuk


On Mon, Apr 18, 2016 at 12:56 PM, Tarandeep Singh  wrote:
> Hi,
>
> How can I set compression for AvroOutputFormat when writing files on HDFS?
> Also, can we set compression for intermediate data that is sent over network
> (from map to reduce phase) ?
>
> Thanks,
> Tarandeep


Re: Flink ML 1.0.0 - Saving and Loading Models to Score a Single Feature Vector

2016-04-18 Thread KirstiLaurila
Answering to myself if someone is having similar problems. So already saved
matrices can be read and used in als like this:


// Setup the ALS learnerd
val als = ALS()

val users  = env.readFile(new
TypeSerializerInputFormat[Factors](createTypeInformation[Factors]),"path")
val items = env.readFile(new
TypeSerializerInputFormat[Factors](createTypeInformation[Factors]),"path")


als.factorsOption = Option(users,items)

After this, one can use als for prediction.





--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-ML-1-0-0-Saving-and-Loading-Models-to-Score-a-Single-Feature-Vector-tp5766p6167.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Compression - AvroOutputFormat and over network ?

2016-04-18 Thread Tarandeep Singh
Hi,

How can I set compression for AvroOutputFormat when writing files on HDFS?
Also, can we set compression for intermediate data that is sent over
network (from map to reduce phase) ?

Thanks,
Tarandeep


Re: Flink 0.10.2 and Kafka 0.8.1

2016-04-18 Thread Robert Metzger
Hi,
did you check your user jar if it contains the Kafka classes?
Are you building a fat jar? Are you manually excluding any dependencies?

Flink's 0.10.2 Kafka connector depends on Kafka 0.8.2.0 [1] which in turn
depends on kafka-clients 0.8.2.0 [2]. And the "kafka-clients" dependency
also contains the org.apache.kafka.common.Node class (the LegacyFetcher
needs that class).


[1]
https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/0.10.2/flink-connector-kafka-0.10.2.pom
[2]
https://repo1.maven.org/maven2/org/apache/kafka/kafka_2.10/0.8.2.0/kafka_2.10-0.8.2.0.pom

On Mon, Apr 18, 2016 at 7:42 AM, Balaji Rajagopalan <
balaji.rajagopa...@olacabs.com> wrote:

> I had fought with 0.8.0.2 kafka and flink 0.10.2 scala version 2.11, was
> never able to get it working confounded with noclassdeffounderror, moved to
> flink 1.0.0 with kafka 0.8.0.2  scala version 2.11 things worked for me, if
> moving to flink 1.0.0 is an option for you do so.
>
> balaji
>
> On Mon, Apr 18, 2016 at 3:19 AM, Robert Schmidtke 
> wrote:
>
>> Hi everyone,
>>
>> I have a Kafka cluster running on version 0.8.1, hence I'm using the
>> FlinkKafkaConsumer081. When running my program, I saw a
>> NoClassDefFoundError for org.apache.kafka.common.Node. So I packaged my
>> binaries according to
>> https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution,
>> however I'm still seeing the error.
>>
>> I played around a bit and it turns out I have to package kafka-clients v.
>> 0.8.2.0 instead of kafka_2.10 v. 0.8.1 with my program. Is there an error
>> in the documentation or have I not figured out something properly?
>>
>> Thanks!
>> Robert
>>
>> --
>> My GPG Key ID: 336E2680
>>
>
>


Re: fan out parallel-able operator sub-task beyond total slots number

2016-04-18 Thread Till Rohrmann
Hi Chen,

two subtasks of the same operator can never be executed within the same
slot/pipeline. The `slotSharingGroup` allows you to only control which
subtasks of different operators can be executed along side in the same
slot. It basically allows you to break pipelines into smaller ones.
Therefore, you need at least as many slots as the maximum degree of
parallelism is in your program (so in your case 1000).

Cheers,
Till

On Sun, Apr 17, 2016 at 6:54 PM, Chen Qin  wrote:

> Hi there,
>
>
> I try run large number of subtasks within a task slot using slot sharing
> group. The usage scenario tried to adress operator that makes a network
> call with high latency yet less memory or cpu footprint. (sample code below)
>
> From doc provided, slotsharinggroup seems the place to look at. Yet it
> seems it were not designed to address the scenario above.
>
> https://ci.apache.org/projects/flink/flink-docs-master/concepts/concepts.html#workers-slots-resources
>
> My question is, which is best way to fan out large number of sub tasking
> parallel within a task?
>
> public void testFanOut() throws Exception{
> env = StreamExecutionEnvironment.getExecutionEnvironment();
> ...
> 
> env.addSource(...).setParallelism(1).disableChaining().shuffle().flatMap(new 
> FlatMapFunction() {
> @Override
> public void flatMap(DummyFlinkRecord dummyFlinkRecord, 
> Collector collector) throws Exception {
> Thread.sleep(1000); //latency is high, needs to fan out
> collector.collect(1l);
> }
> }).slotSharingGroup("flatmap").setParallelism(100).rebalance().filter(new 
> FilterFunction() {
> @Override
> public boolean filter(Long aLong) throws Exception {
> return true;
> }
> }).setParallelism(10).addSink(new SinkFunction() {
> @Override
> public void invoke(Long aLong) throws Exception {
> System.out.println(aLong);
> }
> });
> env.execute("fan out 100 subtasks for 1s delay mapper");
> }
>
> Thanks,
> Chen Qin
>


Re: withBroadcastSet for a DataStream missing?

2016-04-18 Thread Till Rohrmann
Hi Stavros,

yes that’s how you could do it.

broadcast will send the data to every down stream operator.

An element will be processed whenever it arrives at the iteration head.
There is no synchronization.

A windowed stream cannot be the input for a connected stream. Thus, the
window results have to be first processed before the are inputted into the
iteration.

Cheers,
Till
​

On Sun, Apr 17, 2016 at 7:16 PM, Stavros Kontopoulos <
st.kontopou...@gmail.com> wrote:

> Im trying what you suggested. Is this what you are suggesting (this is
> just a skeleton of logic not the actual implementation)?
>
> val dataStream =  ... //window based stream
>
> val modelStream = ...
>
> val connected = dataStream.connect(modelStream)
>
> val output = connected.map(
> (x:String) => { true},
> (y: MyModel) => {false}
>   ).iterate {
> iteration =>
>
>   val feedback = iteration.filter(!_)
>   feedback.broadcast
>   (feedback, iteration.filter(x => x))
>   }
>
>   output.split(
> (b: Boolean) => b match {
>   case true => List("true")
>   case false => List("false")
> }
>   ).select("true")
>
>
> I could save the model In coFlatMap but ideally i need the same model
> everywhere. Broadcast does that? From the documentation i read it sends the
> output to all parallel operators.
> Iteration is executed anytime there is data according to the input window
> stream or is it done independently so i can feed back my improved model
> (like in datasets case)?
> If the latter holds does that mean all partial updates from all operators
> will have to be processed from each operator before the the next window
> processing begins?
>
> Thnx!
>
>
> On Fri, Apr 1, 2016 at 10:51 PM, Stavros Kontopoulos <
> st.kontopou...@gmail.com> wrote:
>
>> Ok thnx Till i will give it a shot!
>>
>> On Thu, Mar 31, 2016 at 11:25 AM, Till Rohrmann 
>> wrote:
>>
>>> Hi Stavros,
>>>
>>> you might be able to solve your problem using a CoFlatMap operation
>>> with iterations. You would use one of the inputs for the iteration on which
>>> you broadcast the model updates to every operator. On the other input you
>>> would receive the data points which you want to cluster. As output you
>>> would emit the clustered points and model updates. Here you have to use the
>>> split and select function to split the output stream into model updates
>>> and output elements. It’s important to broadcast the model updates,
>>> otherwise not all operators have the same clustering model.
>>>
>>> Cheers,
>>> Till
>>> ​
>>>
>>> On Tue, Mar 29, 2016 at 7:23 PM, Stavros Kontopoulos <
>>> st.kontopou...@gmail.com> wrote:
>>>
 H i am new here...

 I am trying to implement online k-means as here
 https://databricks.com/blog/2015/01/28/introducing-streaming-k-means-in-spark-1-2.html
 with flink.
 I dont see anywhere a withBroadcastSet call to save intermediate
 results is this currently supported?

 Is intermediate results state saved somewhere like in this example a
 viable alternative:

 https://github.com/StephanEwen/flink-demos/blob/master/streaming-state-machine/src/main/scala/com/dataartisans/flink/example/eventpattern/StreamingDemo.scala

 Thnx,
 Stavros

>>>
>>>
>>
>


Re: Testing Kafka interface using Flink interactive shell

2016-04-18 Thread Mich Talebzadeh
Thanks Chiwan. It worked.

Now I have this simple streaming program in Spark Scala that gets streaming
data via Kafka. It is pretty simple. Please see attached.

I am trying to make it work with Flink + Kafka

Any hints will be appreciated.

Thanks



Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 18 April 2016 at 02:43, Chiwan Park  wrote:

> Hi Mich,
>
> You can add external dependencies to Scala shell using `--addclasspath`
> option. There is more detail description in documentation [1].
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/scala_shell.html#adding-external-dependencies
>
> Regards,
> Chiwan Park
>
> > On Apr 17, 2016, at 6:04 PM, Mich Talebzadeh 
> wrote:
> >
> > Hi,
> >
> > IN Spark shell I can load Kafka jar file through spark-shell option --jar
> >
> > spark-shell --master spark://50.140.197.217:7077 --jars
> ,/home/hduser/jars/spark-streaming-kafka-assembly_2.10-1.6.1.jar
> >
> > This works fine.
> >
> > In Flink I have added the jar file
> /home/hduser/jars/flink-connector-kafka-0.10.1.jar to the CLASSPATH.
> >
> > However I don't get any support for it within flink shell
> >
> > Scala-Flink> import org.apache.flink.streaming.connectors.kafka
> > :54: error: object connectors is not a member of package
> org.apache.flink.streaming
> > import org.apache.flink.streaming.connectors.kafka
> >
> >
> > Any ideas will be appreciated
> >   ^
> >
> > Dr Mich Talebzadeh
> >
> > LinkedIn
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> >
> > http://talebzadehmich.wordpress.com
> >
>
>
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.Row
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.types._
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.functions._
import _root_.kafka.serializer.StringDecoder
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka.KafkaUtils
//
object TestStream_assembly {
  def main(args: Array[String]) {
  val conf = new SparkConf().
   setAppName("TestStream_assembly").
   setMaster("local[2]").
   set("spark.driver.allowMultipleContexts", "true").
   set("spark.hadoop.validateOutputSpecs", "false")
  val sc = new SparkContext(conf)
  // Create sqlContext based on HiveContext
  val sqlContext = new HiveContext(sc)
  import sqlContext.implicits._
  val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
val ssc = new StreamingContext(conf, Seconds(55))

val kafkaParams = Map[String, String]("bootstrap.servers" -> "rhes564:9092", 
"schema.registry.url" -> "http://rhes564:8081;, "zookeeper.connect" -> 
"rhes564:2181", "group.id" -> "StreamTest" )
val topic = Set("newtopic")
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, 
StringDecoder](ssc, kafkaParams, topic)
messages.cache()
//
// Get the lines
//
val lines = messages.map(_._2)
// Check for message
val showResults = lines.filter(_.contains("Sending messages")).flatMap(line => 
line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + _).print(1000)
ssc.start()
ssc.awaitTermination()
//ssc.stop()
  }
}


Re: Accessing StateBackend snapshots outside of Flink

2016-04-18 Thread Aljoscha Krettek
Hi,
key refers to the key extracted by your KeySelector. Right now, for every
named state (i.e. the name in the StateDescriptor) there is a an isolated
RocksDB instance.

Cheers,
Aljoscha

On Sat, 16 Apr 2016 at 15:43 Igor Berman  wrote:

> thanks a lot for the info, seems not too complex
> I'll try to write simple tool to read this state.
>
> Aljoscha, does the key reflects unique id of operator in some way? Or key
> is just a "name" that passed to ValueStateDescriptor.
>
> thanks in advance
>
>
> On 15 April 2016 at 15:10, Stephan Ewen  wrote:
>
>> One thing to add is that you can always trigger a persistent checkpoint
>> via the "savepoints" feature:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/savepoints.html
>>
>>
>>
>> On Fri, Apr 15, 2016 at 10:24 AM, Aljoscha Krettek 
>> wrote:
>>
>>> Hi,
>>> for RocksDB we simply use a TypeSerializer to serialize the key and
>>> value to a byte[] array and store that in RocksDB. For a ListState, we
>>> serialize the individual elements using a TypeSerializer and store them in
>>> a comma-separated list in RocksDB. The snapshots of RocksDB that we write
>>> to HDFS are regular backups of a RocksDB database, as described here:
>>> https://github.com/facebook/rocksdb/wiki/How-to-backup-RocksDB%3F. You
>>> should be possible to read them from HDFS and restore them to a RocksDB
>>> data base as described in the linked documentation.
>>>
>>> tl;dr As long as you know the type of values stored in the state you
>>> should be able to read them from RocksDB and deserialize the values using
>>> TypeSerializer.
>>>
>>> One more bit of information: Internally the state is keyed by (key,
>>> namespace) -> value where namespace can be an arbitrary type that has a
>>> TypeSerializer. We use this to store window state that is both local to key
>>> and the current window. For state that you store in a user-defined function
>>> the namespace will always be null and that will be serialized by a
>>> VoidSerializer that simply always writes a "0" byte.
>>>
>>> Cheers,
>>> Aljoscha
>>>
>>> On Fri, 15 Apr 2016 at 00:18 igor.berman  wrote:
>>>
 Hi,
 we are evaluating Flink for new solution and several people raised
 concern
 of coupling too much to Flink -
 1. we understand that if we want to get full fault tolerance and best
 performance we'll need to use Flink managed state(probably RocksDB
 backend
 due to volume of state)
 2. but then if we latter find that Flink doesn't answer our needs(for
 any
 reason) - we'll need to extract this state in some way(since it's the
 only
 source of consistent state)
 In general I'd like to be able to take snapshot of backend and try to
 read
 it...do you think it's will be trivial task?
 say If I'm holding list state per partitioned key, would it be easy to
 take
 RocksDb file and open it?

 any thoughts regarding how can I convince people in our team?

 thanks in advance!



 --
 View this message in context:
 http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Accessing-StateBackend-snapshots-outside-of-Flink-tp6116.html
 Sent from the Apache Flink User Mailing List archive. mailing list
 archive at Nabble.com.

>>>
>>
>