Re: writeAsCSV with partitionBy

2016-05-23 Thread KirstiLaurila
Yeah, created this one  https://issues.apache.org/jira/browse/FLINK-3961
  




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/writeAsCSV-with-partitionBy-tp4893p7118.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: HDFS namenode and Flink

2016-05-23 Thread thomas
  ‎Ok, we have all this configuration set up, so it will be fine :-)Thanks for getting response ! Thomas De: Stefano BaghinoEnvoyé: lundi 23 mai 2016 12:58À: user@flink.apache.orgRépondre à: user@flink.apache.orgObjet: Re: HDFS namenode and FlinkOne last quick note: if you're going to run individual jobs on YARN instead of a long running session, make sure you provide each job with a separate set of directories for (surely) ZK storage and (possibly*) state backend, otherwise the state of the jobs will end up entangled and you may experience some undefined behavior.* I'm not really sure about this last one, perhaps some more experienced ML user can help me out on this.On Mon, May 23, 2016 at 12:54 PM, Stefano Baghino  wrote:I think the only keys of interest for your needs (highly available with HDFS state backend) arestate.backend: filesystemstate.backend.fs.checkpointdir: hdfs:///path/to/checkpoints # fill in according to your needsrecovery.zookeeper.storageDir: /path/to/znode # again, fill in according to your needsrecovery.mode: zookeeperrecovery.zookeeper.quorum: zk-ensemble-1:2181,zk-ensemble-2:2181,zk-ensemble-3:2181 # put your zk ensemble hereIf these keys are set you should be good to go. I hope I've been of some help. :)On Mon, May 23, 2016 at 12:37 PM,   wrote:Hello flinkers,

We will activate namenode HDFS high availability in our cluster, and I want to know if there is additional configuration for flink ?
We actually use YARN for launching our flink application, and hdfs filesystem to store the state backend

Thanks

Thomas
-- BR,Stefano BaghinoSoftware Engineer @ Radicalbit

-- BR,Stefano BaghinoSoftware Engineer @ Radicalbit




Re: Import Configuration File in Flink Cluster

2016-05-23 Thread Bajaj, Abhinav
I was gonna post the exact question and noticed this thread.

It will be great if we can have a method in parameter tool to load from 
resources.

Thanks Simon :)

Abhinav Bajaj
Senior Engineer
HERE Predictive Analytics
Office:  +12062092767
Mobile: +17083299516
HERE Seattle
701 Pike Street, #2000, Seattle, WA 98101, USA
47° 36' 41" N. 122° 19' 57" W
HERE Maps
 










On 5/23/16, 8:54 AM, "simon peyer"  wrote:

>Hi Max 
>
>Thanks a lot for your helpful answer.
>It now works on the cluster.
>It would be great to have a method for loading from resources.
>
>-Cheers
>Simon
>
>
>> On 23 May 2016, at 17:52, Maximilian Michels  wrote:
>> 
>> Hi Simon,
>> 
>> AFAIK this is the way to go. We could add a method to the
>> ParameterTool which loads from a resource to make it more convenient.
>> 
>> Cheers,
>> Max
>> 
>> On Mon, May 23, 2016 at 4:42 PM, simon peyer  wrote:
>>> Hi
>>> 
>>> @Max
>>> So for each file in the src/main/resources folder, I first have to create a
>>> new file, copy the file from the resources folder to this new file and then
>>> I'm able to parse it?
>>> 
>>> @Stefano
>>> I think the files in src/main/resources  are integrated automatically right?
>>> Or am I missing something?
>>> 
>>> Cheers
>>> Simon
>>> 
>>> 
>>> On 23 May 2016, at 16:30, Stefano Baghino 
>>> wrote:
>>> 
>>> Are you using Maven to package your project? I believe the resources
>>> plugin[1] can suit your needs.
>>> 
>>> [1]:
>>> http://maven.apache.org/plugins/maven-resources-plugin/examples/include-exclude.html
>>> 
>>> 
>>> On 23 May 2016, at 16:28, Maximilian Michels  wrote:
>>> 
>>> Hi Simon,
>>> 
>>> You'll have to write the property file to disk first to load it using
>>> the ParameterTool.fromPropertiesFile method.
>>> 
>>> For example:
>>> 
>>> // copy config from Java resource to a file
>>> File configOnDisk = new File("/path/to/config.properties");
>>> Files.copy(getClass.getClassLoader.getResourceAsStream("config.properties"),
>>> configOnDisk.toPath());
>>> // load the new file
>>> ParameterTool.fromPropertiesFile(configOnDisk);
>>> 
>>> 
>>> Cheers,
>>> Max
>>> 
>>> 
>>> 
>>> On Mon, May 23, 2016 at 3:56 PM, simon peyer  wrote:
 
 Hi together
 
 Currently I'm using flink on a docker cluster on AWS.
 I would like to use property files with the integrated
 ParameterTool.fromPropertiesFile function of Flink.
 
 Locally this version works absolutely fine:
 val configuration =
 ParameterTool.fromPropertiesFile("src/main/resources/config.properties")
 
 But on the cluster this didn't work, so we introduced this version, which
 also doesn't work:
 
 val baseParameters =
 ParameterTool.fromPropertiesFile(getClass.getClassLoader.getResource("config.properties").getFile)
 
 gives
 
 java.io.FileNotFoundException: Properties file
 file:/tmp/flink-web-upload-57bcc912-bc98-4c89-b5ee-c5176155abd5/992186c1-b3c3-4342-a5c8-67af133155e4pipeline-0.1.0-all.jar!/config.properties
 does not exist
 
 The property file is located in src/main/resources.
 Do you have any idea of how to integrate such property files into the jar
 package?
 
 -Thanks
 Simon
>>> 
>>> 
>>> 
>>> 
>>> --
>>> BR,
>>> Stefano Baghino
>>> 
>>> Software Engineer @ Radicalbit
>>> 
>>> 
>


Re: Logging Exceptions

2016-05-23 Thread David Kim
Hi Max!

Unfortunately, that's not the behavior I'm seeing.

I verified my log4.properties is configured properly because I do see
messages in the /log directory.

However, for this stack trace (grabbed from the web dashboard), I do not
see it in my log file:

java.lang.RuntimeException: Could not forward element to next operator
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
at 
org.apache.flink.streaming.api.operators.StreamSource$NonTimestampContext.collect(StreamSource.java:158)
at 
org.apache.flink.streaming.api.functions.source.FromElementsFunction.run(FromElementsFunction.java:127)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)


Here's my sample program:


val env = StreamExecutionEnvironment.getExecutionEnvironment
env
  .fromCollection(List(1))
  .filter(element => {
throw new RuntimeException("Throwing a runtime to test exception
logging :)")
  })
env.execute("MyTestJobName")


The error is thrown in Task.java[1] and it was hard to track where
flink would actually log to the log file.


[1] 
https://github.com/apache/flink/blob/release-1.0.2-rc3/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java#L559


Thanks!

David


On Mon, May 23, 2016 at 12:01 PM Maximilian Michels  wrote:

> Hi David,
>
> I'm afraid Flink logs all exceptions. You'll find the exceptions in the
> /log directory.
>
> Cheers,
> Max
>
> On Mon, May 23, 2016 at 6:18 PM, David Kim <
> david@braintreepayments.com> wrote:
>
>> Hello!
>>
>> Just wanted to check up on this. :)
>>
>> I grepped around for `log.error` and it *seems* that currently the only
>> events for logging out exceptions are for non-application related errors.
>>
>> Thanks!
>> David
>>
>> On Fri, May 20, 2016 at 12:35 PM David Kim <
>> david@braintreepayments.com> wrote:
>>
>>> Hello!
>>>
>>> Using flink 1.0.2, I noticed that exceptions thrown during a flink
>>> program would show up on the flink dashboard in the 'Exceptions' tab.
>>> That's great!
>>>
>>> However, I don't think flink currently logs this same exception. I was
>>> hoping there would be an equivalent `log.error` call so that third party
>>> logging frameworks can also act upon such errors.
>>>
>>> If this currently the known behavior, would it be troublesome to also
>>> make a `log.error` call around the code that is responsible for sending the
>>> exception to the dashboard?
>>>
>>> If there's a misconfiguration on my end, let me know!
>>>
>>> Thanks!
>>> David
>>>
>>
>


Re: problem of sharing TCP connection when transferring data

2016-05-23 Thread Deepak Sharma
Thanks Ufuk.
Sure I would pick some starter issues and get to these low level issues.

-Deepak
On 23 May 2016 11:21 pm, "Ufuk Celebi"  wrote:

> On Mon, May 23, 2016 at 7:30 PM, Deepak Sharma 
> wrote:
> > Would it be possible to get involved on this issues and start
> contributing
> > to Flink community?
>
> Hey Deepak!
>
> Nice to see that you are also interested in this. If you are new to
> Flink I would recommend to start contributing by looking into one of
> the starter issues:
>
> https://issues.apache.org/jira/issues/?jql=project%20%3D%20FLINK%20AND%20status%20%3D%20Open%20AND%20labels%20%3D%20starter
>
> The lower layers are quite tricky. If it's OK with you, I would rather
> post any follow ups like JIRA issues or design docs in this thread, at
> which point it will be possible to chime in here, too. In the mean
> time the issues I've linked are a better place to start I think.
>
> – Ufuk
>


Re: problem of sharing TCP connection when transferring data

2016-05-23 Thread Ufuk Celebi
On Mon, May 23, 2016 at 7:30 PM, Deepak Sharma  wrote:
> Would it be possible to get involved on this issues and start contributing
> to Flink community?

Hey Deepak!

Nice to see that you are also interested in this. If you are new to
Flink I would recommend to start contributing by looking into one of
the starter issues:
https://issues.apache.org/jira/issues/?jql=project%20%3D%20FLINK%20AND%20status%20%3D%20Open%20AND%20labels%20%3D%20starter

The lower layers are quite tricky. If it's OK with you, I would rather
post any follow ups like JIRA issues or design docs in this thread, at
which point it will be possible to chime in here, too. In the mean
time the issues I've linked are a better place to start I think.

– Ufuk


Re: Combining streams with static data and using REST API as a sink

2016-05-23 Thread Josh
Hi Max,

Thanks, that's very helpful re the REST API sink. For now I don't need
exactly once guarantees for the sink, so I'll just write a simple HTTP sink
implementation. But may need to move to the idempotent version in future!

For 1), that sounds like a simple/easy solution, but how would I handle
occasional updates in that case, since I guess the open() function is only
called once? Do I need to periodically restart the job, or periodically
trigger tasks to restart and refresh their data? Ideally I would want this
job to be running constantly.

Josh

On Mon, May 23, 2016 at 5:56 PM, Maximilian Michels  wrote:

> Hi Josh,
>
> 1) Use a RichFunction which has an `open()` method to load data (e.g. from
> a database) at runtime before the processing starts.
>
> 2) No that's fine. If you want your Rest API Sink to interplay with
> checkpointing (for fault-tolerance), this is a bit tricky though depending
> on the guarantees you want to have. Typically, you would have "at least
> once" or "exactly once" semantics on the state. In Flink, this is easy to
> achieve, it's a bit harder for outside systems.
>
> "At Least Once"
>
> For example, if you increment a counter in a database, this count will be
> off if you recover your job in the case of a failure. You can checkpoint
> the current value of the counter and restore this value on a failure (using
> the Checkpointed interface). However, your counter might decrease
> temporarily when you resume from a checkpoint (until the counter has caught
> up again).
>
> "Exactly Once"
>
> If you want "exactly once" semantics on outside systems (e.g. Rest API),
> you'll need idempotent updates. An idempotent variant of this would be a
> count with a checkpoint id (cid) in your database.
>
> | cid | count |
> |-+---|
> |   0 | 3 |
> |   1 |11 |
> |   2 |20 |
> |   3 |   120 |
> |   4 |   137 |
> |   5 |   158 |
>
> You would then always read the newest cid value for presentation. You
> would only write to the database once you know you have completed the
> checkpoint (CheckpointListener). You can still fail while doing that, so
> you need to keep the confirmation around in the checkpoint such that you
> can confirm again after restore. It is important that confirmation can be
> done multiple times without affecting the result (idempotent). On recovery
> from a checkpoint, you want to delete all rows higher with a cid higher
> than the one you resume from. For example, if you fail after checkpoint 3
> has been created, you'll confirm 3 (because you might have failed before
> you could confirm) and then delete 4 and 5 before starting the computation
> again.
>
> You see, that strong consistency guarantees can be a bit tricky. If you
> don't need strong guarantees and undercounting is ok for you, implement a
> simple checkpointing for "at least once" using the Checkpointed interface
> or the KeyValue state if your counter is scoped by key.
>
> Cheers,
> Max
>
>
> On Mon, May 23, 2016 at 3:22 PM, Josh  wrote:
> > Hi all,
> >
> > I am new to Flink and have a couple of questions which I've had trouble
> > finding answers to online. Any advice would be much appreciated!
> >
> > What's a typical way of handling the scenario where you want to join
> > streaming data with a (relatively) static data source? For example, if I
> > have a stream 'orders' where each order has an 'item_id', and I want to
> join
> > this stream with my database of 'items'. The database of items is mostly
> > static (with perhaps a few new items added every day). The database can
> be
> > retrieved either directly from a standard SQL database (postgres) or via
> a
> > REST call. I guess one way to handle this would be to distribute the
> > database of items with the Flink tasks, and to redeploy the entire job if
> > the items database changes. But I think there's probably a better way to
> do
> > it?
> > I'd like my Flink job to output state to a REST API. (i.e. using the REST
> > API as a sink). Updates would be incremental, e.g. the job would output
> > tumbling window counts which need to be added to some property on a REST
> > resource, so I'd probably implement this as a PATCH. I haven't found much
> > evidence that anyone else has used a REST API as a Flink sink - is there
> a
> > reason why this might be a bad idea?
> >
> > Thanks for any advice on these,
> >
> > Josh
>
>


Re: writeAsCSV with partitionBy

2016-05-23 Thread Fabian Hueske
Hi Kirsti,

I'm not aware of anybody working on this issue.
Would you like to create a JIRA issue for it?

Best, Fabian

2016-05-23 16:56 GMT+02:00 KirstiLaurila :

> Is there any plans to implement this kind of feature (possibility to write
> to
> data specified partitions) in the near future?
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/writeAsCSV-with-partitionBy-tp4893p7099.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Re: keyBy on a collection of Pojos

2016-05-23 Thread Fabian Hueske
Actually, the program works correctly (according to the DataStream API)
Let me explain what happens:

1) You do not initialize the count variable, so it will be 0 (summing 0s
results in 0)
2) DataStreams are considered to be unbound (have an infinite size). KeyBy
does not group the records because it would have to wait forever to close
the group due to the infinite input. Instead keyBy basically partitions the
data.
3) By calling sum() on a KeyedStream you compute a running aggregate which
emits one record for each incoming record summing the declared field (this
stays 0 because 0 + 0 = 0).

You will need to
1) initialize count to 1
2) define window to discretize the stream into finite sets (windows) of
records.

Cheers, Fabian

2016-05-23 17:16 GMT+02:00 Deepak Sharma :

> Can you try serializing your POJO ?
>
> Thanks
> Deepak
>
> On Mon, May 23, 2016 at 8:42 PM, Flavio Pompermaier 
> wrote:
>
>> Sorry Rami, you're right :)
>> Unfortunattely I've never used Flink streaming so I cannot be helpful
>> there..
>> Myabe is it something related to the default triggering policy of the
>> streaming environment?
>>
>>
>> On Mon, May 23, 2016 at 5:06 PM, Al-Isawi Rami > > wrote:
>>
>>> Thanks, setters and getters for public fields have no purpose. Also per
>>> the conditions you have mentioned:
>>> "All fields *either* have to be public *or* there must be getters and
>>> setters for all non-public fields.”
>>> Since my fields are declared public there are no impact on adding
>>> getters and setters. ( I have also testing after adding the setters and
>>> getters and as expected that has no effect).
>>>
>>> Could you spot anything else? this should be really easy basic case. I
>>> am really wondering why it is not working.
>>>
>>> For the people who are lazy to open the gist code snippet, this is what
>>> I am trying to do:
>>>
>>> pojoExampleDataStream.
>>> keyBy("productId").
>>> sum("count").
>>> print();
>>>
>>>
>>>
>>> Regards,
>>> -Rami
>>>
>>>
>>> On 23 May 2016, at 17:11, Flavio Pompermaier 
>>> wrote:
>>>
>>> You don't have getters and setters for count and productId.
>>>
>>> Your class should be
>>>
>>> public class PojoExample {
>>> public int count;
>>> public String productId;
>>>
>>> public PojoExample() {}
>>>
>>> public int getCount() {
>>> return count;
>>> }
>>>
>>> public void setCount(int count) {
>>> this.count = count;
>>> }
>>>
>>> public String getProductId() {
>>> return productId;
>>> }
>>>
>>> public void setProductId(String productId) {
>>> this.productId = productId;
>>> }
>>> }
>>>
>>>
>>>
>>> On Mon, May 23, 2016 at 3:40 PM, Al-Isawi Rami <
>>> rami.al-is...@comptel.com> wrote:
>>>
 Thanks Flavio, but as you can see in my code I have already declared my
 pojo to achieve those conditions:
 public class PojoExample {
 public int count;
 public String productId;
 public PojoExample() {
 }
 }

 So it cannot be that.

 -Rami

 On 23 May 2016, at 16:30, Flavio Pompermaier 
 wrote:

 *Conditions* for a class to be treated as a POJO by Flink:

- The class must be public
- It must have a public constructor without arguments
- All fields either have to be public or there must be getters and
setters for all non-public fields. If the field name is foo the
getter and setters must be called getFoo() and setFoo().

 I don't know whether you need to implement also hashCode() and equals()
 actually
 Best,
 Flavio

 On Mon, May 23, 2016 at 3:24 PM, Al-Isawi Rami <
 rami.al-is...@comptel.com> wrote:

