Re: Graph Analytics on HBase With HGraphDB and Apache Flink Gelly

2017-07-27 Thread Robert Yokota
Also Google Cloud Bigtable has such a page at
https://cloud.google.com/bigtable/docs/integrations

On Thu, Jul 27, 2017 at 6:57 PM, Robert Yokota  wrote:

>
> One thing I really appreciate about HBase is its flexibility.  It doesn't
> enforce a schema, but also doesn't prevent you from building a schema layer
> on top.  It is very customizable, allowing you to push arbitrary code to
> the server in the form of filters and coprocessors.
>
> Not having such higher-layer features built into HBase allows it to remain
> flexibile, but it does have a down-side.  One complaint is that for a new
> user coming to HBase, who perhaps does want to work with things like query
> languages, schemas, secondary indices, transactions, and so forth, it can
> be daunting to research and understand what other projects in the HBase
> ecosystem can help him/her, how others have used such projects, and under
> what use cases each project might be successful or not.
>
> Perhaps a good start would be something like an "HBase ecosystem" page at
> the website that would list projects like Phoenix, Tephra, and others in
> the HBase ecosystem.  The Apache TinkerPop site has a listing of projects
> in its ecosystem at http://tinkerpop.apache.org.   I think new users
> coming to HBase aren't even aware of the larger ecosystem, and sometimes
> end up selecting alternative data stores as a result.
>
> P.S.  I'm using HBase 1.1.2
>
> On Thu, Jul 27, 2017 at 5:42 PM, Ted Yu  wrote:
>
>> Interesting blog.
>>
>> From your experience, is there anything on hbase side which you see room
>> for improvement ?
>>
>> Which hbase release are you using ?
>>
>> Cheers
>>
>> On Thu, Jul 27, 2017 at 3:11 PM, Robert Yokota 
>> wrote:
>>
>>> In case anyone is interested, I wrote a blog on how to analyze graphs
>>> stored in HBase with Apache Flink Gelly:
>>>
>>> https://yokota.blog/2017/07/27/graph-analytics-on-hbase-with
>>> -hgraphdb-and-apache-flink-gelly/
>>>
>>
>>
>


Re: Graph Analytics on HBase With HGraphDB and Apache Flink Gelly

2017-07-27 Thread Robert Yokota
One thing I really appreciate about HBase is its flexibility.  It doesn't
enforce a schema, but also doesn't prevent you from building a schema layer
on top.  It is very customizable, allowing you to push arbitrary code to
the server in the form of filters and coprocessors.

Not having such higher-layer features built into HBase allows it to remain
flexibile, but it does have a down-side.  One complaint is that for a new
user coming to HBase, who perhaps does want to work with things like query
languages, schemas, secondary indices, transactions, and so forth, it can
be daunting to research and understand what other projects in the HBase
ecosystem can help him/her, how others have used such projects, and under
what use cases each project might be successful or not.

Perhaps a good start would be something like an "HBase ecosystem" page at
the website that would list projects like Phoenix, Tephra, and others in
the HBase ecosystem.  The Apache TinkerPop site has a listing of projects
in its ecosystem at http://tinkerpop.apache.org.   I think new users coming
to HBase aren't even aware of the larger ecosystem, and sometimes end up
selecting alternative data stores as a result.

P.S.  I'm using HBase 1.1.2

On Thu, Jul 27, 2017 at 5:42 PM, Ted Yu  wrote:

> Interesting blog.
>
> From your experience, is there anything on hbase side which you see room
> for improvement ?
>
> Which hbase release are you using ?
>
> Cheers
>
> On Thu, Jul 27, 2017 at 3:11 PM, Robert Yokota  wrote:
>
>> In case anyone is interested, I wrote a blog on how to analyze graphs
>> stored in HBase with Apache Flink Gelly:
>>
>> https://yokota.blog/2017/07/27/graph-analytics-on-hbase-with
>> -hgraphdb-and-apache-flink-gelly/
>>
>
>


