Re: ResourceManager not using correct akka URI in standalone cluster (?)

2016-09-28 Thread AJ Heller
Thank you Till. I was in a time crunch, and rebuilt my cluster from the
ground up with hadoop installed. All works fine now, `netstat -pn | grep
6123` shows flink's pid. Hadoop may be irrelevant, I can't rule out PEBKAC
yet :-). Sorry, when I have time I'll attempt to reproduce the scenario, on
the off chance there's a bug in there I can help dig up.

Best,
aj


Create stream from multiple files using "env.readFile(format, input1, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000, FilePathFilter.createDefaultFilter())" ?

2016-09-28 Thread Anchit Jatana
Hi All,

I have a use case where in need to create multiple source streams from
multiple files and monitor the files for any changes using the "
FileProcessingMode.PROCESS_CONTINUOUSLY"

Intention is to achieve something like this(have a monitored stream for
each of the multiple files), something like this:

DataStream stream1 = env.readFile(format, input1,
FileProcessingMode.PROCESS_CONTINUOUSLY, 1000,
FilePathFilter.createDefaultFilter());

DataStream stream2 = env.readFile(format, input2,
FileProcessingMode.PROCESS_CONTINUOUSLY, 1000,
FilePathFilter.createDefaultFilter());

DataStream stream3= env.readFile(format, input3, FileProcessingMode.
PROCESS_CONTINUOUSLY, 1000, FilePathFilter.createDefaultFilter());

.

.

.

.

DataStream streamN = env.readFile(format, inputN,
FileProcessingMode.PROCESS_CONTINUOUSLY, 1000,
FilePathFilter.createDefaultFilter());

Since, this implementation doesn't work, can someone suggest a way how this
thing can be achieved?


PS: Main intention is to '*monitor'* all the files and stream the updated
content if any change has been made to it.


Thank you!

Regards,

Anchit


Re: Error while adding data to RocksDB: No more bytes left

2016-09-28 Thread Stephan Ewen
Hi Shannon!

The stack trace you pasted is independent of checkpointing - it seems to be
from the regular processing. Does this only happen when checkpoints are
activated?

Can you also share which checkpoint method you use?
  - FullyAsynchronous
  - SemiAsynchronous

I think there are two possibilities for what can happen
  - There is a serialization inconsistency in the Serializers. If that is
the case, this error should occur almost in a deterministic fashion. To
debug that, would be good to know which data types you are using.
  - There is a bug in RocksDB (or Flink's wrapping of it) where data gets
corrupted when using the snapshot feature. That would explain why this only
occurs when checkpoints are happening.

Greetings,
Stephan


On Wed, Sep 28, 2016 at 7:28 PM, Shannon Carey  wrote:

> It appears that when one of my jobs tries to checkpoint, the following
> exception is triggered. I am using Flink 1.1.1 in Scala 2.11. RocksDB
> checkpoints are being saved to S3.
>
> java.lang.RuntimeException: Error while adding data to RocksDB
> at org.apache.flink.contrib.streaming.state.
> RocksDBFoldingState.add(RocksDBFoldingState.java:125)
> at org.apache.flink.streaming.runtime.operators.windowing.
> WindowOperator.processElement(WindowOperator.java:382)
> at org.apache.flink.streaming.runtime.io.StreamInputProcessor.
> processInput(StreamInputProcessor.java:176)
> at org.apache.flink.streaming.runtime.tasks.
> OneInputStreamTask.run(OneInputStreamTask.java:66)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:266)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.EOFException: No more bytes left.
> at org.apache.flink.api.java.typeutils.runtime.
> NoFetchingInput.require(NoFetchingInput.java:77)
> at com.esotericsoftware.kryo.io.Input.readVarLong(Input.java:690)
> at com.esotericsoftware.kryo.io.Input.readLong(Input.java:685)
> at com.esotericsoftware.kryo.serializers.UnsafeCacheFields$
> UnsafeLongField.read(UnsafeCacheFields.java:160)
> at com.esotericsoftware.kryo.serializers.FieldSerializer.
> read(FieldSerializer.java:528)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.
> java:761)
> at org.apache.flink.api.java.typeutils.runtime.kryo.
> KryoSerializer.deserialize(KryoSerializer.java:232)
> at org.apache.flink.api.scala.typeutils.CaseClassSerializer.
> deserialize(CaseClassSerializer.scala:113)
> at org.apache.flink.api.scala.typeutils.CaseClassSerializer.
> deserialize(CaseClassSerializer.scala:30)
> at org.apache.flink.contrib.streaming.state.
> RocksDBFoldingState.add(RocksDBFoldingState.java:118)
> ... 6 more
>
> Thanks for any help!
>
> Shannon
>