> Hi,
>
> I was trying to test some specific issue, but now I cannot seem to get
> the very basic case working. It is most likely that I am blind to
> something, would anyone have quick look at it?
> https://gist.github.com/rami-alisawi/d6ff33ae2d4d6e7bb1f8b329e3e5fa77
>
> It is just a collection of pojos where I am just trying to keyBy one
> field and sum into the other, but I am getting:
> 5> PojoExample{count=0, productId='productA'}
> 8> PojoExample{count=0, productId='productB'}
> 5> PojoExample{count=0, productId='productA'}
> 8> PojoExample{count=0, productId='productB'}
> 5> PojoExample{count=0, productId='productA'}
> 5> PojoExample{count=0, productId='productA'}
> 5> PojoExample{count=0, productId='productA’}
>
> Regards,
> -Rami
>
> Disclaimer: This message and any attachments thereto are intended
> solely for the addressed recipient(s) and may contain confidential
> information. If you are not the intended recipient, please notify the
> sender by reply e-mail and delete the e-mail (including any attachments
> thereto) without producing, distributing or retaining any copies thereof.
> Any review, dissemination or other use of, or 

Re: problem of sharing TCP connection when transferring data

2016-05-23 Thread Deepak Sharma
I am not Flink master or regular user of FLink , but would like to start
contributing to Flink.
Would it be possible to get involved on this issues and start contributing
to Flink community?

Thanks
Deepak

On Mon, May 23, 2016 at 10:49 PM, Ufuk Celebi  wrote:

> On Mon, May 23, 2016 at 6:55 PM, wangzhijiang999
>  wrote:
> >In summary, if one task set autoread as false, and when it notify the
> > available buffer, there are some messages during this time to be
> processed
> > first, if one message belongs to another failed task, the autoread for
> this
> > channel would not be set true anymore. The only way is to cancel all the
> > tasks in this channel to release the channel. Is it right?
>
> Yes, very good observation. In this sense the failure model of Flink
> is baked in into the way the channels are multiplexed, which is a bad
> thing (as you already noticed with your improved failover strategy).
>
> If you want, let me look into this issue on a high level and let's fix
> this together as a first step. Let's have a chat about this by the end
> of the week. Does this work for you?
>
> After that, we can continue with the flow control issue, which is
> definitely a bigger task.
>
> – Ufuk
>



-- 
Thanks
Deepak
www.bigdatabig.com
www.keosha.net


Re: [RichFlattMapfunction] Configuration File

2016-05-23 Thread Maximilian Michels
Hi Simon,

Great! I think this is only available in the DataSet API.

Cheers,
Max

On Mon, May 23, 2016 at 2:23 PM, simon peyer  wrote:
> Hi Max
>
> Thanks a lot.
> I found now this solution:
>
> Passing it as a Configuration object to single functions
>
> The example below shows how to pass the parameters as a Configuration object
> to a user defined function.
>
> ParameterTool parameters = ParameterTool.fromArgs(args);
> DataSet> counts = text.flatMap(new
> Tokenizer()).withParameters(parameters.getConfiguration())
>
> In the Tokenizer, the object is now accessible in the open(Configuration
> conf) method:
>
> public static final class Tokenizer extends RichFlatMapFunction Tuple2> {
>   @Override
>   public void open(Configuration parameters) throws Exception {
>   parameters.getInteger("myInt", -1);
>   // .. do
>
>
>
> Cheers
> Simon
>
> On 23 May 2016, at 14:01, Maximilian Michels  wrote:
>
> Hi Simon,
>
> As Aljoscha said, the best way is to supply the configuration as class
> fields. Alternatively, if you overload the open(..) method, it should
> also show up in the Properties/Configuration tab on the Web interface.
>
> Cheers,
> Max
>
> On Mon, May 23, 2016 at 11:43 AM, simon peyer 
> wrote:
>
> Hi Aljoscha
>
> Thanks for your reply.
>
> Regarding question 2, the web dashboard does provide a properties section,
> besides (
>
> Plan
> Timeline
> Exceptions
> Properties
> Configuration
>
>
> )
>
>
> Whats the most common way to handle properties in flink?
> Is there a general way to go and any kind of integration in flink?
>
> --Simon
>
>
> On 21 May 2016, at 10:44, Aljoscha Krettek  wrote:
>
> Hi Simon,
> regarding 1. yes, the value that you get from state_item.value() and that
> you set using state_item.update() is scoped to the key of the incoming
> element.
>
> regarding 2. the open(conf: Configuration) signature is legacy from how
> Functions used to work quite a while back. In the streaming API this
> Configuration is always empty. If you want to configure your user function
> you can have the values as fields in your class and pass them in the
> constructor.
>
> Cheers,
> Aljoscha
>
> On Fri, 20 May 2016 at 17:49 simon peyer  wrote:
>
>
> Hi folks
>
> I'm extending a RichFlatMapFunction in order to use states on a keyed
> stream.
> Concerning this i have two questions:
>
> 1. I have a  var state_item: ValueState[Option[String]] as a local
> variable in this class. Initialized with state_item =
> getRuntimeContext.getState(new ValueStateDescriptor. in the open
> function.
> Is the field state_item for every key different?
>
> In other words if I have a key with val1 and val2 will these get two
> different states?
>
>
> 2. The open function contains a  override def open(conf: Configuration)
> configuration.
> Is there a way to input a custom configuration in there?
>
> Thanks Simon
>
>
>
>


Re: problem of sharing TCP connection when transferring data

2016-05-23 Thread Ufuk Celebi
On Mon, May 23, 2016 at 6:55 PM, wangzhijiang999
 wrote:
>In summary, if one task set autoread as false, and when it notify the
> available buffer, there are some messages during this time to be processed
> first, if one message belongs to another failed task, the autoread for this
> channel would not be set true anymore. The only way is to cancel all the
> tasks in this channel to release the channel. Is it right?

Yes, very good observation. In this sense the failure model of Flink
is baked in into the way the channels are multiplexed, which is a bad
thing (as you already noticed with your improved failover strategy).

If you want, let me look into this issue on a high level and let's fix
this together as a first step. Let's have a chat about this by the end
of the week. Does this work for you?

After that, we can continue with the flow control issue, which is
definitely a bigger task.

– Ufuk


Re: Logging Exceptions

2016-05-23 Thread Maximilian Michels
Hi David,

I'm afraid Flink logs all exceptions. You'll find the exceptions in the
/log directory.

Cheers,
Max

On Mon, May 23, 2016 at 6:18 PM, David Kim 
wrote:

> Hello!
>
> Just wanted to check up on this. :)
>
> I grepped around for `log.error` and it *seems* that currently the only
> events for logging out exceptions are for non-application related errors.
>
> Thanks!
> David
>
> On Fri, May 20, 2016 at 12:35 PM David Kim <
> david@braintreepayments.com> wrote:
>
>> Hello!
>>
>> Using flink 1.0.2, I noticed that exceptions thrown during a flink
>> program would show up on the flink dashboard in the 'Exceptions' tab.
>> That's great!
>>
>> However, I don't think flink currently logs this same exception. I was
>> hoping there would be an equivalent `log.error` call so that third party
>> logging frameworks can also act upon such errors.
>>
>> If this currently the known behavior, would it be troublesome to also
>> make a `log.error` call around the code that is responsible for sending the
>> exception to the dashboard?
>>
>> If there's a misconfiguration on my end, let me know!
>>
>> Thanks!
>> David
>>
>


Re: Combining streams with static data and using REST API as a sink

2016-05-23 Thread Maximilian Michels
Hi Josh,

1) Use a RichFunction which has an `open()` method to load data (e.g. from
a database) at runtime before the processing starts.

2) No that's fine. If you want your Rest API Sink to interplay with
checkpointing (for fault-tolerance), this is a bit tricky though depending
on the guarantees you want to have. Typically, you would have "at least
once" or "exactly once" semantics on the state. In Flink, this is easy to
achieve, it's a bit harder for outside systems.

"At Least Once"

For example, if you increment a counter in a database, this count will be
off if you recover your job in the case of a failure. You can checkpoint
the current value of the counter and restore this value on a failure (using
the Checkpointed interface). However, your counter might decrease
temporarily when you resume from a checkpoint (until the counter has caught
up again).

"Exactly Once"

If you want "exactly once" semantics on outside systems (e.g. Rest API),
you'll need idempotent updates. An idempotent variant of this would be a
count with a checkpoint id (cid) in your database.

| cid | count |
|-+---|
|   0 | 3 |
|   1 |11 |
|   2 |20 |
|   3 |   120 |
|   4 |   137 |
|   5 |   158 |

You would then always read the newest cid value for presentation. You would
only write to the database once you know you have completed the
checkpoint (CheckpointListener). You can still fail while doing that, so
you need to keep the confirmation around in the checkpoint such that you
can confirm again after restore. It is important that confirmation can be
done multiple times without affecting the result (idempotent). On recovery
from a checkpoint, you want to delete all rows higher with a cid higher
than the one you resume from. For example, if you fail after checkpoint 3
has been created, you'll confirm 3 (because you might have failed before
you could confirm) and then delete 4 and 5 before starting the computation
again.

You see, that strong consistency guarantees can be a bit tricky. If you
don't need strong guarantees and undercounting is ok for you, implement a
simple checkpointing for "at least once" using the Checkpointed interface
or the KeyValue state if your counter is scoped by key.

Cheers,
Max

On Mon, May 23, 2016 at 3:22 PM, Josh  wrote:
> Hi all,
>
> I am new to Flink and have a couple of questions which I've had trouble
> finding answers to online. Any advice would be much appreciated!
>
> What's a typical way of handling the scenario where you want to join
> streaming data with a (relatively) static data source? For example, if I
> have a stream 'orders' where each order has an 'item_id', and I want to
join
> this stream with my database of 'items'. The database of items is mostly
> static (with perhaps a few new items added every day). The database can be
> retrieved either directly from a standard SQL database (postgres) or via a
> REST call. I guess one way to handle this would be to distribute the
> database of items with the Flink tasks, and to redeploy the entire job if
> the items database changes. But I think there's probably a better way to
do
> it?
> I'd like my Flink job to output state to a REST API. (i.e. using the REST
> API as a sink). Updates would be incremental, e.g. the job would output
> tumbling window counts which need to be added to some property on a REST
> resource, so I'd probably implement this as a PATCH. I haven't found much
> evidence that anyone else has used a REST API as a Flink sink - is there a
> reason why this might be a bad idea?
>
> Thanks for any advice on these,
>
> Josh


回复:problem of sharing TCP connection when transferring data

2016-05-23 Thread wangzhijiang999
 Hi Ufuk,
   Thank you for the detail explaination!  As we confirmed that the task 
will set the autoread as false for the sharing channel when no available 
segment buffer. In further, when this task has available buffer again, it will 
notify the event to set the autoread as true. But in some scenarios, there 
would be a propobility that the autoread for this sharing channel would not be 
set as true anymore. That is , when available buffer to notify event and 
currently there are some messages staged in the queue,it would process these 
messages first, the message shoule be put on input channel buffer in common 
way, but if the task failed and the buffer pool is released, it will return 
false when process the message,so the channel will not be set as autoread true 
any more, then all the other tasks sharing this channel will be effected.  
In summary, if one task set autoread as false, and when it notify the available 
buffer, there are some messages during this time to be processed first, if one 
message belongs to another failed task, the autoread for this channel would not 
be set true anymore. The only way is to cancel all the tasks in this channel to 
release the channel. Is it right?    In the past, I improved the failover 
strategy based on flink for our application and noticed this issue. Also i am 
very interested and pleasure to do some related work for flink improvement as 
you mentioned. Actually i am working on improving flink in many ways for our 
application, and wish further contact with you for the professional advise. 
Thank you again!
 Zhijiang 
Wang--发件人:Ufuk 
Celebi 发送时间:2016年5月23日(星期一) 19:49收件人:user 
; wangzhijiang999 主 题:Re: 
problem of sharing TCP connection when transferring data Yes, that is a correct 
description of the state of things.

A way to improve this is to introduce flow control in the application
layer, where consumers only receive buffers when they have buffers
available. They could announce on the channel how many buffers they
have before they receive anything. This way there will be no blocking
of the channel and we could actually multiplex more consumers over the
same channel.

The implementation is probably a little tricky, but if you want to
work on this and have time to actually do it, we can think about the
details. :-) Would you be interested? If yes, let's schedule a Hangout
where we brainstorm about the solution and how to implement it.
Ideally, we would come up with a design document, which we share on
the mailing list and then we continue implementing it. I currently
only have time to act as a guide/mentor and you would have to do most
of the implementation.

– Ufuk



On Mon, May 23, 2016 at 5:40 AM, wangzhijiang999
 wrote:
> Hi,
>
>  I am confused with sharing tcp connection for the same connectionID, if
> two tasks share the same connection, and there is no available buffer in the
> local buffer pool of the first task  , then it will set autoread as false
> for the channel, but it will effect the second task if it still has
> available buffer. So if one of the tasks no available buffer , all the other
> tasks can not read data from channel because of this. My understanding is
> right? If so, are there any improvements for it?  Thank you for any help!
>
>
>
>
>


Re: Weird Kryo exception (Unable to find class: java.ttil.HashSet)

2016-05-23 Thread Flavio Pompermaier
You can try with this:

import org.apache.flink.api.java.ExecutionEnvironment;
import org.joda.time.DateTime;

import de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer;

public class DateTimeError {

public static void main(String[] args) throws Exception {
ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
//env.registerTypeWithKryoSerializer(DateTime.class,
JodaDateTimeSerializer.class);
env.fromElements(DateTime.now(), DateTime.now()).print();
}
}

Without the commented row you get:

Exception in thread "main" java.lang.NullPointerException
at
org.joda.time.tz.CachedDateTimeZone.getInfo(CachedDateTimeZone.java:143)
at
org.joda.time.tz.CachedDateTimeZone.getOffset(CachedDateTimeZone.java:103)
at
org.joda.time.format.DateTimeFormatter.printTo(DateTimeFormatter.java:722)
at
org.joda.time.format.DateTimeFormatter.printTo(DateTimeFormatter.java:535)
at
org.joda.time.format.DateTimeFormatter.print(DateTimeFormatter.java:671)
at org.joda.time.base.AbstractInstant.toString(AbstractInstant.java:424)
at
org.joda.time.base.AbstractDateTime.toString(AbstractDateTime.java:314)
at java.lang.String.valueOf(String.java:2994)
at java.io.PrintStream.println(PrintStream.java:821)
at org.apache.flink.api.java.DataSet.print(DataSet.java:1607)

