Re: store and retrieve Graph object

2015-11-25 Thread Vasiliki Kalavri
Hi Stefane,

let me know if I understand the problem correctly. The vertex values are
POJOs that you're somehow inferring from the edge list and this value
creation is what takes a lot of time? Since a graph is just a set of 2
datasets (vertices and edges), you could store the values to disk and have
a custom input format to read them into datasets. Would that work for you?

-Vasia.

On 25 November 2015 at 15:09, Stefanos Antaris 
wrote:

> Hi to all,
>
> i am working on a project with Gelly and i need to create a graph with
> billions of nodes. Although i have the edge list, the node in the Graph
> needs to be a POJO object, the construction of which takes long time in
> order to finally create the final graph. Is it possible to store the Graph
> object as a file and retrieve it whenever i want to run an experiment?
>
> Thanks,
> Stefanos


Re: store and retrieve Graph object

2015-11-25 Thread Stefanos Antaris
Hi Vasia,

my graph object is the following: 

Graph graph = 
Graph.fromCollection(edgeList.collect(), env);

The vertex is a POJO not the value. So the problem is how could i store and 
retrieve the vertex list? 

Thanks,
Stefanos

> On 25 Nov 2015, at 18:16, Vasiliki Kalavri  wrote:
> 
> Hi Stefane,
> 
> let me know if I understand the problem correctly. The vertex values are 
> POJOs that you're somehow inferring from the edge list and this value 
> creation is what takes a lot of time? Since a graph is just a set of 2 
> datasets (vertices and edges), you could store the values to disk and have a 
> custom input format to read them into datasets. Would that work for you?
> 
> -Vasia.
> 
> On 25 November 2015 at 15:09, Stefanos Antaris  > wrote:
> Hi to all,
> 
> i am working on a project with Gelly and i need to create a graph with 
> billions of nodes. Although i have the edge list, the node in the Graph needs 
> to be a POJO object, the construction of which takes long time in order to 
> finally create the final graph. Is it possible to store the Graph object as a 
> file and retrieve it whenever i want to run an experiment?
> 
> Thanks,
> Stefanos
> 



[ANNOUNCE] CFP open for ApacheCon North America 2016

2015-11-25 Thread Rich Bowen
Community growth starts by talking with those interested in your
project. ApacheCon North America is coming, are you?

We are delighted to announce that the Call For Presentations (CFP) is
now open for ApacheCon North America. You can submit your proposed
sessions at
http://events.linuxfoundation.org/events/apache-big-data-north-america/program/cfp
for big data talks and
http://events.linuxfoundation.org/events/apachecon-north-america/program/cfp
for all other topics.

ApacheCon North America will be held in Vancouver, Canada, May 9-13th
2016. ApacheCon has been running every year since 2000, and is the place
to build your project communities.

While we will consider individual talks we prefer to see related
sessions that are likely to draw users and community members. When
submitting your talk work with your project community and with related
communities to come up with a full program that will walk attendees
through the basics and on into mastery of your project in example use
cases. Content that introduces what's new in your latest release is also
of particular interest, especially when it builds upon existing well
know application models. The goal should be to showcase your project in
ways that will attract participants and encourage engagement in your
community, Please remember to involve your whole project community (user
and dev lists) when building content. This is your chance to create a
project specific event within the broader ApacheCon conference.

Content at ApacheCon North America will be cross-promoted as
mini-conferences, such as ApacheCon Big Data, and ApacheCon Mobile, so
be sure to indicate which larger category your proposed sessions fit into.

Finally, please plan to attend ApacheCon, even if you're not proposing a
talk. The biggest value of the event is community building, and we count
on you to make it a place where your project community is likely to
congregate, not just for the technical content in sessions, but for
hackathons, project summits, and good old fashioned face-to-face networking.

-- 
rbo...@apache.org
http://apache.org/


Re: Working with State example /flink streaming

2015-11-25 Thread Stephan Ewen
Hi Javier!

You can solve this both using windows, or using manual state.

What is better depends a bit on when you want to have the result (the sum).
Do you want a result emitted after each update (or do some other operation
with that value) or do you want only the final sum after a certain time?

For the second variant, I would use a window, for the first variant, you
could use custom state as follows:

For each element, you take the current state for the key, add the value to
get the new sum. Then you update the state with the new sum and emit the
value as well...

Java:

DataStream> stream =
...;DataStream> result = stream.keyBy(0).map(new
RollingSum());


public class RollingSum extends RichMapFunction,
Tuple2> {

private OperatorState sum;

@Override
public Tuple2 map(Tuple2 value) {
*long *newSum = sum.value() + value.f1;sum.update(newSum);
return *new* Tuple2<>(value.f0, newSum);
}

@Override
public void open(Configuration config) {
counter = getRuntimeContext().getKeyValueState("myCounter",
Long.class, 0L);
}}



In Scala, you can write this briefly as:

val stream: DataStream[(String, Int)] = *...*
val counts: DataStream[(String, Int)] = stream
  .keyBy(_._1)
  .mapWithState((in: (String, Int), sum: Option[Int]) => {*val*
newSum = in._2 + sum.getOrElse(0)
( (in._1, newSum), Some(newSum) )
 }


Does that help?

Thanks also for pointing out the error in the sample code...

Greetings,
Stephan


On Wed, Nov 25, 2015 at 4:55 PM, Lopez, Javier 
wrote:

> Hi,
>
> We are trying to do a test using States but we have not been able to
> achieve our desired result. Basically we have a data stream with data as
> [{"id":"11","value":123}] and we want to calculate the sum of all values
> grouping by ID. We were able to achieve this using windows but not with
>  states. The example that is in the documentation (
> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming_guide.html#working-with-state)
> is not very clear and even has some errors, for example:
>
> public class CounterSum implements RichReduceFunction
>
> should be
>
> public class CounterSum extends RichReduceFunction
>
> as RichReduceFuncion is a Class, not an interface.
>
> We wanted to ask you if you have an example of how to use States with
> Flink.
>
> Thanks in advance for your help.
>
>
>


Re: Custom TimestampExtractor and FlinkKafkaConsumer082

2015-11-25 Thread Aljoscha Krettek
Hi Konstantin,
I still didn’t come up with an explanation for the behavior. Could you maybe 
send me example code (and example data if it is necessary to reproduce the 
problem.)? This would really help me pinpoint the problem.

Cheers,
Aljoscha
> On 17 Nov 2015, at 21:42, Konstantin Knauf  
> wrote:
> 
> Hi Aljoscha,
> 
> Are you sure? I am running the job from my IDE at the moment.
> 
> If I set
> 
> StreamExecutionEnvironment.setParallelism(1);
> 
> I works with the old TimestampExtractor (returning Long.MIN_VALUE from
> getCurrentWatermark() and emitting a watermark at every record)
> 
> If I set
> 
> StreamExecutionEnvironment.setParallelism(5);
> 
> it does not work.
> 
> So, if I understood you correctly, it is the opposite of what you were
> expecting?!
> 
> Cheers,
> 
> Konstantin
> 
> 
> On 17.11.2015 11:32, Aljoscha Krettek wrote:
>> Hi,
>> actually, the bug is more subtle. Normally, it is not a problem that the 
>> TimestampExtractor sometimes emits a watermark that is lower than the one 
>> before. (This is the result of the bug with Long.MIN_VALUE I mentioned 
>> before). The stream operators wait for watermarks from all upstream 
>> operators and only advance the watermark monotonically in lockstep with 
>> them. This way, the watermark cannot decrease at an operator.
>> 
>> In your case, you have a topology with parallelism 1, I assume. In that case 
>> the operators are chained. (There is no separate operators but basically 
>> only one operator and element transmission happens in function calls). In 
>> this setting the watermarks are directly forwarded to operators without 
>> going through the logic I mentioned above.
>> 
>> Cheers,
>> Aljoscha
>>> On 16 Nov 2015, at 18:13, Konstantin Knauf  
>>> wrote:
>>> 
>>> Hi Aljoscha,
>>> 
>>> I changed the Timestamp Extraktor to save the lastSeenTimestamp and only
>>> emit with getCurrentWatermark [1] as you suggested. So basically I do
>>> the opposite than before (only watermarks per events vs only watermarks
>>> per autowatermark). And now it works :). The question remains, why it
>>> did not work before. As far as I see, it is an issue with the first
>>> TimestmapExtractor itself?!
>>> 
>>> Does getCurrentWatermark(..) somehow "overpower" the extracted watermarks?
>>> 
>>> Cheers,
>>> 
>>> Konstantin
>>> 
>>> [1]
>>> 
>>>   final private long maxDelay;
>>>   private long lastTimestamp = Long.MIN_VALUE;
>>> 
>>>   public PojoTimestampExtractor(long maxDelay) {
>>>   this.maxDelay = maxDelay;
>>>   }
>>> 
>>>   @Override
>>>   public long extractTimestamp(Pojo pojo, long l) {
>>>   lastTimestamp = pojo.getTime();
>>>   return pojo.getTime();
>>>   }
>>> 
>>>   @Override
>>>   public long extractWatermark(Pojo pojo, long l) {
>>>   return Long.MIN_VALUE;
>>>   }
>>> 
>>>   @Override
>>>   public long getCurrentWatermark() {
>>>   return lastTimestamp - maxDelay;
>>>   }
>>> 
>>> 
>>> On 16.11.2015 13:37, Aljoscha Krettek wrote:
 Hi,
 yes, at your data-rate emitting a watermark for every element should not 
 be a problem. It could become a problem with higher data-rates since the 
 system can get overwhelmed if every element also generates a watermark. In 
 that case I would suggest storing the lastest element-timestamp in an 
 internal field and only emitting in getCurrentWatermark(), since then, 
 then the watermark interval can be tunes using the auto-watermark interval 
 setting.
 
 But that should not be the cause of the problem that you currently have. 
 Would you maybe be willing to send me some (mock) example data and the 
 code so that I can reproduce the problem and have a look at it? to 
 aljoscha at apache.org.
 
 Cheers,
 Aljoscha
> On 16 Nov 2015, at 13:05, Konstantin Knauf  
> wrote:
> 
> Hi Aljoscha,
> 
> ok, now I at least understand, why it works with fromElements(...). For
> the rest I am not so sure.
> 
>> What this means in your case is that the watermark can only advance if
> a new element arrives, because only then is the watermark updated.
> 
> But new elements arrive all the time, about 50/s, or do you mean
> something else?
> 
> getCurrentWatermark returning Long.MIN_VALUE still seems to be an ok
> choice, if i understand the semantics correctly. It just affects
> watermarking in the absence of events, right?
> 
> Cheers,
> 
> Konstantin
> 
> 
> On 16.11.2015 12:31, Aljoscha Krettek wrote:
>> Hi,
>> it could be what Gyula mentioned. Let me first go a bit into how the 
>> TimestampExtractor works internally.
>> 
>> First, the timestamp extractor internally keeps the value of the last 
>> emitted watermark. Then, the semantics of the TimestampExtractor are as 
>> follows :
>> - the result of extractTimestamp is taken and it 

Re: [VOTE] Release Apache Flink 0.10.1 (release-0.10.0-rc1)

2015-11-25 Thread Henry Saputra
+1

LICENSE file looks good in source artifact
NOTICE file looks good in source artifact
Signature file looks good in source artifact
Hash files looks good in source artifact
No 3rd party executables in source artifact
Source compiled
All tests are passed
Run standalone mode test app

- Henry

On Mon, Nov 23, 2015 at 4:45 AM, Robert Metzger  wrote:
> Hi All,
>
> this is the first bugfix release for the 0.10 series of Flink.
> I've CC'ed the user@ list if users are interested in helping to verify the
> release.
>
> It contains fixes for critical issues, in particular:
> - FLINK-3021 Fix class loading issue for streaming sources
> - FLINK-2974 Add periodic offset committer for Kafka
> - FLINK-2977 Using reflection to load HBase Kerberos tokens
> - FLINK-3024 Fix TimestampExtractor.getCurrentWatermark() Behaviour
> - FLINK-2967 Increase timeout for LOCAL_HOST address detection stratey
> - FLINK-3025 [kafka consumer] Bump transitive ZkClient dependency
> - FLINK-2989 job cancel button doesn't work on YARN
> - FLINK-3032: Flink does not start on Hadoop 2.7.1 (HDP), due to class
> conflict
> - FLINK-3011, 3019, 3028 Cancel jobs in RESTARTING state
>
> This is the guide on how to verify a release:
> https://cwiki.apache.org/confluence/display/FLINK/Releasing
>
> During the testing, please focus on trying out Flink on different Hadoop
> platforms: We changed the way how Hadoop's Maven dependencies are packaged,
> so maybe there are issues with different Hadoop distributions.
> The Kafka consumer also changed a bit, would be good to test it on a
> cluster.
>
> -
>
> Please vote on releasing the following candidate as Apache Flink version
> 0.10.1:
>
> The commit to be voted on:
> http://git-wip-us.apache.org/repos/asf/flink/commit/2e9b2316
>
> Branch:
> release-0.10.1-rc1 (see
> https://git1-us-west.apache.org/repos/asf/flink/?p=flink.git)
>
> The release artifacts to be voted on can be found at:
> http://people.apache.org/~rmetzger/flink-0.10.1-rc1/
>
> The release artifacts are signed with the key with fingerprint  D9839159:
> http://www.apache.org/dist/flink/KEYS
>
> The staging repository for this release can be found at:
> https://repository.apache.org/content/repositories/orgapacheflink-1058
>
> -
>
> The vote is open for the next 72 hours and passes if a majority of at least
> three +1 PMC votes are cast.
>
> The vote ends on Wednesday, November 25.
>
> [ ] +1 Release this package as Apache Flink 0.10.1
> [ ] -1 Do not release this package because ...
>
> ===


Re: Running on a firewalled Yarn cluster?

2015-11-25 Thread Robert Metzger
Hi,
I just wanted to let you know that I didn't forget about this!

The BlobManager in 1.0-SNAPSHOT has already a configuration parameter to
use a certain range of ports.
I'm trying to add the same feature for YARN tomorrow.
Sorry for the delay.


On Tue, Nov 10, 2015 at 9:27 PM, Cory Monty 
wrote:

> Thanks, Stephan.
>
> I'll give those two workarounds a try!
>
> On Tue, Nov 10, 2015 at 2:18 PM, Stephan Ewen  wrote:
>
>> Hi Cory!
>>
>> There is no flag to define the BlobServer port right now, but we should
>> definitely add this: https://issues.apache.org/jira/browse/FLINK-2996
>>
>> If your setup is such that the firewall problem is only between client
>> and master node (and the workers can reach the master on all ports), then
>> you can try two workarounds:
>>
>> 1) Start the program in the cluster (or on the master node, via ssh).
>>
>> 2) Add the program jar to the lib directory of Flink, and start your
>> program with the RemoteExecutor, without a jar attachment. Then it only
>> needs to communicate to the actor system (RPC) port, which is not random in
>> standalone mode (6123 by default).
>>
>> Stephan
>>
>>
>>
>>
>> On Tue, Nov 10, 2015 at 8:46 PM, Cory Monty 
>> wrote:
>>
>>> I'm also running into an issue with a non-YARN cluster. When submitting
>>> a JAR to Flink, we'll need to have an arbitrary port open on all of the
>>> hosts, which we don't know about until the socket attempts to bind; a bit
>>> of a problem for us.
>>>
>>> Are there ways to submit a JAR to Flink that bypasses the need for the
>>> BlobServer's random port binding? Or, to control the port BlobServer binds
>>> to?
>>>
>>> Cheers,
>>>
>>> Cory
>>>
>>> On Thu, Nov 5, 2015 at 8:07 AM, Niels Basjes  wrote:
>>>
 That is what I tried. Couldn't find that port though.

 On Thu, Nov 5, 2015 at 3:06 PM, Robert Metzger 
 wrote:

> Hi,
>
> cool, that's good news.
>
> The RM proxy is only for the web interface of the AM.
>
>  I'm pretty sure that the MapReduce AM has at least two ports:
> - one for the web interface (accessible through the RM proxy, so
> behind the firewall)
> - one for the AM RPC (and that port is allocated within the configured
> range, open through the firewall).
>
> You can probably find the RPC port in the log file of the running
> MapReduce AM (to find that, identify the NodeManager running the AM, 
> access
> the NM web interface and retrieve the logs of the container running the 
> AM).
>
> Maybe the mapreduce client also logs the AM RPC port when querying the
> status of a running job.
>
>
> On Thu, Nov 5, 2015 at 2:59 PM, Niels Basjes  wrote:
>
>> Hi,
>>
>> I checked and this setting has been set to a limited port range of
>> only 100 port numbers.
>>
>> I tried to find the actual port an AM is running on and couldn't find
>> it (I'm not the admin on that cluster)
>>
>> The url to the AM that I use to access it always looks like this:
>>
>> http://master-001.xx.net:8088/proxy/application_1443166961758_85492/index.html
>>
>> As you can see I never connect directly; always via the proxy that
>> runs over the master on a single fixed port.
>>
>> Niels
>>
>> On Thu, Nov 5, 2015 at 2:46 PM, Robert Metzger 
>> wrote:
>>
>>> While discussing with my colleagues about the issue today, we came
>>> up with another approach to resolve the issue:
>>>
>>> d) Upload the job jar to HDFS (or another FS) and trigger the
>>> execution of the jar using an HTTP request to the web interface.
>>>
>>> We could add some tooling into the /bin/flink client to submit a job
>>> like this transparently, so users would not need to bother with the file
>>> upload and request sending.
>>> Also, Sachin started a discussion on the dev@ list to add support
>>> for submitting jobs over the web interface, so maybe we can base the fix
>>> for FLINK-2960 on that.
>>>
>>> I've also looked into the Hadoop MapReduce code and it seems they do
>>> the following:
>>> When submitting a job, they are uploading the job jar file to HDFS.
>>> They also upload a configuration file that contains all the config 
>>> options
>>> of the job. Then, they submit this altogether as an application to YARN.
>>> So far, there has not been any firewall involved. They establish a
>>> connection between the JobClient and the ApplicationMaster when the 
>>> user is
>>> querying the current job status, but I could not find any special code
>>> getting the status over HTTP.
>>>
>>> But I found the following configuration parameter:
>>> "yarn.app.mapreduce.am.job.client.port-range", so it seems that they 
>>> try to
>>> 