Re: Graph Analytics on HBase With HGraphDB and Apache Flink Gelly

2017-07-27 Thread Ted Yu
Interesting blog.

>From your experience, is there anything on hbase side which you see room
for improvement ?

Which hbase release are you using ?

Cheers

On Thu, Jul 27, 2017 at 3:11 PM, Robert Yokota  wrote:

> In case anyone is interested, I wrote a blog on how to analyze graphs
> stored in HBase with Apache Flink Gelly:
>
> https://yokota.blog/2017/07/27/graph-analytics-on-hbase-with
> -hgraphdb-and-apache-flink-gelly/
>


Graph Analytics on HBase With HGraphDB and Apache Flink Gelly

2017-07-27 Thread Robert Yokota
In case anyone is interested, I wrote a blog on how to analyze graphs
stored in HBase with Apache Flink Gelly:

https://yokota.blog/2017/07/27/graph-analytics-on-hbase-
with-hgraphdb-and-apache-flink-gelly/


Flink CLI cannot submit job to Flink on Mesos

2017-07-27 Thread Francisco Gonzalez Barea
Hello,

We´re having lot of issues while trying to submit a job remotely using the 
Flink CLI command line tool. We have tried different configurations but in all 
of them we get errors from AKKA while trying to connect. I will try to 
summarise the configurations we´ve tried.

- Flink 1.3.0 deployed within a docker container on a Mesos cluster (using 
Marathon)
- This flink has the property jobmanager.rpc.address as a hostname (i.e. kind 
of ip-X.eu.west-1.compute.internal)
- Use the same version for Flink Client remotely (e.g. in my laptop).

When I try to submit the job using the command flink run -m myHostName:myPort 
(the same in jobmanager.rpc.address and jobmanager.rpc.port) after some time 
waiting I get the trace at the end of this email. In the flink side we get this 
error from AKKA:

Association with remote system [akka.tcp://flink@10.203.23.24:24469] has 
failed, address is now gated for [5000] ms. Reason: [Association failed with 
[akka.tcp://flink@10.203.23.24:24469]] Caused by: [Connection refused: 
/10.203.23.24:24469]

After reading a bit, it seems there´re some problems related to akka resolving 
hostnames to ips, so we decided to startup the same flink but changing 
jobmanager.rpc.address to have the direct ip (i.e. kind of XX.XXX.XX.XX). In 
this case I´m getting same trace (at the end of the email) from the client side 
and this one from the Flink server:

Discard message 
LeaderSessionMessage(----,SubmitJob(JobGraph(jobId:
 b25d5c5ced962632abc5ee9ef867792e),DETACHED)) because the expected leader 
session ID b4f53899-5d70-467e-8e9d-e56eeb60b6e3 did not equal the received 
leader session ID ----.

We have tried some other stuff but without success… any clue that could help us?

Thanks in advance!

org.apache.flink.client.program.ProgramInvocationException: The program 
execution failed: JobManager did not respond within 6 milliseconds
at 
org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:454)
at 
org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:99)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:400)
at 
org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(DetachedEnvironment.java:76)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:345)
at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:831)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:256)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1073)
at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1120)
at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1117)
at 
org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1116)
Caused by: org.apache.flink.runtime.client.JobTimeoutException: JobManager did 
not respond within 6 milliseconds
at 
org.apache.flink.runtime.client.JobClient.submitJobDetached(JobClient.java:426)
at 
org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:451)
... 15 more
Caused by: java.util.concurrent.TimeoutException: Futures timed out after 
[6 milliseconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
at 
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:190)
at scala.concurrent.Await.result(package.scala)
at 
org.apache.flink.runtime.client.JobClient.submitJobDetached(JobClient.java:423)
... 16 more




This message is private and confidential. If you have received this message in 
error, please notify the sender or serviced...@piksel.com and remove it from 
your system.

Piksel Inc is a company registered in the United States, 2100 Powers Ferry Road 
SE, Suite 400, Atlanta, GA 30339