Thanks for the support,
Flavio

On Mon, May 23, 2016 at 4:17 PM, Maximilian Michels  wrote:

> What error do you get when you don't register the Kryo serializer?
>
> On Mon, May 23, 2016 at 11:57 AM, Flavio Pompermaier
>  wrote:
> > With this last settings I was able to terminate the job the second time I
> > retried to run it, without restarting the cluster..
> > If I don't register the serializer for DateTime the job doesn't start at
> all
> > (from Flink 1.x you have to register it [1]).
> > I can't understand what's wrong :(
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/Migration+Guide%3A+0.10.x+to+1.0.x
> >
> > Best,
> > Flavio
>



-- 

Flavio Pompermaier

*Development Department*___
*OKKAM**Srl **- www.okkam.it *

*Phone:* +(39) 0461 283 702
*Fax:* + (39) 0461 186 6433
*Email:* pomperma...@okkam.it
*Headquarters:* Trento (Italy), via G.B. Trener 8
*Registered office:* Trento (Italy), via Segantini 23

Confidentially notice. This e-mail transmission may contain legally
privileged and/or confidential information. Please do not read it if you
are not the intended recipient(S). Any use, distribution, reproduction or
disclosure by any other person is strictly prohibited. If you have received
this e-mail in error, please notify the sender and destroy the original
transmission and its attachments without reading or saving it in any manner.


Re: Logging Exceptions

2016-05-23 Thread David Kim
Hello!

Just wanted to check up on this. :)

I grepped around for `log.error` and it *seems* that currently the only
events for logging out exceptions are for non-application related errors.

Thanks!
David

On Fri, May 20, 2016 at 12:35 PM David Kim 
wrote:

> Hello!
>
> Using flink 1.0.2, I noticed that exceptions thrown during a flink program
> would show up on the flink dashboard in the 'Exceptions' tab. That's great!
>
> However, I don't think flink currently logs this same exception. I was
> hoping there would be an equivalent `log.error` call so that third party
> logging frameworks can also act upon such errors.
>
> If this currently the known behavior, would it be troublesome to also make
> a `log.error` call around the code that is responsible for sending the
> exception to the dashboard?
>
> If there's a misconfiguration on my end, let me know!
>
> Thanks!
> David
>


Re: Import Configuration File in Flink Cluster

2016-05-23 Thread simon peyer
Hi Max 

Thanks a lot for your helpful answer.
It now works on the cluster.
It would be great to have a method for loading from resources.

-Cheers
Simon


> On 23 May 2016, at 17:52, Maximilian Michels  wrote:
> 
> Hi Simon,
> 
> AFAIK this is the way to go. We could add a method to the
> ParameterTool which loads from a resource to make it more convenient.
> 
> Cheers,
> Max
> 
> On Mon, May 23, 2016 at 4:42 PM, simon peyer  wrote:
>> Hi
>> 
>> @Max
>> So for each file in the src/main/resources folder, I first have to create a
>> new file, copy the file from the resources folder to this new file and then
>> I'm able to parse it?
>> 
>> @Stefano
>> I think the files in src/main/resources  are integrated automatically right?
>> Or am I missing something?
>> 
>> Cheers
>> Simon
>> 
>> 
>> On 23 May 2016, at 16:30, Stefano Baghino 
>> wrote:
>> 
>> Are you using Maven to package your project? I believe the resources
>> plugin[1] can suit your needs.
>> 
>> [1]:
>> http://maven.apache.org/plugins/maven-resources-plugin/examples/include-exclude.html
>> 
>> 
>> On 23 May 2016, at 16:28, Maximilian Michels  wrote:
>> 
>> Hi Simon,
>> 
>> You'll have to write the property file to disk first to load it using
>> the ParameterTool.fromPropertiesFile method.
>> 
>> For example:
>> 
>> // copy config from Java resource to a file
>> File configOnDisk = new File("/path/to/config.properties");
>> Files.copy(getClass.getClassLoader.getResourceAsStream("config.properties"),
>> configOnDisk.toPath());
>> // load the new file
>> ParameterTool.fromPropertiesFile(configOnDisk);
>> 
>> 
>> Cheers,
>> Max
>> 
>> 
>> 
>> On Mon, May 23, 2016 at 3:56 PM, simon peyer  wrote:
>>> 
>>> Hi together
>>> 
>>> Currently I'm using flink on a docker cluster on AWS.
>>> I would like to use property files with the integrated
>>> ParameterTool.fromPropertiesFile function of Flink.
>>> 
>>> Locally this version works absolutely fine:
>>> val configuration =
>>> ParameterTool.fromPropertiesFile("src/main/resources/config.properties")
>>> 
>>> But on the cluster this didn't work, so we introduced this version, which
>>> also doesn't work:
>>> 
>>> val baseParameters =
>>> ParameterTool.fromPropertiesFile(getClass.getClassLoader.getResource("config.properties").getFile)
>>> 
>>> gives
>>> 
>>> java.io.FileNotFoundException: Properties file
>>> file:/tmp/flink-web-upload-57bcc912-bc98-4c89-b5ee-c5176155abd5/992186c1-b3c3-4342-a5c8-67af133155e4pipeline-0.1.0-all.jar!/config.properties
>>> does not exist
>>> 
>>> The property file is located in src/main/resources.
>>> Do you have any idea of how to integrate such property files into the jar
>>> package?
>>> 
>>> -Thanks
>>> Simon
>> 
>> 
>> 
>> 
>> --
>> BR,
>> Stefano Baghino
>> 
>> Software Engineer @ Radicalbit
>> 
>> 



Re: Import Configuration File in Flink Cluster

2016-05-23 Thread Maximilian Michels
Hi Simon,

AFAIK this is the way to go. We could add a method to the
ParameterTool which loads from a resource to make it more convenient.

Cheers,
Max

On Mon, May 23, 2016 at 4:42 PM, simon peyer  wrote:
> Hi
>
> @Max
> So for each file in the src/main/resources folder, I first have to create a
> new file, copy the file from the resources folder to this new file and then
> I'm able to parse it?
>
> @Stefano
> I think the files in src/main/resources  are integrated automatically right?
> Or am I missing something?
>
> Cheers
> Simon
>
>
> On 23 May 2016, at 16:30, Stefano Baghino 
> wrote:
>
> Are you using Maven to package your project? I believe the resources
> plugin[1] can suit your needs.
>
> [1]:
> http://maven.apache.org/plugins/maven-resources-plugin/examples/include-exclude.html
>
>
> On 23 May 2016, at 16:28, Maximilian Michels  wrote:
>
> Hi Simon,
>
> You'll have to write the property file to disk first to load it using
> the ParameterTool.fromPropertiesFile method.
>
> For example:
>
> // copy config from Java resource to a file
> File configOnDisk = new File("/path/to/config.properties");
> Files.copy(getClass.getClassLoader.getResourceAsStream("config.properties"),
> configOnDisk.toPath());
> // load the new file
> ParameterTool.fromPropertiesFile(configOnDisk);
>
>
> Cheers,
> Max
>
>
>
> On Mon, May 23, 2016 at 3:56 PM, simon peyer  wrote:
>>
>> Hi together
>>
>> Currently I'm using flink on a docker cluster on AWS.
>> I would like to use property files with the integrated
>> ParameterTool.fromPropertiesFile function of Flink.
>>
>> Locally this version works absolutely fine:
>> val configuration =
>> ParameterTool.fromPropertiesFile("src/main/resources/config.properties")
>>
>> But on the cluster this didn't work, so we introduced this version, which
>> also doesn't work:
>>
>> val baseParameters =
>> ParameterTool.fromPropertiesFile(getClass.getClassLoader.getResource("config.properties").getFile)
>>
>> gives
>>
>> java.io.FileNotFoundException: Properties file
>> file:/tmp/flink-web-upload-57bcc912-bc98-4c89-b5ee-c5176155abd5/992186c1-b3c3-4342-a5c8-67af133155e4pipeline-0.1.0-all.jar!/config.properties
>> does not exist
>>
>> The property file is located in src/main/resources.
>> Do you have any idea of how to integrate such property files into the jar
>> package?
>>
>> -Thanks
>> Simon
>
>
>
>
> --
> BR,
> Stefano Baghino
>
> Software Engineer @ Radicalbit
>
>


Re: writeAsCSV with partitionBy

2016-05-23 Thread KirstiLaurila
Is there any plans to implement this kind of feature (possibility to write to
data specified partitions) in the near future?



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/writeAsCSV-with-partitionBy-tp4893p7099.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: keyBy on a collection of Pojos

2016-05-23 Thread Al-Isawi Rami
Thanks, setters and getters for public fields have no purpose. Also per the 
conditions you have mentioned:
"All fields either have to be public or there must be getters and setters for 
all non-public fields.”
Since my fields are declared public there are no impact on adding getters and 
setters. ( I have also testing after adding the setters and getters and as 
expected that has no effect).

Could you spot anything else? this should be really easy basic case. I am 
really wondering why it is not working.

For the people who are lazy to open the gist code snippet, this is what I am 
trying to do:

pojoExampleDataStream.
keyBy("productId").
sum("count").
print();




Regards,
-Rami


On 23 May 2016, at 17:11, Flavio Pompermaier 
> wrote:

You don't have getters and setters for count and productId.

Your class should be

public class PojoExample {
public int count;
public String productId;

public PojoExample() {}

public int getCount() {
return count;
}

public void setCount(int count) {
this.count = count;
}

public String getProductId() {
return productId;
}

public void setProductId(String productId) {
this.productId = productId;
}
}



On Mon, May 23, 2016 at 3:40 PM, Al-Isawi Rami 
> wrote:
Thanks Flavio, but as you can see in my code I have already declared my pojo to 
achieve those conditions:
public class PojoExample {
public int count;
public String productId;
public PojoExample() {
}

}

So it cannot be that.

-Rami

On 23 May 2016, at 16:30, Flavio Pompermaier 
> wrote:


Conditions for a class to be treated as a POJO by Flink:

  *   The class must be public
  *   It must have a public constructor without arguments
  *   All fields either have to be public or there must be getters and setters 
for all non-public fields. If the field name is foo the getter and setters must 
be called getFoo() and setFoo().

I don't know whether you need to implement also hashCode() and equals() actually

Best,
Flavio

On Mon, May 23, 2016 at 3:24 PM, Al-Isawi Rami 
> wrote:
Hi,

I was trying to test some specific issue, but now I cannot seem to get the very 
basic case working. It is most likely that I am blind to something, would 
anyone have quick look at it?
https://gist.github.com/rami-alisawi/d6ff33ae2d4d6e7bb1f8b329e3e5fa77