Re: Custom TimestampExtractor and FlinkKafkaConsumer082

2015-11-25 Thread Konstantin Knauf
Hi Aljoscha,

sure, will do. I have neither found a solution. I won't have time to put
a minimal example together before the weekend though.

Cheers,

Konstantin

On 25.11.2015 19:10, Aljoscha Krettek wrote:
> Hi Konstantin,
> I still didn’t come up with an explanation for the behavior. Could you maybe 
> send me example code (and example data if it is necessary to reproduce the 
> problem.)? This would really help me pinpoint the problem.
> 
> Cheers,
> Aljoscha
>> On 17 Nov 2015, at 21:42, Konstantin Knauf  
>> wrote:
>>
>> Hi Aljoscha,
>>
>> Are you sure? I am running the job from my IDE at the moment.
>>
>> If I set
>>
>> StreamExecutionEnvironment.setParallelism(1);
>>
>> I works with the old TimestampExtractor (returning Long.MIN_VALUE from
>> getCurrentWatermark() and emitting a watermark at every record)
>>
>> If I set
>>
>> StreamExecutionEnvironment.setParallelism(5);
>>
>> it does not work.
>>
>> So, if I understood you correctly, it is the opposite of what you were
>> expecting?!
>>
>> Cheers,
>>
>> Konstantin
>>
>>
>> On 17.11.2015 11:32, Aljoscha Krettek wrote:
>>> Hi,
>>> actually, the bug is more subtle. Normally, it is not a problem that the 
>>> TimestampExtractor sometimes emits a watermark that is lower than the one 
>>> before. (This is the result of the bug with Long.MIN_VALUE I mentioned 
>>> before). The stream operators wait for watermarks from all upstream 
>>> operators and only advance the watermark monotonically in lockstep with 
>>> them. This way, the watermark cannot decrease at an operator.
>>>
>>> In your case, you have a topology with parallelism 1, I assume. In that 
>>> case the operators are chained. (There is no separate operators but 
>>> basically only one operator and element transmission happens in function 
>>> calls). In this setting the watermarks are directly forwarded to operators 
>>> without going through the logic I mentioned above.
>>>
>>> Cheers,
>>> Aljoscha
 On 16 Nov 2015, at 18:13, Konstantin Knauf  
 wrote:

 Hi Aljoscha,

 I changed the Timestamp Extraktor to save the lastSeenTimestamp and only
 emit with getCurrentWatermark [1] as you suggested. So basically I do
 the opposite than before (only watermarks per events vs only watermarks
 per autowatermark). And now it works :). The question remains, why it
 did not work before. As far as I see, it is an issue with the first
 TimestmapExtractor itself?!

 Does getCurrentWatermark(..) somehow "overpower" the extracted watermarks?

 Cheers,

 Konstantin

 [1]

   final private long maxDelay;
   private long lastTimestamp = Long.MIN_VALUE;

   public PojoTimestampExtractor(long maxDelay) {
   this.maxDelay = maxDelay;
   }

   @Override
   public long extractTimestamp(Pojo pojo, long l) {
   lastTimestamp = pojo.getTime();
   return pojo.getTime();
   }

   @Override
   public long extractWatermark(Pojo pojo, long l) {
   return Long.MIN_VALUE;
   }

   @Override
   public long getCurrentWatermark() {
   return lastTimestamp - maxDelay;
   }


 On 16.11.2015 13:37, Aljoscha Krettek wrote:
> Hi,
> yes, at your data-rate emitting a watermark for every element should not 
> be a problem. It could become a problem with higher data-rates since the 
> system can get overwhelmed if every element also generates a watermark. 
> In that case I would suggest storing the lastest element-timestamp in an 
> internal field and only emitting in getCurrentWatermark(), since then, 
> then the watermark interval can be tunes using the auto-watermark 
> interval setting.
>
> But that should not be the cause of the problem that you currently have. 
> Would you maybe be willing to send me some (mock) example data and the 
> code so that I can reproduce the problem and have a look at it? to 
> aljoscha at apache.org.
>
> Cheers,
> Aljoscha
>> On 16 Nov 2015, at 13:05, Konstantin Knauf 
>>  wrote:
>>
>> Hi Aljoscha,
>>
>> ok, now I at least understand, why it works with fromElements(...). For
>> the rest I am not so sure.
>>
>>> What this means in your case is that the watermark can only advance if
>> a new element arrives, because only then is the watermark updated.
>>
>> But new elements arrive all the time, about 50/s, or do you mean
>> something else?
>>
>> getCurrentWatermark returning Long.MIN_VALUE still seems to be an ok
>> choice, if i understand the semantics correctly. It just affects
>> watermarking in the absence of events, right?
>>
>> Cheers,
>>
>> Konstantin
>>
>>
>> On 16.11.2015 12:31, Aljoscha Krettek wrote:
>>> Hi,
>>> it could be what Gyula mentioned. Let 

Re: store and retrieve Graph object

2015-11-25 Thread Vasiliki Kalavri
Good to know :)

On 25 November 2015 at 21:44, Stefanos Antaris 
wrote:

> Hi,
>
> It works fine using this approach.
>
> Thanks,
> Stefanos
>
> On 25 Nov 2015, at 20:32, Vasiliki Kalavri 
> wrote:
>
> Hey,
>
> you can preprocess your data, create the vertices and store them to a
> file, like you would store any other Flink DataSet, e.g. with writeAsText.
>
> Then, you can create the graph by reading 2 datasets, like this:
>
> DataSet vertices = env.readTextFile("/path/to/vertices/")... // or
> your custom reading logic
> DataSet edges = ...
>
> Graph graph = Graph.fromDataSet(vertices, edges, env);
>
> Is this what you're looking for?
>
> Also, note that if you have a very large graph, you should avoid using
> collect() and fromCollection().
>
> -Vasia.
>
> On 25 November 2015 at 18:03, Stefanos Antaris  > wrote:
>
>> Hi Vasia,
>>
>> my graph object is the following:
>>
>> Graph graph = Graph.fromCollection(
>> edgeList.collect(), env);
>>
>> The vertex is a POJO not the value. So the problem is how could i store
>> and retrieve the vertex list?
>>
>> Thanks,
>> Stefanos
>>
>> On 25 Nov 2015, at 18:16, Vasiliki Kalavri 
>> wrote:
>>
>> Hi Stefane,
>>
>> let me know if I understand the problem correctly. The vertex values are
>> POJOs that you're somehow inferring from the edge list and this value
>> creation is what takes a lot of time? Since a graph is just a set of 2
>> datasets (vertices and edges), you could store the values to disk and have
>> a custom input format to read them into datasets. Would that work for you?
>>
>> -Vasia.
>>
>> On 25 November 2015 at 15:09, Stefanos Antaris <
>> antaris.stefa...@gmail.com> wrote:
>>
>>> Hi to all,
>>>
>>> i am working on a project with Gelly and i need to create a graph with
>>> billions of nodes. Although i have the edge list, the node in the Graph
>>> needs to be a POJO object, the construction of which takes long time in
>>> order to finally create the final graph. Is it possible to store the Graph
>>> object as a file and retrieve it whenever i want to run an experiment?
>>>
>>> Thanks,
>>> Stefanos
>>
>>
>>
>>
>
>


Re: store and retrieve Graph object

2015-11-25 Thread Stefanos Antaris
Hi,

It works fine using this approach. 

Thanks,
Stefanos

> On 25 Nov 2015, at 20:32, Vasiliki Kalavri  wrote:
> 
> Hey,
> 
> you can preprocess your data, create the vertices and store them to a file, 
> like you would store any other Flink DataSet, e.g. with writeAsText.
> 
> Then, you can create the graph by reading 2 datasets, like this:
> 
> DataSet vertices = env.readTextFile("/path/to/vertices/")... // or 
> your custom reading logic
> DataSet edges = ...
> 
> Graph graph = Graph.fromDataSet(vertices, edges, env);
> 
> Is this what you're looking for?
> 
> Also, note that if you have a very large graph, you should avoid using 
> collect() and fromCollection().
> 
> -Vasia.
> 
> On 25 November 2015 at 18:03, Stefanos Antaris  > wrote:
> Hi Vasia,
> 
> my graph object is the following: 
> 
> Graph graph = 
> Graph.fromCollection(edgeList.collect(), env);
> 
> The vertex is a POJO not the value. So the problem is how could i store and 
> retrieve the vertex list? 
> 
> Thanks,
> Stefanos
> 
>> On 25 Nov 2015, at 18:16, Vasiliki Kalavri > > wrote:
>> 
>> Hi Stefane,
>> 
>> let me know if I understand the problem correctly. The vertex values are 
>> POJOs that you're somehow inferring from the edge list and this value 
>> creation is what takes a lot of time? Since a graph is just a set of 2 
>> datasets (vertices and edges), you could store the values to disk and have a 
>> custom input format to read them into datasets. Would that work for you?
>> 
>> -Vasia.
>> 
>> On 25 November 2015 at 15:09, Stefanos Antaris > > wrote:
>> Hi to all,
>> 
>> i am working on a project with Gelly and i need to create a graph with 
>> billions of nodes. Although i have the edge list, the node in the Graph 
>> needs to be a POJO object, the construction of which takes long time in 
>> order to finally create the final graph. Is it possible to store the Graph 
>> object as a file and retrieve it whenever i want to run an experiment?
>> 
>> Thanks,
>> Stefanos
>> 
> 
> 



Re: Standalone Cluster vs YARN

2015-11-25 Thread Andreas Fritzler
Hi Welly,

you will need Zookeeper if you want to setup the standalone cluster in HA
mode.
http://spark.apache.org/docs/latest/spark-standalone.html#high-availability

In the YARN case you probably have already Zookeeper in place if you are
running YARN in HA mode.

Regards,
Andreas

On Wed, Nov 25, 2015 at 10:02 AM, Welly Tambunan  wrote:

> Hi Ufuk
>
> >In failure cases I find YARN more convenient, because it takes care of
> restarting failed task manager processes/containers for you.
>
> So this mean that we don't need zookeeper ?
>
>
> Cheers
>
> On Wed, Nov 25, 2015 at 3:46 PM, Ufuk Celebi  wrote:
>
>> > On 25 Nov 2015, at 02:35, Welly Tambunan  wrote:
>> >
>> > Hi All,
>> >
>> > I would like to know if there any feature differences between using
>> Standalone Cluster vs YARN ?
>> >
>> > Until now we are using Standalone cluster for our jobs.
>> > Is there any added value for using YARN ?
>> >
>> > We don't have any hadoop infrastructure in place right now but we can
>> provide that if there's some value to that.
>>
>> There are no features, which only work on YARN or in standalone clusters.
>> YARN mode is essentially starting a standalone cluster in YARN containers.
>>
>> In failure cases I find YARN more convenient, because it takes care of
>> restarting failed task manager processes/containers for you.
>>
>> – Ufuk
>>
>>
>
>
> --
> Welly Tambunan
> Triplelands
>
> http://weltam.wordpress.com
> http://www.triplelands.com 
>


Re: Standalone Cluster vs YARN

2015-11-25 Thread Till Rohrmann
Hi Welly,

at the moment Flink only supports HA via ZooKeeper. However, there is no
limitation to use another system. The only requirement is that this system
allows you to find a consensus among multiple participants and to retrieve
the community decision. If this is possible, then it can be integrated into
Flink to serve as an alternative HA backend.

Cheers,
Till

On Wed, Nov 25, 2015 at 10:30 AM, Maximilian Michels  wrote:

> Hi Welly,
>
> > However YARN is still tightly couple to HDFS, is that seems wasteful to
> use only YARN without Hadoop ?
>
> I wouldn't say tightly coupled. You can use YARN without HDFS. To work
> with YARN properly, you would have to setup another distributed file
> system like xtreemfs. Or use the one provided with the AWS or Google
> Cloud Platform. You can tell Hadoop which file system to use by
> modifying "fs.default.name" in the Hadoop config.
>
> Cheers,
> Max
>
> On Wed, Nov 25, 2015 at 10:06 AM, Welly Tambunan 
> wrote:
> > Hi Fabian,
> >
> > Interesting !
> >
> > However YARN is still tightly couple to HDFS, is that seems wasteful to
> use
> > only YARN without Hadoop ?
> >
> > Currently we are using Cassandra and CFS ( cass file system )
> >
> >
> > Cheers
> >
> > On Wed, Nov 25, 2015 at 3:51 PM, Fabian Hueske 
> wrote:
> >>
> >> A strong argument for YARN mode can be the isolation of multiple users
> and
> >> jobs. You can easily start a new Flink cluster for each job or user.
> >> However, this comes at the price of resource (memory) fragmentation.
> YARN
> >> mode does not use memory as effective as cluster mode.
> >>
> >> 2015-11-25 9:46 GMT+01:00 Ufuk Celebi :
> >>>
> >>> > On 25 Nov 2015, at 02:35, Welly Tambunan  wrote:
> >>> >
> >>> > Hi All,
> >>> >
> >>> > I would like to know if there any feature differences between using
> >>> > Standalone Cluster vs YARN ?
> >>> >
> >>> > Until now we are using Standalone cluster for our jobs.
> >>> > Is there any added value for using YARN ?
> >>> >
> >>> > We don't have any hadoop infrastructure in place right now but we can
> >>> > provide that if there's some value to that.
> >>>
> >>> There are no features, which only work on YARN or in standalone
> clusters.
> >>> YARN mode is essentially starting a standalone cluster in YARN
> containers.
> >>>
> >>> In failure cases I find YARN more convenient, because it takes care of
> >>> restarting failed task manager processes/containers for you.
> >>>
> >>> – Ufuk
> >>>
> >>
> >
> >
> >
> > --
> > Welly Tambunan
> > Triplelands
> >
> > http://weltam.wordpress.com
> > http://www.triplelands.com
>


Re: Standalone Cluster vs YARN

2015-11-25 Thread Fabian Hueske
YARN is not a replacement for Zookeeper. Zookeeper is mandatory to run
Flink in high-availability mode and takes care of leader (JobManager)
election and meta-data persistance.

With YARN, Flink can automatically start new Taskmanagers (and JobManagers)
to compensate for failures. In cluster mode, you need stand-by TMs and JMs
and manually take care that these are "filled-up" again in case of a
failure.

2015-11-25 10:06 GMT+01:00 Welly Tambunan :

> Hi Fabian,
>
> Interesting !
>
> However YARN is still tightly couple to HDFS, is that seems wasteful to
> use only YARN without Hadoop ?
>
> Currently we are using Cassandra and CFS ( cass file system )
>
>
> Cheers
>
> On Wed, Nov 25, 2015 at 3:51 PM, Fabian Hueske  wrote:
>
>> A strong argument for YARN mode can be the isolation of multiple users
>> and jobs. You can easily start a new Flink cluster for each job or user.
>> However, this comes at the price of resource (memory) fragmentation. YARN
>> mode does not use memory as effective as cluster mode.
>>
>> 2015-11-25 9:46 GMT+01:00 Ufuk Celebi :
>>
>>> > On 25 Nov 2015, at 02:35, Welly Tambunan  wrote:
>>> >
>>> > Hi All,
>>> >
>>> > I would like to know if there any feature differences between using
>>> Standalone Cluster vs YARN ?
>>> >
>>> > Until now we are using Standalone cluster for our jobs.
>>> > Is there any added value for using YARN ?
>>> >
>>> > We don't have any hadoop infrastructure in place right now but we can
>>> provide that if there's some value to that.
>>>
>>> There are no features, which only work on YARN or in standalone
>>> clusters. YARN mode is essentially starting a standalone cluster in YARN
>>> containers.
>>>
>>> In failure cases I find YARN more convenient, because it takes care of
>>> restarting failed task manager processes/containers for you.
>>>
>>> – Ufuk
>>>
>>>
>>
>
>
> --
> Welly Tambunan
> Triplelands
>
> http://weltam.wordpress.com
> http://www.triplelands.com 
>


Re: Standalone Cluster vs YARN

2015-11-25 Thread Andreas Fritzler
Hi Welly,

If you want to use cassandra, you might want to look into having a Mesos
cluster with frameworks for cassandra and spark.

Regards,
Andreas

[1] http://spark.apache.org/docs/latest/running-on-mesos.html
[2] https://github.com/mesosphere/cassandra-mesos

On Wed, Nov 25, 2015 at 10:30 AM, Maximilian Michels  wrote:

> Hi Welly,
>
> > However YARN is still tightly couple to HDFS, is that seems wasteful to
> use only YARN without Hadoop ?
>
> I wouldn't say tightly coupled. You can use YARN without HDFS. To work
> with YARN properly, you would have to setup another distributed file
> system like xtreemfs. Or use the one provided with the AWS or Google
> Cloud Platform. You can tell Hadoop which file system to use by
> modifying "fs.default.name" in the Hadoop config.
>
> Cheers,
> Max
>
> On Wed, Nov 25, 2015 at 10:06 AM, Welly Tambunan 
> wrote:
> > Hi Fabian,
> >
> > Interesting !
> >
> > However YARN is still tightly couple to HDFS, is that seems wasteful to
> use
> > only YARN without Hadoop ?
> >
> > Currently we are using Cassandra and CFS ( cass file system )
> >
> >
> > Cheers
> >
> > On Wed, Nov 25, 2015 at 3:51 PM, Fabian Hueske 
> wrote:
> >>
> >> A strong argument for YARN mode can be the isolation of multiple users
> and
> >> jobs. You can easily start a new Flink cluster for each job or user.
> >> However, this comes at the price of resource (memory) fragmentation.
> YARN
> >> mode does not use memory as effective as cluster mode.
> >>
> >> 2015-11-25 9:46 GMT+01:00 Ufuk Celebi :
> >>>
> >>> > On 25 Nov 2015, at 02:35, Welly Tambunan  wrote:
> >>> >
> >>> > Hi All,
> >>> >
> >>> > I would like to know if there any feature differences between using
> >>> > Standalone Cluster vs YARN ?
> >>> >
> >>> > Until now we are using Standalone cluster for our jobs.
> >>> > Is there any added value for using YARN ?
> >>> >
> >>> > We don't have any hadoop infrastructure in place right now but we can
> >>> > provide that if there's some value to that.
> >>>
> >>> There are no features, which only work on YARN or in standalone
> clusters.
> >>> YARN mode is essentially starting a standalone cluster in YARN
> containers.
> >>>
> >>> In failure cases I find YARN more convenient, because it takes care of
> >>> restarting failed task manager processes/containers for you.
> >>>
> >>> – Ufuk
> >>>
> >>
> >
> >
> >
> > --
> > Welly Tambunan
> > Triplelands
> >
> > http://weltam.wordpress.com
> > http://www.triplelands.com
>