Error while adding data to RocksDB: No more bytes left

2016-09-28 Thread Shannon Carey
It appears that when one of my jobs tries to checkpoint, the following 
exception is triggered. I am using Flink 1.1.1 in Scala 2.11. RocksDB 
checkpoints are being saved to S3.

java.lang.RuntimeException: Error while adding data to RocksDB
at 
org.apache.flink.contrib.streaming.state.RocksDBFoldingState.add(RocksDBFoldingState.java:125)
at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:382)
at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:176)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:66)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:266)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.EOFException: No more bytes left.
at 
org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:77)
at com.esotericsoftware.kryo.io.Input.readVarLong(Input.java:690)
at com.esotericsoftware.kryo.io.Input.readLong(Input.java:685)
at 
com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeLongField.read(UnsafeCacheFields.java:160)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:232)
at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:113)
at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:30)
at 
org.apache.flink.contrib.streaming.state.RocksDBFoldingState.add(RocksDBFoldingState.java:118)
... 6 more

Thanks for any help!

Shannon


Re: Best way to trigger dataset sampling

2016-09-28 Thread Flavio Pompermaier
I think I'll probably end with submitting the job through YARN in order to
have a more standard approach :)

Thanks,
Flavio

On Wed, Sep 28, 2016 at 5:19 PM, Maximilian Michels  wrote:

> I meant that you simply keep the sampling jar on the machine where you
> want to sample. However, you mentioned that it is a requirement for it
> to be on the cluster.
>
> Cheers,
> Max
>
> On Tue, Sep 27, 2016 at 3:18 PM, Flavio Pompermaier
>  wrote:
> > Hi max,
> > that's exactly what I was looking for. What do you mean for 'the best
> thing
> > is if you keep a local copy of your sampling jars and work directly with
> > them'?
> >
> > Best,
> > Flavio
> >
> > On Tue, Sep 27, 2016 at 2:35 PM, Maximilian Michels 
> wrote:
> >>
> >> Hi Flavio,
> >>
> >> This is not really possible at the moment. Though there is a workaround.
> >> You can create a dummy jar file (may be empty). Then you can use
> >>
> >> ./flink run -C hdfs:///path/to/cluster.jar -c org.package.SampleClass
> >> /path/to/dummy.jar
> >>
> >> That way Flink will include your cluster jar and you can load all
> classes
> >> necessary.
> >>
> >> Alternatively, using the Remote Environment, this looks like this:
> >>
> >> public static void main(String[] args) throws Exception {
> >>
> >>final RemoteEnvironment env = new RemoteEnvironment(
> >>   "remoteHost",
> >>   6123,
> >>   new Configuration(),
> >>   new String[0],
> >>   new URL[]{
> >>  new URL("file:///path/to/sample.jar"),
> >>  new
> >> URL("file:///Users/max/Dev/flink/build-target/lib/flink-
> dist_2.10-1.2-SNAPSHOT.jar")});
> >>URLClassLoader classLoader = new
> >> URLClassLoader(env.globalClasspaths.toArray(new URL[0]));
> >>
> >>Class clazz =
> >> classLoader.loadClass("org.package.sample.SampleClass");
> >>
> >>Method main = clazz.getDeclaredMethod("sampleMethod",
> >> ExecutionEnvironment.class);
> >>
> >>// pass environment as an argument to your sample method
> >>// the method should return the results of the execution
> >>Object sampleResult = main.invoke(null, env);
> >> }
> >>
> >>
> >> Beware, this is extremely hacky. We should have a better way to invoke
> jar
> >> files remotely. Honestly, the best thing is if you keep a local copy of
> your
> >> sampling jars and work directly with them.
> >>
> >> Cheers,
> >> Max
> >>
> >> On Tue, Sep 27, 2016 at 12:25 PM, Flavio Pompermaier
> >>  wrote:
> >>>
> >>> Hi Max,
> >>> actually I have a jar containing sampling jobs and I need to collect
> >>> results from a client.
> >>> I've tried to use ExecutionEnvironment.createRemoteEnvironment but I
> fear
> >>> that it's not the right way to do that because
> >>> I just need to tell the cluster the main class and the parameters to
> run
> >>> the job (and where the jar file is on HDFS).
> >>>
> >>> Best,
> >>> Flavio
> >>>
> >>> On Tue, Sep 27, 2016 at 12:06 PM, Maximilian Michels 
> >>> wrote:
> 
>  Hi Flavio,
> 
>  Do you want to sample from a running batch job? That would be like
>  Queryable State in streaming jobs but it is not supported in batch
>  mode.
> 
>  Cheers,
>  Max
> 
> 
>  On Mon, Sep 26, 2016 at 6:13 PM, Flavio Pompermaier
>   wrote:
>  > Hi to all,
>  >
>  > I have a use case where I need to tell a Flink cluster to give me a
>  > sample
>  > of X records using parametrizable sampling functions. Is there any
>  > best
>  > practice or advice to do that?
>  >
>  > Should I create a Remote ExecutionEnvironment or should I use the
>  > Flink
>  > client (I don't know if it uses REST services or RPC or whatever)?
>  > Is there any java snippet for that?
>  >
>  > Best,
>  > Flavio
>  >
> >>>
> >>>
> >>>
> >>>
> >>
> >
> >
>


Re: Merge the states of different partition in streaming

2016-09-28 Thread Ufuk Celebi
Great to hear!

On Wed, Sep 28, 2016 at 5:18 PM, Simone Robutti
 wrote:
> Solved. Probably there was an error in the way I was testing. Also I
> simplified the job and it works now.
>
> 2016-09-27 16:01 GMT+02:00 Simone Robutti :
>>
>> Hello,
>>
>> I'm dealing with an analytical job in streaming and I don't know how to
>> write the last part.
>>
>> Actually I want to count all the elements in a window with a given status,
>> so I keep a state with a Map[Status,Long]. This state is updated starting
>> from tuples containing the oldStatus and the newStatus. So every event
>> generates a +1 for the new status and a -1 for the old status. Then I want
>> to reduce all these counts and move from a local and partial state to a
>> global state that will be written in output.
>>
>> Right now my code look like:
>>
>> filteredLatestOrders.keyBy(x =>
>> x._1.getStatus).mapWithState(updateState).keyBy(x=>"0").reduce((orderA,orderB)=>orderA.sum(orderB))
>>
>> where "filteredLatestOrder" is a DataStream containing informations about
>> the elements, the new state and the old state.
>>
>> This produces in output:
>>
>> 2> Map(DRAFT -> 0, RESERVED -> 1, PICKED -> 0, PACKED -> 0)
>> 2> Map(DRAFT -> 0, RESERVED -> 2, PICKED -> 0, PACKED -> 0)
>> 2> Map(DRAFT -> 0, RESERVED -> 3, PICKED -> 0, PACKED -> 1)
>> 3> Map(DRAFT -> 0, RESERVED -> -1, PICKED -> 1, PACKED -> 0)
>> 4> Map(DRAFT -> 0, RESERVED -> -1, PICKED -> 0, PACKED -> 1)
>>
>> I thought that keying with a fixed value would collect all the elements in
>> a single node so that I could finally compute the final result, but they are
>> left on different nodes and are never summed.
>>
>> Is this the correct approach? In that case, how can I do what I need? Is
>> there a smarter way to count distinct evolving elements by their status in a
>> streaming? Mind that the original source of events are updates to the status
>> of an element and the requirement is that I want to count only the latest
>> status available.
>>
>> Thank you in advance,
>>
>> Simone
>
>


Re: Best way to trigger dataset sampling

2016-09-28 Thread Maximilian Michels
I meant that you simply keep the sampling jar on the machine where you
want to sample. However, you mentioned that it is a requirement for it
to be on the cluster.

Cheers,
Max

On Tue, Sep 27, 2016 at 3:18 PM, Flavio Pompermaier
 wrote:
> Hi max,
> that's exactly what I was looking for. What do you mean for 'the best thing
> is if you keep a local copy of your sampling jars and work directly with
> them'?
>
> Best,
> Flavio
>
> On Tue, Sep 27, 2016 at 2:35 PM, Maximilian Michels  wrote:
>>
>> Hi Flavio,
>>
>> This is not really possible at the moment. Though there is a workaround.
>> You can create a dummy jar file (may be empty). Then you can use
>>
>> ./flink run -C hdfs:///path/to/cluster.jar -c org.package.SampleClass
>> /path/to/dummy.jar
>>
>> That way Flink will include your cluster jar and you can load all classes
>> necessary.
>>
>> Alternatively, using the Remote Environment, this looks like this:
>>
>> public static void main(String[] args) throws Exception {
>>
>>final RemoteEnvironment env = new RemoteEnvironment(
>>   "remoteHost",
>>   6123,
>>   new Configuration(),
>>   new String[0],
>>   new URL[]{
>>  new URL("file:///path/to/sample.jar"),
>>  new
>> URL("file:///Users/max/Dev/flink/build-target/lib/flink-dist_2.10-1.2-SNAPSHOT.jar")});
>>URLClassLoader classLoader = new
>> URLClassLoader(env.globalClasspaths.toArray(new URL[0]));
>>
>>Class clazz =
>> classLoader.loadClass("org.package.sample.SampleClass");
>>
>>Method main = clazz.getDeclaredMethod("sampleMethod",
>> ExecutionEnvironment.class);
>>
>>// pass environment as an argument to your sample method
>>// the method should return the results of the execution
>>Object sampleResult = main.invoke(null, env);
>> }
>>
>>
>> Beware, this is extremely hacky. We should have a better way to invoke jar
>> files remotely. Honestly, the best thing is if you keep a local copy of your
>> sampling jars and work directly with them.
>>
>> Cheers,
>> Max
>>
>> On Tue, Sep 27, 2016 at 12:25 PM, Flavio Pompermaier
>>  wrote:
>>>
>>> Hi Max,
>>> actually I have a jar containing sampling jobs and I need to collect
>>> results from a client.
>>> I've tried to use ExecutionEnvironment.createRemoteEnvironment but I fear
>>> that it's not the right way to do that because
>>> I just need to tell the cluster the main class and the parameters to run
>>> the job (and where the jar file is on HDFS).
>>>
>>> Best,
>>> Flavio
>>>
>>> On Tue, Sep 27, 2016 at 12:06 PM, Maximilian Michels 
>>> wrote:

 Hi Flavio,

 Do you want to sample from a running batch job? That would be like
 Queryable State in streaming jobs but it is not supported in batch
 mode.

 Cheers,
 Max


 On Mon, Sep 26, 2016 at 6:13 PM, Flavio Pompermaier
  wrote:
 > Hi to all,
 >
 > I have a use case where I need to tell a Flink cluster to give me a
 > sample
 > of X records using parametrizable sampling functions. Is there any
 > best
 > practice or advice to do that?
 >
 > Should I create a Remote ExecutionEnvironment or should I use the
 > Flink
 > client (I don't know if it uses REST services or RPC or whatever)?
 > Is there any java snippet for that?
 >
 > Best,
 > Flavio
 >
>>>
>>>
>>>
>>>
>>
>
>


Re: Merge the states of different partition in streaming

2016-09-28 Thread Simone Robutti
Solved. Probably there was an error in the way I was testing. Also I
simplified the job and it works now.

2016-09-27 16:01 GMT+02:00 Simone Robutti :

> Hello,
>
> I'm dealing with an analytical job in streaming and I don't know how to
> write the last part.
>
> Actually I want to count all the elements in a window with a given status,
> so I keep a state with a Map[Status,Long]. This state is updated starting
> from tuples containing the oldStatus and the newStatus. So every event
> generates a +1 for the new status and a -1 for the old status. Then I want
> to reduce all these counts and move from a local and partial state to a
> global state that will be written in output.
>
> Right now my code look like:
>
> filteredLatestOrders.keyBy(x => x._1.getStatus).mapWithState(
> updateState).keyBy(x=>"0").reduce((orderA,orderB)=>orderA.sum(orderB))
>
> where "filteredLatestOrder" is a DataStream containing informations about
> the elements, the new state and the old state.
>
> This produces in output:
>
> 2> Map(DRAFT -> 0, RESERVED -> 1, PICKED -> 0, PACKED -> 0)
> 2> Map(DRAFT -> 0, RESERVED -> 2, PICKED -> 0, PACKED -> 0)
> 2> Map(DRAFT -> 0, RESERVED -> 3, PICKED -> 0, PACKED -> 1)
> 3> Map(DRAFT -> 0, RESERVED -> -1, PICKED -> 1, PACKED -> 0)
> 4> Map(DRAFT -> 0, RESERVED -> -1, PICKED -> 0, PACKED -> 1)
>
> I thought that keying with a fixed value would collect all the elements in
> a single node so that I could finally compute the final result, but they
> are left on different nodes and are never summed.
>
> Is this the correct approach? In that case, how can I do what I need? Is
> there a smarter way to count distinct evolving elements by their status in
> a streaming? Mind that the original source of events are updates to the
> status of an element and the requirement is that I want to count only the
> latest status available.
>
> Thank you in advance,
>
> Simone
>


Re: Flink 1.2-SNAPSHOT fails to initialize keyed state backend

2016-09-28 Thread Stephan Ewen
Hi!

This was a temporary regression on the snapshot that has been fixed a few
days ago. It should be in the snapshot repositories by now.

Can you check if the problem persists if you force an update of your
snapshots dependencies?

Greetings,
Stephan



On Tue, Sep 27, 2016 at 5:04 PM, Timo Walther  wrote:

> Hi Seth,
>
> the 1.2-SNAPSHOT is very fragile at the moment because of multiple big
> changes for dynamic scaling.
> Maybe Stefan (in CC) has an idea what is happening with the keyed state
> backend here?
>
> Timo
>
>
> Am 27/09/16 um 16:14 schrieb swiesman:
>
> Hi all,
>>
>> I am working on an analytics project and am developing against flink
>> 1.2-SNAPSHOT. The pipeline that I have built works; ie I can ingest data,
>> perform operations, and output the expected result. I can also see
>> checkpoints being written to RocksDB using Amazon S3 as the state backend.
>> Whenever the application runs for more than approximately 5 - 10 minutes I
>> get an RuntimeException: Could not initialize keyed state backed caused
>> by a
>> class not found exception, full stack trace posted below. This confuses me
>> because for the program to have run and output data for the first 5 - 10
>> minutes as it does that class was certainly loaded successfully in the
>> past.
>> As well, I am submitting a single fat jar to flink that I am certain
>> contains the class in question.
>>
>> Thank you in advance for any assistance
>>
>> Seth Wiesman
>>
>> java.lang.RuntimeException: Could not initialize keyed state backend.
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOpera
>> tor.open(AbstractStreamOperator.java:148)
>> at
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOp
>> erator.open(AbstractUdfStreamOperator.java:91)
>> at
>> org.apache.flink.streaming.api.operators.StreamFlatMap.open(
>> StreamFlatMap.java:41)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllO
>> perators(StreamTask.java:407)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>> StreamTask.java:283)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:590)
>> at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.lang.ClassNotFoundException:
>> com.mediamath.reporting.lib.streaming.UpsertTrigger
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>> at java.lang.Class.forName0(Native Method)
>> at java.lang.Class.forName(Class.java:348)
>> at java.io.ObjectInputStream.resolveClass(ObjectInputStream.
>> java:628)
>> at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream
>> .java:1620)
>> at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.
>> java:1521)
>> at java.io.ObjectInputStream.readClass(ObjectInputStream.java:
>> 1486)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java
>> :1336)
>> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStrea
>> m.java:2018)
>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.
>> java:1942)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java
>> :1353)
>> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStrea
>> m.java:2018)
>> at java.io.ObjectInputStream.defaultReadObject(ObjectInputStrea
>> m.java:503)
>> at
>> org.apache.flink.api.common.state.StateDescriptor.readObject
>> (StateDescriptor.java:311)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
>> ssorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>> thodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass
>> .java:1058)
>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.
>> java:1909)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java
>> :1353)
>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:
>> 373)
>> at
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBa
>> ckend$RocksDBRestoreOperation.restoreKVStateMetaData(RocksDB
>> KeyedStateBackend.java:653)
>> at
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBa
>> ckend$RocksDBRestoreOperation.restoreKeyGroupsInStateHandle(
>> RocksDBKeyedStateBackend.java:626)
>> at
>> 