It is just a collection of pojos where I am just trying to keyBy one field and 
sum into the other, but I am getting:
5> PojoExample{count=0, productId='productA'}
8> PojoExample{count=0, productId='productB'}
5> PojoExample{count=0, productId='productA'}
8> PojoExample{count=0, productId='productB'}
5> PojoExample{count=0, productId='productA'}
5> PojoExample{count=0, productId='productA'}
5> PojoExample{count=0, productId='productA’}

Regards,
-Rami

Disclaimer: This message and any attachments thereto are intended solely for 
the addressed recipient(s) and may contain confidential information. If you are 
not the intended recipient, please notify the sender by reply e-mail and delete 
the e-mail (including any attachments thereto) without producing, distributing 
or retaining any copies thereof. Any review, dissemination or other use of, or 
taking of any action in reliance upon, this information by persons or entities 
other than the intended recipient(s) is prohibited. Thank you.


Disclaimer: This message and any attachments thereto are intended solely for 
the addressed recipient(s) and may contain confidential information. If you are 
not the intended recipient, please notify the sender by reply e-mail and delete 
the e-mail (including any attachments thereto) without producing, distributing 
or retaining any copies thereof. Any review, dissemination or other use of, or 
taking of any action in reliance upon, this information by persons or entities 
other than the intended recipient(s) is prohibited. Thank you.







Disclaimer: This message and any attachments thereto are intended solely for 
the addressed recipient(s) and may contain confidential information. If you are 
not the intended recipient, please notify the sender by reply e-mail and delete 
the e-mail (including any attachments thereto) without producing, distributing 
or retaining any copies thereof. Any review, dissemination or other use of, or 
taking of any action in reliance upon, this information by persons or entities 
other than the intended recipient(s) is prohibited. Thank you.


Re: Import Configuration File in Flink Cluster

2016-05-23 Thread simon peyer
Hi 

@Max
So for each file in the src/main/resources folder, I first have to create a new 
file, copy the file from the resources folder to this new file and then I'm 
able to parse it?

@Stefano
I think the files in src/main/resources  are integrated automatically right? Or 
am I missing something?

Cheers
Simon


> On 23 May 2016, at 16:30, Stefano Baghino  
> wrote:
> 
> Are you using Maven to package your project? I believe the resources 
> plugin[1] can suit your needs.
> 
> [1]: 
> http://maven.apache.org/plugins/maven-resources-plugin/examples/include-exclude.html
>  
> 

On 23 May 2016, at 16:28, Maximilian Michels  wrote:

Hi Simon,

You'll have to write the property file to disk first to load it using
the ParameterTool.fromPropertiesFile method.

For example:

// copy config from Java resource to a file
File configOnDisk = new File("/path/to/config.properties");
Files.copy(getClass.getClassLoader.getResourceAsStream("config.properties"),
configOnDisk.toPath());
// load the new file
ParameterTool.fromPropertiesFile(configOnDisk);


Cheers,
Max


> 
> On Mon, May 23, 2016 at 3:56 PM, simon peyer  > wrote:
> Hi together
> 
> Currently I'm using flink on a docker cluster on AWS.
> I would like to use property files with the integrated 
> ParameterTool.fromPropertiesFile function of Flink.
> 
> Locally this version works absolutely fine:
> val configuration = 
> ParameterTool.fromPropertiesFile("src/main/resources/config.properties")
> 
> But on the cluster this didn't work, so we introduced this version, which 
> also doesn't work:
> 
> val baseParameters = 
> ParameterTool.fromPropertiesFile(getClass.getClassLoader.getResource("config.properties").getFile)
> 
> gives 
> java.io.FileNotFoundException: Properties file 
> file:/tmp/flink-web-upload-57bcc912-bc98-4c89-b5ee-c5176155abd5/992186c1-b3c3-4342-a5c8-67af133155e4pipeline-0.1.0-all.jar!/config.properties
>  does not exist
> The property file is located in src/main/resources.
> Do you have any idea of how to integrate such property files into the jar 
> package?
> 
> -Thanks
> Simon
> 
> 
> 
> -- 
> BR,
> Stefano Baghino
> 
> Software Engineer @ Radicalbit



Re: Import Configuration File in Flink Cluster

2016-05-23 Thread Stefano Baghino
Are you using Maven to package your project? I believe the resources
plugin[1] can suit your needs.

[1]:
http://maven.apache.org/plugins/maven-resources-plugin/examples/include-exclude.html

On Mon, May 23, 2016 at 3:56 PM, simon peyer  wrote:

> Hi together
>
> Currently I'm using flink on a docker cluster on AWS.
> I would like to use property files with the integrated
> ParameterTool.fromPropertiesFile function of Flink.
>
> Locally this version works absolutely fine:
> val configuration = ParameterTool.fromPropertiesFile(
> "src/main/resources/config.properties")
>
> But on the cluster this didn't work, so we introduced this version, which
> also doesn't work:
>
> val baseParameters =
> ParameterTool.fromPropertiesFile(getClass.getClassLoader.getResource(
> "config.properties").getFile)
>
> gives
>
> java.io.FileNotFoundException: Properties file 
> file:/tmp/flink-web-upload-57bcc912-bc98-4c89-b5ee-c5176155abd5/992186c1-b3c3-4342-a5c8-67af133155e4pipeline-0.1.0-all.jar!/config.properties
>  does not exist
>
> The property file is located in src/main/resources.
> Do you have any idea of how to integrate such property files into the jar
> package?
>
> -Thanks
> Simon
>



-- 
BR,
Stefano Baghino

Software Engineer @ Radicalbit


Re: Import Configuration File in Flink Cluster

2016-05-23 Thread Maximilian Michels
Hi Simon,

You'll have to write the property file to disk first to load it using
the ParameterTool.fromPropertiesFile method.

For example:

// copy config from Java resource to a file
File configOnDisk = new File("/path/to/config.properties");
Files.copy(getClass.getClassLoader.getResourceAsStream("config.properties"),
configOnDisk.toPath());
// load the new file
ParameterTool.fromPropertiesFile(configOnDisk);


Cheers,
Max

On Mon, May 23, 2016 at 3:56 PM, simon peyer  wrote:
> Hi together
>
> Currently I'm using flink on a docker cluster on AWS.
> I would like to use property files with the integrated
> ParameterTool.fromPropertiesFile function of Flink.
>
> Locally this version works absolutely fine:
> val configuration =
> ParameterTool.fromPropertiesFile("src/main/resources/config.properties")
>
> But on the cluster this didn't work, so we introduced this version, which
> also doesn't work:
>
> val baseParameters =
> ParameterTool.fromPropertiesFile(getClass.getClassLoader.getResource("config.properties").getFile)
>
> gives
>
> java.io.FileNotFoundException: Properties file
> file:/tmp/flink-web-upload-57bcc912-bc98-4c89-b5ee-c5176155abd5/992186c1-b3c3-4342-a5c8-67af133155e4pipeline-0.1.0-all.jar!/config.properties
> does not exist
>
> The property file is located in src/main/resources.
> Do you have any idea of how to integrate such property files into the jar
> package?
>
> -Thanks
> Simon


Re: Weird Kryo exception (Unable to find class: java.ttil.HashSet)

2016-05-23 Thread Maximilian Michels
What error do you get when you don't register the Kryo serializer?

On Mon, May 23, 2016 at 11:57 AM, Flavio Pompermaier
 wrote:
> With this last settings I was able to terminate the job the second time I
> retried to run it, without restarting the cluster..
> If I don't register the serializer for DateTime the job doesn't start at all
> (from Flink 1.x you have to register it [1]).
> I can't understand what's wrong :(
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/Migration+Guide%3A+0.10.x+to+1.0.x
>
> Best,
> Flavio


Re: keyBy on a collection of Pojos

2016-05-23 Thread Flavio Pompermaier
You don't have getters and setters for count and productId.

Your class should be

public class PojoExample {
public int count;
public String productId;

public PojoExample() {}

public int getCount() {
return count;
}

public void setCount(int count) {
this.count = count;
}

public String getProductId() {
return productId;
}

public void setProductId(String productId) {
this.productId = productId;
}
}



On Mon, May 23, 2016 at 3:40 PM, Al-Isawi Rami 
wrote:

> Thanks Flavio, but as you can see in my code I have already declared my
> pojo to achieve those conditions:
> public class PojoExample {
> public int count;
> public String productId;
> public PojoExample() {
> }
> }
>
> So it cannot be that.
>
> -Rami
>
> On 23 May 2016, at 16:30, Flavio Pompermaier  wrote:
>
> *Conditions* for a class to be treated as a POJO by Flink:
>
>- The class must be public
>- It must have a public constructor without arguments
>- All fields either have to be public or there must be getters and
>setters for all non-public fields. If the field name is foo the getter
>and setters must be called getFoo() and setFoo().
>
> I don't know whether you need to implement also hashCode() and equals()
> actually
> Best,
> Flavio
>
> On Mon, May 23, 2016 at 3:24 PM, Al-Isawi Rami 
> wrote:
>
>> Hi,
>>
>> I was trying to test some specific issue, but now I cannot seem to get
>> the very basic case working. It is most likely that I am blind to
>> something, would anyone have quick look at it?
>> https://gist.github.com/rami-alisawi/d6ff33ae2d4d6e7bb1f8b329e3e5fa77
>>
>> It is just a collection of pojos where I am just trying to keyBy one
>> field and sum into the other, but I am getting:
>> 5> PojoExample{count=0, productId='productA'}
>> 8> PojoExample{count=0, productId='productB'}
>> 5> PojoExample{count=0, productId='productA'}
>> 8> PojoExample{count=0, productId='productB'}
>> 5> PojoExample{count=0, productId='productA'}
>> 5> PojoExample{count=0, productId='productA'}
>> 5> PojoExample{count=0, productId='productA’}
>>
>> Regards,
>> -Rami
>>
>> Disclaimer: This message and any attachments thereto are intended solely
>> for the addressed recipient(s) and may contain confidential information. If
>> you are not the intended recipient, please notify the sender by reply
>> e-mail and delete the e-mail (including any attachments thereto) without
>> producing, distributing or retaining any copies thereof. Any review,
>> dissemination or other use of, or taking of any action in reliance upon,
>> this information by persons or entities other than the intended
>> recipient(s) is prohibited. Thank you.
>>
>
>
> Disclaimer: This message and any attachments thereto are intended solely
> for the addressed recipient(s) and may contain confidential information. If
> you are not the intended recipient, please notify the sender by reply
> e-mail and delete the e-mail (including any attachments thereto) without
> producing, distributing or retaining any copies thereof. Any review,
> dissemination or other use of, or taking of any action in reliance upon,
> this information by persons or entities other than the intended
> recipient(s) is prohibited. Thank you.
>


Re: Combining streams with static data and using REST API as a sink

2016-05-23 Thread Josh
Hi Rami,

Thanks for the fast reply.

   1. In your solution, would I need to create a new stream for 'item
   updates', and add it as a source of my Flink job? Then I would need to
   ensure item updates get broadcast to all nodes that are running my job and
   use them to update the in-memory items database? This sounds like it might
   be a good solution, but I'm not sure how the broadcast would work - it
   sounds like I'd need Flink broadcast variables, but it looks like there's
   no support for changing datasets at the moment:
   https://issues.apache.org/jira/browse/FLINK-3514
   2. I don't understand why an HTTP sink isn't possible. Say the output of
   my job is 'number of items ordered per customer', then for each output I
   want to update a 'customer' in my database, incrementing their
   'item_order_count'. What's wrong with doing that update in the Flink job
   via an HTTP REST call (updating the customer resource), rather than writing
   directly to a database? The reason I'd like to do it this way is to
   decouple the underlying database from Flink.

Josh

On Mon, May 23, 2016 at 2:35 PM, Al-Isawi Rami 
wrote:

> Hi Josh,
>
> I am no expert in Flink yet, but here are my thoughts on this:
>
> 1. what about you stream an event to flink everytime the DB of items have
> an update? then in some background thread you get the new data from the DB
> let it be through REST (if it is only few updates a day) then load the
> results in memory and there is your updated static data.
>
> 2. REST API are over HTTP, how that is possible to be a sink? does not
> sound like flink job at all to serve http requests. simply sink the results
> to some DB and have some component to read from DB and serve it as REST API.
>
> -Rami
>
> On 23 May 2016, at 16:22, Josh  wrote:
>
> Hi all,
>
> I am new to Flink and have a couple of questions which I've had trouble
> finding answers to online. Any advice would be much appreciated!
>
>1. What's a typical way of handling the scenario where you want to
>join streaming data with a (relatively) static data source? For example, if
>I have a stream 'orders' where each order has an 'item_id', and I want to
>join this stream with my database of 'items'. The database of items is
>mostly static (with perhaps a few new items added every day). The database
>can be retrieved either directly from a standard SQL database (postgres) or
>via a REST call. I guess one way to handle this would be to distribute the
>database of items with the Flink tasks, and to redeploy the entire job if
>the items database changes. But I think there's probably a better way to do
>it?
>2. I'd like my Flink job to output state to a REST API. (i.e. using
>the REST API as a sink). Updates would be incremental, e.g. the job would
>output tumbling window counts which need to be added to some property on a
>REST resource, so I'd probably implement this as a PATCH. I haven't found
>much evidence that anyone else has used a REST API as a Flink sink - is
>there a reason why this might be a bad idea?
>
> Thanks for any advice on these,
>
> Josh
>
>
> Disclaimer: This message and any attachments thereto are intended solely
> for the addressed recipient(s) and may contain confidential information. If
> you are not the intended recipient, please notify the sender by reply
> e-mail and delete the e-mail (including any attachments thereto) without
> producing, distributing or retaining any copies thereof. Any review,
> dissemination or other use of, or taking of any action in reliance upon,
> this information by persons or entities other than the intended
> recipient(s) is prohibited. Thank you.
>


Import Configuration File in Flink Cluster

2016-05-23 Thread simon peyer
Hi together

Currently I'm using flink on a docker cluster on AWS.
I would like to use property files with the integrated 
ParameterTool.fromPropertiesFile function of Flink.

Locally this version works absolutely fine:
val configuration = 
ParameterTool.fromPropertiesFile("src/main/resources/config.properties")

But on the cluster this didn't work, so we introduced this version, which also 
doesn't work:

val baseParameters = 
ParameterTool.fromPropertiesFile(getClass.getClassLoader.getResource("config.properties").getFile)

gives 
java.io.FileNotFoundException: Properties file 
file:/tmp/flink-web-upload-57bcc912-bc98-4c89-b5ee-c5176155abd5/992186c1-b3c3-4342-a5c8-67af133155e4pipeline-0.1.0-all.jar!/config.properties
 does not exist
The property file is located in src/main/resources.
Do you have any idea of how to integrate such property files into the jar 
package?

-Thanks
Simon

Re: keyBy on a collection of Pojos

2016-05-23 Thread Al-Isawi Rami
Thanks Flavio, but as you can see in my code I have already declared my pojo to 
achieve those conditions:
public class PojoExample {
public int count;
public String productId;
public PojoExample() {
}

}

So it cannot be that.

-Rami

On 23 May 2016, at 16:30, Flavio Pompermaier 
> wrote:


Conditions for a class to be treated as a POJO by Flink:

  *   The class must be public
  *   It must have a public constructor without arguments
  *   All fields either have to be public or there must be getters and setters 
for all non-public fields. If the field name is foo the getter and setters must 
be called getFoo() and setFoo().

I don't know whether you need to implement also hashCode() and equals() actually

Best,
Flavio

On Mon, May 23, 2016 at 3:24 PM, Al-Isawi Rami 
> wrote:
Hi,

I was trying to test some specific issue, but now I cannot seem to get the very 
basic case working. It is most likely that I am blind to something, would 
anyone have quick look at it?
https://gist.github.com/rami-alisawi/d6ff33ae2d4d6e7bb1f8b329e3e5fa77

It is just a collection of pojos where I am just trying to keyBy one field and 
sum into the other, but I am getting:
5> PojoExample{count=0, productId='productA'}
8> PojoExample{count=0, productId='productB'}
5> PojoExample{count=0, productId='productA'}
8> PojoExample{count=0, productId='productB'}
5> PojoExample{count=0, productId='productA'}
5> PojoExample{count=0, productId='productA'}
5> PojoExample{count=0, productId='productA’}

Regards,
-Rami

Disclaimer: This message and any attachments thereto are intended solely for 
the addressed recipient(s) and may contain confidential information. If you are 
not the intended recipient, please notify the sender by reply e-mail and delete 
the e-mail (including any attachments thereto) without producing, distributing 
or retaining any copies thereof. Any review, dissemination or other use of, or 
taking of any action in reliance upon, this information by persons or entities 
other than the intended recipient(s) is prohibited. Thank you.


Disclaimer: This message and any attachments thereto are intended solely for 
the addressed recipient(s) and may contain confidential information. If you are 
not the intended recipient, please notify the sender by reply e-mail and delete 
the e-mail (including any attachments thereto) without producing, distributing 
or retaining any copies thereof. Any review, dissemination or other use of, or 
taking of any action in reliance upon, this information by persons or entities 
other than the intended recipient(s) is prohibited. Thank you.


Combining streams with static data and using REST API as a sink

2016-05-23 Thread Josh
Hi all,

I am new to Flink and have a couple of questions which I've had trouble
finding answers to online. Any advice would be much appreciated!

   1. What's a typical way of handling the scenario where you want to join
   streaming data with a (relatively) static data source? For example, if I
   have a stream 'orders' where each order has an 'item_id', and I want to
   join this stream with my database of 'items'. The database of items is
   mostly static (with perhaps a few new items added every day). The database
   can be retrieved either directly from a standard SQL database (postgres) or
   via a REST call. I guess one way to handle this would be to distribute the
   database of items with the Flink tasks, and to redeploy the entire job if
   the items database changes. But I think there's probably a better way to do
   it?
   2. I'd like my Flink job to output state to a REST API. (i.e. using the
   REST API as a sink). Updates would be incremental, e.g. the job would
   output tumbling window counts which need to be added to some property on a
   REST resource, so I'd probably implement this as a PATCH. I haven't found
   much evidence that anyone else has used a REST API as a Flink sink - is
   there a reason why this might be a bad idea?

Thanks for any advice on these,

Josh


Re: [RichFlattMapfunction] Configuration File

2016-05-23 Thread simon peyer
Hi Max

Thanks a lot.
I found now this solution:

Passing it as a Configuration object to single functions 

The example below shows how to pass the parameters as a Configuration object to 
a user defined function.

ParameterTool parameters = ParameterTool.fromArgs(args);
DataSet> counts = text.flatMap(new 
Tokenizer()).withParameters(parameters.getConfiguration())
In the Tokenizer, the object is now accessible in the open(Configuration conf) 
method:

public static final class Tokenizer extends RichFlatMapFunction> {
@Override
public void open(Configuration parameters) throws Exception {
parameters.getInteger("myInt", -1);
// .. do


Cheers
Simon

> On 23 May 2016, at 14:01, Maximilian Michels  wrote:
> 
> Hi Simon,
> 
> As Aljoscha said, the best way is to supply the configuration as class
> fields. Alternatively, if you overload the open(..) method, it should
> also show up in the Properties/Configuration tab on the Web interface.
> 
> Cheers,
> Max
> 
> On Mon, May 23, 2016 at 11:43 AM, simon peyer  wrote:
>> Hi Aljoscha
>> 
>> Thanks for your reply.
>> 
>> Regarding question 2, the web dashboard does provide a properties section,
>> besides (
>> 
>> Plan
>> Timeline
>> Exceptions
>> Properties
>> Configuration
>> 
>> 
>> )
>> 
>> 
>> Whats the most common way to handle properties in flink?
>> Is there a general way to go and any kind of integration in flink?
>> 
>> --Simon
>> 
>> 
>> On 21 May 2016, at 10:44, Aljoscha Krettek  wrote:
>> 
>> Hi Simon,
>> regarding 1. yes, the value that you get from state_item.value() and that
>> you set using state_item.update() is scoped to the key of the incoming
>> element.
>> 
>> regarding 2. the open(conf: Configuration) signature is legacy from how
>> Functions used to work quite a while back. In the streaming API this
>> Configuration is always empty. If you want to configure your user function
>> you can have the values as fields in your class and pass them in the
>> constructor.
>> 
>> Cheers,
>> Aljoscha
>> 
>> On Fri, 20 May 2016 at 17:49 simon peyer  wrote:
>>> 
>>> Hi folks
>>> 
>>> I'm extending a RichFlatMapFunction in order to use states on a keyed
>>> stream.
>>> Concerning this i have two questions:
>>> 
>>> 1. I have a  var state_item: ValueState[Option[String]] as a local
>>> variable in this class. Initialized with state_item =
>>> getRuntimeContext.getState(new ValueStateDescriptor. in the open
>>> function.
>>> Is the field state_item for every key different?
>>> 
>>> In other words if I have a key with val1 and val2 will these get two
>>> different states?
>>> 
>>> 
>>> 2. The open function contains a  override def open(conf: Configuration)
>>> configuration.
>>> Is there a way to input a custom configuration in there?
>>> 
>>> Thanks Simon
>> 
>> 



Re: [RichFlattMapfunction] Configuration File

2016-05-23 Thread Maximilian Michels
Hi Simon,

As Aljoscha said, the best way is to supply the configuration as class
fields. Alternatively, if you overload the open(..) method, it should
also show up in the Properties/Configuration tab on the Web interface.

Cheers,
Max

On Mon, May 23, 2016 at 11:43 AM, simon peyer  wrote:
> Hi Aljoscha
>
> Thanks for your reply.
>
> Regarding question 2, the web dashboard does provide a properties section,
> besides (
>
> Plan
> Timeline
> Exceptions
> Properties
> Configuration
>
>
> )
>
>
> Whats the most common way to handle properties in flink?
> Is there a general way to go and any kind of integration in flink?
>
> --Simon
>
>
> On 21 May 2016, at 10:44, Aljoscha Krettek  wrote:
>
> Hi Simon,
> regarding 1. yes, the value that you get from state_item.value() and that
> you set using state_item.update() is scoped to the key of the incoming
> element.
>
> regarding 2. the open(conf: Configuration) signature is legacy from how
> Functions used to work quite a while back. In the streaming API this
> Configuration is always empty. If you want to configure your user function
> you can have the values as fields in your class and pass them in the
> constructor.
>
> Cheers,
> Aljoscha
>
> On Fri, 20 May 2016 at 17:49 simon peyer  wrote:
>>
>> Hi folks
>>
>> I'm extending a RichFlatMapFunction in order to use states on a keyed
>> stream.
>> Concerning this i have two questions:
>>
>> 1. I have a  var state_item: ValueState[Option[String]] as a local
>> variable in this class. Initialized with state_item =
>> getRuntimeContext.getState(new ValueStateDescriptor. in the open
>> function.
>> Is the field state_item for every key different?
>>
>> In other words if I have a key with val1 and val2 will these get two
>> different states?
>>
>>
>> 2. The open function contains a  override def open(conf: Configuration)
>> configuration.
>> Is there a way to input a custom configuration in there?
>>
>> Thanks Simon
>
>


Re: problem of sharing TCP connection when transferring data

2016-05-23 Thread Ufuk Celebi
Yes, that is a correct description of the state of things.

A way to improve this is to introduce flow control in the application
layer, where consumers only receive buffers when they have buffers
available. They could announce on the channel how many buffers they
have before they receive anything. This way there will be no blocking
of the channel and we could actually multiplex more consumers over the
same channel.

The implementation is probably a little tricky, but if you want to
work on this and have time to actually do it, we can think about the
details. :-) Would you be interested? If yes, let's schedule a Hangout
where we brainstorm about the solution and how to implement it.
Ideally, we would come up with a design document, which we share on
the mailing list and then we continue implementing it. I currently
only have time to act as a guide/mentor and you would have to do most
of the implementation.

– Ufuk



On Mon, May 23, 2016 at 5:40 AM, wangzhijiang999
 wrote:
> Hi,
>
>  I am confused with sharing tcp connection for the same connectionID, if
> two tasks share the same connection, and there is no available buffer in the
> local buffer pool of the first task  , then it will set autoread as false
> for the channel, but it will effect the second task if it still has
> available buffer. So if one of the tasks no available buffer , all the other
> tasks can not read data from channel because of this. My understanding is
> right? If so, are there any improvements for it?  Thank you for any help!
>
>
>
>
>


Re: HDFS namenode and Flink

2016-05-23 Thread Stefano Baghino
One last quick note: if you're going to run individual jobs on YARN instead
of a long running session, make sure you provide each job with a separate
set of directories for (surely) ZK storage and (possibly*) state backend,
otherwise the state of the jobs will end up entangled and you may
experience some undefined behavior.

* I'm not really sure about this last one, perhaps some more experienced ML
user can help me out on this.

On Mon, May 23, 2016 at 12:54 PM, Stefano Baghino <
stefano.bagh...@radicalbit.io> wrote:

> I think the only keys of interest for your needs (highly available with
> HDFS state backend) are
>
> state.backend: filesystem
> state.backend.fs.checkpointdir: hdfs:///path/to/checkpoints # fill in
> according to your needs
> recovery.zookeeper.storageDir: /path/to/znode # again, fill in according
> to your needs
> recovery.mode: zookeeper
> recovery.zookeeper.quorum:
> zk-ensemble-1:2181,zk-ensemble-2:2181,zk-ensemble-3:2181 # put your zk
> ensemble here
>
> If these keys are set you should be good to go. I hope I've been of some
> help. :)
>
> On Mon, May 23, 2016 at 12:37 PM,  wrote:
>
>> Hello flinkers,
>>
>> We will activate namenode HDFS high availability in our cluster, and I
>> want to know if there is additional configuration for flink ?
>> We actually use YARN for launching our flink application, and hdfs
>> filesystem to store the state backend
>>
>> Thanks
>>
>> Thomas
>>
>
>
>
> --
> BR,
> Stefano Baghino
>
> Software Engineer @ Radicalbit
>



-- 
BR,
Stefano Baghino

Software Engineer @ Radicalbit


Re: HDFS namenode and Flink

2016-05-23 Thread Stefano Baghino
I think the only keys of interest for your needs (highly available with
HDFS state backend) are

state.backend: filesystem
state.backend.fs.checkpointdir: hdfs:///path/to/checkpoints # fill in
according to your needs
recovery.zookeeper.storageDir: /path/to/znode # again, fill in according to
your needs
recovery.mode: zookeeper
recovery.zookeeper.quorum:
zk-ensemble-1:2181,zk-ensemble-2:2181,zk-ensemble-3:2181 # put your zk
ensemble here

If these keys are set you should be good to go. I hope I've been of some
help. :)

On Mon, May 23, 2016 at 12:37 PM,  wrote:

> Hello flinkers,
>
> We will activate namenode HDFS high availability in our cluster, and I
> want to know if there is additional configuration for flink ?
> We actually use YARN for launching our flink application, and hdfs
> filesystem to store the state backend
>
> Thanks
>
> Thomas
>



-- 
BR,
Stefano Baghino

Software Engineer @ Radicalbit


Fwd: HDFS namenode and Flink

2016-05-23 Thread thomas

Hello flinkers,

We will activate namenode HDFS high availability in our cluster, and I 
want to know if there is additional configuration for flink ?
We actually use YARN for launching our flink application, and hdfs 
filesystem to store the state backend


Thanks

Thomas


Re: Weird Kryo exception (Unable to find class: java.ttil.HashSet)

2016-05-23 Thread Maximilian Michels
Hi Flavio,

These error messages are quite odd. Looks like an off by one error in the
serializer/deserializer. Must be somehow related to the Kryo serialization
stack because it doesn't seem to occur with Flink's serialization system.

Does the job run fine if you don't register the custom Kryo serializer?
After all, the DateTime class is Serializable and doesn't necessarily need
a custom Kryo serializer.

Cheers,
Max

On Mon, May 23, 2016 at 10:32 AM, Flavio Pompermaier 
wrote:

> Changing
>
>- taskmanager.memory.fraction, from 0.7 to 0.9
>- taskmanager.memory.off-heap, from true to false
>- decreasing the slots of each tm from 2 to 1
>
> I had this Exception:
> java.lang.Exception: The data preparation for task 'GroupReduce
> (GroupReduce at main(AciDataInference.java:331))' , caused an error: Error
> obtaining the sorted input: Thread 'SortMerger spilling thread' terminated
> due to an exception: -2
> at
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:456)
> at
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
> Thread 'SortMerger spilling thread' terminated due to an exception: -2
> at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
> at
> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
> at
> org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)
> at
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
> ... 3 more
> Caused by: java.io.IOException: Thread 'SortMerger spilling thread'
> terminated due to an exception: -2
> at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
> Caused by: java.lang.ArrayIndexOutOfBoundsException: -2
> at java.util.ArrayList.elementData(ArrayList.java:418)
> at java.util.ArrayList.get(ArrayList.java:431)
> at
> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
> at
> com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
> at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
> at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:242)
> at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:252)
> at
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:556)
> at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:75)
> at
> org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
> at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
> at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
>
>
> On Mon, May 23, 2016 at 10:04 AM, Flavio Pompermaier  > wrote:
>
>> Changing
>>
>>- taskmanager.memory.fraction, from 0.9 to 0.7
>>- taskmanager.memory.off-heap, from false to true
>>- decreasing the slots of each tm from 3 to 2
>>
>> I had this error:
>>
>> 2016-05-23 09:55:42,534 ERROR
>> org.apache.flink.runtime.operators.BatchTask  - Error in
>> task code:  CHAIN FlatMap (FlatMap at main(MyApp.java:246)) -> Map (Key
>> Extractor) (7/14)
>> java.io.IOException: Received an event in channel 0 while still having
>> data from a record. This indicates broken serialization logic. If you are
>> using custom serialization code (Writable or Value types), check their
>> serialization routines. In the case of Kryo, check the respective Kryo
>> serializer.
>> at
>> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:90)
>> at
>> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
>> at
>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
>> at
>> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:101)
>> at
>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
>> at
>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>> at java.lang.Thread.run(Thread.java:745)
>>
>>
>> On Mon, May 

