Re: How to share text file across tasks at run time in flink.

2016-08-24 Thread Baswaraj Kasture
Thanks to all for your inputs.
Yeah, I could put all these common configurations/rules in DB and workers
can pick it up dynamically at run time.
In this case DB configuration/connection details need to be hard coded  ?
Is there any way worker can pickup  DB name/credentials etc at run time
dynamically ?

I am going through the feature/API documentation, but how about using
function closer  and setGlobalJobParameters/getGlobalJobParameters ?

+Baswaraj

On Wed, Aug 24, 2016 at 5:17 PM, Maximilian Michels  wrote:

> Hi!
>
> 1. The community is working on adding side inputs to the DataStream
> API. That will allow you to easily distribute data to all of your
> workers.
>
> 2. In the meantime, you could use `.broadcast()` on a DataSet to
> broadcast data to all workers. You still have to join that data with
> another stream though.
>
> 3. The easiest method of all is to simply load your file in the
> RichMapFunction's open() method. The file can reside in a distributed
> file system which is accessible by all workers.
>
> Cheers,
> Max
>
> On Wed, Aug 24, 2016 at 6:45 AM, Jark Wu 
> wrote:
> > Hi,
> >
> > I think what Bswaraj want is excatly something like Storm Distributed
> Cache
> > API[1] (if I’m not misunderstanding).
> >
> > The distributed cache feature in storm is used to efficiently distribute
> > files (or blobs, which is the equivalent terminology for a file in the
> > distributed cache and is used interchangeably in this document) that are
> > large and can change during the lifetime of a topology, such as
> geo-location
> > data, dictionaries, etc. Typical use cases include phrase recognition,
> > entity extraction, document classification, URL re-writing,
> location/address
> > detection and so forth. Such files may be several KB to several GB in
> size.
> > For small datasets that don't need dynamic updates, including them in the
> > topology jar could be fine. But for large files, the startup times could
> > become very large. In these cases, the distributed cache feature can
> provide
> > fast topology startup, especially if the files were previously downloaded
> > for the same submitter and are still in the cache. This is useful with
> > frequent deployments, sometimes few times a day with updated jars,
> because
> > the large cached files will remain available without changes. The large
> > cached blobs that do not change frequently will remain available in the
> > distributed cache.
> >
> >
> > We can look into this whether it is a common use case and how to
> implement
> > it in Flink.
> >
> > [1] http://storm.apache.org/releases/2.0.0-SNAPSHOT/
> distcache-blobstore.html
> >
> >
> > - Jark Wu
> >
> > 在 2016年8月23日,下午9:45,Lohith Samaga M  写道:
> >
> > Hi
> > May be you could use Cassandra to store and fetch all such reference
> data.
> > This way the reference data can be updated without restarting your
> > application.
> >
> > Lohith
> >
> > Sent from my Sony Xperia™ smartphone
> >
> >
> >
> >  Baswaraj Kasture wrote 
> >
> > Thanks Kostas !
> > I am using DataStream API.
> >
> > I have few config/property files (key vale text file) and also have
> business
> > rule files (json).
> > These rules and configurations are needed when we process incoming event.
> > Is there any way to share them to task nodes from driver program ?
> > I think this is very common use case and am sure other users may face
> > similar issues.
> >
> > +Baswaraj
> >
> > On Mon, Aug 22, 2016 at 4:56 PM, Kostas Kloudas
> >  wrote:
> >>
> >> Hello Baswaraj,
> >>
> >> Are you using the DataSet (batch) or the DataStream API?
> >>
> >> If you are in the first, you can use a broadcast variable for your task.
> >> If you are using the DataStream one, then there is no proper support for
> >> that.
> >>
> >> Thanks,
> >> Kostas
> >>
> >> On Aug 20, 2016, at 12:33 PM, Baswaraj Kasture 
> >> wrote:
> >>
> >> Am running Flink standalone cluster.
> >>
> >> I have text file that need to be shared across tasks when i submit my
> >> application.
> >> in other words , put this text file in class path of running tasks.
> >>
> >> How can we achieve this with flink ?
> >>
> >> In spark, spark-submit has --jars option that puts all the files
> specified
> >> in class path of executors (executors run in separate JVM and spawned
> >> dynamically, so it is possible).
> >>
> >> Flink's task managers run tasks in separate thread under taskmanager JVM
> >> (?) , how can we make this text file to be accessible on all tasks
> spawned
> >> by current application ?
> >>
> >> Using HDFS, NFS or including file in program jar is one way that i know,
> >> but am looking for solution that can allows me to provide text file at
> run
> >> time and still accessible in all tasks.
> >> Thanks.
> >>
> >>
> >
> >
> > Information transmitted by this e-mail is proprietary to Mphasis, its
> > associated companies and/ or its 

Regarding Global Configuration in Flink

2016-08-24 Thread Janardhan Reddy
Hi,

Is global configuration same for all jobs in a Flink cluster.

Is it a good idea to write a custom source which polls some external source
every x minutes and updates the global config. Will the config change be
propagated across all jobs?

What happens when the size of global config grows too big?  Where is the
global config stored


How to set a custom JAVA_HOME when run flink on YARN?

2016-08-24 Thread Renkai
Hi,all:
  The YARN cluster of my company is default to use Java 7,and I want to use
java 8 form my Flink application, whant can I do to achieve it?



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-set-a-custom-JAVA-HOME-when-run-flink-on-YARN-tp8676.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Firing windows multiple times

2016-08-24 Thread Shannon Carey
What do you think about adding the current watermark to the window function 
metadata in FLIP-2?

From: Shannon Carey >
Date: Friday, August 12, 2016 at 6:24 PM
To: Aljoscha Krettek >, 
"user@flink.apache.org" 
>
Subject: Re: Firing windows multiple times

Thanks Aljoscha, I didn't know about those. Yes, they look like handy changes, 
especially to enable flexible approaches for eviction. In particular, having 
the current watermark available to the evictor via EvictorContext is helpful: 
it will be able to evict the old data more easily without needing to rely on 
Window#maxTimestamp().

However, I think you might still be missing a piece. Specifically, it would 
still not be possible for the window function to choose which items to 
aggregate based on the current watermark. In particular, it is desirable to be 
able to aggregate only the items below the watermark, omitting items which have 
come in with timestamps larger than the watermark. Does that make sense?

-Shannon

From: Aljoscha Krettek >
Date: Friday, August 12, 2016 at 4:25 AM
To: "user@flink.apache.org" 
>
Subject: Re: Firing windows multiple times

Hi,
there is already this FLIP: 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-4+%3A+Enhance+Window+Evictor
 which also links to a mailing list discussion. And this FLIP: 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-2+Extending+Window+Function+Metadata.
 The former proposes to enhance the Evictor API a bit, among other things we 
propose to give the evictor access to the current watermark. The other FLIP 
proposes to extend the amount of meta-data we give to the window function. The 
first to things we propose to add is a "firing reason" that would tell you 
whether this was an early firing, an on time firing or a late firing. The 
second thing is a firing counter that would tell you how many times the trigger 
has fired so far for the current window.

Would a combination of these help with your use case?

Cheers,
Aljoscha

On Thu, 11 Aug 2016 at 19:19 Shannon Carey 
> wrote:
"If Window B is a Folding Window and does not have an evictor then it should 
not keep the list of all received elements."

Agreed! Upon closer inspection, the behavior I'm describing is only present 
when using EvictingWindowOperator, not when using WindowOperator. I misread 
line 382 of WindowOperator which calls windowState.add(): in actuality, the 
windowState is a FoldingState which incorporates the user-provided fold 
function in order to eagerly fold the data. In contrast, if you use an evictor, 
EvictingWindowOperator has the behavior I describe.

I am already using a custom Trigger which uses a processing timer to FIRE a 
short time after a new event comes in, and an event timer to FIRE_AND_PURGE.

It seems that I can achieve the desired effect by avoiding use of an evictor so 
that the intermediate events are not retained in an EvictingWindowOperator's 
state, and perform any necessary eviction within my fold function. This has the 
aforementioned drawbacks of the windowed fold function not knowing about 
watermarks, and therefore it is difficult to be precise about choosing which 
items to evict. However, this seems to be the best choice within the current 
framework.

Interestingly, it appears that TimeEvictor doesn't really know about watermarks 
either. When a window emits an event, regardless of how it was fired, it is 
assigned the timestamp given by its window's maxTimestamp(), which might be 
much greater than the processing time that actually fired the event. Then, 
TimeEvictor compares the max timestamp of all items in the window against the 
other ones in order to determine which ones to evict. Basically, it assumes 
that the events were emitted due to the window terminating with FIRE_AND_PURGE. 
What if we gave more information (specifically, the current watermark) to the 
evictor in order to allow it to deal with a mix of intermediate events (fired 
by processing time) and final events (fired by event time when the watermark 
reaches the window)? That value is already available in the WindowOperator & 
could be passed to the Evictor very easily. It would be an API change, of 
course.

Other than that, is it worth considering a change to EvictingWindowOperator to 
allow user-supplied functions to reduce the size of its state when people fire 
upstream windows repeatedly? From what I see when I monitor the state with 
debugger print statements, the EvictingWindowOperator is definitely holding on 
to all the elements ever received, not just the aggregated result. You can see 
this clearly because EvictingWindowOperator holds a ListState 

Re: How to get latency info from benchmark

2016-08-24 Thread Robert Metzger
Hi,

Version 0.10-SNAPSHOT is pretty old. The snapshot repository of Apache
probably doesn't keep old artifacts around forever.
Maybe you can migrate the tests to Flink 0.10.0, or maybe even to a higher
version.

Regards,
Robert

On Wed, Aug 24, 2016 at 10:32 PM, Eric Fukuda  wrote:

> Hi Max, Robert,
>
> Thanks for the advice. I'm trying to build the "performance" project, but
> failing with the following error. Is there a solution for this?
>
> [ERROR] Failed to execute goal on project streaming-state-demo: Could not
> resolve dependencies for project com.dataartisans.flink:streami
> ng-state-demo:jar:1.0-SNAPSHOT: Failure to find
> org.apache.flink:flink-connector-kafka-083:jar:0.10-SNAPSHOT in
> https://repository.apache.org/content/repositories/snapshots/ was cached
> in the local repository, resolution will not be reattempted until the
> update interval of apache.snapshots has elapsed or updates are forced ->
> [Help 1]
>
>
>
>
> On Wed, Aug 24, 2016 at 8:12 AM, Robert Metzger 
> wrote:
>
>> Hi Eric,
>>
>> Max is right, the tool has been used for a different benchmark [1]. The
>> throughput logger that should produce the right output is this one [2].
>> Very recently, I've opened a pull request for adding metric-measuring
>> support into the engine [3]. Maybe that's helpful for your experiments.
>>
>>
>> [1] http://data-artisans.com/high-throughput-low-latency-and
>> -exactly-once-stream-processing-with-apache-flink/
>> [2] https://github.com/dataArtisans/performance/blob/master/
>> flink-jobs/src/main/java/com/github/projectflink/streaming/T
>> hroughput.java#L203
>> [3] https://github.com/apache/flink/pull/2386
>>
>>
>>
>> On Wed, Aug 24, 2016 at 2:04 PM, Maximilian Michels 
>> wrote:
>>
>>> I believe the AnaylzeTool is for processing logs of a different
>>> benchmark.
>>>
>>> CC Jamie and Robert who worked on the benchmark.
>>>
>>> On Wed, Aug 24, 2016 at 3:25 AM, Eric Fukuda 
>>> wrote:
>>> > Hi,
>>> >
>>> > I'm trying to benchmark Flink without Kafka as mentioned in this post
>>> > (http://data-artisans.com/extending-the-yahoo-streaming-benchmark/).
>>> After
>>> > running flink.benchmark.state.AdvertisingTopologyFlinkState with
>>> > user.local.event.generator in localConf.yaml set to 1, I ran
>>> > flink.benchmark.utils.AnalyzeTool giving
>>> > flink-1.0.1/log/flink-[username]-jobmanager-0-[servername].log as a
>>> > command-line argument. I got the following output and it does not have
>>> the
>>> > information about the latency.
>>> >
>>> >
>>> > = Latency (0 reports ) =
>>> > = Throughput (1 reports ) =
>>> > == null (entries: 10150)===
>>> > Mean throughput 639078.5018497099
>>> > Exception in thread "main" java.lang.IndexOutOfBoundsException:
>>> toIndex = 2
>>> > at java.util.ArrayList.subListRangeCheck(ArrayList.java:962)
>>> > at java.util.ArrayList.subList(ArrayList.java:954)
>>> > at flink.benchmark.utils.AnalyzeT
>>> ool.main(AnalyzeTool.java:133)
>>> >
>>> >
>>> > Reading the code in AnalyzeTool.java, I found that it's looking for
>>> lines
>>> > that include "Latency" in the log file, but apparently it's not
>>> finding any.
>>> > I tried grepping the log file, and couldn't find any either. I have one
>>> > server that runs both JobManager and Task Manager and another server
>>> that
>>> > runs Redis, and they are connected through a network with each other.
>>> >
>>> > I think I have to do something to read the data stored in Redis before
>>> > running AnalyzeTool, but can't figure out what. Does anyone know how
>>> to get
>>> > the latency information?
>>> >
>>> > Thanks,
>>> > Eric
>>>
>>
>>
>


Re: How to get latency info from benchmark

2016-08-24 Thread Eric Fukuda
Hi Max, Robert,

Thanks for the advice. I'm trying to build the "performance" project, but
failing with the following error. Is there a solution for this?

[ERROR] Failed to execute goal on project streaming-state-demo: Could not
resolve dependencies for project com.dataartisans.flink:
streaming-state-demo:jar:1.0-SNAPSHOT: Failure to find
org.apache.flink:flink-connector-kafka-083:jar:0.10-SNAPSHOT in
https://repository.apache.org/content/repositories/snapshots/ was cached in
the local repository, resolution will not be reattempted until the update
interval of apache.snapshots has elapsed or updates are forced -> [Help 1]




On Wed, Aug 24, 2016 at 8:12 AM, Robert Metzger  wrote:

> Hi Eric,
>
> Max is right, the tool has been used for a different benchmark [1]. The
> throughput logger that should produce the right output is this one [2].
> Very recently, I've opened a pull request for adding metric-measuring
> support into the engine [3]. Maybe that's helpful for your experiments.
>
>
> [1] http://data-artisans.com/high-throughput-low-latency-and
> -exactly-once-stream-processing-with-apache-flink/
> [2] https://github.com/dataArtisans/performance/blob/master/
> flink-jobs/src/main/java/com/github/projectflink/streaming/
> Throughput.java#L203
> [3] https://github.com/apache/flink/pull/2386
>
>
>
> On Wed, Aug 24, 2016 at 2:04 PM, Maximilian Michels 
> wrote:
>
>> I believe the AnaylzeTool is for processing logs of a different benchmark.
>>
>> CC Jamie and Robert who worked on the benchmark.
>>
>> On Wed, Aug 24, 2016 at 3:25 AM, Eric Fukuda 
>> wrote:
>> > Hi,
>> >
>> > I'm trying to benchmark Flink without Kafka as mentioned in this post
>> > (http://data-artisans.com/extending-the-yahoo-streaming-benchmark/).
>> After
>> > running flink.benchmark.state.AdvertisingTopologyFlinkState with
>> > user.local.event.generator in localConf.yaml set to 1, I ran
>> > flink.benchmark.utils.AnalyzeTool giving
>> > flink-1.0.1/log/flink-[username]-jobmanager-0-[servername].log as a
>> > command-line argument. I got the following output and it does not have
>> the
>> > information about the latency.
>> >
>> >
>> > = Latency (0 reports ) =
>> > = Throughput (1 reports ) =
>> > == null (entries: 10150)===
>> > Mean throughput 639078.5018497099
>> > Exception in thread "main" java.lang.IndexOutOfBoundsException:
>> toIndex = 2
>> > at java.util.ArrayList.subListRangeCheck(ArrayList.java:962)
>> > at java.util.ArrayList.subList(ArrayList.java:954)
>> > at flink.benchmark.utils.AnalyzeTool.main(AnalyzeTool.java:133)
>> >
>> >
>> > Reading the code in AnalyzeTool.java, I found that it's looking for
>> lines
>> > that include "Latency" in the log file, but apparently it's not finding
>> any.
>> > I tried grepping the log file, and couldn't find any either. I have one
>> > server that runs both JobManager and Task Manager and another server
>> that
>> > runs Redis, and they are connected through a network with each other.
>> >
>> > I think I have to do something to read the data stored in Redis before
>> > running AnalyzeTool, but can't figure out what. Does anyone know how to
>> get
>> > the latency information?
>> >
>> > Thanks,
>> > Eric
>>
>
>


Re: Dealing with Multiple sinks in Flink

2016-08-24 Thread vinay patil
Hi,

Just an update, the window is not getting triggered when I change the
parallelism to more than 1.

Can you please explain why this is happening ?

Regards,
Vinay Patil

On Wed, Aug 24, 2016 at 9:55 AM, vinay patil [via Apache Flink User Mailing
List archive.]  wrote:

> Hi Max,
>
> I tried writing to local file as well, its giving me the same issue, I
> have attached the logs and dummy pipeline code.logs.txt
> 
> dummy_pipeline.txt
> 
>
> --
> If you reply to this email, your message will be added to the discussion
> below:
> http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/Dealing-with-Multiple-sinks-in-Flink-tp8643p8664.html
> To start a new topic under Apache Flink User Mailing List archive., email
> ml-node+s2336050n1...@n4.nabble.com
> To unsubscribe from Apache Flink User Mailing List archive., click here
> 
> .
> NAML
> 
>




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Dealing-with-Multiple-sinks-in-Flink-tp8643p8672.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Re: how to get rid of duplicate rows group by in DataStream

2016-08-24 Thread Yassine Marzougui
Sorry I mistyped the code, it should be
*timeWindow**(Time.minutes(10))* instead
of *window**(Time.minutes(10)).*

On Wed, Aug 24, 2016 at 9:29 PM, Yassine Marzougui 
wrote:

> Hi subash,
>
> A stream is infinite, hence it has no notion of "final" count. To get
> distinct counts you need to define a period (= a window [1] ) over which
> you count elements and emit a result, by adding a winow operator before the
> reduce.
> For example the following will emit distinct counts every 10 minutes over
> the last 10 minutes period:
>
> *stream.keyby(2)*
> *  .window(Time.minutes(10))*
> *  .reduce(new GridPointsCount())*
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> master/apis/streaming/windows.html
>
>
> On Wed, Aug 24, 2016 at 6:14 PM, subash basnet  wrote:
>
>> Hello Kostas,
>>
>> Sorry for late reply. But I couldn't understand how to apply split in
>> datastream, such as in below to get the distinct output stream element with
>> the count after applying group by and reduce.
>>
>> DataStream> gridWithDensity =
>> pointsWithGridCoordinates.map(new AddCountAppender())
>> .keyBy(2).reduce(*new GridPointsCount()*).map(new
>> RetrieveGridWithCount());
>> gridWithDensity.print();
>>
>> Current Output:
>>   Required Output:
>> (0,1)
>>  (0,3)
>> (0,2)
>>  (0,4)
>> (0,1)
>> (0,2)
>> (0,3)
>> (0,3)
>> (0,4)
>>
>> public static final class GridPointsCount implements
>> ReduceFunction> {
>> @Override
>> public Tuple4 reduce(Tuple4> String, Long> val1,
>> Tuple4 val2) {
>> return new Tuple4(val1.f0, val1.f1, val1.f2,
>> val1.f3 + val2.f3);
>> }
>> }
>>
>>
>> Regards,
>> Subash Basnet
>>
>> On Mon, Aug 22, 2016 at 6:34 PM, Kostas Kloudas <
>> k.klou...@data-artisans.com> wrote:
>>
>>> [image: Boxbe]  This message is
>>> eligible for Automatic Cleanup! (k.klou...@data-artisans.com) Add
>>> cleanup rule
>>> 
>>> | More info
>>> 
>>>
>>> Hi Subash,
>>>
>>> You should also split your elements in windows.
>>> If not, Flink emits an element for each incoming record.
>>> That is why you have:
>>>
>>> (1,1)
>>> (1,2)
>>> (1,3)
>>>
>>> …
>>>
>>> Kostas
>>>
>>> > On Aug 22, 2016, at 5:58 PM, subash basnet  wrote:
>>> >
>>> > Hello all,
>>> >
>>> > I grouped by the input based on it's id to count the number of
>>> elements in each group.
>>> > DataStream> gridWithCount;
>>> > Upon printing the above datastream it shows with duplicate rows:
>>> > Output:
>>> > (1, 1)
>>> > (1,2)
>>> > (2,1)
>>> > (1,3)
>>> > (2,2)...
>>> >
>>> > Whereas I wanted the distinct rows with final count:
>>> > Needed Output:
>>> > (1,3)
>>> > (2,2)..
>>> >
>>> > What could be the way to achieve this.
>>> >
>>> >
>>> > Regards,
>>> > Subash Basnet
>>>
>>>
>>>
>>
>


Re: how to get rid of duplicate rows group by in DataStream

2016-08-24 Thread Yassine Marzougui
Hi subash,

A stream is infinite, hence it has no notion of "final" count. To get
distinct counts you need to define a period (= a window [1] ) over which
you count elements and emit a result, by adding a winow operator before the
reduce.
For example the following will emit distinct counts every 10 minutes over
the last 10 minutes period:

*stream.keyby(2)*
*  .window(Time.minutes(10))*
*  .reduce(new GridPointsCount())*

[1]
https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/windows.html


On Wed, Aug 24, 2016 at 6:14 PM, subash basnet  wrote:

> Hello Kostas,
>
> Sorry for late reply. But I couldn't understand how to apply split in
> datastream, such as in below to get the distinct output stream element with
> the count after applying group by and reduce.
>
> DataStream> gridWithDensity =
> pointsWithGridCoordinates.map(new AddCountAppender())
> .keyBy(2).reduce(*new GridPointsCount()*).map(new
> RetrieveGridWithCount());
> gridWithDensity.print();
>
> Current Output:
>   Required Output:
> (0,1)
>  (0,3)
> (0,2)
>  (0,4)
> (0,1)
> (0,2)
> (0,3)
> (0,3)
> (0,4)
>
> public static final class GridPointsCount implements
> ReduceFunction> {
> @Override
> public Tuple4 reduce(Tuple4 String, Long> val1,
> Tuple4 val2) {
> return new Tuple4(val1.f0, val1.f1, val1.f2,
> val1.f3 + val2.f3);
> }
> }
>
>
> Regards,
> Subash Basnet
>
> On Mon, Aug 22, 2016 at 6:34 PM, Kostas Kloudas <
> k.klou...@data-artisans.com> wrote:
>
>> [image: Boxbe]  This message is eligible
>> for Automatic Cleanup! (k.klou...@data-artisans.com) Add cleanup rule
>> 
>> | More info
>> 
>>
>> Hi Subash,
>>
>> You should also split your elements in windows.
>> If not, Flink emits an element for each incoming record.
>> That is why you have:
>>
>> (1,1)
>> (1,2)
>> (1,3)
>>
>> …
>>
>> Kostas
>>
>> > On Aug 22, 2016, at 5:58 PM, subash basnet  wrote:
>> >
>> > Hello all,
>> >
>> > I grouped by the input based on it's id to count the number of elements
>> in each group.
>> > DataStream> gridWithCount;
>> > Upon printing the above datastream it shows with duplicate rows:
>> > Output:
>> > (1, 1)
>> > (1,2)
>> > (2,1)
>> > (1,3)
>> > (2,2)...
>> >
>> > Whereas I wanted the distinct rows with final count:
>> > Needed Output:
>> > (1,3)
>> > (2,2)..
>> >
>> > What could be the way to achieve this.
>> >
>> >
>> > Regards,
>> > Subash Basnet
>>
>>
>>
>


Re: Setting number of TaskManagers

2016-08-24 Thread Foster, Craig
Oh, sorry, I didn't specify I was using YARN and don't necessarily want to 
specify with the command line option.

From: Greg Hogan 
Reply-To: "user@flink.apache.org" 
Date: Wednesday, August 24, 2016 at 12:07 PM
To: "user@flink.apache.org" 
Subject: Re: Setting number of TaskManagers

The number of TaskManagers will be equal to the number of entries in the 
conf/slaves file.

On Wed, Aug 24, 2016 at 3:04 PM, Foster, Craig 
> wrote:
Is there a way to set the number of TaskManagers using a configuration file or 
environment variable? I'm looking at the docs for it and it says you can set 
slots but not the number of TMs.

Thanks,
Craig



Re: Setting number of TaskManagers

2016-08-24 Thread Greg Hogan
The number of TaskManagers will be equal to the number of entries in the
conf/slaves file.

On Wed, Aug 24, 2016 at 3:04 PM, Foster, Craig  wrote:

> Is there a way to set the number of TaskManagers using a configuration
> file or environment variable? I'm looking at the docs for it and it says
> you can set slots but not the number of TMs.
>
>
>
> Thanks,
>
> Craig
>
>


Setting number of TaskManagers

2016-08-24 Thread Foster, Craig
Is there a way to set the number of TaskManagers using a configuration file or 
environment variable? I'm looking at the docs for it and it says you can set 
slots but not the number of TMs.

Thanks,
Craig


Re: how to get rid of duplicate rows group by in DataStream

2016-08-24 Thread subash basnet
Hello Kostas,

Sorry for late reply. But I couldn't understand how to apply split in
datastream, such as in below to get the distinct output stream element with
the count after applying group by and reduce.

DataStream> gridWithDensity =
pointsWithGridCoordinates.map(new AddCountAppender())
.keyBy(2).reduce(*new GridPointsCount()*).map(new RetrieveGridWithCount());
gridWithDensity.print();

Current Output:
Required Output:
(0,1)
   (0,3)
(0,2)
   (0,4)
(0,1)
(0,2)
(0,3)
(0,3)
(0,4)

public static final class GridPointsCount implements
ReduceFunction> {
@Override
public Tuple4 reduce(Tuple4 val1,
Tuple4 val2) {
return new Tuple4(val1.f0, val1.f1, val1.f2,
val1.f3 + val2.f3);
}
}


Regards,
Subash Basnet

On Mon, Aug 22, 2016 at 6:34 PM, Kostas Kloudas  wrote:

> [image: Boxbe]  This message is eligible
> for Automatic Cleanup! (k.klou...@data-artisans.com) Add cleanup rule
> 
> | More info
> 
>
> Hi Subash,
>
> You should also split your elements in windows.
> If not, Flink emits an element for each incoming record.
> That is why you have:
>
> (1,1)
> (1,2)
> (1,3)
>
> …
>
> Kostas
>
> > On Aug 22, 2016, at 5:58 PM, subash basnet  wrote:
> >
> > Hello all,
> >
> > I grouped by the input based on it's id to count the number of elements
> in each group.
> > DataStream> gridWithCount;
> > Upon printing the above datastream it shows with duplicate rows:
> > Output:
> > (1, 1)
> > (1,2)
> > (2,1)
> > (1,3)
> > (2,2)...
> >
> > Whereas I wanted the distinct rows with final count:
> > Needed Output:
> > (1,3)
> > (2,2)..
> >
> > What could be the way to achieve this.
> >
> >
> > Regards,
> > Subash Basnet
>
>
>


Re: Setting up zeppelin with flink

2016-08-24 Thread Trevor Grant
Frank,

can you post the zeppelin flink log please?

You can probably find it in zeppelin_dir/logs/*flink*.log

You've got a few moving pieces here.  I've never run zeppelin against Flink
in a docker container.   But I think the Zeppelin-Flink log is the first
place to look.

You say you can't get Zeppelin to work in local mode either right? Just
curious, is Zeppelin running in a docker too?

Thanks,
tg


Trevor Grant
Data Scientist
https://github.com/rawkintrevo
http://stackexchange.com/users/3002022/rawkintrevo
http://trevorgrant.org

*"Fortunate is he, who is able to know the causes of things."  -Virgil*


On Wed, Aug 24, 2016 at 6:50 AM, Maximilian Michels  wrote:

> Hi!
>
> There are some people familiar with the Zeppelin integration. CCing
> Till and Trevor. Otherwise, you could also send this to the Zeppelin
> community.
>
> Cheers,
> Max
>
> On Wed, Aug 24, 2016 at 12:58 PM, Frank Dekervel  wrote:
> > Hello,
> >
> > for reference:
> >
> > i already found out that "connect to existing process" was my error
> here: it
> > means connecting to an existing zeppelin interpreter, not an existing
> flink
> > cluster. After fixing my error, i'm now in the same situation as
> described
> > here:
> >
> > https://stackoverflow.com/questions/38688277/flink-
> zeppelin-not-responding
> >
> > i guess it's more a zeppelin problem than a flink problem tho, as i see
> both
> > interpreter JVM and main zeppelin JVM waiting on thrift input (so it
> seems
> > they are waiting for each other)
> >
> > greetings,
> > Frank
> >
> >
> >
> >
> > On Tue, Aug 23, 2016 at 2:09 PM, Frank Dekervel 
> wrote:
> >>
> >> Hello,
> >>
> >> I try to set up apache zeppelin with a flink cluster (one jobmanager,
> one
> >> task manager).
> >>
> >> What i did was using the dockerfiles in flink-contrib/docker-flink + the
> >> latest binary release of apache zeppelin with all interpreters:
> >>
> >>
> >> https://github.com/apache/flink/blob/master/flink-contrib/docker-flink/
> Dockerfile
> >> (i changed the flink version to 1.0.3 to match zeppelin's flink version)
> >>
> >> I built another docker image around the latest binary release of
> zeppelin
> >> (with all interpreters), and i reconfigure the flink interpreter:
> >>
> >> connect to existing process
> >> host: jobmanager, port: 6123
> >> i removed all other properties
> >>
> >> when i try to submit a flink job, i get an error state and the following
> >> exception appears in the log (nothing appears in the jobmanager log)
> >>
> >> ERROR [2016-08-23 11:44:57,932] ({Thread-16}
> >> JobProgressPoller.java[run]:54) - Can not get or update progress
> >> org.apache.zeppelin.interpreter.InterpreterException:
> >> org.apache.thrift.transport.TTransportException
> >> at
> >> org.apache.zeppelin.interpreter.remote.RemoteInterpreter.getProgress(
> RemoteInterpreter.java:373)
> >> at
> >> org.apache.zeppelin.interpreter.LazyOpenInterpreter.getProgress(
> LazyOpenInterpreter.java:111)
> >> at
> >> org.apache.zeppelin.notebook.Paragraph.progress(Paragraph.java:237)
> >> at
> >> org.apache.zeppelin.scheduler.JobProgressPoller.run(
> JobProgressPoller.java:51)
> >> Caused by: org.apache.thrift.transport.TTransportException
> >> at
> >> org.apache.thrift.transport.TIOStreamTransport.read(
> TIOStreamTransport.java:132)
> >> at
> >> org.apache.thrift.transport.TTransport.readAll(TTransport.java:86)
> >> at
> >> org.apache.thrift.protocol.TBinaryProtocol.readAll(
> TBinaryProtocol.java:429)
> >> at
> >> org.apache.thrift.protocol.TBinaryProtocol.readI32(
> TBinaryProtocol.java:318)
> >> at
> >> org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(
> TBinaryProtocol.java:219)
> >> at
> >> org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:69)
> >> at
> >> org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService$
> Client.recv_getProgress(RemoteInterpreterService.java:296)
> >> at
> >> org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService$
> Client.getProgress(RemoteInterpreterService.java:281)
> >> at
> >> org.apache.zeppelin.interpreter.remote.RemoteInterpreter.getProgress(
> RemoteInterpreter.java:370)
> >> ... 3 more
> >>
> >> Flink in local mode works fine on zeppelin.
> >> Could somebody point me to what i'm doing wrong ?
> >>
> >> Thanks a lot!
> >> Frank
> >>
> >>
> >>
> >
>


Re: Dealing with Multiple sinks in Flink

2016-08-24 Thread vinay patil
Hi Max,

I tried writing to local file as well, its giving me the same issue, I have
attached the logs and dummy pipeline code. logs.txt

  
dummy_pipeline.txt

  



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Dealing-with-Multiple-sinks-in-Flink-tp8643p8664.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Setting up zeppelin with flink

2016-08-24 Thread Trevor Grant
Hey Frank,

Saw your post on the Zeppelin list yesterday.  I can look at it later this
morning, but my gut feeling is a ghost Zeppelin daemon is running in the
background and it's local Flink is holding the port 6123. This is fairly
common and would explain the issue.

Idk if you're on linux or windows or whatever, but have you tried rebooting
the machine? (sorry if you said you did higher in the email). Also I very
vaguely remember there is a boot order that matters with Flink and
Zeppelin, like you need to start flink first then zeppelin, or vice verse.
I feel like it is Flink first, then Zeppelin.

Hope that helps, will dig in later if not.

tg





Trevor Grant
Data Scientist
https://github.com/rawkintrevo
http://stackexchange.com/users/3002022/rawkintrevo
http://trevorgrant.org

*"Fortunate is he, who is able to know the causes of things."  -Virgil*


On Wed, Aug 24, 2016 at 6:50 AM, Maximilian Michels  wrote:

> Hi!
>
> There are some people familiar with the Zeppelin integration. CCing
> Till and Trevor. Otherwise, you could also send this to the Zeppelin
> community.
>
> Cheers,
> Max
>
> On Wed, Aug 24, 2016 at 12:58 PM, Frank Dekervel  wrote:
> > Hello,
> >
> > for reference:
> >
> > i already found out that "connect to existing process" was my error
> here: it
> > means connecting to an existing zeppelin interpreter, not an existing
> flink
> > cluster. After fixing my error, i'm now in the same situation as
> described
> > here:
> >
> > https://stackoverflow.com/questions/38688277/flink-
> zeppelin-not-responding
> >
> > i guess it's more a zeppelin problem than a flink problem tho, as i see
> both
> > interpreter JVM and main zeppelin JVM waiting on thrift input (so it
> seems
> > they are waiting for each other)
> >
> > greetings,
> > Frank
> >
> >
> >
> >
> > On Tue, Aug 23, 2016 at 2:09 PM, Frank Dekervel 
> wrote:
> >>
> >> Hello,
> >>
> >> I try to set up apache zeppelin with a flink cluster (one jobmanager,
> one
> >> task manager).
> >>
> >> What i did was using the dockerfiles in flink-contrib/docker-flink + the
> >> latest binary release of apache zeppelin with all interpreters:
> >>
> >>
> >> https://github.com/apache/flink/blob/master/flink-contrib/docker-flink/
> Dockerfile
> >> (i changed the flink version to 1.0.3 to match zeppelin's flink version)
> >>
> >> I built another docker image around the latest binary release of
> zeppelin
> >> (with all interpreters), and i reconfigure the flink interpreter:
> >>
> >> connect to existing process
> >> host: jobmanager, port: 6123
> >> i removed all other properties
> >>
> >> when i try to submit a flink job, i get an error state and the following
> >> exception appears in the log (nothing appears in the jobmanager log)
> >>
> >> ERROR [2016-08-23 11:44:57,932] ({Thread-16}
> >> JobProgressPoller.java[run]:54) - Can not get or update progress
> >> org.apache.zeppelin.interpreter.InterpreterException:
> >> org.apache.thrift.transport.TTransportException
> >> at
> >> org.apache.zeppelin.interpreter.remote.RemoteInterpreter.getProgress(
> RemoteInterpreter.java:373)
> >> at
> >> org.apache.zeppelin.interpreter.LazyOpenInterpreter.getProgress(
> LazyOpenInterpreter.java:111)
> >> at
> >> org.apache.zeppelin.notebook.Paragraph.progress(Paragraph.java:237)
> >> at
> >> org.apache.zeppelin.scheduler.JobProgressPoller.run(
> JobProgressPoller.java:51)
> >> Caused by: org.apache.thrift.transport.TTransportException
> >> at
> >> org.apache.thrift.transport.TIOStreamTransport.read(
> TIOStreamTransport.java:132)
> >> at
> >> org.apache.thrift.transport.TTransport.readAll(TTransport.java:86)
> >> at
> >> org.apache.thrift.protocol.TBinaryProtocol.readAll(
> TBinaryProtocol.java:429)
> >> at
> >> org.apache.thrift.protocol.TBinaryProtocol.readI32(
> TBinaryProtocol.java:318)
> >> at
> >> org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(
> TBinaryProtocol.java:219)
> >> at
> >> org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:69)
> >> at
> >> org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService$
> Client.recv_getProgress(RemoteInterpreterService.java:296)
> >> at
> >> org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService$
> Client.getProgress(RemoteInterpreterService.java:281)
> >> at
> >> org.apache.zeppelin.interpreter.remote.RemoteInterpreter.getProgress(
> RemoteInterpreter.java:370)
> >> ... 3 more
> >>
> >> Flink in local mode works fine on zeppelin.
> >> Could somebody point me to what i'm doing wrong ?
> >>
> >> Thanks a lot!
> >> Frank
> >>
> >>
> >>
> >
>


Re: How large a Flink cluster can have?

2016-08-24 Thread Alexis Gendronneau
Hi,

If possible, with this information, that be great to know  how jobmanager
has to be scaled according to number of nodes ?
Will 1 Jobmanager be able to manage hundreds of nodes ? Is there
recommandation in terms of JM/TM ratio ?
Thanks

2016-07-14 15:41 GMT+02:00 Robert Metzger :

> Hi,
>
> I think the reason why this information is not written anywhere is because
> we don't know it either.
> Alibaba seems to run a fork of Flink on "thousands of nodes" [1].
>
> Maybe some of the production users on this mailing list can share some
> information regarding this.
>
>
> [1] http://www.slideshare.net/HadoopSummit/improvements-to-
> flink-its-applications-in-alibaba-search
>
>
> On Wed, Jul 13, 2016 at 3:54 PM, Yan Chou Chen 
> wrote:
>
>> FAQ[1], mailing list[2], and the powered by page[3] doesn't find
>> related information. Just out of curiosity, what is the current
>> largest Flink cluster size running in production? For instance, long
>> time ago yahoo [4] ran 4k hadoop nodes in production.
>>
>> Thanks
>>
>> [1]. https://flink.apache.org/faq.html
>> [2]. http://apache-flink-user-mailing-list-archive.2336050.
>> n4.nabble.com/template/NamlServlet.jtp?macro=search_
>> page=1=max+cluster=48
>> [3]. https://cwiki.apache.org/confluence/display/FLINK/Powered+by+Flink
>> [4]. https://developer.yahoo.com/blogs/hadoop/scaling-hadoop-
>> 4000-nodes-yahoo-410.html
>>
>
>


-- 
Alexis Gendronneau

alexis.gendronn...@corp.ovh.com
a.gendronn...@gmail.com


Re: How to get latency info from benchmark

2016-08-24 Thread Robert Metzger
Hi Eric,

Max is right, the tool has been used for a different benchmark [1]. The
throughput logger that should produce the right output is this one [2].
Very recently, I've opened a pull request for adding metric-measuring
support into the engine [3]. Maybe that's helpful for your experiments.


[1]
http://data-artisans.com/high-throughput-low-latency-and-exactly-once-stream-processing-with-apache-flink/
[2]
https://github.com/dataArtisans/performance/blob/master/flink-jobs/src/main/java/com/github/projectflink/streaming/Throughput.java#L203
[3] https://github.com/apache/flink/pull/2386



On Wed, Aug 24, 2016 at 2:04 PM, Maximilian Michels  wrote:

> I believe the AnaylzeTool is for processing logs of a different benchmark.
>
> CC Jamie and Robert who worked on the benchmark.
>
> On Wed, Aug 24, 2016 at 3:25 AM, Eric Fukuda  wrote:
> > Hi,
> >
> > I'm trying to benchmark Flink without Kafka as mentioned in this post
> > (http://data-artisans.com/extending-the-yahoo-streaming-benchmark/).
> After
> > running flink.benchmark.state.AdvertisingTopologyFlinkState with
> > user.local.event.generator in localConf.yaml set to 1, I ran
> > flink.benchmark.utils.AnalyzeTool giving
> > flink-1.0.1/log/flink-[username]-jobmanager-0-[servername].log as a
> > command-line argument. I got the following output and it does not have
> the
> > information about the latency.
> >
> >
> > = Latency (0 reports ) =
> > = Throughput (1 reports ) =
> > == null (entries: 10150)===
> > Mean throughput 639078.5018497099
> > Exception in thread "main" java.lang.IndexOutOfBoundsException: toIndex
> = 2
> > at java.util.ArrayList.subListRangeCheck(ArrayList.java:962)
> > at java.util.ArrayList.subList(ArrayList.java:954)
> > at flink.benchmark.utils.AnalyzeTool.main(AnalyzeTool.java:133)
> >
> >
> > Reading the code in AnalyzeTool.java, I found that it's looking for lines
> > that include "Latency" in the log file, but apparently it's not finding
> any.
> > I tried grepping the log file, and couldn't find any either. I have one
> > server that runs both JobManager and Task Manager and another server that
> > runs Redis, and they are connected through a network with each other.
> >
> > I think I have to do something to read the data stored in Redis before
> > running AnalyzeTool, but can't figure out what. Does anyone know how to
> get
> > the latency information?
> >
> > Thanks,
> > Eric
>


Re: How to get latency info from benchmark

2016-08-24 Thread Maximilian Michels
I believe the AnaylzeTool is for processing logs of a different benchmark.

CC Jamie and Robert who worked on the benchmark.

On Wed, Aug 24, 2016 at 3:25 AM, Eric Fukuda  wrote:
> Hi,
>
> I'm trying to benchmark Flink without Kafka as mentioned in this post
> (http://data-artisans.com/extending-the-yahoo-streaming-benchmark/). After
> running flink.benchmark.state.AdvertisingTopologyFlinkState with
> user.local.event.generator in localConf.yaml set to 1, I ran
> flink.benchmark.utils.AnalyzeTool giving
> flink-1.0.1/log/flink-[username]-jobmanager-0-[servername].log as a
> command-line argument. I got the following output and it does not have the
> information about the latency.
>
>
> = Latency (0 reports ) =
> = Throughput (1 reports ) =
> == null (entries: 10150)===
> Mean throughput 639078.5018497099
> Exception in thread "main" java.lang.IndexOutOfBoundsException: toIndex = 2
> at java.util.ArrayList.subListRangeCheck(ArrayList.java:962)
> at java.util.ArrayList.subList(ArrayList.java:954)
> at flink.benchmark.utils.AnalyzeTool.main(AnalyzeTool.java:133)
>
>
> Reading the code in AnalyzeTool.java, I found that it's looking for lines
> that include "Latency" in the log file, but apparently it's not finding any.
> I tried grepping the log file, and couldn't find any either. I have one
> server that runs both JobManager and Task Manager and another server that
> runs Redis, and they are connected through a network with each other.
>
> I think I have to do something to read the data stored in Redis before
> running AnalyzeTool, but can't figure out what. Does anyone know how to get
> the latency information?
>
> Thanks,
> Eric


Re: Setting up zeppelin with flink

2016-08-24 Thread Maximilian Michels
Hi!

There are some people familiar with the Zeppelin integration. CCing
Till and Trevor. Otherwise, you could also send this to the Zeppelin
community.

Cheers,
Max

On Wed, Aug 24, 2016 at 12:58 PM, Frank Dekervel  wrote:
> Hello,
>
> for reference:
>
> i already found out that "connect to existing process" was my error here: it
> means connecting to an existing zeppelin interpreter, not an existing flink
> cluster. After fixing my error, i'm now in the same situation as described
> here:
>
> https://stackoverflow.com/questions/38688277/flink-zeppelin-not-responding
>
> i guess it's more a zeppelin problem than a flink problem tho, as i see both
> interpreter JVM and main zeppelin JVM waiting on thrift input (so it seems
> they are waiting for each other)
>
> greetings,
> Frank
>
>
>
>
> On Tue, Aug 23, 2016 at 2:09 PM, Frank Dekervel  wrote:
>>
>> Hello,
>>
>> I try to set up apache zeppelin with a flink cluster (one jobmanager, one
>> task manager).
>>
>> What i did was using the dockerfiles in flink-contrib/docker-flink + the
>> latest binary release of apache zeppelin with all interpreters:
>>
>>
>> https://github.com/apache/flink/blob/master/flink-contrib/docker-flink/Dockerfile
>> (i changed the flink version to 1.0.3 to match zeppelin's flink version)
>>
>> I built another docker image around the latest binary release of zeppelin
>> (with all interpreters), and i reconfigure the flink interpreter:
>>
>> connect to existing process
>> host: jobmanager, port: 6123
>> i removed all other properties
>>
>> when i try to submit a flink job, i get an error state and the following
>> exception appears in the log (nothing appears in the jobmanager log)
>>
>> ERROR [2016-08-23 11:44:57,932] ({Thread-16}
>> JobProgressPoller.java[run]:54) - Can not get or update progress
>> org.apache.zeppelin.interpreter.InterpreterException:
>> org.apache.thrift.transport.TTransportException
>> at
>> org.apache.zeppelin.interpreter.remote.RemoteInterpreter.getProgress(RemoteInterpreter.java:373)
>> at
>> org.apache.zeppelin.interpreter.LazyOpenInterpreter.getProgress(LazyOpenInterpreter.java:111)
>> at
>> org.apache.zeppelin.notebook.Paragraph.progress(Paragraph.java:237)
>> at
>> org.apache.zeppelin.scheduler.JobProgressPoller.run(JobProgressPoller.java:51)
>> Caused by: org.apache.thrift.transport.TTransportException
>> at
>> org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132)
>> at
>> org.apache.thrift.transport.TTransport.readAll(TTransport.java:86)
>> at
>> org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:429)
>> at
>> org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:318)
>> at
>> org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:219)
>> at
>> org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:69)
>> at
>> org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService$Client.recv_getProgress(RemoteInterpreterService.java:296)
>> at
>> org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService$Client.getProgress(RemoteInterpreterService.java:281)
>> at
>> org.apache.zeppelin.interpreter.remote.RemoteInterpreter.getProgress(RemoteInterpreter.java:370)
>> ... 3 more
>>
>> Flink in local mode works fine on zeppelin.
>> Could somebody point me to what i'm doing wrong ?
>>
>> Thanks a lot!
>> Frank
>>
>>
>>
>


Re: How to share text file across tasks at run time in flink.

2016-08-24 Thread Maximilian Michels
Hi!

1. The community is working on adding side inputs to the DataStream
API. That will allow you to easily distribute data to all of your
workers.

2. In the meantime, you could use `.broadcast()` on a DataSet to
broadcast data to all workers. You still have to join that data with
another stream though.

3. The easiest method of all is to simply load your file in the
RichMapFunction's open() method. The file can reside in a distributed
file system which is accessible by all workers.

Cheers,
Max

On Wed, Aug 24, 2016 at 6:45 AM, Jark Wu  wrote:
> Hi,
>
> I think what Bswaraj want is excatly something like Storm Distributed Cache
> API[1] (if I’m not misunderstanding).
>
> The distributed cache feature in storm is used to efficiently distribute
> files (or blobs, which is the equivalent terminology for a file in the
> distributed cache and is used interchangeably in this document) that are
> large and can change during the lifetime of a topology, such as geo-location
> data, dictionaries, etc. Typical use cases include phrase recognition,
> entity extraction, document classification, URL re-writing, location/address
> detection and so forth. Such files may be several KB to several GB in size.
> For small datasets that don't need dynamic updates, including them in the
> topology jar could be fine. But for large files, the startup times could
> become very large. In these cases, the distributed cache feature can provide
> fast topology startup, especially if the files were previously downloaded
> for the same submitter and are still in the cache. This is useful with
> frequent deployments, sometimes few times a day with updated jars, because
> the large cached files will remain available without changes. The large
> cached blobs that do not change frequently will remain available in the
> distributed cache.
>
>
> We can look into this whether it is a common use case and how to implement
> it in Flink.
>
> [1] http://storm.apache.org/releases/2.0.0-SNAPSHOT/distcache-blobstore.html
>
>
> - Jark Wu
>
> 在 2016年8月23日,下午9:45,Lohith Samaga M  写道:
>
> Hi
> May be you could use Cassandra to store and fetch all such reference data.
> This way the reference data can be updated without restarting your
> application.
>
> Lohith
>
> Sent from my Sony Xperia™ smartphone
>
>
>
>  Baswaraj Kasture wrote 
>
> Thanks Kostas !
> I am using DataStream API.
>
> I have few config/property files (key vale text file) and also have business
> rule files (json).
> These rules and configurations are needed when we process incoming event.
> Is there any way to share them to task nodes from driver program ?
> I think this is very common use case and am sure other users may face
> similar issues.
>
> +Baswaraj
>
> On Mon, Aug 22, 2016 at 4:56 PM, Kostas Kloudas
>  wrote:
>>
>> Hello Baswaraj,
>>
>> Are you using the DataSet (batch) or the DataStream API?
>>
>> If you are in the first, you can use a broadcast variable for your task.
>> If you are using the DataStream one, then there is no proper support for
>> that.
>>
>> Thanks,
>> Kostas
>>
>> On Aug 20, 2016, at 12:33 PM, Baswaraj Kasture 
>> wrote:
>>
>> Am running Flink standalone cluster.
>>
>> I have text file that need to be shared across tasks when i submit my
>> application.
>> in other words , put this text file in class path of running tasks.
>>
>> How can we achieve this with flink ?
>>
>> In spark, spark-submit has --jars option that puts all the files specified
>> in class path of executors (executors run in separate JVM and spawned
>> dynamically, so it is possible).
>>
>> Flink's task managers run tasks in separate thread under taskmanager JVM
>> (?) , how can we make this text file to be accessible on all tasks spawned
>> by current application ?
>>
>> Using HDFS, NFS or including file in program jar is one way that i know,
>> but am looking for solution that can allows me to provide text file at run
>> time and still accessible in all tasks.
>> Thanks.
>>
>>
>
>
> Information transmitted by this e-mail is proprietary to Mphasis, its
> associated companies and/ or its customers and is intended
> for use only by the individual or entity to which it is addressed, and may
> contain information that is privileged, confidential or
> exempt from disclosure under applicable law. If you are not the intended
> recipient or it appears that this mail has been forwarded
> to you without proper authority, you are notified that any use or
> dissemination of this information in any manner is strictly
> prohibited. In such cases, please notify us immediately at
> mailmas...@mphasis.com and delete this mail from your records.
>
>


Re: Setting up zeppelin with flink

2016-08-24 Thread Frank Dekervel
Hello,

for reference:

i already found out that "connect to existing process" was my error here:
it means connecting to an existing zeppelin interpreter, not an existing
flink cluster. After fixing my error, i'm now in the same situation as
described here:

https://stackoverflow.com/questions/38688277/flink-zeppelin-not-responding

i guess it's more a zeppelin problem than a flink problem tho, as i see
both interpreter JVM and main zeppelin JVM waiting on thrift input (so it
seems they are waiting for each other)

greetings,
Frank




On Tue, Aug 23, 2016 at 2:09 PM, Frank Dekervel  wrote:

> Hello,
>
> I try to set up apache zeppelin with a flink cluster (one jobmanager, one
> task manager).
>
> What i did was using the dockerfiles in flink-contrib/docker-flink + the
> latest binary release of apache zeppelin with all interpreters:
>
> https://github.com/apache/flink/blob/master/flink-contrib/docker-flink/
> Dockerfile (i changed the flink version to 1.0.3 to match zeppelin's
> flink version)
>
> I built another docker image around the latest binary release of zeppelin
> (with all interpreters), and i reconfigure the flink interpreter:
>
>- connect to existing process
>- host: jobmanager, port: 6123
>- i removed all other properties
>
> when i try to submit a flink job, i get an error state and the following
> exception appears in the log (nothing appears in the jobmanager log)
>
> ERROR [2016-08-23 11:44:57,932] ({Thread-16} JobProgressPoller.java[run]:54)
> - Can not get or update progress
> org.apache.zeppelin.interpreter.InterpreterException:
> org.apache.thrift.transport.TTransportException
> at org.apache.zeppelin.interpreter.remote.
> RemoteInterpreter.getProgress(RemoteInterpreter.java:373)
> at org.apache.zeppelin.interpreter.LazyOpenInterpreter.
> getProgress(LazyOpenInterpreter.java:111)
> at org.apache.zeppelin.notebook.Paragraph.progress(Paragraph.
> java:237)
> at org.apache.zeppelin.scheduler.JobProgressPoller.run(
> JobProgressPoller.java:51)
> Caused by: org.apache.thrift.transport.TTransportException
> at org.apache.thrift.transport.TIOStreamTransport.read(
> TIOStreamTransport.java:132)
> at org.apache.thrift.transport.TTransport.readAll(TTransport.
> java:86)
> at org.apache.thrift.protocol.TBinaryProtocol.readAll(
> TBinaryProtocol.java:429)
> at org.apache.thrift.protocol.TBinaryProtocol.readI32(
> TBinaryProtocol.java:318)
> at org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(
> TBinaryProtocol.java:219)
> at org.apache.thrift.TServiceClient.receiveBase(
> TServiceClient.java:69)
> at org.apache.zeppelin.interpreter.thrift.
> RemoteInterpreterService$Client.recv_getProgress(
> RemoteInterpreterService.java:296)
> at org.apache.zeppelin.interpreter.thrift.
> RemoteInterpreterService$Client.getProgress(RemoteInterpreterService.java:
> 281)
> at org.apache.zeppelin.interpreter.remote.
> RemoteInterpreter.getProgress(RemoteInterpreter.java:370)
> ... 3 more
>
> Flink in local mode works fine on zeppelin.
> Could somebody point me to what i'm doing wrong ?
>
> Thanks a lot!
> Frank
>
>
>
>


Re: Dealing with Multiple sinks in Flink

2016-08-24 Thread Maximilian Michels
Hi Vinay,

Does this only happen with the S3 file system or also with your local
file system? Could you share some example code or log output of your
running job?

Best,
Max

On Wed, Aug 24, 2016 at 4:20 AM, Vinay Patil  wrote:
> Hi,
>
> In our flink pipeline we are currently writing the data to multiple S3
> objects/folders based on some conditions, so the issue I am facing is as
> follows :
>
> Consider these S3 folders :
> temp_bucket/processedData/20160823/
> temp_bucket/rawData/20160822/
> temp_bucket/errorData/20160821/
>
> Now when the parallelism is set to 1, the data gets written to all S3
> folders above, but when I set it to larger value the data is written only to
> the first folder and not the others.
>
> I am testing the flink job on EMR with 4 task managers having 16 slots, even
> if I keep parallelism as 4 , I am facing the same issue.
> (running from IDE is resulting in same output, Tested this with Flink 1.0.3
> and 1.1.1)
>
> I am not understanding why this is happening.
>
>
> Regards,
> Vinay Patil


Re: FLINK-4329 fix version

2016-08-24 Thread Maximilian Michels
Added a fix version 1.1.2 and 1.2.0 because a pull request is under way.

On Tue, Aug 23, 2016 at 1:17 PM, Ufuk Celebi  wrote:
> On Tue, Aug 23, 2016 at 12:28 PM, Yassine Marzougui
>  wrote:
>> The fix version of FLINK-4329 in JIRA is set to 1.1.1, but 1.1.1 is already
>> released. Should I expect it to be fixed in the next release? and will a
>> patch be available meanwhile? Thanks.
>
> Hey Yassine! The JIRA fix version tag is incorrect as you say (I just
> removed it). The best thing would be to ask in the pull request
> directly: https://github.com/apache/flink/pull/2350. I would assume
> that it will be included in 1.1.2 too.
>
> – Ufuk


Re: Checking for existance of output directory/files before running a batch job

2016-08-24 Thread Maximilian Michels
Forgot to mention, this is on the master. For Flink < 1.2.x, you will
have to use GlobalConfiguration.get();

On Wed, Aug 24, 2016 at 12:23 PM, Maximilian Michels  wrote:
> Hi Niels,
>
> The problem is that such method only works reliably if the cluster
> configuration, e.g. Flink and Hadoop config files, are present on the
> client machine. Also, the environment variables have to be set
> correctly. This is usually not the case when working from the IDE. But
> seems like your code is really in the jar which you execute against
> /bin/flink, so everything should be configured then. If so, you can
> add the following before your existing code:
>
> Configuration config = GlobalConfiguration.loadConfiguration();
> FileSystem.setDefaultScheme(config);
>
> Then you're good to go. We could think about adding this code to
> ExecutionEnvironment. The main problem, however, is that the location
> of the config file has to be supplied when working from an IDE, where
> the environment variables are usually not set.*
>
> Cheers,
> Max
>
> * You can use 
> GlobalConfiguration.loadConfiguration("/path/to/config/directory")
> from the IDE to load the config. Alternatively, set FLINK_CONF_DIR
> environment variable.
>
> On Mon, Aug 22, 2016 at 10:55 AM, Niels Basjes  wrote:
>> Yes, that did the trick. Thanks.
>> I was using a relative path without any FS specification.
>> So my path was "foo" and on the cluster this resolves to
>> "hdfs:///user/nbasjes/foo"
>> Locally this resolved to "file:///home/nbasjes/foo" and hence the mismatch I
>> was looking at.
>>
>> For now I can work with this fine.
>>
>> Yet I think having a method on the ExecutionEnvironment instance
>> 'getFileSystem()' that would return me the actual filesystem against which
>> my job "is going to be executed" would solve this in an easier way. That way
>> I can use a relative path (i.e. "foo") and run it anywhere (local, Yarn,
>> Mesos, etc.) without any problems.
>>
>> What do you guys think?
>> Is this desirable? Possible?
>>
>> Niels.
>>
>>
>>
>> On Fri, Aug 19, 2016 at 3:22 PM, Robert Metzger  wrote:
>>>
>>> Ooops. Looks like Google Mail / Apache / the internet needs 13 minutes to
>>> deliver an email.
>>> Sorry for double answering.
>>>
>>> On Fri, Aug 19, 2016 at 3:07 PM, Maximilian Michels 
>>> wrote:

 HI Niels,

 Have you tried specifying the fully-qualified path? The default is the
 local file system.

 For example, hdfs:///path/to/foo

 If that doesn't work, do you have the same Hadoop configuration on the
 machine where you test?

 Cheers,
 Max

 On Thu, Aug 18, 2016 at 2:02 PM, Niels Basjes  wrote:
 > Hi,
 >
 > I have a batch job that I run on yarn that creates files in HDFS.
 > I want to avoid running this job at all if the output already exists.
 >
 > So in my code (before submitting the job into yarn-session) I do this:
 >
 > String directory = "foo";
 >
 > Path directory = new Path(directoryName);
 > FileSystem fs = directory.getFileSystem();
 >
 > if (!fs.exists(directory)) {
 >
 > // run the job
 >
 > }
 >
 > What I found is that this code apparently checks the 'wrong' file
 > system. (I
 > always get 'false' even if it exists in hdfs)
 >
 > I checked the API of the execution environment yet I was unable to get
 > the
 > 'correct' filesystem from there.
 >
 > What is the proper way to check this?
 >
 >
 > --
 > Best regards / Met vriendelijke groeten,
 >
 > Niels Basjes
>>>
>>>
>>
>>
>>
>> --
>> Best regards / Met vriendelijke groeten,
>>
>> Niels Basjes


Re: Checking for existance of output directory/files before running a batch job

2016-08-24 Thread Maximilian Michels
Hi Niels,

The problem is that such method only works reliably if the cluster
configuration, e.g. Flink and Hadoop config files, are present on the
client machine. Also, the environment variables have to be set
correctly. This is usually not the case when working from the IDE. But
seems like your code is really in the jar which you execute against
/bin/flink, so everything should be configured then. If so, you can
add the following before your existing code:

Configuration config = GlobalConfiguration.loadConfiguration();
FileSystem.setDefaultScheme(config);

Then you're good to go. We could think about adding this code to
ExecutionEnvironment. The main problem, however, is that the location
of the config file has to be supplied when working from an IDE, where
the environment variables are usually not set.*

Cheers,
Max

* You can use GlobalConfiguration.loadConfiguration("/path/to/config/directory")
from the IDE to load the config. Alternatively, set FLINK_CONF_DIR
environment variable.

On Mon, Aug 22, 2016 at 10:55 AM, Niels Basjes  wrote:
> Yes, that did the trick. Thanks.
> I was using a relative path without any FS specification.
> So my path was "foo" and on the cluster this resolves to
> "hdfs:///user/nbasjes/foo"
> Locally this resolved to "file:///home/nbasjes/foo" and hence the mismatch I
> was looking at.
>
> For now I can work with this fine.
>
> Yet I think having a method on the ExecutionEnvironment instance
> 'getFileSystem()' that would return me the actual filesystem against which
> my job "is going to be executed" would solve this in an easier way. That way
> I can use a relative path (i.e. "foo") and run it anywhere (local, Yarn,
> Mesos, etc.) without any problems.
>
> What do you guys think?
> Is this desirable? Possible?
>
> Niels.
>
>
>
> On Fri, Aug 19, 2016 at 3:22 PM, Robert Metzger  wrote:
>>
>> Ooops. Looks like Google Mail / Apache / the internet needs 13 minutes to
>> deliver an email.
>> Sorry for double answering.
>>
>> On Fri, Aug 19, 2016 at 3:07 PM, Maximilian Michels 
>> wrote:
>>>
>>> HI Niels,
>>>
>>> Have you tried specifying the fully-qualified path? The default is the
>>> local file system.
>>>
>>> For example, hdfs:///path/to/foo
>>>
>>> If that doesn't work, do you have the same Hadoop configuration on the
>>> machine where you test?
>>>
>>> Cheers,
>>> Max
>>>
>>> On Thu, Aug 18, 2016 at 2:02 PM, Niels Basjes  wrote:
>>> > Hi,
>>> >
>>> > I have a batch job that I run on yarn that creates files in HDFS.
>>> > I want to avoid running this job at all if the output already exists.
>>> >
>>> > So in my code (before submitting the job into yarn-session) I do this:
>>> >
>>> > String directory = "foo";
>>> >
>>> > Path directory = new Path(directoryName);
>>> > FileSystem fs = directory.getFileSystem();
>>> >
>>> > if (!fs.exists(directory)) {
>>> >
>>> > // run the job
>>> >
>>> > }
>>> >
>>> > What I found is that this code apparently checks the 'wrong' file
>>> > system. (I
>>> > always get 'false' even if it exists in hdfs)
>>> >
>>> > I checked the API of the execution environment yet I was unable to get
>>> > the
>>> > 'correct' filesystem from there.
>>> >
>>> > What is the proper way to check this?
>>> >
>>> >
>>> > --
>>> > Best regards / Met vriendelijke groeten,
>>> >
>>> > Niels Basjes
>>
>>
>
>
>
> --
> Best regards / Met vriendelijke groeten,
>
> Niels Basjes


Re: "Failed to retrieve JobManager address" in Flink 1.1.1 with JM HA

2016-08-24 Thread Maximilian Michels
Hi Hironori,

That's what I thought. So it won't be an issue for most users who do
not comment out the JobManager url from the config. Still, the
information printed is not correct. The issue has just been fixed.

You will have to wait for the next minor release 1.1.2 or build the
'release-1.1' Git branch.

Best,
Max

On Wed, Aug 24, 2016 at 11:14 AM, Hironori Ogibayashi
 wrote:
> Ufuk, Max,
>
> Thank you for your answer and opening JIRA.
> I will wait for the fix.
>
> As Max mentioned, I first commented out jobmanager.rpc.address,
> jobmanager.rpc.port. When I tried setting localhost and 6123
> respectively, it worked.
>
> Regards,
> Hironori
>
> 2016-08-24 0:54 GMT+09:00 Maximilian Michels :
>> Created an issue and fix should be there soon:
>> https://issues.apache.org/jira/browse/FLINK-4454
>>
>> Thanks,
>> Max
>>
>> On Tue, Aug 23, 2016 at 4:38 PM, Maximilian Michels  wrote:
>>> Hi!
>>>
>>> Yes, this is a bug. However, there seems to be something wrong with
>>> the config directory because Flink fails to load the default value
>>> ("localhost") from the config. If you had a default value for the job
>>> manager in flink-conf.yaml, it wouldn't fail but only display a wrong
>>> job manager url. Note that it still connects to the right job manager
>>> afterwards.
>>>
>>> Sorry for the trouble.
>>>
>>> Thanks,
>>> Max
>>>
>>> On Tue, Aug 23, 2016 at 11:02 AM, Ufuk Celebi  wrote:
 You are right that this config key is not needed in this case.

 The ClusterClient has been refactored between Flink 1.0 and 1.1 and
 the config parsing might be too strict in this case. It expects the
 IPC address to be set, which is not necessary as you say. It should be
 very easy to fix for 1.1.2. Let's confirm that it is actually a bug
 with Max and file an issue afterwards.

 @Max: can you confirm whether this is correct?


 On Tue, Aug 23, 2016 at 7:24 AM, Hironori Ogibayashi
  wrote:
> Hello,
>
> After I upgraded to 1.1.1, I am getting error when submitting job with
> "flink run"
>
> The command and result is like this. It has been working with Flink 1.0.3.
>
> ---
>  % FLINK_CONF_DIR=~/opt/flink/conf ~/opt/flink/flink-1.1.1/bin/flink
> run -c MyJob target/my-flink-job.jar
>
> 
>  The program finished with the following exception:
>
> java.lang.RuntimeException: Failed to retrieve JobManager address
> at 
> org.apache.flink.client.program.ClusterClient.getJobManagerAddressFromConfig(ClusterClient.java:244)
> at 
> org.apache.flink.client.program.StandaloneClusterClient.getClusterIdentifier(StandaloneClusterClient.java:78)
> at 
> org.apache.flink.client.CliFrontend.createClient(CliFrontend.java:887)
> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:237)
> at 
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1005)
> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048)
> Caused by: java.lang.IllegalArgumentException: hostname can't be null
> at 
> java.net.InetSocketAddress.checkHost(InetSocketAddress.java:149)
> at java.net.InetSocketAddress.(InetSocketAddress.java:216)
> at 
> org.apache.flink.client.program.ClusterClient.getJobManagerAddressFromConfig(ClusterClient.java:242)
> ... 5 more
> ---
>
> I am using JobManager HA and I set "recovery.mode: zookeeper",
> recovery.zookeeper.quorum, recovery.zookeeper.path.root is my
> flink-conf.yaml.
> So, the client should be able to get JobManager address from zookeeper.
> If I explicitly specify JobManager address with -m option, it works.
>
> Am I missing something?
>
> Regards,
> Hironori Ogibayashi


Re: sharded state, 2-step operation

2016-08-24 Thread Stephan Ewen
Hi!

The "feedback loop" sounds like a solution, yes. Actually, that works well
with the CoMap / CoFlatMap - one input to the CoMap would be the original
value, the other input the feedback value.

https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/index.html#datastream-transformations


Once Flink's iterations are better hardened, these could be used for
feedback as well, and would be exactly once.

Best,
Stephan



On Tue, Aug 23, 2016 at 9:05 PM, Michael Warnock  wrote:

> Another approach I'm considering, which feels pretty kludgy, but I think
> should be acceptable for my current use:
>
> Only one stateful op, keyed on the same field, but with a flag field
> indicating the actual operation to be performed.  The results of this op
> are output to a kafka (or whatever) queue, which is ingested along with the
> first stream.  The two state changes don't have to be atomic for my case,
> but the second one does have to be guaranteed to eventually happen, and be
> idempotent.  I'm not quite sure how to (safely) make that second pass
> idempotent though, at the moment, and I'm not sure if there might be other
> issues with it I'm not seeing - it definitely doesn't _feel_ like a great
> solution.
>
> Any thoughts?
>
> On Tue, Aug 23, 2016 at 11:53 AM, Michael Warnock 
> wrote:
>
>> Thanks for the quick response!
>>
>> I've been wondering about Connected streams and CoFlatMap, but either I
>> don't see all the ways they can be used, or they don't solve my problem.
>> Do you know of any examples outside of the documentation?  My searches for
>> "flink comap example" and similar haven't turned anything up.
>>
>> On Tue, Aug 23, 2016 at 11:41 AM, Stephan Ewen  wrote:
>>
>>> Hi!
>>>
>>> This is a tricky one. State access and changes are not shared across
>>> operators in Flink.
>>> We chose that design because it makes it possible to work on "local"
>>> state in each operator
>>>   - state automatically shards with the computation
>>>   - no locking / concurrency implications
>>>   - asynchronous persistence
>>>
>>> Sharing state across operations between two operations in the same stage
>>> works with the CoMap / CoFlatMap functions
>>> Sharing state across successive nodes does not work, because the
>>> functions could be executed on different machines and one would need to do
>>> remote and synchronized state updates that way.
>>>
>>> Do you think you can use the CoMap / CoFlatMap functions for this?
>>>
>>> Greetings,
>>> Stephan
>>>
>>>
>>> On Tue, Aug 23, 2016 at 8:03 PM, Michael Warnock 
>>> wrote:
>>>
 I'm trying to do something that seems like it should be possible, but
 my implementation doesn't behave as expected, and I'm not sure how else to
 express it.

 Let's say the stream is composed of tuples like this: (Alice, Bob, 1)
 and I want to keyBy(1), flatMap with state associated with Alice, then
 keyBy(2) with state associated with Bob.  The trick is, when I later get a
 tuple like (Bob, Alice, 1), I want the first operator to see the state that
 was updated in the second op previously.  Is this possible?  I tried
  implementing both operators as one, getting the state by descriptor in the
 flatMap body, and even instantiating the operator only once; the behavior
 is, as you might guess, that the state in stage 1 doesn't include changes
 made previously in stage 2.

 Is there any way to do this without throwing away the parallelism?

 Thanks in advance!
 ~Michael

>>>
>>>
>>
>


Re: JobManager HA without Distributed FileSystem

2016-08-24 Thread Stephan Ewen
Hi!

  - Concerning replication to other JobManagers - this could be an
extension, but it would need to also support additional replacement
JobManagers coming up later, so it would need a replication service in the
JobManagers, not just a "send to all" at program startup.

  - That would work in theory like this, yes, assuming the JobGraph storage
would be solved (the jars can always be added to the "lib" directory in the
first place).
Right now, Flink does not accept that setting, but one could think
about that configuration, definitely. We are currently abstracting
HighAvailability services for different configurations, this could be one
of them.

Stephan


On Wed, Aug 24, 2016 at 9:30 AM, Konstantin Knauf <
konstantin.kn...@tngtech.com> wrote:

> Hi Stephan,
>
> thanks for the quick response, understood. Is there a reason why JAR
> files and JobGraph are not sent to all JobManagers by the client?
> Accordingly, why don't all taskmanagers sent Checkpoint Metadata to all
> JobManagers?
>
> I did not have any other storage at mind [1]. I am basically interested
> in what is possible with the MemoryStateBackend alone. So, from here on
> let's take JM HA aside.
>
> For a stand-alone Flink Cluster with MemoryStateBackend (default config)
> I can only have 1MB (akka.framesize) of state per handle, correct?
>
> There is one handle per operator, correct?
>
> So, for example, a KafkaConsumer with parallism 2 and consuming from a
> topic with 20 partitions:
>
> Two operators each with a state of a HashMap
> with 10 entries. Kafka Topic Partition has field: String, int, int. So
> this should amount to < 1kbyte, if the name of the partition is of
> reasonable length.
>
> So, if this is the only state in the pipeline, there is no problem using
> the MemoryStateBackend, if one accepts, that a JM failure means a loss
> of the state? In case of the KafkaConsumer, the current offsets are also
> stored in Kafka/Zookeeper anyway, so actually there would not be any
> loss of data even in this case, just duplication.
>
> Does this make sense?
>
> Cheers,
>
> Konstantin
>
> [1] We did consider reviving FLINK-3035 (Redis Statebackend), but that's
> a different topic ;)
>
>
>
> On 23.08.2016 20:45, Stephan Ewen wrote:
> > Hi!
> >
> > The state one can store in ZooKeeper is only very small (recommended is
> > smaller than 1MB per handle).
> >
> > For HA, the JobManager needs to persist:
> >   - JobGraph
> >   - JAR files
> >   - Checkpoint Metadata
> >
> > Those are easily too large for ZooKeeper, which is why Flink currently
> > requires a DFS to store those, and only stores "pointers" to the data in
> > the DFS in ZooKeeper.
> >
> > Are you thinking of another highly available storage for larger data
> > (megabytes) that could be used here?
> >
> > Greetings,
> > Stephan
> >
> >
> > On Tue, Aug 23, 2016 at 6:36 PM, Konstantin Knauf
> > >
> wrote:
> >
> > Hi all,
> >
> > the documenation of JobManager HA [1] explains that HA is only
> possible
> > with the FS state backend as Job Manager metadata is saved there.
> >
> > What are the particular problems using JobManager HA with the
> > MemoryStatebackend?
> >
> > As I understand it, the state is checkpointed to all JobManagers
> > (leaders + standy) when using the MemoryStateBackend or am I wrong
> here?
> >
> > Follow Up Question: Is it generally possible to setup a highly
> > available, at-least-once (source: Kafka) pipeline without a
> distributed
> > filesystem (only local FS and Zookeeper) for the checkpoints?
> >
> > Cheers,
> >
> > Konstantin
> >
> >
> > [1]
> > https://ci.apache.org/projects/flink/flink-docs-
> master/setup/jobmanager_high_availability.html
> >  master/setup/jobmanager_high_availability.html>
> >
> > --
> > Konstantin Knauf * konstantin.kn...@tngtech.com
> >  * +49-174-3413182
> > 
> > TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> > Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> > Sitz: Unterföhring * Amtsgericht München * HRB 135082
> >
> >
>
> --
> Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182
> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>
>


Re: "Failed to retrieve JobManager address" in Flink 1.1.1 with JM HA

2016-08-24 Thread Hironori Ogibayashi
Ufuk, Max,

Thank you for your answer and opening JIRA.
I will wait for the fix.

As Max mentioned, I first commented out jobmanager.rpc.address,
jobmanager.rpc.port. When I tried setting localhost and 6123
respectively, it worked.

Regards,
Hironori

2016-08-24 0:54 GMT+09:00 Maximilian Michels :
> Created an issue and fix should be there soon:
> https://issues.apache.org/jira/browse/FLINK-4454
>
> Thanks,
> Max
>
> On Tue, Aug 23, 2016 at 4:38 PM, Maximilian Michels  wrote:
>> Hi!
>>
>> Yes, this is a bug. However, there seems to be something wrong with
>> the config directory because Flink fails to load the default value
>> ("localhost") from the config. If you had a default value for the job
>> manager in flink-conf.yaml, it wouldn't fail but only display a wrong
>> job manager url. Note that it still connects to the right job manager
>> afterwards.
>>
>> Sorry for the trouble.
>>
>> Thanks,
>> Max
>>
>> On Tue, Aug 23, 2016 at 11:02 AM, Ufuk Celebi  wrote:
>>> You are right that this config key is not needed in this case.
>>>
>>> The ClusterClient has been refactored between Flink 1.0 and 1.1 and
>>> the config parsing might be too strict in this case. It expects the
>>> IPC address to be set, which is not necessary as you say. It should be
>>> very easy to fix for 1.1.2. Let's confirm that it is actually a bug
>>> with Max and file an issue afterwards.
>>>
>>> @Max: can you confirm whether this is correct?
>>>
>>>
>>> On Tue, Aug 23, 2016 at 7:24 AM, Hironori Ogibayashi
>>>  wrote:
 Hello,

 After I upgraded to 1.1.1, I am getting error when submitting job with
 "flink run"

 The command and result is like this. It has been working with Flink 1.0.3.

 ---
  % FLINK_CONF_DIR=~/opt/flink/conf ~/opt/flink/flink-1.1.1/bin/flink
 run -c MyJob target/my-flink-job.jar

 
  The program finished with the following exception:

 java.lang.RuntimeException: Failed to retrieve JobManager address
 at 
 org.apache.flink.client.program.ClusterClient.getJobManagerAddressFromConfig(ClusterClient.java:244)
 at 
 org.apache.flink.client.program.StandaloneClusterClient.getClusterIdentifier(StandaloneClusterClient.java:78)
 at 
 org.apache.flink.client.CliFrontend.createClient(CliFrontend.java:887)
 at org.apache.flink.client.CliFrontend.run(CliFrontend.java:237)
 at 
 org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1005)
 at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048)
 Caused by: java.lang.IllegalArgumentException: hostname can't be null
 at java.net.InetSocketAddress.checkHost(InetSocketAddress.java:149)
 at java.net.InetSocketAddress.(InetSocketAddress.java:216)
 at 
 org.apache.flink.client.program.ClusterClient.getJobManagerAddressFromConfig(ClusterClient.java:242)
 ... 5 more
 ---

 I am using JobManager HA and I set "recovery.mode: zookeeper",
 recovery.zookeeper.quorum, recovery.zookeeper.path.root is my
 flink-conf.yaml.
 So, the client should be able to get JobManager address from zookeeper.
 If I explicitly specify JobManager address with -m option, it works.

 Am I missing something?

 Regards,
 Hironori Ogibayashi


Re: JobManager HA without Distributed FileSystem

2016-08-24 Thread Konstantin Knauf
Hi Stephan,

thanks for the quick response, understood. Is there a reason why JAR
files and JobGraph are not sent to all JobManagers by the client?
Accordingly, why don't all taskmanagers sent Checkpoint Metadata to all
JobManagers?

I did not have any other storage at mind [1]. I am basically interested
in what is possible with the MemoryStateBackend alone. So, from here on
let's take JM HA aside.

For a stand-alone Flink Cluster with MemoryStateBackend (default config)
I can only have 1MB (akka.framesize) of state per handle, correct?

There is one handle per operator, correct?

So, for example, a KafkaConsumer with parallism 2 and consuming from a
topic with 20 partitions:

Two operators each with a state of a HashMap
with 10 entries. Kafka Topic Partition has field: String, int, int. So
this should amount to < 1kbyte, if the name of the partition is of
reasonable length.

So, if this is the only state in the pipeline, there is no problem using
the MemoryStateBackend, if one accepts, that a JM failure means a loss
of the state? In case of the KafkaConsumer, the current offsets are also
stored in Kafka/Zookeeper anyway, so actually there would not be any
loss of data even in this case, just duplication.

Does this make sense?

Cheers,

Konstantin

[1] We did consider reviving FLINK-3035 (Redis Statebackend), but that's
a different topic ;)



On 23.08.2016 20:45, Stephan Ewen wrote:
> Hi!
> 
> The state one can store in ZooKeeper is only very small (recommended is
> smaller than 1MB per handle).
> 
> For HA, the JobManager needs to persist:
>   - JobGraph
>   - JAR files
>   - Checkpoint Metadata
> 
> Those are easily too large for ZooKeeper, which is why Flink currently
> requires a DFS to store those, and only stores "pointers" to the data in
> the DFS in ZooKeeper.
> 
> Are you thinking of another highly available storage for larger data
> (megabytes) that could be used here?
> 
> Greetings,
> Stephan
> 
> 
> On Tue, Aug 23, 2016 at 6:36 PM, Konstantin Knauf
> > wrote:
> 
> Hi all,
> 
> the documenation of JobManager HA [1] explains that HA is only possible
> with the FS state backend as Job Manager metadata is saved there.
> 
> What are the particular problems using JobManager HA with the
> MemoryStatebackend?
> 
> As I understand it, the state is checkpointed to all JobManagers
> (leaders + standy) when using the MemoryStateBackend or am I wrong here?
> 
> Follow Up Question: Is it generally possible to setup a highly
> available, at-least-once (source: Kafka) pipeline without a distributed
> filesystem (only local FS and Zookeeper) for the checkpoints?
> 
> Cheers,
> 
> Konstantin
> 
> 
> [1]
> 
> https://ci.apache.org/projects/flink/flink-docs-master/setup/jobmanager_high_availability.html
> 
> 
> 
> --
> Konstantin Knauf * konstantin.kn...@tngtech.com
>  * +49-174-3413182
> 
> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> Sitz: Unterföhring * Amtsgericht München * HRB 135082
> 
> 

-- 
Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082



signature.asc
Description: OpenPGP digital signature


Re: flink-shaded-hadoop

2016-08-24 Thread Aljoscha Krettek
Hi,
this might be due to a bug in the Flink 1.1.0 maven dependencies. Can you
try updating to Flink 1.1.1?

Cheers,
Aljoscha

On Mon, 22 Aug 2016 at 07:48  wrote:

> Hi,
> every one , when i use scala version 2.10,and set the sbt project(add
> those:flink-core,flink-scala,flink-streaming-scala,flink-kafka,flink-streaming-connectors,),the
> result download the flink-shaded-hadoop1_2.10.jar,but use scala version
> 2.11,i got flink-shaded-hadoop1_2.10.jar and flink-shaded-hadoop2_2.11.jar.
> why? some can tell me?
>
> 
>
>
>
>
>