Re: ExecutionEnvironment setConfiguration API

2015-10-14 Thread Flavio Pompermaier
of course,I tried to configure the task slot during a debug test and I
forgot to remove it..
Just for curiosity, is there any good reason why you've changed the default
parallellelism that way?and moreover, is it the only unexpected changed
behaviour wrt the previous API version?
On 14 Oct 2015 18:35, "Stephan Ewen"  wrote:

> Hi Flavio!
>
> ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment()
> by default picks up the number of cores as the parallelism, while the
> manual environments do not do that.
> You can still set it manually set the parallelism
> "env.setParallelism(Runtime.getRuntime().availableProcessors());"
>
> I would not configure the slots for the local execution, they should be
> automatically configured based on the max parallelism.
>
> Greetings,
> Stephan
>
>
> On Wed, Oct 14, 2015 at 3:36 PM, Flavio Pompermaier 
> wrote:
>
>> Hi Fabian and Stephan, back to work :)
>>
>> I finally managed to find the problem of the parallelism encountered by
>> my colleague!
>> Basically that was introduced by this API change. Before I was using
>> env.setConfiguration() to merge the default params with some custom ones.
>> Now, after the API change I was using the following code:
>>
>> ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
>> if (env instanceof LocalEnvironment) {
>> Configuration c = new Configuration();
>> c.setString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, FLINK_TEST_TMP_DIR);
>>
>> c.setString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY,FLINK_TEST_TMP_DIR);
>> c.setLong(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 2048 * 2);
>> c.setLong(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
>> env = ExecutionEnvironment.createLocalEnvironment(c);
>> }
>>
>> However, the first env and the reassigned one doesn't behave in the same
>> manner.
>> If I don't reassign env I have parallelism=8, otherwise it's 1 :(
>> Am I using the wrong APIs or the execution environment doesn't allow now
>> to configure such parameters anymore?
>>
>> Thanks in advance,
>> Flavio
>>
>>
>> On Tue, Oct 6, 2015 at 11:31 AM, Flavio Pompermaier > > wrote:
>>
>>> That makes sense: what can be configured should be differentiated
>>> between local and remote envs (obviously this is a minor issue/improvement)
>>>
>>> Thanks again,
>>> Flavio
>>>
>>> On Tue, Oct 6, 2015 at 11:25 AM, Stephan Ewen  wrote:
>>>
 We can think about that, but I think it may be quite confusing. The
 configurations actually mean something different for local and remote
 environments:

   - For the local environment, the config basically describes the
 entire Flink cluster setup (for the local execution cluster in the
 background)
   - For the remote environment, the config describes the parameters for
 the client that connects to the cluster (akka paramters, optimizer
 parameters, ...), but not parameters of the cluster itself (like
 taskmanager slots and memory).

 Greetings,
 Stephan


 On Tue, Oct 6, 2015 at 10:56 AM, Flavio Pompermaier <
 pomperma...@okkam.it> wrote:

> However it could be a good idea to overload also
> the getExecutionEnvironment() to be able to pass a custom
> configuration..what do you think?
> Otherwise I have to know a priori if I'm working in a local deployment
> or in a remote one, or check if getExecutionEnvironment() returned an
> instance of LocalEnvironment/RemoteEnvironment..
>
>
>
> On Tue, Oct 6, 2015 at 10:53 AM, Flavio Pompermaier <
> pomperma...@okkam.it> wrote:
>
>> Yes Stephan!
>> I usually work with the master version, at least in development ;)
>> Thanks for the quick support!
>>
>> Best,
>> Flavio
>>
>> On Tue, Oct 6, 2015 at 10:48 AM, Stephan Ewen 
>> wrote:
>>
>>> Hi!
>>>
>>> Are you on the SNAPSHOT master version?
>>>
>>> You can pass the configuration to the constructor of the execution
>>> environment, or create one via
>>> ExecutionEnvironment.createLocalEnvironment(config) or via
>>> createRemoteEnvironment(host, port, configuration, jarFiles);
>>>
>>> The change of the signature was part of an API cleanup for the next
>>> release. Sorry for the inconvenience...
>>>
>>> Stephan
>>>
>>>
>>> On Tue, Oct 6, 2015 at 10:36 AM, Flavio Pompermaier <
>>> pomperma...@okkam.it> wrote:
>>>
 Hi to all,

 today my code doesn't compile anymore because ExecutionEnvironment
 doesn't have setConfiguration() anymore..how can I set the following
 parameters in my unit tests?

 - ConfigConstants.TASK_MANAGER_TMP_DIR_KEY
 - ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY
 - ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY

 Best,
 Flavio

>>>
>>>
>>
>

>>>
>>
>


Re: ExecutionEnvironment setConfiguration API

2015-10-14 Thread Stephan Ewen
Hi Flavio!

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment()
by default picks up the number of cores as the parallelism, while the
manual environments do not do that.
You can still set it manually set the parallelism
"env.setParallelism(Runtime.getRuntime().availableProcessors());"

I would not configure the slots for the local execution, they should be
automatically configured based on the max parallelism.

Greetings,
Stephan


On Wed, Oct 14, 2015 at 3:36 PM, Flavio Pompermaier 
wrote:

> Hi Fabian and Stephan, back to work :)
>
> I finally managed to find the problem of the parallelism encountered by my
> colleague!
> Basically that was introduced by this API change. Before I was using
> env.setConfiguration() to merge the default params with some custom ones.
> Now, after the API change I was using the following code:
>
> ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
> if (env instanceof LocalEnvironment) {
> Configuration c = new Configuration();
> c.setString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, FLINK_TEST_TMP_DIR);
> c.setString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY,FLINK_TEST_TMP_DIR);
> c.setLong(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 2048 * 2);
> c.setLong(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
> env = ExecutionEnvironment.createLocalEnvironment(c);
> }
>
> However, the first env and the reassigned one doesn't behave in the same
> manner.
> If I don't reassign env I have parallelism=8, otherwise it's 1 :(
> Am I using the wrong APIs or the execution environment doesn't allow now
> to configure such parameters anymore?
>
> Thanks in advance,
> Flavio
>
>
> On Tue, Oct 6, 2015 at 11:31 AM, Flavio Pompermaier 
> wrote:
>
>> That makes sense: what can be configured should be differentiated between
>> local and remote envs (obviously this is a minor issue/improvement)
>>
>> Thanks again,
>> Flavio
>>
>> On Tue, Oct 6, 2015 at 11:25 AM, Stephan Ewen  wrote:
>>
>>> We can think about that, but I think it may be quite confusing. The
>>> configurations actually mean something different for local and remote
>>> environments:
>>>
>>>   - For the local environment, the config basically describes the entire
>>> Flink cluster setup (for the local execution cluster in the background)
>>>   - For the remote environment, the config describes the parameters for
>>> the client that connects to the cluster (akka paramters, optimizer
>>> parameters, ...), but not parameters of the cluster itself (like
>>> taskmanager slots and memory).
>>>
>>> Greetings,
>>> Stephan
>>>
>>>
>>> On Tue, Oct 6, 2015 at 10:56 AM, Flavio Pompermaier <
>>> pomperma...@okkam.it> wrote:
>>>
 However it could be a good idea to overload also
 the getExecutionEnvironment() to be able to pass a custom
 configuration..what do you think?
 Otherwise I have to know a priori if I'm working in a local deployment
 or in a remote one, or check if getExecutionEnvironment() returned an
 instance of LocalEnvironment/RemoteEnvironment..



 On Tue, Oct 6, 2015 at 10:53 AM, Flavio Pompermaier <
 pomperma...@okkam.it> wrote:

> Yes Stephan!
> I usually work with the master version, at least in development ;)
> Thanks for the quick support!
>
> Best,
> Flavio
>
> On Tue, Oct 6, 2015 at 10:48 AM, Stephan Ewen 
> wrote:
>
>> Hi!
>>
>> Are you on the SNAPSHOT master version?
>>
>> You can pass the configuration to the constructor of the execution
>> environment, or create one via
>> ExecutionEnvironment.createLocalEnvironment(config) or via
>> createRemoteEnvironment(host, port, configuration, jarFiles);
>>
>> The change of the signature was part of an API cleanup for the next
>> release. Sorry for the inconvenience...
>>
>> Stephan
>>
>>
>> On Tue, Oct 6, 2015 at 10:36 AM, Flavio Pompermaier <
>> pomperma...@okkam.it> wrote:
>>
>>> Hi to all,
>>>
>>> today my code doesn't compile anymore because ExecutionEnvironment
>>> doesn't have setConfiguration() anymore..how can I set the following
>>> parameters in my unit tests?
>>>
>>> - ConfigConstants.TASK_MANAGER_TMP_DIR_KEY
>>> - ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY
>>> - ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY
>>>
>>> Best,
>>> Flavio
>>>
>>
>>
>

>>>
>>
>


Re: flink kafka question

2015-10-14 Thread Robert Metzger
I would also suggest to create a mapper after the source. Make sure the
mapper is chained to the kafka source, then, you'll not really see a big
delay in the timestamp written to redis.

Just out of curiosity, why do you need to write a timestamp to redis for
each record from Kafka?

On Wed, Oct 14, 2015 at 11:40 AM, Ufuk Celebi  wrote:

>
> > On 12 Oct 2015, at 22:47, Jerry Peng 
> wrote:
> >
> > Hello,
> >
> > I am trying to do some benchmark testing with flink streaming.  When
> flink reads a message in from Kafka, I want to write a timestamp to redis.
> How can I modify the existing kafka consumer code to do this?  What would
> be easiest way to do something like this?  Thanks for your help!
>
> I guess you want to do this in the consumer in order to have less delay
> from when you read it and the timestamp. I am not familiar with the
> consumer code, but you can try to this in a map after the source. This
> should be chained to the source and the delay should not be too large from
> the source to the map function.
>
>


RE: Building Flink with hadoop 2.6

2015-10-14 Thread Gwenhael Pasquiers
Yes, we’re onto the exactly-once ; trying to write RCFiles (Parquet and 
ORCFiles are not compatible because of their footer).

It seems to be working perfectly.

As expected, Flink is falling back to .valid-length metadata on HDFS 2.6 (and 
2.3).

From: Robert Metzger [mailto:rmetz...@apache.org]
Sent: mercredi 14 octobre 2015 17:23
To: user@flink.apache.org
Subject: Re: Building Flink with hadoop 2.6

Great. We are shading curator now into a different location, that's why you 
can't find it anymore.

I suspect you're trying out our new exactly-once filesystem sinks. Please let 
us know how well its working for you and if you're missing something.
Its a pretty new feature :)
Also note that you can use the fs sinks with hadoop versions below 2.7.0, then 
we'll write some metadata containing the valid offsets.

On Wed, Oct 14, 2015 at 5:18 PM, Gwenhael Pasquiers 
mailto:gwenhael.pasqui...@ericsson.com>> wrote:
Yes … You’re right.

Anyway, adding the log4j jar solved the issue and our app is working properly, 
thanks !

About curator, I just observed that it was not there anymore when comparing the 
old and new fatjars. But it’s probably now in another dependency, anyway there 
is no curator-related error so it just probably moved.

Thanks !

Gwen’

From: Robert Metzger [mailto:rmetz...@apache.org]
Sent: mercredi 14 octobre 2015 17:06

To: user@flink.apache.org
Subject: Re: Building Flink with hadoop 2.6