Re: [RichFlattMapfunction] Configuration File

2016-05-23 Thread simon peyer
Hi Aljoscha

Thanks for your reply.

Regarding question 2, the web dashboard does provide a properties section, 
besides (
Plan
 
Timeline
 
Exceptions
 
Properties
 
Configuration
 


)


Whats the most common way to handle properties in flink?
Is there a general way to go and any kind of integration in flink?

--Simon


> On 21 May 2016, at 10:44, Aljoscha Krettek  wrote:
> 
> Hi Simon,
> regarding 1. yes, the value that you get from state_item.value() and that you 
> set using state_item.update() is scoped to the key of the incoming element. 
> 
> regarding 2. the open(conf: Configuration) signature is legacy from how 
> Functions used to work quite a while back. In the streaming API this 
> Configuration is always empty. If you want to configure your user function 
> you can have the values as fields in your class and pass them in the 
> constructor.
> 
> Cheers,
> Aljoscha
> 
> On Fri, 20 May 2016 at 17:49 simon peyer  > wrote:
> Hi folks
> 
> I'm extending a RichFlatMapFunction in order to use states on a keyed stream.
> Concerning this i have two questions:
> 
> 1. I have a  var state_item: ValueState[Option[String]] as a local variable 
> in this class. Initialized with state_item = getRuntimeContext.getState(new 
> ValueStateDescriptor. in the open function.
> Is the field state_item for every key different?
> 
> In other words if I have a key with val1 and val2 will these get two 
> different states?
> 
> 
> 2. The open function contains a  override def open(conf: Configuration) 
> configuration.
> Is there a way to input a custom configuration in there?
> 
> Thanks Simon



Re: Weird Kryo exception (Unable to find class: java.ttil.HashSet)

2016-05-23 Thread Flavio Pompermaier
Changing

   - taskmanager.memory.fraction, from 0.7 to 0.9
   - taskmanager.memory.off-heap, from true to false
   - decreasing the slots of each tm from 2 to 1

I had this Exception:
java.lang.Exception: The data preparation for task 'GroupReduce
(GroupReduce at main(AciDataInference.java:331))' , caused an error: Error
obtaining the sorted input: Thread 'SortMerger spilling thread' terminated
due to an exception: -2
at
org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:456)
at
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
Thread 'SortMerger spilling thread' terminated due to an exception: -2
at
org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
at
org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
at
org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)
at
org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
... 3 more
Caused by: java.io.IOException: Thread 'SortMerger spilling thread'
terminated due to an exception: -2
at
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
Caused by: java.lang.ArrayIndexOutOfBoundsException: -2
at java.util.ArrayList.elementData(ArrayList.java:418)
at java.util.ArrayList.get(ArrayList.java:431)
at
com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:242)
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:252)
at
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:556)
at
org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:75)
at
org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
at
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
at
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)