Re: Standalone Cluster vs YARN

2015-11-25 Thread Welly Tambunan
Hi Ufuk

>In failure cases I find YARN more convenient, because it takes care of
restarting failed task manager processes/containers for you.

So this mean that we don't need zookeeper ?


Cheers

On Wed, Nov 25, 2015 at 3:46 PM, Ufuk Celebi  wrote:

> > On 25 Nov 2015, at 02:35, Welly Tambunan  wrote:
> >
> > Hi All,
> >
> > I would like to know if there any feature differences between using
> Standalone Cluster vs YARN ?
> >
> > Until now we are using Standalone cluster for our jobs.
> > Is there any added value for using YARN ?
> >
> > We don't have any hadoop infrastructure in place right now but we can
> provide that if there's some value to that.
>
> There are no features, which only work on YARN or in standalone clusters.
> YARN mode is essentially starting a standalone cluster in YARN containers.
>
> In failure cases I find YARN more convenient, because it takes care of
> restarting failed task manager processes/containers for you.
>
> – Ufuk
>
>


-- 
Welly Tambunan
Triplelands

http://weltam.wordpress.com
http://www.triplelands.com 


Re: Standalone Cluster vs YARN

2015-11-25 Thread Welly Tambunan
Hi Fabian,

Interesting !

However YARN is still tightly couple to HDFS, is that seems wasteful to use
only YARN without Hadoop ?

Currently we are using Cassandra and CFS ( cass file system )


Cheers

On Wed, Nov 25, 2015 at 3:51 PM, Fabian Hueske  wrote:

> A strong argument for YARN mode can be the isolation of multiple users and
> jobs. You can easily start a new Flink cluster for each job or user.
> However, this comes at the price of resource (memory) fragmentation. YARN
> mode does not use memory as effective as cluster mode.
>
> 2015-11-25 9:46 GMT+01:00 Ufuk Celebi :
>
>> > On 25 Nov 2015, at 02:35, Welly Tambunan  wrote:
>> >
>> > Hi All,
>> >
>> > I would like to know if there any feature differences between using
>> Standalone Cluster vs YARN ?
>> >
>> > Until now we are using Standalone cluster for our jobs.
>> > Is there any added value for using YARN ?
>> >
>> > We don't have any hadoop infrastructure in place right now but we can
>> provide that if there's some value to that.
>>
>> There are no features, which only work on YARN or in standalone clusters.
>> YARN mode is essentially starting a standalone cluster in YARN containers.
>>
>> In failure cases I find YARN more convenient, because it takes care of
>> restarting failed task manager processes/containers for you.
>>
>> – Ufuk
>>
>>
>


-- 
Welly Tambunan
Triplelands

http://weltam.wordpress.com
http://www.triplelands.com 


Re: Standalone Cluster vs YARN

2015-11-25 Thread Welly Tambunan
Hi Andreas,

Yes, seems I can't avoid Zookeeper right now. It would be really nice if we
can achieve HA via gossip protocol like Cassandra/Spark DSE does ?

Is this possible ?


Cheers

On Wed, Nov 25, 2015 at 4:12 PM, Andreas Fritzler <
andreas.fritz...@gmail.com> wrote:

> Hi Welly,
>
> you will need Zookeeper if you want to setup the standalone cluster in HA
> mode.
> http://spark.apache.org/docs/latest/spark-standalone.html#high-availability
>
> In the YARN case you probably have already Zookeeper in place if you are
> running YARN in HA mode.
>
> Regards,
> Andreas
>
> On Wed, Nov 25, 2015 at 10:02 AM, Welly Tambunan 
> wrote:
>
>> Hi Ufuk
>>
>> >In failure cases I find YARN more convenient, because it takes care of
>> restarting failed task manager processes/containers for you.
>>
>> So this mean that we don't need zookeeper ?
>>
>>
>> Cheers
>>
>> On Wed, Nov 25, 2015 at 3:46 PM, Ufuk Celebi  wrote:
>>
>>> > On 25 Nov 2015, at 02:35, Welly Tambunan  wrote:
>>> >
>>> > Hi All,
>>> >
>>> > I would like to know if there any feature differences between using
>>> Standalone Cluster vs YARN ?
>>> >
>>> > Until now we are using Standalone cluster for our jobs.
>>> > Is there any added value for using YARN ?
>>> >
>>> > We don't have any hadoop infrastructure in place right now but we can
>>> provide that if there's some value to that.
>>>
>>> There are no features, which only work on YARN or in standalone
>>> clusters. YARN mode is essentially starting a standalone cluster in YARN
>>> containers.
>>>
>>> In failure cases I find YARN more convenient, because it takes care of
>>> restarting failed task manager processes/containers for you.
>>>
>>> – Ufuk
>>>
>>>
>>
>>
>> --
>> Welly Tambunan
>> Triplelands
>>
>> http://weltam.wordpress.com
>> http://www.triplelands.com 
>>
>
>


-- 
Welly Tambunan
Triplelands

http://weltam.wordpress.com
http://www.triplelands.com 


Re: Standalone Cluster vs YARN