Re: Flink Checkpoint runs slow for low load stream

2016-09-28 Thread Chakravarthy varaga
Hi Stephan,

 That should be great. Let me know once the fix is done and the
snapshot version to use, I'll check and revert then.
 Can you also share the JIRA that tracks the issue?

 With regards to offset commit issue, I'm not sure as to how to proceed
here. Probably I'll use your fix first and see if the problem reoccurs.

Thanks much
Varaga

On Tue, Sep 27, 2016 at 7:46 PM, Stephan Ewen  wrote:

> @CVP
>
> Flink stores in checkpoints in your case only the Kafka offsets (few
> bytes) and the custom state (e).
>
> Here is an illustration of the checkpoint and what is stored (from the
> Flink docs).
> https://ci.apache.org/projects/flink/flink-docs-master/internals/stream_
> checkpointing.html
>
>
> I am quite puzzled why the offset committing problem occurs only for one
> input, and not for the other.
> I am preparing a fix for 1.2, possibly going into 1.1.3 as well.
> Could you try out a snapshot version to see if that fixes your problem?
>
> Greetings,
> Stephan
>
>
>
> On Tue, Sep 27, 2016 at 2:24 PM, Chakravarthy varaga <
> chakravarth...@gmail.com> wrote:
>
>> Hi Stefan,
>>
>>  Thanks a million for your detailed explanation. I appreciate it.
>>
>>  -  The *zookeeper bundled with kafka 0.9.0.1* was used to start
>> zookeeper. There is only 1 instance (standalone) of zookeeper running on my
>> localhost (ubuntu 14.04)
>>  -  There is only 1 Kafka broker (*version: 0.9.0.1* )
>>
>>  With regards to Flink cluster there's only 1 JM & 2 TMs started with
>> no HA. I presume this does not use zookeeper anyways as it runs as
>> standalone cluster.
>>
>>
>>  BTW., The kafka connector version that I use is as suggested in the
>> flink connectors page
>>
>>
>>
>>
>> *. org.apache.flink
>> flink-connector-kafka-0.9_2.10
>> 1.1.1*
>>
>>  Do you see any issues with versions?
>>
>>  1) Do you have benchmarks wrt., to checkpointing in flink?
>>
>>  2) There isn't detailed explanation on what states are stored as
>> part of the checkpointing process. For ex.,  If I have pipeline like
>> *source -> map -> keyBy -> map -> sink, my assumption on what's stored
>> is:*
>>
>> * a) The source stream's custom watermarked records*
>>
>> * b) Intermediate states of each of the transformations in the
>> pipeline*
>>
>> * c) Delta of Records stored from the previous sink*
>>
>> * d) Custom States (SayValueState as in my case) - Essentially
>> this is what I bother about storing.*
>> * e) All of my operators*
>>
>>   Is my understanding right?
>>
>>  3) Is there a way in Flink to checkpoint only d) as stated above
>>
>>  4) Can you apply checkpointing to only streams and certain operators
>> (say I wish to store aggregated values part of the transformation)
>>
>> Best Regards
>> CVP
>>
>>
>> On Mon, Sep 26, 2016 at 6:18 PM, Stephan Ewen  wrote:
>>
>>> Thanks, the logs were very helpful!
>>>
>>> TL:DR - The offset committing to ZooKeeper is very slow and prevents
>>> proper starting of checkpoints.
>>>
>>> Here is what is happening in detail:
>>>
>>>   - Between the point when the TaskManager receives the "trigger
>>> checkpoint" message and when the point when the KafkaSource actually starts
>>> the checkpoint is a long time (many seconds) - for one of the Kafka Inputs
>>> (the other is fine).
>>>   - The only way this delayed can be introduced is if another checkpoint
>>> related operation (such as trigger() or notifyComplete() ) is still in
>>> progress when the checkpoint is started. Flink does not perform concurrent
>>> checkpoint operations on a single operator, to ease the concurrency model
>>> for users.
>>>   - The operation that is still in progress must be the committing of
>>> the offsets (to ZooKeeper or Kafka). That also explains why this only
>>> happens once one side receives the first record. Before that, there is
>>> nothing to commit.
>>>
>>>
>>> What Flink should fix:
>>>   - The KafkaConsumer should run the commit operations asynchronously,
>>> to not block the "notifyCheckpointComplete()" method.
>>>
>>> What you can fix:
>>>   - Have a look at your Kafka/ZooKeeper setup. One Kafka Input works
>>> well, the other does not. Do they go against different sets of brokers, or
>>> different ZooKeepers? Is the metadata for one input bad?
>>>   - In the next Flink version, you may opt-out of committing offsets to
>>> Kafka/ZooKeeper all together. It is not important for Flink's checkpoints
>>> anyways.
>>>
>>> Greetings,
>>> Stephan
>>>
>>>
>>> On Mon, Sep 26, 2016 at 5:13 PM, Chakravarthy varaga <
>>> chakravarth...@gmail.com> wrote:
>>>
 Hi Stefan,

 Please find my responses below.

 - What source are you using for the slow input?
 * [CVP] - Both stream as pointed out in my first mail, are Kafka
 Streams*
   - How large is the state that you are checkpointing?

 *[CVP] - I have enabled 