On Mon, May 23, 2016 at 10:04 AM, Flavio Pompermaier 
wrote:

> Changing
>
>- taskmanager.memory.fraction, from 0.9 to 0.7
>- taskmanager.memory.off-heap, from false to true
>- decreasing the slots of each tm from 3 to 2
>
> I had this error:
>
> 2016-05-23 09:55:42,534 ERROR
> org.apache.flink.runtime.operators.BatchTask  - Error in
> task code:  CHAIN FlatMap (FlatMap at main(MyApp.java:246)) -> Map (Key
> Extractor) (7/14)
> java.io.IOException: Received an event in channel 0 while still having
> data from a record. This indicates broken serialization logic. If you are
> using custom serialization code (Writable or Value types), check their
> serialization routines. In the case of Kryo, check the respective Kryo
> serializer.
> at
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:90)
> at
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
> at
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
> at
> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:101)
> at
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
> at
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> at java.lang.Thread.run(Thread.java:745)
>
>
> On Mon, May 23, 2016 at 9:31 AM, Flavio Pompermaier 
> wrote:
>
>> I've slightly modified the program to shorten the length on the entire
>> job and this time I had this Exception:
>>
>> 2016-05-23 09:26:51,438 ERROR
>> org.apache.flink.runtime.io.disk.iomanager.IOManager  - IO Thread
>> 'IOManager writer thread #1' terminated due to an exception. Shutting down
>> I/O Manager.
>> java.lang.ClassCastException: java.nio.DirectByteBuffer$Deallocator
>> cannot be cast to org.apache.flink.runtime.io.disk.iomanager.WriteRequest
>> at
>> 