One more thing regarding the truncate method: Its supported as of Hadoop 2.7.0 
(https://issues.apache.org/jira/browse/HDFS-3107)

On Wed, Oct 14, 2015 at 5:00 PM, Robert Metzger 
mailto:rmetz...@apache.org>> wrote:
Ah, I know what's causing this issue.
In the latest 0.10-SNAPSHOT, we have removed log4j from the fat jar.

Can you copy everything from the lib/ folder from your maven build into the 
lib/ folder of your flink installation?
Log4j is now in a separate jar in the lib/ folder .

What about the curator dependency issue?

On Wed, Oct 14, 2015 at 4:56 PM, Gwenhael Pasquiers 
mailto:gwenhael.pasqui...@ericsson.com>> wrote:
The first class that it can not find is :
org.apache.log4j.Level

The org.apache.log4j package is not present in the fat jar I get from the mvn 
command, but it is in the one you distributed on your website.

From: Robert Metzger [mailto:rmetz...@apache.org]
Sent: mercredi 14 octobre 2015 16:54
To: user@flink.apache.org
Subject: Re: Building Flink with hadoop 2.6

Great.
Which classes can it not find at runtime?

I'll try to build and run Flink with exactly the command you've provided.

On Wed, Oct 14, 2015 at 4:49 PM, Gwenhael Pasquiers 
mailto:gwenhael.pasqui...@ericsson.com>> wrote:
Hi Robert !

I’m using ” mvn clean install -DskipTests -Dhadoop.version=2.6.0 “.


From: Robert Metzger [mailto:rmetz...@apache.org]
Sent: mercredi 14 octobre 2015 16:47
To: user@flink.apache.org
Subject: Re: Building Flink with hadoop 2.6

Hi Gwen,

can you tell us the "mvn" command you're using for building Flink?



On Wed, Oct 14, 2015 at 4:37 PM, Gwenhael Pasquiers 
mailto:gwenhael.pasqui...@ericsson.com>> wrote:
Hi ;

We need to test some things with flink and hadoop 2.6 (the trunc method).

We’ve set up a build task on our Jenkins and everything seem okay.

However when we replace the original jar from your 0.10-SNAPSHOT distribution 
by ours there are some missing dependencies (log4j, curator, and maybe others) 
and we get some ClassNotFoundException at runtime.

Are we missing some build parameters ?

Thanks in advance,

B.R.







Re: Building Flink with hadoop 2.6

2015-10-14 Thread Robert Metzger
Great. We are shading curator now into a different location, that's why you
can't find it anymore.

I suspect you're trying out our new exactly-once filesystem sinks. Please
let us know how well its working for you and if you're missing something.
Its a pretty new feature :)
Also note that you can use the fs sinks with hadoop versions below 2.7.0,
then we'll write some metadata containing the valid offsets.

On Wed, Oct 14, 2015 at 5:18 PM, Gwenhael Pasquiers <
gwenhael.pasqui...@ericsson.com> wrote:

> Yes … You’re right.
>
>
>
> Anyway, adding the log4j jar solved the issue and our app is working
> properly, thanks !
>
>
>
> About curator, I just observed that it was not there anymore when
> comparing the old and new fatjars. But it’s probably now in another
> dependency, anyway there is no curator-related error so it just probably
> moved.
>
>
>
> Thanks !
>
>
>
> Gwen’
>
>
>
> *From:* Robert Metzger [mailto:rmetz...@apache.org]
> *Sent:* mercredi 14 octobre 2015 17:06
>
> *To:* user@flink.apache.org
> *Subject:* Re: Building Flink with hadoop 2.6
>
>
>
> One more thing regarding the truncate method: Its supported as of Hadoop
> 2.7.0 (https://issues.apache.org/jira/browse/HDFS-3107)
>
>
>
> On Wed, Oct 14, 2015 at 5:00 PM, Robert Metzger 
> wrote:
>
> Ah, I know what's causing this issue.
>
> In the latest 0.10-SNAPSHOT, we have removed log4j from the fat jar.
>
>
>
> Can you copy everything from the lib/ folder from your maven build into
> the lib/ folder of your flink installation?
>
> Log4j is now in a separate jar in the lib/ folder .
>
>
>
> What about the curator dependency issue?
>
>
>
> On Wed, Oct 14, 2015 at 4:56 PM, Gwenhael Pasquiers <
> gwenhael.pasqui...@ericsson.com> wrote:
>
> The first class that it can not find is :
>
> org.apache.log4j.Level
>
>
>
> The org.apache.log4j package is not present in the fat jar I get from the
> mvn command, but it is in the one you distributed on your website.
>
>
>
> *From:* Robert Metzger [mailto:rmetz...@apache.org]
> *Sent:* mercredi 14 octobre 2015 16:54
> *To:* user@flink.apache.org
> *Subject:* Re: Building Flink with hadoop 2.6
>
>
>
> Great.
>
> Which classes can it not find at runtime?
>
>
>
> I'll try to build and run Flink with exactly the command you've provided.
>
>
>
> On Wed, Oct 14, 2015 at 4:49 PM, Gwenhael Pasquiers <
> gwenhael.pasqui...@ericsson.com> wrote:
>
> Hi Robert !
>
>
>
> I’m using ” mvn clean install -DskipTests -Dhadoop.version=2.6.0 “.
>
>
>
>
>
> *From:* Robert Metzger [mailto:rmetz...@apache.org]
> *Sent:* mercredi 14 octobre 2015 16:47
> *To:* user@flink.apache.org
> *Subject:* Re: Building Flink with hadoop 2.6
>
>
>
> Hi Gwen,
>
>
>
> can you tell us the "mvn" command you're using for building Flink?
>
>
>
>
>
>
>
> On Wed, Oct 14, 2015 at 4:37 PM, Gwenhael Pasquiers <
> gwenhael.pasqui...@ericsson.com> wrote:
>
> Hi ;
>
>
>
> We need to test some things with flink and hadoop 2.6 (the trunc method).
>
>
>
> We’ve set up a build task on our Jenkins and everything seem okay.
>
>
>
> However when we replace the original jar from your 0.10-SNAPSHOT
> distribution by ours there are some missing dependencies (log4j, curator,
> and maybe others) and we get some ClassNotFoundException at runtime.
>
>
>
> Are we missing some build parameters ?
>
>
>
> Thanks in advance,
>
>
>
> B.R.
>
>
>
>
>
>
>
>
>


RE: Building Flink with hadoop 2.6

2015-10-14 Thread Gwenhael Pasquiers
Yes … You’re right.

Anyway, adding the log4j jar solved the issue and our app is working properly, 
thanks !

About curator, I just observed that it was not there anymore when comparing the 
old and new fatjars. But it’s probably now in another dependency, anyway there 
is no curator-related error so it just probably moved.

Thanks !

Gwen’

From: Robert Metzger [mailto:rmetz...@apache.org]
Sent: mercredi 14 octobre 2015 17:06
To: user@flink.apache.org
Subject: Re: Building Flink with hadoop 2.6

One more thing regarding the truncate method: Its supported as of Hadoop 2.7.0 
(https://issues.apache.org/jira/browse/HDFS-3107)

On Wed, Oct 14, 2015 at 5:00 PM, Robert Metzger 
mailto:rmetz...@apache.org>> wrote:
Ah, I know what's causing this issue.
In the latest 0.10-SNAPSHOT, we have removed log4j from the fat jar.

Can you copy everything from the lib/ folder from your maven build into the 
lib/ folder of your flink installation?
Log4j is now in a separate jar in the lib/ folder .

What about the curator dependency issue?

On Wed, Oct 14, 2015 at 4:56 PM, Gwenhael Pasquiers 
mailto:gwenhael.pasqui...@ericsson.com>> wrote:
The first class that it can not find is :
org.apache.log4j.Level

The org.apache.log4j package is not present in the fat jar I get from the mvn 
command, but it is in the one you distributed on your website.

From: Robert Metzger [mailto:rmetz...@apache.org]
Sent: mercredi 14 octobre 2015 16:54
To: user@flink.apache.org
Subject: Re: Building Flink with hadoop 2.6

Great.
Which classes can it not find at runtime?

I'll try to build and run Flink with exactly the command you've provided.

On Wed, Oct 14, 2015 at 4:49 PM, Gwenhael Pasquiers 
mailto:gwenhael.pasqui...@ericsson.com>> wrote:
Hi Robert !

I’m using ” mvn clean install -DskipTests -Dhadoop.version=2.6.0 “.


From: Robert Metzger [mailto:rmetz...@apache.org]
Sent: mercredi 14 octobre 2015 16:47
To: user@flink.apache.org
Subject: Re: Building Flink with hadoop 2.6

Hi Gwen,

can you tell us the "mvn" command you're using for building Flink?



On Wed, Oct 14, 2015 at 4:37 PM, Gwenhael Pasquiers 
mailto:gwenhael.pasqui...@ericsson.com>> wrote:
Hi ;

We need to test some things with flink and hadoop 2.6 (the trunc method).

We’ve set up a build task on our Jenkins and everything seem okay.

However when we replace the original jar from your 0.10-SNAPSHOT distribution 
by ours there are some missing dependencies (log4j, curator, and maybe others) 
and we get some ClassNotFoundException at runtime.

Are we missing some build parameters ?

Thanks in advance,

B.R.






Re: Building Flink with hadoop 2.6

2015-10-14 Thread Robert Metzger
One more thing regarding the truncate method: Its supported as of Hadoop
2.7.0 (https://issues.apache.org/jira/browse/HDFS-3107)

On Wed, Oct 14, 2015 at 5:00 PM, Robert Metzger  wrote:

> Ah, I know what's causing this issue.
> In the latest 0.10-SNAPSHOT, we have removed log4j from the fat jar.
>
> Can you copy everything from the lib/ folder from your maven build into
> the lib/ folder of your flink installation?
> Log4j is now in a separate jar in the lib/ folder .
>
> What about the curator dependency issue?
>
> On Wed, Oct 14, 2015 at 4:56 PM, Gwenhael Pasquiers <
> gwenhael.pasqui...@ericsson.com> wrote:
>
>> The first class that it can not find is :
>>
>> org.apache.log4j.Level
>>
>>
>>
>> The org.apache.log4j package is not present in the fat jar I get from the
>> mvn command, but it is in the one you distributed on your website.
>>
>>
>>
>> *From:* Robert Metzger [mailto:rmetz...@apache.org]
>> *Sent:* mercredi 14 octobre 2015 16:54
>> *To:* user@flink.apache.org
>> *Subject:* Re: Building Flink with hadoop 2.6
>>
>>
>>
>> Great.
>>
>> Which classes can it not find at runtime?
>>
>>
>>
>> I'll try to build and run Flink with exactly the command you've provided.
>>
>>
>>
>> On Wed, Oct 14, 2015 at 4:49 PM, Gwenhael Pasquiers <
>> gwenhael.pasqui...@ericsson.com> wrote:
>>
>> Hi Robert !
>>
>>
>>
>> I’m using ” mvn clean install -DskipTests -Dhadoop.version=2.6.0 “.
>>
>>
>>
>>
>>
>> *From:* Robert Metzger [mailto:rmetz...@apache.org]
>> *Sent:* mercredi 14 octobre 2015 16:47
>> *To:* user@flink.apache.org
>> *Subject:* Re: Building Flink with hadoop 2.6
>>
>>
>>
>> Hi Gwen,
>>
>>
>>
>> can you tell us the "mvn" command you're using for building Flink?
>>
>>
>>
>>
>>
>>
>>
>> On Wed, Oct 14, 2015 at 4:37 PM, Gwenhael Pasquiers <
>> gwenhael.pasqui...@ericsson.com> wrote:
>>
>> Hi ;
>>
>>
>>
>> We need to test some things with flink and hadoop 2.6 (the trunc method).
>>
>>
>>
>> We’ve set up a build task on our Jenkins and everything seem okay.
>>
>>
>>
>> However when we replace the original jar from your 0.10-SNAPSHOT
>> distribution by ours there are some missing dependencies (log4j, curator,
>> and maybe others) and we get some ClassNotFoundException at runtime.
>>
>>
>>
>> Are we missing some build parameters ?
>>
>>
>>
>> Thanks in advance,
>>
>>
>>
>> B.R.
>>
>>
>>
>>
>>
>
>


Re: Building Flink with hadoop 2.6

2015-10-14 Thread Robert Metzger
Ah, I know what's causing this issue.
In the latest 0.10-SNAPSHOT, we have removed log4j from the fat jar.

Can you copy everything from the lib/ folder from your maven build into the
lib/ folder of your flink installation?
Log4j is now in a separate jar in the lib/ folder .

What about the curator dependency issue?

On Wed, Oct 14, 2015 at 4:56 PM, Gwenhael Pasquiers <
gwenhael.pasqui...@ericsson.com> wrote:

> The first class that it can not find is :
>
> org.apache.log4j.Level
>
>
>
> The org.apache.log4j package is not present in the fat jar I get from the
> mvn command, but it is in the one you distributed on your website.
>
>
>
> *From:* Robert Metzger [mailto:rmetz...@apache.org]
> *Sent:* mercredi 14 octobre 2015 16:54
> *To:* user@flink.apache.org
> *Subject:* Re: Building Flink with hadoop 2.6
>
>
>
> Great.
>
> Which classes can it not find at runtime?
>
>
>
> I'll try to build and run Flink with exactly the command you've provided.
>
>
>
> On Wed, Oct 14, 2015 at 4:49 PM, Gwenhael Pasquiers <
> gwenhael.pasqui...@ericsson.com> wrote:
>
> Hi Robert !
>
>
>
> I’m using ” mvn clean install -DskipTests -Dhadoop.version=2.6.0 “.
>
>
>
>
>
> *From:* Robert Metzger [mailto:rmetz...@apache.org]
> *Sent:* mercredi 14 octobre 2015 16:47
> *To:* user@flink.apache.org
> *Subject:* Re: Building Flink with hadoop 2.6
>
>
>
> Hi Gwen,
>
>
>
> can you tell us the "mvn" command you're using for building Flink?
>
>
>
>
>
>
>
> On Wed, Oct 14, 2015 at 4:37 PM, Gwenhael Pasquiers <
> gwenhael.pasqui...@ericsson.com> wrote:
>
> Hi ;
>
>
>
> We need to test some things with flink and hadoop 2.6 (the trunc method).
>
>
>
> We’ve set up a build task on our Jenkins and everything seem okay.
>
>
>
> However when we replace the original jar from your 0.10-SNAPSHOT
> distribution by ours there are some missing dependencies (log4j, curator,
> and maybe others) and we get some ClassNotFoundException at runtime.
>
>
>
> Are we missing some build parameters ?
>
>
>
> Thanks in advance,
>
>
>
> B.R.
>
>
>
>
>


Re: Building Flink with hadoop 2.6

2015-10-14 Thread Robert Metzger
Great.
Which classes can it not find at runtime?

I'll try to build and run Flink with exactly the command you've provided.

On Wed, Oct 14, 2015 at 4:49 PM, Gwenhael Pasquiers <
gwenhael.pasqui...@ericsson.com> wrote:

> Hi Robert !
>
>
>
> I’m using ” mvn clean install -DskipTests -Dhadoop.version=2.6.0 “.
>
>
>
>
>
> *From:* Robert Metzger [mailto:rmetz...@apache.org]
> *Sent:* mercredi 14 octobre 2015 16:47
> *To:* user@flink.apache.org
> *Subject:* Re: Building Flink with hadoop 2.6
>
>
>
> Hi Gwen,
>
>
>
> can you tell us the "mvn" command you're using for building Flink?
>
>
>
>
>
>
>
> On Wed, Oct 14, 2015 at 4:37 PM, Gwenhael Pasquiers <
> gwenhael.pasqui...@ericsson.com> wrote:
>
> Hi ;
>
>
>
> We need to test some things with flink and hadoop 2.6 (the trunc method).
>
>
>
> We’ve set up a build task on our Jenkins and everything seem okay.
>
>
>
> However when we replace the original jar from your 0.10-SNAPSHOT
> distribution by ours there are some missing dependencies (log4j, curator,
> and maybe others) and we get some ClassNotFoundException at runtime.
>
>
>
> Are we missing some build parameters ?
>
>
>
> Thanks in advance,
>
>
>
> B.R.
>
>
>


RE: Building Flink with hadoop 2.6

2015-10-14 Thread Gwenhael Pasquiers
The first class that it can not find is :
org.apache.log4j.Level

The org.apache.log4j package is not present in the fat jar I get from the mvn 
command, but it is in the one you distributed on your website.

From: Robert Metzger [mailto:rmetz...@apache.org]
Sent: mercredi 14 octobre 2015 16:54
To: user@flink.apache.org
Subject: Re: Building Flink with hadoop 2.6

Great.
Which classes can it not find at runtime?

I'll try to build and run Flink with exactly the command you've provided.

On Wed, Oct 14, 2015 at 4:49 PM, Gwenhael Pasquiers 
mailto:gwenhael.pasqui...@ericsson.com>> wrote:
Hi Robert !

I’m using ” mvn clean install -DskipTests -Dhadoop.version=2.6.0 “.


From: Robert Metzger [mailto:rmetz...@apache.org]
Sent: mercredi 14 octobre 2015 16:47
To: user@flink.apache.org
Subject: Re: Building Flink with hadoop 2.6

Hi Gwen,

can you tell us the "mvn" command you're using for building Flink?



On Wed, Oct 14, 2015 at 4:37 PM, Gwenhael Pasquiers 
mailto:gwenhael.pasqui...@ericsson.com>> wrote:
Hi ;

We need to test some things with flink and hadoop 2.6 (the trunc method).

We’ve set up a build task on our Jenkins and everything seem okay.

However when we replace the original jar from your 0.10-SNAPSHOT distribution 
by ours there are some missing dependencies (log4j, curator, and maybe others) 
and we get some ClassNotFoundException at runtime.

Are we missing some build parameters ?

Thanks in advance,

B.R.




RE: Building Flink with hadoop 2.6

2015-10-14 Thread Gwenhael Pasquiers
Hi Robert !

I’m using ” mvn clean install -DskipTests -Dhadoop.version=2.6.0 “.


From: Robert Metzger [mailto:rmetz...@apache.org]
Sent: mercredi 14 octobre 2015 16:47
To: user@flink.apache.org
Subject: Re: Building Flink with hadoop 2.6

Hi Gwen,

can you tell us the "mvn" command you're using for building Flink?



On Wed, Oct 14, 2015 at 4:37 PM, Gwenhael Pasquiers 
mailto:gwenhael.pasqui...@ericsson.com>> wrote:
Hi ;

We need to test some things with flink and hadoop 2.6 (the trunc method).

We’ve set up a build task on our Jenkins and everything seem okay.

However when we replace the original jar from your 0.10-SNAPSHOT distribution 
by ours there are some missing dependencies (log4j, curator, and maybe others) 
and we get some ClassNotFoundException at runtime.

Are we missing some build parameters ?

Thanks in advance,

B.R.



Re: Building Flink with hadoop 2.6

2015-10-14 Thread Robert Metzger
Hi Gwen,

can you tell us the "mvn" command you're using for building Flink?



On Wed, Oct 14, 2015 at 4:37 PM, Gwenhael Pasquiers <
gwenhael.pasqui...@ericsson.com> wrote:

> Hi ;
>
>
>
> We need to test some things with flink and hadoop 2.6 (the trunc method).
>
>
>
> We’ve set up a build task on our Jenkins and everything seem okay.
>
>
>
> However when we replace the original jar from your 0.10-SNAPSHOT
> distribution by ours there are some missing dependencies (log4j, curator,
> and maybe others) and we get some ClassNotFoundException at runtime.
>
>
>
> Are we missing some build parameters ?
>
>
>
> Thanks in advance,
>
>
>
> B.R.
>


Re: FlinkKafkaConsumer bootstrap.servers vs. broker hosts

2015-10-14 Thread Robert Metzger
Hi Juho,

sorry for the late reply, I was busy with Flink Forward :)

The Flink Kafka Consumer needs both addresses.
Kafka uses the bootstrap servers to connect to the brokers to consume
messages.
The Zookeeper connection is used to commit the offsets of the consumer
group once a state snapshot in Flink has been completed.

Our consumers are pretty minimalistic because we are waiting for the Kafka
project to finish their new consumer API [2]. It seems that its a matter of
weeks for the Kafka project to release the new consumer [1].
With the new consumer API from kafka, we can actually give Flink users a
lot more features, for example subscribing to multiple topics with one
source instance, committing offsets to brokers, etc.
I think the new consumer will not need both bootstrap servers and zookeeper.


Is the problem you're reporting preventing you from using the KafkaConsumer
in production?
If so, can you tell me when exactly the data consumption is failing? (maybe
with some logs / stacktrace?)

If this contains confidential information, you can also send me a private
mail.


Regards,
Robert




[1] https://twitter.com/gwenshap/status/653718350648897536
[2]
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design


On Tue, Oct 13, 2015 at 3:29 PM, Juho Autio  wrote:

> The FlinkKafkaConsumer takes the two arguments --bootstrap.servers (kafka
> servers) and --zookeeper.connect (zookeeper servers). Then it seems to
> resolve Kafka brokers from zookeeper, and uses those host names to consume
> kafka. But it also connects to the given bootstrap servers to fetch some
> metadata, it seems.
>
> This is problematic when zookeeper uses internal kafka hostnames –
> consuming
> won't work when those resolved kafka hosts can't be reached.
>
> Could the consumer be changed to
> - respect the provided kafka hosts / ips and not use host names resolved
> from zookeeper
> and optionally
> - not require bootstrap.servers argument at all, just resolve broker host
> names from zookeeper and use those as "bootstrap servers", too?
>
> Is the concept of bootstrap server something entirely else, or what am I
> missing here?
>
> Thanks!
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/FlinkKafkaConsumer-bootstrap-servers-vs-broker-hosts-tp3109.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Building Flink with hadoop 2.6

2015-10-14 Thread Gwenhael Pasquiers
Hi ;

We need to test some things with flink and hadoop 2.6 (the trunc method).

We've set up a build task on our Jenkins and everything seem okay.

However when we replace the original jar from your 0.10-SNAPSHOT distribution 
by ours there are some missing dependencies (log4j, curator, and maybe others) 
and we get some ClassNotFoundException at runtime.

Are we missing some build parameters ?

Thanks in advance,

B.R.


Re: ExecutionEnvironment setConfiguration API

2015-10-14 Thread Flavio Pompermaier
Hi Fabian and Stephan, back to work :)

I finally managed to find the problem of the parallelism encountered by my
colleague!
Basically that was introduced by this API change. Before I was using
env.setConfiguration() to merge the default params with some custom ones.
Now, after the API change I was using the following code:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
if (env instanceof LocalEnvironment) {
Configuration c = new Configuration();
c.setString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, FLINK_TEST_TMP_DIR);
c.setString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY,FLINK_TEST_TMP_DIR);
c.setLong(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 2048 * 2);
c.setLong(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
env = ExecutionEnvironment.createLocalEnvironment(c);
}