Re: checkpoints not removed on hdfs.

2016-09-28 Thread Ufuk Celebi
Hey! Any update on this?

On Mon, Sep 5, 2016 at 11:29 AM, Aljoscha Krettek  wrote:
> Hi,
> which version of Flink are you using? Are the checkpoints being reported as
> successful in the Web Frontend, i.e. in the "checkpoints" tab of the running
> job?
>
> Cheers,
> Aljoscha
>
> On Fri, 2 Sep 2016 at 12:17 Dong-iL, Kim  wrote:
>>
>> Hi,
>>
>> I’m using HDFS as state backend.
>> The checkpoints folder grows bigger every moments.
>> What shall I do?
>>
>> Regards.


Re: RemoteEnv connect failed

2016-09-28 Thread Ufuk Celebi
Hey Dayong,
can you check the logs of the Flink cluster on the virtual machine?
The client side (what you posted) looks ok.
– Ufuk

On Wed, Sep 14, 2016 at 3:52 PM, Dayong  wrote:
> Hi folks,
> I need to run a java app to submit a job to remote flink cluster. I am 
> testing with the code at 
> https://gist.github.com/datafibers/4b842ebc5b3c9e754ceaf78695e7567e
> and my comments.
>
>
> Thanks,
> Will


Re: How to interact with a running flink application?