Re: Flink's WordCount at scale of 1BLN of unique words

2016-05-23 Thread Matthias J. Sax
Are you talking about a streaming or a batch job?

You are mentioning a "text stream" but also say you want to stream 100TB
-- indicating you have a finite data set using DataSet API.

-Matthias

On 05/22/2016 09:50 PM, Xtra Coder wrote:
> Hello, 
> 
> Question from newbie about how Flink's WordCount will actually work at
> scale. 
> 
> I've read/seen rather many high-level presentations and do not see
> more-or-less clear answers for following …
> 
> Use-case: 
> --
> there is huuuge text stream with very variable set of words – let's say
> 1BLN of unique words. Storing them just as raw text, without
> supplementary data, will take roughly 16TB of RAM. How Flink is
> approaching this internally. 
> 
> Here I'm more interested in following:
> 1.  How individual words are spread in cluster of Flink nodes? 
> Will each word appear exactly in one node and will be counted there or
> ... I'm not sure about the variants
> 
> 2.  As far as I understand – while job is running all its intermediate
> aggregation results are stored in-memory across cluster nodes (which may
> be partially written to local drive). 
> Wild guess - what size of cluster is required to run above mentioned
> tasks efficiently?
> 
> And two functional question on top of this  ...
> 
> 1. Since intermediate results are in memory – I guess it should be
> possible to get “current” counter for any word being processed. 
> Is this possible?
> 
> 2. After I've streamed 100TB of text – what will be the right way to
> save result to HDFS. For example I want to save list of words ordered by
> key with portions of 10mln per file compressed with bzip2. 
> What APIs I should use? 
> Since Flink uses intermediate snapshots for falt-tolerance - is it
> possible to save whole "current" state without stopping the stream?
> 
> Thanks.



signature.asc
Description: OpenPGP digital signature


Re: Weird Kryo exception (Unable to find class: java.ttil.HashSet)

2016-05-23 Thread Flavio Pompermaier
Changing

   - taskmanager.memory.fraction, from 0.9 to 0.7
   - taskmanager.memory.off-heap, from false to true
   - decreasing the slots of each tm from 3 to 2

I had this error:

2016-05-23 09:55:42,534 ERROR
org.apache.flink.runtime.operators.BatchTask  - Error in
task code:  CHAIN FlatMap (FlatMap at main(MyApp.java:246)) -> Map (Key
Extractor) (7/14)
java.io.IOException: Received an event in channel 0 while still having data
from a record. This indicates broken serialization logic. If you are using
custom serialization code (Writable or Value types), check their
serialization routines. In the case of Kryo, check the respective Kryo
serializer.
at
org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:90)
at
org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
at
org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
at
org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:101)
at
org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
at
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)


On Mon, May 23, 2016 at 9:31 AM, Flavio Pompermaier 
wrote:

> I've slightly modified the program to shorten the length on the entire job
> and this time I had this Exception:
>
> 2016-05-23 09:26:51,438 ERROR
> org.apache.flink.runtime.io.disk.iomanager.IOManager  - IO Thread
> 'IOManager writer thread #1' terminated due to an exception. Shutting down
> I/O Manager.
> java.lang.ClassCastException: java.nio.DirectByteBuffer$Deallocator cannot
> be cast to org.apache.flink.runtime.io.disk.iomanager.WriteRequest
> at
> org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$WriterThread.run(IOManagerAsync.java:486)
>
>
> I don't know wheter this is related to the others or not..
>
>
> On Sat, May 21, 2016 at 11:00 AM, Flavio Pompermaier  > wrote:
>
>> I think this bug comes from something in
>> SpillingAdaptiveSpanningRecordDeserializer..I've tried to find a common
>> point of failure in all those messages and I found that it contains also
>> this error message that I got once:
>>
>> private static final String BROKEN_SERIALIZATION_ERROR_MESSAGE =
>> "Serializer consumed more bytes than the record
>> had. " +
>> "This indicates broken serialization. If you are
>> using custom serialization types " +
>> "(Value or Writable), check their serialization
>> methods. If you are using a " +
>> "Kryo-serialized type, check the corresponding
>> Kryo serializer.";
>>
>> Any clue about how to find what is causing this?
>>
>>
>>
>>
>> On Sat, May 21, 2016 at 10:53 AM, Flavio Pompermaier <
>> pomperma...@okkam.it> wrote:
>>
>>> I tried to move flink tmp dir from hdd disks to sdd ones (in order to
>>>  exclude faulty disks) and I had another of those Exception:
>>>
>>> java.lang.IllegalArgumentException: The datetime zone id 'Europe/Romd'
>>> is not recognised
>>> at org.joda.time.DateTimeZone.forID(DateTimeZone.java:229)
>>> at
>>> de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer.readTimeZone(JodaDateTimeSerializer.java:94)
>>> at
>>> de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer.read(JodaDateTimeSerializer.java:74)
>>> at
>>> de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer.read(JodaDateTimeSerializer.java:59)
>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>>> at
>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
>>> at
>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:242)
>>> at
>>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:501)
>>> at
>>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:441)
>>> at
>>> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>>> at
>>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
>>> at
>>> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
>>> at
>>> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
>>> at
>>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
>>> at
>>> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:101)
>>> at 

Re: Weird Kryo exception (Unable to find class: java.ttil.HashSet)

2016-05-23 Thread Flavio Pompermaier
I've slightly modified the program to shorten the length on the entire job
and this time I had this Exception:

2016-05-23 09:26:51,438 ERROR
org.apache.flink.runtime.io.disk.iomanager.IOManager  - IO Thread
'IOManager writer thread #1' terminated due to an exception. Shutting down
I/O Manager.
java.lang.ClassCastException: java.nio.DirectByteBuffer$Deallocator cannot
be cast to org.apache.flink.runtime.io.disk.iomanager.WriteRequest
at
org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$WriterThread.run(IOManagerAsync.java:486)


I don't know wheter this is related to the others or not..

On Sat, May 21, 2016 at 11:00 AM, Flavio Pompermaier 
wrote:

> I think this bug comes from something in
> SpillingAdaptiveSpanningRecordDeserializer..I've tried to find a common
> point of failure in all those messages and I found that it contains also
> this error message that I got once:
>
> private static final String BROKEN_SERIALIZATION_ERROR_MESSAGE =
> "Serializer consumed more bytes than the record
> had. " +
> "This indicates broken serialization. If you are
> using custom serialization types " +
> "(Value or Writable), check their serialization
> methods. If you are using a " +
> "Kryo-serialized type, check the corresponding
> Kryo serializer.";
>
> Any clue about how to find what is causing this?
>
>
>
>
> On Sat, May 21, 2016 at 10:53 AM, Flavio Pompermaier  > wrote:
>
>> I tried to move flink tmp dir from hdd disks to sdd ones (in order to
>>  exclude faulty disks) and I had another of those Exception:
>>
>> java.lang.IllegalArgumentException: The datetime zone id 'Europe/Romd' is
>> not recognised
>> at org.joda.time.DateTimeZone.forID(DateTimeZone.java:229)
>> at
>> de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer.readTimeZone(JodaDateTimeSerializer.java:94)
>> at
>> de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer.read(JodaDateTimeSerializer.java:74)
>> at
>> de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer.read(JodaDateTimeSerializer.java:59)
>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>> at
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
>> at
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:242)
>> at
>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:501)
>> at
>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:441)
>> at
>> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>> at
>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
>> at
>> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
>> at
>> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
>> at
>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
>> at
>> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:101)
>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
>> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>> at java.lang.Thread.run(Thread.java:745)
>>
>>
>> On Fri, May 20, 2016 at 8:34 PM, Flavio Pompermaier > > wrote:
>>
>>> Right now I'm using Flink 1.0.2...to which version should I downgrade?
>>> The hardware seems to be ok..how could I detect a faulty hardware?
>>> These errors appeared in every run of my job after I moved the temporary
>>> directory from ssd to hdd and I extended my pipeline with a dataset that
>>> grows as the pipeline goes on,accumulating data from intermediate datasets..
>>> On 20 May 2016 18:31, "Fabian Hueske"  wrote:
>>>
 The problem seems to occur quite often.
 Did you update your Flink version recently? If so, could you try to
 downgrade and see if the problem disappears.

 Is it otherwise possible that it is cause by faulty hardware?

 2016-05-20 18:05 GMT+02:00 Flavio Pompermaier :

> This time (Europed instead of Europe):
>
> java.lang.Exception: The data preparation for task 'CHAIN GroupReduce 
> (GroupReduce at createResult(PassaggioWithComprAndVend.java:132)) -> Map 
> (Key Extractor)' , caused an error: Error obtaining the sorted input: 
> Thread 'SortMerger spilling thread' terminated due to an exception: The 
> datetime zone id 'Europd/Rome' is not recognised
>   at