However, the first env and the reassigned one doesn't behave in the same
manner.
If I don't reassign env I have parallelism=8, otherwise it's 1 :(
Am I using the wrong APIs or the execution environment doesn't allow now to
configure such parameters anymore?

Thanks in advance,
Flavio


On Tue, Oct 6, 2015 at 11:31 AM, Flavio Pompermaier 
wrote:

> That makes sense: what can be configured should be differentiated between
> local and remote envs (obviously this is a minor issue/improvement)
>
> Thanks again,
> Flavio
>
> On Tue, Oct 6, 2015 at 11:25 AM, Stephan Ewen  wrote:
>
>> We can think about that, but I think it may be quite confusing. The
>> configurations actually mean something different for local and remote
>> environments:
>>
>>   - For the local environment, the config basically describes the entire
>> Flink cluster setup (for the local execution cluster in the background)
>>   - For the remote environment, the config describes the parameters for
>> the client that connects to the cluster (akka paramters, optimizer
>> parameters, ...), but not parameters of the cluster itself (like
>> taskmanager slots and memory).
>>
>> Greetings,
>> Stephan
>>
>>
>> On Tue, Oct 6, 2015 at 10:56 AM, Flavio Pompermaier > > wrote:
>>
>>> However it could be a good idea to overload also
>>> the getExecutionEnvironment() to be able to pass a custom
>>> configuration..what do you think?
>>> Otherwise I have to know a priori if I'm working in a local deployment
>>> or in a remote one, or check if getExecutionEnvironment() returned an
>>> instance of LocalEnvironment/RemoteEnvironment..
>>>
>>>
>>>
>>> On Tue, Oct 6, 2015 at 10:53 AM, Flavio Pompermaier <
>>> pomperma...@okkam.it> wrote:
>>>
 Yes Stephan!
 I usually work with the master version, at least in development ;)
 Thanks for the quick support!

 Best,
 Flavio

 On Tue, Oct 6, 2015 at 10:48 AM, Stephan Ewen  wrote:

> Hi!
>
> Are you on the SNAPSHOT master version?
>
> You can pass the configuration to the constructor of the execution
> environment, or create one via
> ExecutionEnvironment.createLocalEnvironment(config) or via
> createRemoteEnvironment(host, port, configuration, jarFiles);
>
> The change of the signature was part of an API cleanup for the next
> release. Sorry for the inconvenience...
>
> Stephan
>
>
> On Tue, Oct 6, 2015 at 10:36 AM, Flavio Pompermaier <
> pomperma...@okkam.it> wrote:
>
>> Hi to all,
>>
>> today my code doesn't compile anymore because ExecutionEnvironment
>> doesn't have setConfiguration() anymore..how can I set the following
>> parameters in my unit tests?
>>
>> - ConfigConstants.TASK_MANAGER_TMP_DIR_KEY
>> - ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY
>> - ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY
>>
>> Best,
>> Flavio
>>
>
>

>>>
>>
>


Re: Processing S3 data with Apache Flink

2015-10-14 Thread Ufuk Celebi

> On 10 Oct 2015, at 22:59, snntr  wrote:
> 
> Hey everyone, 
> 
> I was having the same problem with S3 and found this thread very useful.
> Everything works fine now, when I start Flink from my IDE, but when I run
> the jar in local mode I keep getting 
> 
> java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access Key
> must be specified as the username or password (respectively) of a s3n URL,
> or by setting the fs.s3n.awsAccessKeyId or fs.s3n.awsSecretAccessKey
> properties (respectively).
> 
> I have set fs.hdfs.hadoopconf to point to a core-site.xml on my local
> machine with the required properties. What am I missing?
> 
> Any advice is highly appreciated ;)