2015-11-25 Thread Fabian Hueske
A strong argument for YARN mode can be the isolation of multiple users and
jobs. You can easily start a new Flink cluster for each job or user.
However, this comes at the price of resource (memory) fragmentation. YARN
mode does not use memory as effective as cluster mode.

2015-11-25 9:46 GMT+01:00 Ufuk Celebi :

> > On 25 Nov 2015, at 02:35, Welly Tambunan  wrote:
> >
> > Hi All,
> >
> > I would like to know if there any feature differences between using
> Standalone Cluster vs YARN ?
> >
> > Until now we are using Standalone cluster for our jobs.
> > Is there any added value for using YARN ?
> >
> > We don't have any hadoop infrastructure in place right now but we can
> provide that if there's some value to that.
>
> There are no features, which only work on YARN or in standalone clusters.
> YARN mode is essentially starting a standalone cluster in YARN containers.
>
> In failure cases I find YARN more convenient, because it takes care of
> restarting failed task manager processes/containers for you.
>
> – Ufuk
>
>


Re: Standalone Cluster vs YARN

2015-11-25 Thread Ufuk Celebi
> On 25 Nov 2015, at 02:35, Welly Tambunan  wrote:
> 
> Hi All, 
> 
> I would like to know if there any feature differences between using 
> Standalone Cluster vs YARN ?
> 
> Until now we are using Standalone cluster for our jobs. 
> Is there any added value for using YARN ?
> 
> We don't have any hadoop infrastructure in place right now but we can provide 
> that if there's some value to that. 

There are no features, which only work on YARN or in standalone clusters. YARN 
mode is essentially starting a standalone cluster in YARN containers.

In failure cases I find YARN more convenient, because it takes care of 
restarting failed task manager processes/containers for you.

– Ufuk



Re: Using Hadoop Input/Output formats

2015-11-25 Thread Stephan Ewen
For streaming, I am a bit torn whether reading a file will should have so
many such prominent functions. Most streaming programs work on message
queues, or on monitored directories.

Not saying no, but not sure DataSet/DataStream parity is the main goal -
they are for different use cases after all...

On Wed, Nov 25, 2015 at 8:22 AM, Chiwan Park  wrote:

> Thanks for correction @Fabian. :)
>
> > On Nov 25, 2015, at 4:40 AM, Suneel Marthi  wrote:
> >
> > Guess, it makes sense to add readHadoopXXX() methods to
> StreamExecutionEnvironment (for feature parity with what's existing
> presently in ExecutionEnvironment).
> >
> > Also Flink-2949 addresses the need to add relevant syntactic sugar
> wrappers in DataSet api for the code snippet in Fabian's previous email.
> Its not cool, having to instantiate a JobConf in client code and having to
> pass that around.
> >
> >
> >
> > On Tue, Nov 24, 2015 at 2:26 PM, Fabian Hueske 
> wrote:
> > Hi Nick,
> >
> > you can use Flink's HadoopInputFormat wrappers also for the DataStream
> API. However, DataStream does not offer as much "sugar" as DataSet because
> StreamEnvironment does not offer dedicated createHadoopInput or
> readHadoopFile methods.
> >
> > In DataStream Scala you can read from a Hadoop InputFormat
> (TextInputFormat in this case) as follows:
> >
> > val textData: DataStream[(LongWritable, Text)] = env.createInput(
> >   new HadoopInputFormat[LongWritable, Text](
> > new TextInputFormat,
> > classOf[LongWritable],
> > classOf[Text],
> > new JobConf()
> > ))
> >
> > The Java version is very similar.
> >
> > Note: Flink has wrappers for both MR APIs: mapred and mapreduce.
> >
> > Cheers,
> > Fabian
> >
> > 2015-11-24 19:36 GMT+01:00 Chiwan Park :
> > I’m not streaming expert. AFAIK, the layer can be used with only
> DataSet. There are some streaming-specific features such as distributed
> snapshot in Flink. These need some supports of source and sink. So you have
> to implement I/O.
> >
> > > On Nov 25, 2015, at 3:22 AM, Nick Dimiduk  wrote:
> > >
> > > I completely missed this, thanks Chiwan. Can these be used with
> DataStreams as well as DataSets?
> > >
> > > On Tue, Nov 24, 2015 at 10:06 AM, Chiwan Park 
> wrote:
> > > Hi Nick,
> > >
> > > You can use Hadoop Input/Output Format without modification! Please
> check the documentation[1] in Flink homepage.
> > >
> > > [1]
> https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/hadoop_compatibility.html
> > >
> > > > On Nov 25, 2015, at 3:04 AM, Nick Dimiduk 
> wrote:
> > > >
> > > > Hello,
> > > >
> > > > Is it possible to use existing Hadoop Input and OutputFormats with
> Flink? There's a lot of existing code that conforms to these interfaces,
> seems a shame to have to re-implement it all. Perhaps some adapter shim..?
> > > >
> > > > Thanks,
> > > > Nick
> > >
> > > Regards,
> > > Chiwan Park
> > >
> > >
> >
> > Regards,
> > Chiwan Park
> >
>
> Regards,
> Chiwan Park
>
>
>
>


graph problem to be solved

2015-11-25 Thread RahadianBayu Permadi
Greetings,

I am a newbie in this flink world. Thanks to Slim Baltagi for recommending
this Flink community.

I have a graph problem. So I have some points and paths among those points.
Each path has some value like distance that determine the distance between
two points it's connecting.

So far it s quite a graph representation. I want to create a system that
can find all possible paths (not just the shortest path) from point A to
point B (any pair of points in the graph).

My questions are:
1. How would flink solve this?
2.  Any suggestion data storage for this? Any suggestion on how to use
Cassandra with flink?

Thanks in advanced.

Best Regards,
Bayu


store and retrieve Graph object

2015-11-25 Thread Stefanos Antaris
Hi to all,

i am working on a project with Gelly and i need to create a graph with billions 
of nodes. Although i have the edge list, the node in the Graph needs to be a 
POJO object, the construction of which takes long time in order to finally 
create the final graph. Is it possible to store the Graph object as a file and 
retrieve it whenever i want to run an experiment?

Thanks,
Stefanos