Re: Memory Leak - Flink / RocksDB ?

2017-07-27 Thread Kien Truong

Hello,

This is the system log, not the task manager's application log, which is 
what I was referring to.


If you're using the standalone cluster, then the task manager log should 
be in the logs directory, inside your Flink's installation


Regards,
Kien

On 7/27/2017 3:49 PM, Shashwat Rastogi wrote:

Hi Kien,

Sorry it took me sometime to fetch the logs. I am attaching logs of 
the machine which died due to lack of free memory.





I have set only
`taskmanager.heap.mb: 35840`
taskmanager.numberOfTaskSlots: 8
And the rest are just default properties in the flink-conf.yaml

Thank you in advance.

Regards
Shashwat

On 26-Jul-2017, at 12:10 PM, Kien Truong > wrote:


Hi,

What're your task manager memory configuration ? Can you post the 
TaskManager's log ?


Regards,

Kien


On 7/25/2017 8:41 PM, Shashwat Rastogi wrote:

Hi,

We have several Flink jobs, all of which reads data from Kafka do 
some aggregations (over sliding windows of (1d, 1h)) and writes data 
to Cassandra. Something like :


```
DataStream lines = env.addSource(new FlinkKafkaConsumer010( 
… ));

DataStream events = lines.map(line -> parse(line));
DataStream stats = stream
.keyBy(“id”)
.timeWindow(1d, 1h)
.sum(new MyAggregateFunction());
writeToCassandra(stats);
```

We recently made a switch to RocksDbStateBackend, for it’s 
suitability for large states/long windows. However, after making the 
switch a memory issues has come up, the memory utilisation on 
TaskManager gradually increases from 50 GB to ~63GB until the 
container is killed. We are unable to figure out what is causing 
this behaviour, is there some memory leak on the RocksDB ?


How much memory should we allocate to the Flink TaskManager? Since, 
RocksDB is a native application and it does not use the JVM how much 
of the memory should we allocate/leave for RocksDB (out of 64GB of 
total memory).
Is there a way to set the maximum amount of memory that will be used 
by RocksDB so that it doesn’t overwhelms the system? Are there some 
recommended optimal settings for RocksDB for larger states (for 1 
day window average state size is 3GB).


Any help would be greatly appreciated. I am using Flink v1.2.1.
Thanks in advance.

Best,
Shashwat








Re: Is joined stream WindowedStream?

2017-07-27 Thread Nico Kruber
Hi Wei,
what do you mean be "windowedStream"?

The result of
dataStream.join(otherStream).where().equalTo()
expects a window to be specified. In each window, based on the time and window 
characteristics you defined, both sources will collect elements that fit into 
the window and, at its end, will complete the join operation by calling your 
user function "for each combination of elements with the same key in a window" 
(see JoinedStreams.java). The result is a (non-keyed) data stream again.

Of course, you can always apply additional window operations afterwards if 
that makes sense.


Nico

On Tuesday, 25 July 2017 09:15:31 CEST xie wei wrote:
> Hello Flink,
> 
> 
> 
> one question about join operator:
> 
> Flink Docu said "Join two data streams on a given key and a common
> window",=  does "common window" mean that the joined stream is
> windowedStream?
> 
> If it is normal datastream (not windowed), could the stream be windowed
> aga= in with different window definition as the window specified by join?
> 
> 
> 
> 
> 
> Thank you and best regards
> 
> Wei



signature.asc
Description: This is a digitally signed message part.


Re: write into hdfs using avro

2017-07-27 Thread Rinat
Hi Gordeon, Thx for your reply, already implemented ;)