This looks like a problem with picking up the Hadoop config. Can you look into 
the logs to check whether the configuration is picked up? Change the log 
settings to DEBUG in log/log4j.properties for this. And can you provide the 
complete stack trace?

– Ufuk



Re: Scala Code Generation

2015-10-14 Thread Till Rohrmann
If you're using Scala, then you're bound to a maximum of 22 fields in a
tuple, because the Scala library does not provide larger tuples. You could
generate your own case classes which have more than the 22 fields, though.
On Oct 14, 2015 11:30 AM, "Ufuk Celebi"  wrote:

>
> > On 13 Oct 2015, at 16:06, schul...@informatik.hu-berlin.de wrote:
> >
> > Hello,
> >
> > I am currently working on a compilation unit translating AsterixDB's AQL
> > into runnable Scala code for Flink's Scala API. During code generation I
> > discovered some things that are quite hard to work around. I am still
> > working with Flink version 0.8, so some of the problems I have might
> > already be fixed in 0.9 and if so please tell me.
> >
> > First, whenever a record gets projected down to only a single field (e.g.
> > by a map or reduce function) it is no longer considered a record, but a
> > variable of the type of that field. If afterwards I want to apply
> > additional functions like .sum(0) I get an error message like
>
> A workaround is to return Tuple1 for this. Then you can run the
> aggregation. I think that the Tuple0 class has been added after 0.8 though.
>
> > "Aggregating on field positions is only possible on tuple data types."
> >
> > This is the same for all functions (like write or join) as the "record"
> is
> > no longer considered a dataset.
>
> What do you mean? At least in the current versions, the join projections
> return a Tuple type as well.
>
> > Second, I found that records longer than 22 fields are not supported.
> > Whenever I have a record that is longer than that I receive a build error
> > as
>
> Flink’s Tuple classes go up to Tuple25. You can work around this by using
> a custom PoJo type, e.g.
>
> class TPCHRecord {
> public int f0;
> ...
> public int f99;
> }
>
> If possible, I would suggest to update to the latest 0.9 or the upcoming
> 0.10 release. A lot of stuff has been fixed since 0.8. I think it will be
> worth it. If you encounter any problems while doing this, feel free to ask
> here. :)
>
> – Ufuk