2016-09-28 Thread Ufuk Celebi
Hey Anchit,

the usual recommendation for this is to use a CoMap/CoFlatMap
operator, where the second input are the lookup location changes. You
can then use this input to update the location.

Search for CoMap/CoFlatMap here:
https://ci.apache.org/projects/flink/flink-docs-master/dev/datastream_api.html#datastream-transformations

Best,

Ufuk

On Wed, Sep 28, 2016 at 9:37 AM, Anchit Jatana
 wrote:
> Hi All,
>
> Brief: I have a use case where I need to interact with a running flink
> application.
>
> Detail:
>
> My Flink application has a Kafka source, an operator processing on the
> content received from the Kafka stream(this operator is using a lookup from
> an external source file to accomplish the processing of the Kafka content).
> If the content of the file kept at the same source location changes, I need
> to notify the operator to update its lookup content loaded in the memory and
> continue its processing of Kafka content with the new loaded lookup content
> without stopping the Flink application.
>
> Is there a way where I can "interact with the running Flink Application"
> through some event or something to notify the application to make some
> changes in its operation without stopping the application.
>
> Thank you!
>
> Regards,
> Anchit


Re: Failures on DataSet programs

2016-09-28 Thread Ufuk Celebi
Hey Paulo! I think it's not possible out of the box at the moment, but
you can try the following as a work around:

1) Create a custom OutputFormat that extends TextOutputFormat and
override the clean up method:

public class NoCleanupTextOutputFormat extends TextOutputFormat {

@Override
public void tryCleanupOnError() {
   // ignore cleanup on error
}

}

2) writeAsFormattedText is actually a map + writeAsText (if you look
into DataSet.java). Instead of that you should manually do:

dataSet.map(new FormattingMapper<>(clean(formatter))).output(new
NoCleanupTextOutputFormat(..))


This should work as expected. You can furthermore open an issue with a
feature request to allow configuring Flink's TextOutputFormat to
ignore cleanup.

Best,

Ufuk


On Tue, Sep 27, 2016 at 10:42 PM, Paulo Cezar  wrote:
> Hi Folks,
>
> I was wondering if it's possible to keep partial outputs from dataset
> programs.
> I have a batch pipeline that writes its output on HDFS using
> writeAsFormattedText. When it fails the output file is deleted but I would
> like to keep it so that I can generate new inputs for the pipeline to avoid
> reprocessing.
>
> []'s
> Paulo Cezar