> On 27 Jul 2017, at 12:57, Tzu-Li (Gordon) Tai  wrote:
> 
> Hi!
> 
> Yes, you can provide a custom writer for the BucketingSink via 
> BucketingSink#setWriter(…).
> The AvroKeyValueSinkWriter is a simple example of a writer that uses Avro for 
> serialization, and takes as input KV 2-tuples.
> If you want to have a writer that takes as input your own event types, AFAIK 
> you’ll need to implement your own Writer.
> 
> Cheers,
> Gordon
> 
> On 21 July 2017 at 7:31:21 PM, Rinat (r.shari...@cleverdata.ru 
> ) wrote:
> 
>> Hi, folks !
>> 
>> I’ve got a little question, I’m trying to save stream of events from Kafka 
>> into HDSF using 
>> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink with AVRO 
>> serialization. 
>> If I properly understood, I should use some implementation of 
>> org.apache.flink.streaming.connectors.fs.Writer for this purposes.
>> 
>> I found an existing implementation of avro writer 
>> org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter, but 
>> my stream contains only value. 
>> What I need to do, if I want to write values from stream using a 
>> BucketingSing in avro format ?
>> 
>> Thx.



Re: write into hdfs using avro

2017-07-27 Thread Tzu-Li (Gordon) Tai
Hi!

Yes, you can provide a custom writer for the BucketingSink via 
BucketingSink#setWriter(…).
The AvroKeyValueSinkWriter is a simple example of a writer that uses Avro for 
serialization, and takes as input KV 2-tuples.
If you want to have a writer that takes as input your own event types, AFAIK 
you’ll need to implement your own Writer.

Cheers,
Gordon

On 21 July 2017 at 7:31:21 PM, Rinat (r.shari...@cleverdata.ru) wrote:

Hi, folks !

I’ve got a little question, I’m trying to save stream of events from Kafka into 
HDSF using org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink 
with AVRO serialization. 
If I properly understood, I should use some implementation of 
org.apache.flink.streaming.connectors.fs.Writer for this purposes.

I found an existing implementation of avro writer 
org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter, but my 
stream contains only value. 
What I need to do, if I want to write values from stream using a BucketingSing 
in avro format ?

Thx.

Re: Memory Leak - Flink / RocksDB ?