Re: flink kafka question

2015-10-14 Thread Ufuk Celebi

> On 12 Oct 2015, at 22:47, Jerry Peng  wrote:
> 
> Hello,
> 
> I am trying to do some benchmark testing with flink streaming.  When flink 
> reads a message in from Kafka, I want to write a timestamp to redis.  How can 
> I modify the existing kafka consumer code to do this?  What would be easiest 
> way to do something like this?  Thanks for your help!

I guess you want to do this in the consumer in order to have less delay from 
when you read it and the timestamp. I am not familiar with the consumer code, 
but you can try to this in a map after the source. This should be chained to 
the source and the delay should not be too large from the source to the map 
function.



Re: Scala Code Generation

2015-10-14 Thread Ufuk Celebi

> On 13 Oct 2015, at 16:06, schul...@informatik.hu-berlin.de wrote:
> 
> Hello,
> 
> I am currently working on a compilation unit translating AsterixDB's AQL
> into runnable Scala code for Flink's Scala API. During code generation I
> discovered some things that are quite hard to work around. I am still
> working with Flink version 0.8, so some of the problems I have might
> already be fixed in 0.9 and if so please tell me.
> 
> First, whenever a record gets projected down to only a single field (e.g.
> by a map or reduce function) it is no longer considered a record, but a
> variable of the type of that field. If afterwards I want to apply
> additional functions like .sum(0) I get an error message like

A workaround is to return Tuple1 for this. Then you can run the aggregation. 
I think that the Tuple0 class has been added after 0.8 though.

> "Aggregating on field positions is only possible on tuple data types."
> 
> This is the same for all functions (like write or join) as the "record" is
> no longer considered a dataset.

What do you mean? At least in the current versions, the join projections return 
a Tuple type as well.

> Second, I found that records longer than 22 fields are not supported.
> Whenever I have a record that is longer than that I receive a build error
> as

Flink’s Tuple classes go up to Tuple25. You can work around this by using a 
custom PoJo type, e.g.

class TPCHRecord {
public int f0;
...
public int f99;
}

If possible, I would suggest to update to the latest 0.9 or the upcoming 0.10 
release. A lot of stuff has been fixed since 0.8. I think it will be worth it. 
If you encounter any problems while doing this, feel free to ask here. :)

– Ufuk

Re: Apache Flink and serious streaming stateful processing

2015-10-14 Thread Krzysztof Zarzycki
Hi guys!
I'm sorry I have abandoned this thread but I had to give up Flink for some
time. Now I'm back and would like to resurrect this thread. Flink has
rapidly evolved in this time too, so maybe new features will allow me what
I want to do. By the way, I heard really only good stuff about you from
Flink Forward conference!

First, about back-pressure. As you said, it is working well so I'm taking
it as granted. Sounds great!

Let's focus now on stateful processing:

To back up what I mean, I'm citing some numbers of the state I'm currently
holding:
My stream processing program keeps around 300GB in 1 month state, but it
will be holding around 2 months, so twice as much (600 GB). The state is
key-value store, where key is some user id & value is actually a list of
events correlated with the user. There are tens of millions of keys -
unique user ids. The stream is partitioned on user id, so my state can be
partitioned on user id as well.
Currently I keep this "state" in Cassandra, so externally to the program,
but this is my biggest pain as the communication cost is large, especially
when I do reprocessing of my streaming data.

Now what I would like to have is some abstraction available in Flink, that
allows me to keep the state out-of-core, but embedded. I would use it as
key-value store and Flink will journal & replicate all the update
operations, so they are recoverable on failure, when the state (or its
partition) is lost.
To describe my idea in code, I imagine the following pseudocode (totally
abstracted from Flink):
class MyProcessor {
  val keyValueState = Flink.createKeyValueState("name-it")

  def processRecord(r: Record) {
 val userList = keyValueState.get(r.get("userId"))
 userList += r.get("someData")
 keyValueState.put(r.get("userId"), userList)
  }
}

Something similar is in Samza, with grants:
- all puts are replicated (by saving the put in separate Kafka topic).
- on failure & recover, the state is recovered from the saved puts, before
starting the processing.


Last time, you said that you're "working on making incrementally backed-up
key/value state a first-class citizen in Flink, but is is still WIP".  How
this change since then? Do you already support the case that I just
described?


Thanks for the idea of MapDB. I couldn't  find any benchmark of MapDB
out-of-core performance , and I don't know yet if it can match performance
of RocksDB-like database, but I will try to find time to check it.
In meantime, this is the performance that attracts me to RocksDb:

> Measure performance to load 1B keys into the database. The keys are
> inserted in random order.
>  rocksdb: 103 minutes, 80 MB/sec (total data size 481 GB, 1 billion
> key-values)
> Measure performance to load 1B keys into the database. The keys are
> inserted in sequential order.
> rocksdb: 36 minutes, 370 MB/sec (total data size 760 GB)


[1] https://github.com/facebook/rocksdb/wiki/Performance-Benchmarks

Cheers!
Krzysiek

2015-06-30 15:00 GMT+02:00 Ufuk Celebi :

>
> On 30 Jun 2015, at 14:23, Gyula Fóra  wrote:
> > 2. We have support for stateful processing in Flink in many ways you
> have described in your question. Unfortunately the docs are down currently
> but you should check out the 'Stateful processing' section in the 0.10 docs
> (once its back online). We practically support an OperatorState interface
> which let's you keep partitioned state by some key and access it from
> runtime operators. The states declared using these interfaces are
> checkpointed and will be restored on failure. Currently all the states are
> stored in-memory but we are planning to extend it to allow writing state
> updates to external systems.
>
>
> http://flink.apache.org/docs/master/apis/streaming_guide.html#stateful-computation