2017-07-27 Thread Shashwat Rastogi
Hi Kien,Sorry it took me sometime to fetch the logs. I am attaching logs of the machine which died due to lack of free memory. Jun 26 13:00:01 staging-east-dataplatform-02-c01 systemd: Created slice 
user-0.slice.
Jun 26 13:00:01 staging-east-dataplatform-02-c01 systemd: Starting user-0.slice.
Jun 26 13:00:01 staging-east-dataplatform-02-c01 systemd: Started Session 47284 
of user root.
Jun 26 13:00:01 staging-east-dataplatform-02-c01 systemd: Starting Session 
47284 of user root.
Jun 26 13:00:01 staging-east-dataplatform-02-c01 systemd: Removed slice 
user-0.slice.
Jun 26 13:00:01 staging-east-dataplatform-02-c01 systemd: Stopping user-0.slice.
Jun 26 13:01:01 staging-east-dataplatform-02-c01 systemd: Created slice 
user-0.slice.
Jun 26 13:01:01 staging-east-dataplatform-02-c01 systemd: Starting user-0.slice.
Jun 26 13:01:01 staging-east-dataplatform-02-c01 systemd: Started Session 47285 
of user root.
Jun 26 13:01:01 staging-east-dataplatform-02-c01 systemd: Starting Session 
47285 of user root.
Jun 26 13:01:01 staging-east-dataplatform-02-c01 systemd: Started Session 47286 
of user root.
Jun 26 13:01:01 staging-east-dataplatform-02-c01 systemd: Starting Session 
47286 of user root.
Jun 26 13:01:01 staging-east-dataplatform-02-c01 systemd: Removed slice 
user-0.slice.
Jun 26 13:01:01 staging-east-dataplatform-02-c01 systemd: Stopping user-0.slice.
Jun 26 13:01:01 staging-east-dataplatform-02-c01 newrelic-infra: 
time="2017-06-26T13:01:01Z" level=error msg="can't get sample from 
ProcessSampler" error="open /proc/32131: no such file or directory"
Jun 26 13:02:01 staging-east-dataplatform-02-c01 systemd: Created slice 
user-0.slice.
Jun 26 13:02:01 staging-east-dataplatform-02-c01 systemd: Starting user-0.slice.
Jun 26 13:02:01 staging-east-dataplatform-02-c01 systemd: Started Session 47287 
of user root.
Jun 26 13:02:01 staging-east-dataplatform-02-c01 systemd: Starting Session 
47287 of user root.
Jun 26 13:02:01 staging-east-dataplatform-02-c01 systemd: Removed slice 
user-0.slice.
Jun 26 13:02:01 staging-east-dataplatform-02-c01 systemd: Stopping user-0.slice.
Jun 26 13:03:01 staging-east-dataplatform-02-c01 systemd: Created slice 
user-0.slice.
Jun 26 13:03:01 staging-east-dataplatform-02-c01 systemd: Starting user-0.slice.
Jun 26 13:03:01 staging-east-dataplatform-02-c01 systemd: Started Session 47288 
of user root.
Jun 26 13:03:01 staging-east-dataplatform-02-c01 systemd: Starting Session 
47288 of user root.
Jun 26 13:03:01 staging-east-dataplatform-02-c01 systemd: Removed slice 
user-0.slice.
Jun 26 13:03:01 staging-east-dataplatform-02-c01 systemd: Stopping user-0.slice.
Jun 26 13:04:01 staging-east-dataplatform-02-c01 systemd: Created slice 
user-0.slice.
Jun 26 13:04:01 staging-east-dataplatform-02-c01 systemd: Starting user-0.slice.
Jun 26 13:04:01 staging-east-dataplatform-02-c01 systemd: Started Session 47289 
of user root.
Jun 26 13:04:01 staging-east-dataplatform-02-c01 systemd: Starting Session 
47289 of user root.
Jun 26 13:04:02 staging-east-dataplatform-02-c01 systemd: Removed slice 
user-0.slice.
Jun 26 13:04:02 staging-east-dataplatform-02-c01 systemd: Stopping user-0.slice.
Jun 26 13:05:01 staging-east-dataplatform-02-c01 systemd: Created slice 
user-0.slice.
Jun 26 13:05:01 staging-east-dataplatform-02-c01 systemd: Starting user-0.slice.
Jun 26 13:05:01 staging-east-dataplatform-02-c01 systemd: Started Session 47290 
of user root.
Jun 26 13:05:01 staging-east-dataplatform-02-c01 systemd: Starting Session 
47290 of user root.
Jun 26 13:05:01 staging-east-dataplatform-02-c01 systemd: Removed slice 
user-0.slice.
Jun 26 13:05:01 staging-east-dataplatform-02-c01 systemd: Stopping user-0.slice.
Jun 26 13:06:01 staging-east-dataplatform-02-c01 systemd: Created slice 
user-0.slice.
Jun 26 13:06:01 staging-east-dataplatform-02-c01 systemd: Starting user-0.slice.
Jun 26 13:06:01 staging-east-dataplatform-02-c01 systemd: Started Session 47291 
of user root.
Jun 26 13:06:01 staging-east-dataplatform-02-c01 systemd: Starting Session 
47291 of user root.
Jun 26 13:06:01 staging-east-dataplatform-02-c01 systemd: Removed slice 
user-0.slice.
Jun 26 13:06:01 staging-east-dataplatform-02-c01 systemd: Stopping user-0.slice.
Jun 26 13:07:01 staging-east-dataplatform-02-c01 systemd: Created slice 
user-0.slice.
Jun 26 13:07:01 staging-east-dataplatform-02-c01 systemd: Starting user-0.slice.
Jun 26 13:07:01 staging-east-dataplatform-02-c01 systemd: Started Session 47292 
of user root.
Jun 26 13:07:01 staging-east-dataplatform-02-c01 systemd: Starting Session 
47292 of user root.
Jun 26 13:07:01 staging-east-dataplatform-02-c01 systemd: Removed slice 
user-0.slice.
Jun 26 13:07:01 staging-east-dataplatform-02-c01 systemd: Stopping user-0.slice.
Jun 26 13:08:01 staging-east-dataplatform-02-c01 systemd: Created slice 
user-0.slice.
Jun 26 13:08:01 staging-east-dataplatform-02-c01 systemd: Starting user-0.slice.
Jun 26 13:08:01 staging-east-dataplatform-02-c01 systemd: Started Session 47293 
of user root.

Re: Questions

2017-07-27 Thread Fabian Hueske
Hi Egor,

There is the Row type which is not strongly typed (such as TupleX) but
supports arbitrary number of fields and null-valued fields.

The DataSet API does not have a split operator and implementing this would
be much more difficult than one would expect. The problem is in the
optimizer which assumes that all outputs of an operator receive the same
data. So we would have to change the plan enumeration logic.
However, there is a workaround for this. I would convert the String into an
Either (Flink features a Java Either type), emit the
dataset to two filters and the first filters on Either.isLeft and the
second on Either.isRight (or you you use a FlatMap to directly extract it
from the Either:

DataSet input = ...
DataSet parsed = input.map(// string -> either);
DataSet doubles = parsed.flatMap(// if Either.isLeft ->
Either.left);
DataSet failed = parsed.flatMap(// Either.isRight -> Either.right);

Best, Fabian


2017-07-27 8:46 GMT+02:00 Егор Литвиненко :

> Hi
>
> Is there a way to process mapping errors in Flink?
> For example when string is valid double write in one table, otherwise in
> another?
> If not, what problems you see reffered to this opportunity and if I will
> make PR, where I should start to implenent this feature?
>
> I saw Tuple1, 2, etc. Many methods for different tuples to define types of
> DataSet.
> But I don't see Tuple with custom size. I mean something like new
> Tuple(List types)
> Did I miss something?
>
> In best regards, Egor Litvinenko
>


Re: Distributed reading and parsing of protobuf files from S3 in Apache Flink

2017-07-27 Thread Fabian Hueske
Hi,

it depends on the file format whether a file can be read in parallel or
not. Basically, you have to be able to identify valid offsets from which
you can start reading.
There are a few techniques like fixed sized blocks with padding or a footer
section with split offsets, but if the file is already written and does not
offer these features, there is no way to read it in parallel.

To read a file without splitting it, you can implement a custom
FileInputFormat and set the "unsplittable" member field to "true".
This will create one input split for each file. In nextRecord(), you could
parse the file record by record.

Hope this helps,
Fabian

2017-07-26 20:47 GMT+02:00 ShB :

> I'm working with Apache Flink on reading, parsing and processing data from
> S3. I'm using the DataSet API, as my data is bounded and doesn't need
> streaming semantics.
>
> My data is on S3 in binary protobuf format in the form of a large number of
> timestamped files. Each of these files have to be read, parsed(using
> parseDelimiedFrom
>  reference/java/com/google/protobuf/Parser#parseDelimitedFrom-java.io.
> InputStream->
> ) into their custom protobuf java classes and then processed.
>
> I’m currently using the aws-java-sdk to read these files as I couldn’t
> figure out how to read binary protobufs via Flink semantics(env.readFile).
> But I'm getting OOM errors as the number/size of files is too large.
>
> So I'm looking to do distributed/parallel reading and parsing of the files
> in Flink. How can these custom binary files be read from s3 using the Flink
> Dataset API(like env.readFile)? How can these custom binary files be read
> from s3 in a distributed manner?
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Distributed-
> reading-and-parsing-of-protobuf-files-from-S3-in-Apache-Flink-tp14480.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Re: Class not found when deserializing

2017-07-27 Thread Fabian Hueske
Hi Paolo,

do you get the ClassNotFoundException for TheGlobalModel or for another
class?
Did you maybe forget to include SerializationUtils in the classpath?

Best, Fabian

2017-07-26 16:14 GMT+02:00 Paolo Cristofanelli <
cristofanelli.pa...@gmail.com>:

> Hi,
>
> I am trying to write and read in a Kafka topic a user-defined class (that
> implements serializable, and all the fields are serializable).
> Everything works fine when I am executing the program in the IDE or with
> the mvn exec command.
> When I try to execute the program in standalone mode I get the
> ClassNotFoundException.
>
> More specifically I get the exception only during the deserialization
> parts :
>
> @Override
>>
>> public TheGlobalModel deserialize(byte[] message) throws IOException {
>>
>> outlierDetection.network.TheGlobalModel model;
>>
>>
>>>model = (outlierDetection.network.TheGlobalModel) SerializationUtils.
>>> deserialize(message);
>>
>>
>>
>> return model;
>>
>> }
>>
>>
> It seems that the problem lies in the deserialize method. If I remove it
> and simply return "new TheGlobalModel()" the exception is not thrown. I
> don´t understand why in this case the program seems to be aware of the
> existence of the class, I guess the problem is in the deserialize function.
>
> I only know this method for sending a serializable class through Kafka, I
> would be glad to hear other ways.
>
> Thanks in advance for your time.
> Cheers
> Paolo
>


Re: CEP condition expression and its event consuming strategy

2017-07-27 Thread Dawid Wysakowicz
Hi Chao,

Ad. 1 You could implement it with IterativeCondition. Sth like this:

Pattern pattern = Pattern.begin("first").where(new 
SimpleCondition() {
   @Override
   public boolean filter(Event value) throws Exception {
  return value.equals("A") || value.equals("B");
   }
}).followedBy("second").where(new IterativeCondition() {
   @Override
   public boolean filter(Event value, Context ctx) throws Exception {
  return (value.equals("A") || value.equals("B")) && 
!value.equals(ctx.getEventsForPattern("first"));
   }
}).

Ad. 2 Unfortunately right now as you said Pattern restarts each other event and 
it is not possible to change that strategy. There is ongoing work to introduce 
AfterMatchSkipStrategy[1], but at best it will be merged in 1.4.0. I did not 
give it much thought, but I would try implement some discarding logic.

Regards,
Dawid

[1] https://issues.apache.org/jira/browse/FLINK-7169

> On 26 Jul 2017, at 22:45, Chao Wang  wrote:
> 
> Hi,
> 
> I have two questions regarding the use of the Flink CEP library 
> (flink-cep_2.11:1.3.1), as follows:
> 
> 1. I'd like to know how to use the API to express "emit event C in the 
> presence of events A and B, with no restriction on the arriving order of A 
> and B"? I've tried by creating two patterns, one for "A and then B" and the 
> other for "B and then A", and consequently using two patternStreams to handle 
> each case, which emits C. It worked but to me this approach seems redundant.
> 
> 2. Given the above objective expression, how to consume the accepted events 
> so that they will not be used for future matchings? For example, with the 
> arriving sequence {A, B, A}, the CEP should only emit one C (due to the 
> matching of {A,B}), not two Cs (due to {A,B} and {B,A}). Similarly, with the 
> arriving sequence {B, A, B, A}, the CPE should only emit two Cs, not three.
> 
> 
> Thanks,
> 
> Chao
> 



signature.asc
Description: Message signed with OpenPGP


Questions

2017-07-27 Thread Егор Литвиненко
Hi

Is there a way to process mapping errors in Flink?
For example when string is valid double write in one table, otherwise in
another?
If not, what problems you see reffered to this opportunity and if I will
make PR, where I should start to implenent this feature?

I saw Tuple1, 2, etc. Many methods for different tuples to define types of
DataSet.
But I don't see Tuple with custom size. I mean something like new
Tuple(List types)
Did I miss something?

In best regards, Egor Litvinenko