Re: Kinesis Connector

2016-01-08 Thread Tzu-Li (Gordon) Tai
Hi Giancarlo,

Since it has been a while since the last post and there hasn't been a JIRA
ticket opened for Kinesis connector yet, I'm wondering how you are doing on
the Kinesis connector and hope to help out with this feature :)

I've opened a JIRA (https://issues.apache.org/jira/browse/FLINK-3211),
finished the Kinesis sink, and half way through the Kinesis consumer. Would
you like to merge our current efforts so that we can complete this feature
ASAP for the AWS user community?

Thankfully,
Gordon



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-tp2872p4206.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Checkpoint for exact-once proccessing

2016-01-13 Thread Tzu-Li (Gordon) Tai
Hi Francis,

A part of every complete snapshot is the record positions associated with
the barrier that triggered the checkpointing of this snapshot. The snapshot
is completed only when all the records within the checkpoint reaches the
sink. When a topology fails, all the operators' state will fall back to the
latest complete snapshot (incomplete snapshots will be ignored). The data
source will also fall back to the position recorded with this snapshot, so
even if there are repeatedly read data records after the restore, the
restored operator's state are also clean of the records effect. This way,
Flink guarantees exactly-once effects of each record on every operator's
state. The user functions in operators need not to be implemented
idempotent.

Hope this helps answer your question!

Cheers,
Gordon



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpoint-for-exact-once-proccessing-tp4261p4264.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: S3 as streaming source

2016-06-03 Thread Tzu-Li (Gordon) Tai
Hi Soumya,

No, currently there is no Flink standard supported S3 streaming source. As
far as I know, there isn't one out in the public yet either. The community
is open to submissions for new connectors, so if you happen to be working on
one for S3, you can file up a JIRA to let us know.

Also, are you looking for a S3 streaming source that fetches S3 event
notifications (ref:
http://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html), or
streaming files / objects from S3 for a data stream program? I assume the
first one, since otherwise writing Flink batch jobs will suit you more (the
batch DataSet API already supports this).



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/S3-as-streaming-source-tp7357p7358.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: API request to submit job takes over 1hr

2016-06-13 Thread Tzu-Li (Gordon) Tai
Hi Shannon,

Thanks for your investigation on the issue and the JIRA. There's actually a
previous JIRA on this problem already:
https://issues.apache.org/jira/browse/FLINK-4023. Would you be ok with
tracking this issue on FLINK-4023, and close FLINK-4069 as a duplicate
issue? As you can see, I've also referenced a link to FLINK-4069 on
FLINK-4023 for your additional info on the problem.

A little help with answering your last questions:
1. We're doing the partition distribution across consumers ourselves: the
Kafka consumer connector creates a Kafka client on subtasks, and each
subtask independently determines which partitions it should be in charge of.
There's also information on this blog here for more info:
http://data-artisans.com/kafka-flink-a-practical-how-to/, on the last FAQ
section. As Robert has mentioned, the consumer is currently depending on the
fixed ordered list of partitions sent to all subtasks so that each of them
always determine the same set of partitions to fetch from across restarts.
2. Following the above description, currently the consumer is only
subscribing to the fixed partition list queried in the constructor. So at
the moment the Flink Kafka consumer doesn't handle repartitioning of topics,
but it's definitely on the todo list for the Kafka connector and won't be
too hard to implement once querying in the consumer is resolved (perhaps
Robert can clarify this a bit more).

Best,
Gordon



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/API-request-to-submit-job-takes-over-1hr-tp7319p7558.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Flink with Yarn

2016-01-11 Thread Tzu-Li (Gordon) Tai
Hi Sourav,

A little help with more clarification on your last comment.

In sense of "where" the driver program is executed, then yes the Flink
driver program runs in a mode similar to Spark's YARN-client.

However, the "role" of the driver program and the work that it is
responsible of is quite different between Flink and Spark. In Spark, the
driver program is in charge of coordinating Spark workers (executors) and
must listen for and accept incoming connections from the workers throughout
the job's lifetime. Therefore, in Spark's YARN-client mode, you must keep
the driver program process alive otherwise the job will be shutdown.

However, in Flink, the coordination of Flink TaskManagers to complete a job
is handled by Flink's JobManager once the client at the driver program
submits the job to the JobManager. The driver program is solely used for the
job submission and can disconnect afterwards. 

Like what Stephan explained, if the user-defined dataflow defines any
intermediate results to be retrieved via collect() or print(), the results
are transmitted through the JobManager. Only then does the driver program
need to stay connected. Note that this connection still does not need to
have any connections with the workers (Flink TaskManagers), only the
JobManager.

Cheers,
Gordon



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-with-Yarn-tp4224p4227.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Flink DataStream and KeyBy

2016-01-13 Thread Tzu-Li (Gordon) Tai
Hi Saiph,

In Flink, the key for keyBy() can be provided in different ways:
https://ci.apache.org/projects/flink/flink-docs-master/apis/programming_guide.html#specifying-keys
(the doc is for DataSet API, but specifying keys is basically the same for
DataStream and DataSet).

As described in the documentation, calls like keyBy(0) are meant for Tuples,
so it only works for DataStream[Tuple]. Other key definition types like
keyBy(new KeySelector() {...}) can basically take any DataStream of
arbitrary data type. Flink finds out whether or not there is a conflict
between the type of the data in the DataStream and the way the key is
defined at runtime.

Hope this helps!

Cheers,
Gordon





--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-DataStream-and-KeyBy-tp4271p4272.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Local Cluster have problem with connect to elasticsearch

2016-05-11 Thread Tzu-Li (Gordon) Tai
Hi Rafal,

>From your description, it seems like Flink is complaining because it cannot
access the Elasticsearch API related dependencies as well. You'd also have
to include the following into your Maven build, under :


org.elasticsearch
elasticsearch
2.3.2
jar
false
${project.build.directory}/classes
org/elasticsearch/**


Now your built jar should correctly include all required dependencies (the
connector & Elasticsearch API).

As explained in  Linking with modules not contained in the binary
distribution

 
, it will be enough to package dependencies along with your code for Flink
to access all required dependencies, and you wouldn't need to copy the jar
to the lib folder. I would recommend to clean up the lib folder of the
previous jars you copied, and follow this approach in the future, just in
case they mess up the classloader.

As with your first attempt that Flink cannot find any Elasticsearch nodes
when executed in the IDE, I'm suspecting the reason is that the
elasticsearch2 connector by default uses version 2.2.1, lower than your
cluster version 2.3.2. I had previous experience when Elasticsearch
strangely complains not finding any nodes when using lower client versions
than the deployment. Can you try compiling the elasticsearch2 connector with
the option -Delasticsearch.version=2.3.2, and use the newly build connector
jar, following the same method mentioned above?

Hope this helps!

Cheers,
Gordon



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Local-Cluster-have-problem-with-connect-to-elasticsearch-tp6788p6838.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: How to get latest offsets with FlinkKafkaConsumer

2016-08-05 Thread Tzu-Li (Gordon) Tai
Hi,

Please also note that the “auto.offset.reset” property is only respected
when there is no offsets under the same consumer group in ZK. So,
currently, in order to make sure you read from the latest / earliest
offsets every time you restart your Flink application, you’d have to use an
unique groupId on each restart.

We’re currently working on new configuration for the Kafka consumer to
explicitly configure the starting offset / position without respecting
existing offsets in ZK. You can follow the corresponding JIRA here:
https://issues.apache.org/jira/browse/FLINK-4280.

Regards,
Gordon

On August 5, 2016 at 8:47:32 PM, Stefan Richter (s.rich...@data-artisans.com)
wrote:

Sorry, I think you are actually asking for the largest offset in the Kafka
source, which makes it setProperty("auto.offset.reset", "largest").

Am 05.08.2016 um 14:44 schrieb Stefan Richter :

Hi,

I think passing properties with setProperty("auto.offset.reset",
"smallest“) to the Kafka consumer should do what you want.

Best,
Stefan


Am 05.08.2016 um 14:36 schrieb Mao, Wei :

I am doing some performance tests with Flink (1.0.3 )+ Kafka (0.8.2.2). And
I noticed that when I restarted my Flink application, it reads records
starting from the latest offset that I consumed last time, but not the
latest offsets of that topic in Kafka.

So Is there any way to make it read from last offsets of broker/MyTopic
instead of consumer/MyTopic in Flink?

Thanks,
William


Re: Support for Auto scaling

2017-02-01 Thread Tzu-Li (Gordon) Tai
Hi Sandeep!

While auto scaling jobs in Flink still isn’t possible, in Flink 1.2 you will be 
able to rescale jobs by stopping and restarting.
This works by taking a savepoint of the job before stopping the job, and then 
redeploy the job with a higher / lower parallelism using the savepoint.
Upon restarting the job, your states will be redistributed across the new 
operators.

Changing operator / job parallelism on the fly while running is still on the 
future roadmap.

Cheers,
Gordon

On February 2, 2017 at 8:39:39 AM, Meghashyam Sandeep V 
(vr1meghash...@gmail.com) wrote:

Hi Guys,

I currently run flink 1.1.4 streaming jobs in EMR in AWS with yarn. I 
understand that EMR supports auto scaling but Flink doesn't. Is there a plan 
for this support in 1.2. 

Thanks,
Sandeep

Re: Fink: KafkaProducer Data Loss

2017-02-02 Thread Tzu-Li (Gordon) Tai
Hi Ninad and Till,

Thank you for looking into the issue! This is actually a bug.

Till’s suggestion is correct:
The producer holds a `pendingRecords` value that is incremented on each 
invoke() and decremented on each callback, used to check if the producer needs 
to sync on pending callbacks on checkpoints.
On each checkpoint, we should only consider the checkpoint succeeded iff after 
flushing the `pendingRecords == 0` and `asyncException == null` (currently, 
we’re only checking `pendingRecords`).

A quick fix for this is to check and rethrow async exceptions in the 
`snapshotState` method both before and after flushing and `pendingRecords` 
becomes 0.
I’ve filed a JIRA for this: https://issues.apache.org/jira/browse/FLINK-5701.

Cheers,
Gordon

On February 3, 2017 at 6:05:23 AM, Till Rohrmann (trohrm...@apache.org) wrote:

Hi Ninad,

thanks for reporting the issue. For me it looks also as if exceptions might go 
under certain circumstances unnoticed. So for example you have a write 
operation which fails this will set the asyncException field which is not 
checked before the next invoke call happens. If now a checkpoint operation 
happens, it will pass and mark all messages up to this point as being 
successfully processed. Only after the checkpoint, the producer will fail. And 
this constitutes a data loss imho.

I've looped Robert and Gordon into the conversation which are more familiar 
with the Kafka producer. Maybe they can answer your and my questions.

Cheers,
Till

On Thu, Feb 2, 2017 at 9:58 PM, ninad  wrote:
Our Flink streaming workflow publishes messages to Kafka. KafkaProducer's
'retry' mechanism doesn't kick in until a message is added to it's internal
buffer.

If there's an exception before that, KafkaProducer will throw that
exception, and seems like Flink isn't handling that. In this case there will
be a data loss.

Related Flink code (FlinkKafkaProducerBase):

if (logFailuresOnly) {
            callback = new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception
e) {
                    if (e != null) {
                        LOG.error("Error while sending record to Kafka: " +
e.getMessage(), e);
                    }
                    acknowledgeMessage();
                }
            };
        }
        else {
            callback = new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception
exception) {
                    if (exception != null && asyncException == null) {
                        asyncException = exception;
                    }
                    acknowledgeMessage();
                }
            };
        }

Here are the scenario's we've identified that will cause data loss:

All kafka brokers are down.

In this case, before appending a message to it's buffer, KafkaProducer tries
to fetch metadata. If the KafkaProducer isn't able to fetch the metadata in
configured timeout, it throws an exception.
-Memory records not writable (Existing bug in kafka 0.9.0.1 library)
https://issues.apache.org/jira/browse/KAFKA-3594

In both the above cases, KafkaProducer won't retry, and Flink will ignore
the messages. the messages aren't even logged. The exception is, but not the
messages which failed.

Possible workarounds (Kafka settings):

A very high value for metadata timeout (metadata.fetch.timeout.ms)
A very high value for buffer expiry (request.timeout.ms)
We're still investigating the possible side effects of changing the above
kafka settings.

So, is our understanding correct? Or is there a way we can avoid this data
loss by modifying some Flink settings?

Thanks.



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-tp11413.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.



Re: Apache Flink and Elasticsearch send Json Object instead of string

2017-02-22 Thread Tzu-Li (Gordon) Tai
Hi,

The Flink Elasticsearch Sink uses the Elasticsearch Java client to send the 
indexing requests, so whatever the client supports, it will be achievable 
through the `ElasticsearchSinkFunction` also.

From a quick check at the Elasticsearch Javadocs, I think you can also just set 
the document json as a String in the created `IndexRequest`. So,

public IndexRequest createIndexRequest(String element){

                    HashMap<String, Object> esJson = new HashMap<>();
                    
                    esJson.put("data", element);
                    
                    return Requests
                            .indexRequest()
                            .index("logs")
                            .type("object")
                            .source(esJson);
                }

Here, if `element` is already a Json string representing the document, you can 
just do 

return Requests
    .indexRequest()
    .index(“logs”)
    .type(“object”)
    .source(“the Json String”);

The `.source(…)` method has quite a few variants on how to set the source, and 
providing a Map is only one of them.
Please refer to the Elasticsearch Javadocs for the full list 
(https://www.javadoc.io/doc/org.elasticsearch/elasticsearch/5.2.1).

Hope this helps!

Cheers,
Gordon
On February 21, 2017 at 5:43:36 PM, Fábio Dias (fabiodio...@gmail.com) wrote:

Hi, 
thanks for the reply.

There isn't other way to do that?
Using REST you can send json like this : 

curl -XPOST 'localhost:9200/customer/external?pretty' -H 'Content-Type: 
application/json' -d'
{
 "name": "Jane Doe"
}
'

In my case I have json like this: 

{
      "filters" : {
                        "id" : 1,
                        "name": "abc"
                    }
}

how can I treat this cases? There isn't a way to send all the json element and 
index it like the in the REST request?

Thanks.

Tzu-Li (Gordon) Tai <tzuli...@apache.org> escreveu no dia terça, 21/02/2017 às 
07:54:
Hi,

I’ll use your code to explain.

public IndexRequest createIndexRequest(String element){

                    HashMap<String, Object> esJson = new HashMap<>();
                    
                    esJson.put("data", element);
What you should do here is parse the field values from `element`, and simply 
treat them as key-value pairs of the `esJson` map.

So, the `esJson` should be prepared by doing:

esJson.put(“id”, 6);

esJson.put(“name”, “A green door”);

esJson.put(“price”, 12.5);

etc.



Cheers,

Gordon



On February 21, 2017 at 12:41:40 AM, Fábio Dias (fabiodio...@gmail.com) wrote:

Hi,

I'm using Flink and Elasticsearch and I want to recieve in elasticsearch a json 
object ({"id":1, "name":"X"} ect...), I already have a string with this 
information, but I don't want to save it as string.

I recieve this:

{
  "_index": "logs",
  "_type": "object",
  "_id": "AVpcARfkfYWqSubr0ZvK",
  "_score": 1,
  "_source": {
    "data": "{\"id\":6,\"name\":\"A green 
door\",\"price\":12.5,\"tags\":[\"home\",\"green\"]}"
  }
}

And I want to recieve this:

{
"_index": "logs",
"_type": "external",
"_id": "AVpcARfkfYWqSubr0ZvK",
"_score": 1,
"_source": {
"data": {
"id":6,
"name":"A green door",
"price":12.5,
"tags":
["home","green"]
}
}
}

my java code:

try {
            ArrayList transports = new ArrayList<>();
            transports.add(new InetSocketAddress("127.0.0.1", 9300));

            ElasticsearchSinkFunction indexLog = new 
ElasticsearchSinkFunction() {

private static final long serialVersionUID = 8802869701292023100L;

public IndexRequest createIndexRequest(String element){

                    HashMap<String, Object> esJson = new HashMap<>();
                    
                    esJson.put("data", element);
                    
                    
                    
                    return Requests
                            .indexRequest()
                            .index("logs")
                            .type("object")
                            .source(esJson);
                }
@Override
                public void process(String element, RuntimeContext ctx, 
RequestIndexer indexer) {
                    indexer.add(createIndexRequest(element));
                }
            };

            ElasticsearchSink esSink = new 
ElasticsearchSink(config, transports, indexLog);
            input.addSink(esSink);
        } 
        catch (Exception e) {
            System.out.println(e);
        }


Do I need to treat every entry as a map? Can I just send a object with key 
value?

Thanks.

Re: Serialization schema

2017-02-23 Thread Tzu-Li (Gordon) Tai
Hi Mohit,

As 刘彪 pointed out in his reply, the problem is that your `Tuple2Serializer` 
contains fields that are not serializable, so `Tuple2Serializer` itself is not 
serializable.
Could you perhaps share your `Tuple2Serializer` implementation with us so we 
can pinpoint the problem?

A snippet of the class fields and constructor will do, so you don’t have to 
provide the whole `serialize` / `deserialize` implementation if you don’t want 
to.

Cheers,
Gordon


On February 24, 2017 at 11:04:34 AM, Mohit Anchlia (mohitanch...@gmail.com) 
wrote:

I am using String inside to convert into bytes.

On Thu, Feb 23, 2017 at 6:50 PM, 刘彪  wrote:
Hi Mohit
As you did not give the whole codes of Tuple2Serializerr. I guess the reason is 
some fields of Tuple2Serializerr do not implement Serializable.

2017-02-24 9:07 GMT+08:00 Mohit Anchlia :
I wrote a key serialization class to write to kafka however I am getting this 
error. Not sure why as I've already implemented the interfaces.

Caused by: java.io.NotSerializableException: 
com.sy.flink.test.Tuple2Serializerr$1
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
    at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)

And the class implements the following:

public class Tuple2Serializerr implements

DeserializationSchema>,

SerializationSchema> {

And called like this:



FlinkKafkaProducer010> myProducer = new 
FlinkKafkaProducer010>(

"10.22.4.15:9092", // broker list

"my-topic", // target topic

new Tuple2Serializerr()); // serialization schema









Re: Flink the right tool for the job ? Huge Data window lateness

2017-02-24 Thread Tzu-Li (Gordon) Tai
Hi Patrick,

Thanks a lot for feedback on your use case! At a first glance, I would say that 
Flink can definitely solve the issues you are evaluating.

I’ll try to explain them, and point you to some docs / articles that can 
further explain in detail:

- Lateness

The 7-day lateness shouldn’t be a problem. We definitely recommend
using RocksDB as the state backend for such a use case, as you
mentioned correctly, the state would be kept for a long time.
The heavy burst when your locally buffered data on machines are
sent to Kafka once they come back online shouldn’t be a problem either;
since Flink is a pure data streaming engine, it handles backpressure
naturally without any additional mechanisms (I would recommend
taking a look at http://data-artisans.com/how-flink-handles-backpressure/).

- Out of Order

That’s exactly what event time processing is for :-) As long as the event
comes in before the allowed lateness for windows, the event will still fall
into its corresponding event time window. So, even with the heavy burst of
the your late machine data, they will still be aggregated in the correct 
windows.
You can look into event time in Flink with more detail in the event time docs:
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/event_time.html

- Last write wins

Your operators that does the aggregations simply need to be able to reprocess
results if it sees an event with the same id come in. Now, if results are sent 
out
of Flink and stored in an external db, if you can design the db writes to be 
idempotent,
then it’ll effectively be a “last write wins”. It depends mostly on your 
pipeline and
use case.
- Computations per minute

I think you can simply do this by having two separate window operators.
One that works on your longer window, and another on a per-minute basis.
Hope this helps!

- Gordon


On February 24, 2017 at 10:49:14 PM, Patrick Brunmayr (j...@kpibench.com) wrote:

Hello

I've done my first steps with Flink and i am very impressed of its 
capabilities. Thank you for that :) I want to use it for a project we are 
currently working on. After reading some documentation
i am not sure if it's the right tool for the job. We have an IoT application in 
which we are monitoring machines in production plants. The machines have 
sensors attached and they are sending
their data to a broker ( Kafka, Azure Iot Hub ) currently on a per minute basis.

Following requirements must be fulfilled

Lateness

We have to allow lateness for 7 days because machines can have down time due 
network issues, maintenance or something else. If thats the case buffering of 
data happens localy on the machine and once they
are online again all data will be sent to the broker. This can result in some 
relly heavy burst.


Out of order

Events come out of order due this lateness issues


Last write wins

Machines are not stateful and can not guarantee exactly once sending of their 
data. It can happen that sometimes events are sent twice. In that case the last 
event wins and should override the previous one.
Events are unique due a sensor_id and a timestamp

Computations per minute

We can not wait until the windows ends and have to do computations on a per 
minute basis. For example aggregating data per sensor and writing it to a db

My biggest concern in that case is the huge lateness. Keeping data for 7 days 
would result in 10080 data points for just one sensor! Multiplying that by 
10.000 sensors would result in 10080 datapoints which Flink
would have to handle in its state. The number of sensors are constantly growing 
so will the number of data points

So my questions are

Is Flink the right tool for the Job ?

Is that lateness an issue ?

How can i implement the Last write wins ?

How to tune flink to handle that growing load of sensors and data points ?

Hardware requirements, storage and memory size ?


I don't want to maintain two code base for batch and streaming because the 
operations are all equal. The only difference is the time range! Thats the 
reason i wanted to do all this with Flink Streaming.

Hope you can guide me in the right direction

Thx









Re: kinesis producer setCustomPartitioner use stream's own data

2017-02-20 Thread Tzu-Li (Gordon) Tai
Hi Sathi,

The `getPartitionId` method is invoked with each record from the stream. In 
there, you can extract values / fields from the record, and use that to 
determine the target partition id.

Is this what you had in mind?

Cheers,
Gordon

On February 21, 2017 at 11:54:21 AM, Sathi Chowdhury 
(sathi.chowdh...@elliemae.com) wrote:

Hi flink users and experts,

 

In my flink processor I am trying to use Flink Kinesis connector . I read from 
a kinesis stream , and After the transformation (for which I use 
RichCoFlatMapFunction), json event needs to sink to a kinesis stream k1.

DataStream myStream = see.addSource(new 
FlinkKinesisConsumer<>(inputStream, new MyDeserializationSchema(), 
consumerConfig));
 

 

For setting up the producer including partitioning I want to use 
setCustompartitioner , but the problem is that I don’t know how to access a 
parameters inside myStream , I have multiple fields that I want to extract from 
the stream  right there in the main method and use them in deciding the 
partition key. is possible to choose a partition key that is prepared from the 
stream ? if so can you please share an example.

 

 


kinesis.setCustomPartitioner(new KinesisPartitioner() {
    @Override
    public String getPartitionId(String element) {
    int l = element.length();   /// here I want to bring values extracted 
from the stream
    return element.substring(l - 1, l);
    }
});

 

Thanks

Sathi

=Notice to Recipient: This e-mail transmission, and any documents, 
files or previous e-mail messages attached to it may contain information that 
is confidential or legally privileged, and intended for the use of the 
individual or entity named above. If you are not the intended recipient, or a 
person responsible for delivering it to the intended recipient, you are hereby 
notified that you must not read this transmission and that any disclosure, 
copying, printing, distribution or use of any of the information contained in 
or attached to this transmission is STRICTLY PROHIBITED. If you have received 
this transmission in error, please immediately notify the sender by telephone 
or return e-mail and delete the original transmission and its attachments 
without reading or saving in any manner. Thank you. =

Re: A way to control redistribution of operator state?

2017-02-13 Thread Tzu-Li (Gordon) Tai
Hi Dmitry,

Technically, from the looks of the internal code around 
`OperatorStateRepartitioner`, I think it is certainly possible to be pluggable.
Right now it is just hard coded to use a round-robin repartitioner 
implementation as default.

However, I’m not sure of the plans in exposing this to the user and making it 
configurable.
Looping in Stefan (in cc) who mostly worked on this part and see if he can 
provide more info.

- Gordon

On February 14, 2017 at 2:30:27 AM, Dmitry Golubets (dgolub...@gmail.com) wrote:

Hi,

It looks impossible to implement a keyed state with operator state now.

I know it sounds like "just use a keyed state", but latter requires updating it 
on every value change as opposed to operator state and thus can be expensive 
(especially if you have to deal with mutable structures inside which have to be 
serialized).

The problem is that there is no way to tell Flink how to reassign savepoint 
parts between partitions, and thus impossible to route data to correct 
partitions.

Is there anything I missed or maybe a plan to implement it in future?

Best regards,
Dmitry

Re: #kafka/#ssl: how to enable ssl authentication for a new kafka consumer?

2017-02-14 Thread Tzu-Li (Gordon) Tai
Hi Alex,

Kafka authentication and data transfer encryption using SSL can be simply
done be configuring brokers and the connecting client.

You can take a look at this:
https://kafka.apache.org/documentation/#security_ssl.

The Kafka client that the Flink connector uses can be configured through the
`Properties` configuration provided when instantiating `FlinkKafkaConsumer`.
You just need to set values for these config properties:
https://kafka.apache.org/documentation/#security_configclients.

Note that SSL truststore / keystore locations must exist on all of your
Flink TMs for this to work.

Hope this helps!

- Gordon



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/kafka-ssl-how-to-enable-ssl-authentication-for-a-new-kafka-consumer-tp11532p11610.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Start streaming tuples depending on another streams rate

2017-02-09 Thread Tzu-Li (Gordon) Tai
Hi Jonas,

A few things to clarify first:

Stream A has a rate of 100k tuples/s. After processing the whole Kafka queue, 
the rate drops to 10 tuples/s.

From this description it seems like the job is re-reading from the beginning 
from the topic, and once you reach the latest record at the head of the queue, 
you start getting the normal input rate again, correct?

What I now want is that while tuples from A are being processed in flatMap1, 
the stream B in flatMap2 should wait until the rate of the A stream has dropped 
and only then, be flatMap2 should be called.

So what you are looking for is that flatMap2 for stream B only doing work after 
the job reaches the latest record in stream A?

If that’s the case, I would not rely on determining a drop on the threshold 
rate value. It isn’t reliable because it’s dependent on stream A’s actual input 
rate, which naturally as a stream changes over time.

I’m not sure if it’s the best solution, but this is what I have in mind:
You could perhaps insert a special marker event into stream A every time you 
start running this job.
Your job can have an operator before your co-flatMap operator that expects this 
special marker, and when it receives it (which is when the head of stream A is 
reached),  broadcasts a special event to the co-flatMap for flatMap2 to be 
processed.
Then, once flatMap2 is invoked with the special event, you can toggle logic in 
flatMap2 to actually start doing stuff.

Cheers,
Gordon
On February 9, 2017 at 8:09:33 PM, Jonas (jo...@huntun.de) wrote:

Hi! I have a job that uses a RichCoFlatMapFunction of two streams: A and B.
A
.connect(B)
.keyBy(_.id, _.id)
.flatMap(new MyOp)
In MyOp, the A stream tuples are combined to form a state using a 
ValueStateDescriptor. Stream A is usually started from the beginning of a Kafka 
topic. Stream A has a rate of 100k tuples/s. After processing the whole Kafka 
queue, the rate drops to 10 tuples/s. A big drop. What I now want is that while 
tuples from A are being processed in flatMap1, the stream B in flatMap2 should 
wait until the rate of the A stream has dropped and only then, be flatMap2 
should be called. Ideally, this behaviour would be captured in a separate 
operator, like RateBasedStreamValve or something like that :) To solve this, my 
idea is to add a counter/timer in the RichCoFlatMapFunction that counts how 
many tuples have been processed from A. If the rate drops below a threshold 
(here maybe 15 tuples/s), flatMap2 that proesses tuples from B empties the 
buffer. However, this would make my RichCoFlatMapFunction much bigger and would 
not allow for operator reuse in other scenarios. I'm of course happy to answer 
if something is unclear. -- Jonas
View this message in context: Start streaming tuples depending on another 
streams rate
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Where to put "pre-start" logic and how to detect recovery?

2017-02-09 Thread Tzu-Li (Gordon) Tai
Hi Dmitry,

I think currently the simplest way to do this is simply to add a
program argument as a flag to whether or not the current run
is from a savepoint (so you manually supply the flag whenever you’re starting 
the
job from a savepoint), and check that flag in the main method.
The main method will only be executed at the client once on every job submit,
and not on job auto restarts from checkpoints due to failures.

Cheers,
Gordon

On February 9, 2017 at 11:08:54 PM, Dmitry Golubets (dgolub...@gmail.com) wrote:

Hi,

I need to re-create a Kafka topic when a job is started in "clean" mode.
I can do it, but I'm not sure if I do it in the right place.

Is it fine to put this kind of code in the "main"?
Then it's called on every job submit.
But.. how to detect if a job is being started from a savepoint?

Or is there a different approach?

Best regards,
Dmitry

Re: Specifying Schema dynamically

2017-02-12 Thread Tzu-Li (Gordon) Tai
Hi Luqman,

From your description, it seems like that you want to infer the type (case 
class, tuple, etc.) of a stream dynamically at runtime.
AFAIK, I don’t think this is supported in Flink. You’re required to have 
defined types for your DataStreams.

Could you also provide an example code of what the functionality you have in 
mind looks like?
That would help clarify if I have misunderstood and there’s actually a way to 
do it.

- Gordon

On February 12, 2017 at 4:30:56 PM, Luqman Ghani (lgsa...@gmail.com) wrote:

Like if a file has a header: id, first_name, last_name, last_login
and we infer schema as: Int, String, String, Long



Re: There is no Open and Close method in Async I/O API of Scala

2017-02-12 Thread Tzu-Li (Gordon) Tai
Hi Howard,

I don’t think there is a rich variant for Async IO in Scala yet. We should 
perhaps add support for it.

Looped in Till who worked on the Async IO and its Scala support to clarify 
whether there were any concerns in not supporting it initially.

Cheers,
Gordon


On February 13, 2017 at 9:49:32 AM, Howard,Li(vip.com) (howard...@vipshop.com) 
wrote:

Hi,

 I’m going to test async IO of scala version. As we can see in 
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/asyncio.html.
 The java version of async IO API has method of open and close, in which I can 
do some init and clean work. The scala api, however,  has neither open nor 
close. Even if I can do init work while construct the class, the clean work 
can’t be done for there’s no close method.

 When I look into the RichAsyncFunction which the java api extends 
from, I find it’s the subclass of AbstractRichFunction which provide open and 
close method. I try to make my Async IO Function extends both 
AbstractRichFunction and AsyncFunction but find out that the open method does 
not called by flink so It won’t work.

 I managed to find a work around by getting javaStream from scala 
stream and use Java api instead, but I don’t think it’s idea.

 Did I miss something? Or it’s just a bug. If it is a bug, I can open a 
issue and try to fix it.

 

Thanks.

 

Howard

 

李哲豪 | 技术中心  实时计算平台

vip.com | 唯品会

唯品会 一家专门做特卖的网站

手机:15210965971 / 信箱: howard...@vipshop.com

地址:中国上海市闸北区西藏北路18号四行天地3楼

美国上市公司 纽交所代码:VIPS www.vip.com

 

本电子邮件可能为保密文件。如果阁下非电子邮件所指定之收件人,谨请立即通知本人。敬请阁下不要使用、保存、复印、打印、散布本电子邮件及其内容,或将其用于其他任何目的或向任何人披露。谢谢您的合作!
 This communication is intended only for the addressee(s) and may contain 
information that is privileged and confidential. You are hereby notified that, 
if you are not an intended recipient listed above, or an authorized employee or 
agent of an addressee of this communication responsible for delivering e-mail 
messages to an intended recipient, any dissemination, distribution or 
reproduction of this communication (including any attachments hereto) is 
strictly prohibited. If you have received this communication in error, please 
notify us immediately by a reply e-mail addressed to the sender and permanently 
delete the original e-mail communication and any attachments from all storage 
devices without making or otherwise retaining a copy.

Re: Streaming data from MongoDB using Flink

2017-02-16 Thread Tzu-Li (Gordon) Tai
I would recommend checking out the Flink RabbitMQ Source for examples:
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java

For your case, you should extend the `RichSourceFunction` which provides 
additional access to override the `open()` life cycle method.
In that method, you instantiate your MongoDB client connection and  fetch the 
cursor. In the `run()` method, you should essentially have a while loop that 
polls the MongoDB cursor and emits the fetched documents using the 
`SourceContext`.

If your also looking to implement a MongoDB source that works with Flink’s 
checkpointing for exactly-once, be sure to check out:
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/state.html#stateful-source-functions

Cheers,
Gordon
On February 16, 2017 at 5:53:03 PM, Pedro Monteiro (pedro.mlmonte...@gmail.com) 
wrote:

Dear Tzu-Li,

Thank you so much for your prompt response.

Lets assume I have a variable, in Java, env which is my 
StreamExecutionEnvironment. When I go ahead and attempt to do:
​env.addSource();  
​It requests an implementation of a Source Function interface: ​
env.addSource(new SourceFunction() {
            ​​ @Override
            public void run(SourceFunction.SourceContext ctx) throws 
Exception {
                 ​// TO DO​
            }

            @Override
            public void cancel() {
                ​// TO DO​
            }
        });
​And this is where I'm somehow stuck. I do not understand how should I access 
my MongoDB's cursor in any of this methods (I suppose the most adequate would 
be the "run" method) in a way it would allow me to return a new MongoDB 
document as it arrived to the database from another source.

Once again, thank you so much for your help.

I will wait to hear from you!​

Cumprimentos,

Pedro Lima Monteiro

On 16 February 2017 at 09:43, Tzu-Li (Gordon) Tai <tzuli...@apache.org> wrote:
Hi Pedro!

This is definitely possible, by simply writing a Flink `SourceFunction` that 
uses MongoDB clients to fetch the data.
It should be straightforward and works well with MongoDB’s cursor APIs.

Could you explain a bit which part in particular you were stuck with?

Cheers,
Gordon


On February 16, 2017 at 5:31:39 PM, Pedro Monteiro (pedro.mlmonte...@gmail.com) 
wrote:

Good morning,

I am trying to get data from MongoDB to be analysed in Flink.
I would like to know if it is possible to stream data from MongoDB into
Flink. I have looked into Flink's source function to add in the addSource
method of the StreamExecutionEnvironment but I had no luck.
Can anyone help me out?
Thanks.

Pedro Lima Monteiro



Re: Streaming data from MongoDB using Flink

2017-02-16 Thread Tzu-Li (Gordon) Tai
Good to know!


On February 16, 2017 at 10:13:28 PM, Pedro Monteiro 
(pedro.mlmonte...@gmail.com) wrote:

Dear Gordon,

Thanks for your help, I think I am on the right track as of now.

On the other hand, I have another question: is it possible to add sources to 
environments that are already executing? In what I am currently developing, I 
need to add new sources as they arrive to my system.

I will wait to hear from you!

Cumprimentos,

Pedro Lima Monteiro

On 16 February 2017 at 11:29, Pedro Monteiro <pedro.mlmonte...@gmail.com> wrote:
Thank you again for your prompt response.

I will give it a try and will come back to you.

Pedro Lima Monteiro

On 16 February 2017 at 10:20, Tzu-Li (Gordon) Tai <tzuli...@apache.org> wrote:
I would recommend checking out the Flink RabbitMQ Source for examples:
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java

For your case, you should extend the `RichSourceFunction` which provides 
additional access to override the `open()` life cycle method.
In that method, you instantiate your MongoDB client connection and  fetch the 
cursor. In the `run()` method, you should essentially have a while loop that 
polls the MongoDB cursor and emits the fetched documents using the 
`SourceContext`.

If your also looking to implement a MongoDB source that works with Flink’s 
checkpointing for exactly-once, be sure to check out:
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/state.html#stateful-source-functions

Cheers,
Gordon
On February 16, 2017 at 5:53:03 PM, Pedro Monteiro (pedro.mlmonte...@gmail.com) 
wrote:

Dear Tzu-Li,

Thank you so much for your prompt response.

Lets assume I have a variable, in Java, env which is my 
StreamExecutionEnvironment. When I go ahead and attempt to do:
​env.addSource();  
​It requests an implementation of a Source Function interface: ​
env.addSource(new SourceFunction() {
            ​​ @Override
            public void run(SourceFunction.SourceContext ctx) throws 
Exception {
                 ​// TO DO​
            }

            @Override
            public void cancel() {
                ​// TO DO​
            }
        });
​And this is where I'm somehow stuck. I do not understand how should I access 
my MongoDB's cursor in any of this methods (I suppose the most adequate would 
be the "run" method) in a way it would allow me to return a new MongoDB 
document as it arrived to the database from another source.

Once again, thank you so much for your help.

I will wait to hear from you!​

Cumprimentos,

Pedro Lima Monteiro

On 16 February 2017 at 09:43, Tzu-Li (Gordon) Tai <tzuli...@apache.org> wrote:
Hi Pedro!

This is definitely possible, by simply writing a Flink `SourceFunction` that 
uses MongoDB clients to fetch the data.
It should be straightforward and works well with MongoDB’s cursor APIs.

Could you explain a bit which part in particular you were stuck with?

Cheers,
Gordon


On February 16, 2017 at 5:31:39 PM, Pedro Monteiro (pedro.mlmonte...@gmail.com) 
wrote:

Good morning,

I am trying to get data from MongoDB to be analysed in Flink.
I would like to know if it is possible to stream data from MongoDB into
Flink. I have looked into Flink's source function to add in the addSource
method of the StreamExecutionEnvironment but I had no luck.
Can anyone help me out?
Thanks.

Pedro Lima Monteiro





Re: "Job submission to the JobManager timed out" on EMR YARN cluster with multiple jobs

2017-02-16 Thread Tzu-Li (Gordon) Tai
Hi Geoffrey,

Thanks for investigating and updating on this. Good to know that it is working!

Just to clarify, was your series of jobs submitted to a “yarn session + regular 
bin/flink run”, or “per job yarn cluster”?
I’m asking just to make sure of the limitations Robert mentioned.

Cheers,
Gordon


On February 17, 2017 at 3:37:27 AM, Geoffrey Mon (geof...@gmail.com) wrote:

Hi Robert,

Thanks for your reply. I've done some further testing and (hopefully) solved 
the issue; this turned out to be a red herring.  After discovering that the 
same issue manifested itself when testing on my local machine, I found that 
multiple jobs can be submitted from a main() function for both temporary and 
permanent Flink YARN clusters, and that the issue was not with Flink or with 
YARN, but with my job file.

In one part of my job, I need to fill in missing components of a vector with 
zeroes. I did this by combining the vector DataSet with another DataSet 
containing indexed zeroes using a union operation and an aggregation operation. 
In my problematic job, I used ExecutionEnvironment#fromElements to make a 
DataSet out of an ArrayList of Tuples containing an index and a zero. However, 
for input files with very large parameters, I needed to generate very large 
length DataSets of zeroes, and since I was using fromElements, the client 
needed to send the Flink runtime all of the elements with which to create the 
DataSet (lots and lots of zeroes). This caused the job to time out before 
execution, making me think that the job had not been properly received by the 
runtime.

I've replaced this with ExecutionEnvironment#generateSequence and a map 
function mapping each number of the generated sequence to a tuple with a zero. 
This has solved the issue and my job seems to be running fine for now.
(https://github.com/quinngroup/flink-r1dl/blame/9bb651597405fcea1fc4c7ba3e4229ca45305176/src/main/java/com/github/quinngroup/R1DL.java#L370)

Again, thank you very much for your help.

Sincerely,
Geoffrey

On Wed, Feb 15, 2017 at 7:43 AM Robert Metzger  wrote:
Hi Geoffrey,

I think the "per job yarn cluster" feature does probably not work for one 
main() function submitting multiple jobs.
If you have a yarn session + regular "flink run" it should work. 

On Mon, Feb 13, 2017 at 5:37 PM, Geoffrey Mon  wrote:
Just to clarify, is Flink designed to allow submitting multiple jobs from a 
single program class when using a YARN cluster? I wasn't sure based on the 
documentation.

Cheers,
Geoffrey


On Thu, Feb 9, 2017 at 6:34 PM Geoffrey Mon  wrote:
Hello all,

I'm running a Flink plan made up of multiple jobs. The source for my job can be 
found here if it would help in any way: 
https://github.com/quinngroup/flink-r1dl/blob/master/src/main/java/com/github/quinngroup/R1DL.java
Each of the jobs (except for the first job) depends on files generated by the 
previous job; I'm running it on an AWS EMR cluster using YARN.

When I submit the plan file, the first job runs as planned. After it completes, 
the second job is submitted by the YARN client:


02/09/2017 16:39:43 DataSink (CsvSink)(4/5) switched to FINISHED 
02/09/2017 16:39:43 Job execution switched to status FINISHED.
2017-02-09 16:40:26,470 INFO  org.apache.flink.yarn.YarnClusterClient           
            - Waiting until all TaskManagers have connected
Waiting until all TaskManagers have connected
2017-02-09 16:40:26,476 INFO  org.apache.flink.yarn.YarnClusterClient           
            - TaskManager status (5/5)
TaskManager status (5/5)
2017-02-09 16:40:26,476 INFO  org.apache.flink.yarn.YarnClusterClient           
            - All TaskManagers are connected
All TaskManagers are connected
2017-02-09 16:40:26,480 INFO  org.apache.flink.yarn.YarnClusterClient           
            - Submitting job with JobID: b226f5f18a78bc386bd1b1b6d30515ea. 
Waiting for job completion.
Submitting job with JobID: b226f5f18a78bc386bd1b1b6d30515ea. Waiting for job 
completion.
Connected to JobManager at 
Actor[akka.tcp://flink@.ec2.internal:35598/user/jobmanager#68430682]

If the input file is small and the first job runs quickly (~1 minute works for 
me), then the second job runs fine. However, if the input file for my first job 
is large and the first job takes more than a minute or so to complete, Flink 
will not acknowledge receiving the next job; the web Flink console does not 
show any new jobs and Flink logs do not mention receiving any new jobs after 
the first job has completed. The YARN client's job submission times out after 
Flink does not respond:

Caused by: 
org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException: Job 
submission to the JobManager timed out. You may increase 'akka.client.timeout' 
in case the JobManager needs more time to configure and confirm the job 
submission.
at 
org.apache.flink.runtime.client.JobSubmissionClientActor.handleCustomMessage(JobSubmissionClientActor.java:119)
at 

Re: Fault tolerance guarantees of Elasticsearch sink in flink-elasticsearch2?

2017-01-18 Thread Tzu-Li (Gordon) Tai
Hi Andrew!

There’s nothing special about extending the checkpointing interfaces for the 
SinkFunction; for Flink they’re essentially user functions that have user state 
to be checkpointed.
So yes, you’ll just implement is as you would for a flatMap / map / etc. 
function.

Fell free to let me know if you bump into any questions.

Cheers,
Gordon


On January 16, 2017 at 11:37:30 PM, Andrew Roberts (arobe...@fuze.com) wrote:

Hi Gordon,

Thanks for getting back to me. The ticket looks good, but I’m going to need to 
do something similar for our homegrown sinks. It sounds like just having the 
affected sinks participate in checkpointing is enough of a solution - is there 
anything special about `SinkFunction[T]` extending `Checkpointed[S]`, or can I 
just implement it as I would for e.g. a mapping function?

Thanks,

Andrew



On Jan 13, 2017, at 4:34 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org> wrote:

Hi Andrew,

Your observations are correct. Like you mentioned, the current problem circles 
around how we deal with the pending buffered requests with accordance to 
Flink’s checkpointing.
I’ve filed a JIRA for this, as well as some thoughts for the solution in the 
description: https://issues.apache.org/jira/browse/FLINK-5487. What do you 
think?

Thank you for bringing this up! We should probably fix this soon.
There’s already some on-going effort in fixing some other aspects of proper 
at-least-once support in the Elasticsearch sinks, so I believe this will be 
brought to attention very soon too.

Cheers,
Gordon




On January 11, 2017 at 3:49:06 PM, Andrew Roberts (arobe...@fuze.com) wrote:

I’m trying to understand the guarantees made by Flink’s Elasticsearch sink in 
terms of message delivery. according to (1), the ES sink offers at-least-once 
guarantees. This page doesn’t differentiate between flink-elasticsearch and 
flink-elasticsearch2, so I have to assume for the moment that they both offer 
that guarantee. However, a look at the code (2) shows that the invoke() method 
puts the record into a buffer, and then that buffer is flushed to elasticsearch 
some time later.



Re: Deduplicate messages from Kafka topic

2017-01-15 Thread Tzu-Li (Gordon) Tai
Hi,

You’re correct that the FlinkKafkaProducer may emit duplicates to Kafka topics, 
as it currently only provides at-least-once guarantees.
Note that this isn’t a restriction only in the FlinkKafkaProducer, but a 
general restriction for Kafka's message delivery.
This can definitely be improved to exactly-once (no duplicates produced into 
topics) once Kafka supports transactional messaging.

On the consumer side, the FlinkKafkaConsumer doesn’t have built-in support to 
dedupe the messages read from topics.
On the other hand this isn’t really feasible, as consumers could basically only 
view messages with different offsets as separate independent messages, unless 
identified by some user application-level logic.
So in the end, we’ll need to rely on the assumption that messages produced into 
Kafka topics are not duplicated, which as explained above, will hopefully be 
available in the near future.

Cheers,
Gordon

On January 14, 2017 at 6:12:29 PM, ljwagerfield (lawre...@dmz.wagerfield.com) 
wrote:

As I understand it, the Flink Kafka Producer may emit duplicates to Kafka  
topics.  

How can I deduplicate these messages when reading them back with Flink (via  
the Flink Kafka Consumer)?  

For example, is there any out-the-box support for deduplicating messages,  
i.e. by incorporating something like "idempotent producers" as proposed by  
Jay Krepps (which, as I understand it, involves maintaining a "high  
watermark" on a message-by-message level)?  



--  
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Deduplicate-messages-from-Kafka-topic-tp11051.html
  
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.  


Re: Kafka KeyedStream source

2017-01-15 Thread Tzu-Li (Gordon) Tai
Hi Niels,

If it’s only for simple data filtering that does not depend on the key, a 
simple “flatMap” or “filter" directly after the source can be chained to the 
source instances.
What that does is that the filter processing will be done within the same 
thread as the one fetching data from a Kafka partition, hence no excessive 
network transfers for this simple filtering.
You can read more about operator chaining here: 
https://ci.apache.org/projects/flink/flink-docs-release-1.2/concepts/runtime.html#tasks-and-operator-chains

So, what that sums up to is that you have a FlinkKafkaConsumer as source, do a 
filter transformation right after, and then a keyBy followed with your 
heavy-processing, key-wise computations.
Does that makes sense for what you have in mind?

Cheers,
Gordon

On January 11, 2017 at 4:11:26 PM, Niels Basjes (ni...@basjes.nl) wrote:

Hi,

Ok. I think I get it.

WHAT IF:
Assume we create a addKeyedSource(...) which will allow us to add a source that 
makes some guarantees about the data.
And assume this source returns simply the Kafka partition id as the result of 
this 'hash' function.
Then if I have 10 kafka partitions I would read these records in and I could 
filter the data more efficiently because the data would not need to go over the 
network before this filter.
Afterwards I can scale it up to 'many' tasks for the heavier processing that 
follows.

As a concept: Could that be made to work?

Niels 

On Mon, Jan 9, 2017 at 9:14 AM, Tzu-Li (Gordon) Tai <tzuli...@apache.org> wrote:
Hi Niels,

Thank you for bringing this up. I recall there was some previous discussion 
related to this before: [1].

I don’t think this is possible at the moment, mainly because of how the API is 
designed.

On the other hand, a KeyedStream in Flink is basically just a DataStream with a 
hash partitioner that is used when deciding which instance of the following 
downstream operator an emitted record of the stream is sent to.
So, even if we have a Kafka source that directly produces a KeyedStream on 
“addSource”, redistribution of data can still happen. I.e., if the parallelism 
of the compute operators right after is different than the number of Kafka 
partitions, redistribution will happen to let the key space and state be evenly 
distributed in Flink.

This leads to the argument that we probably need to think about whether 
retaining the original partitioning of records in Kafka when consumed by Flink 
is actually only a special case.
Flink, as a parallel compute engine, can freely adjust the parallelism of its 
operators regardless of the parallelism of Kafka topics (rescaling isn’t 
actually in yet, but is on the near-future roadmap).

So, under the general case, the parallelism of a Flink operator may be 
different than the number of Kafka partitions, and therefore redistributing 
must occur.
For redistribution to not need to take place right after an already partitioned 
Kafka topic, you’d need identical numbers of 1) Kafka partitions, 2) Flink 
source instances consuming the partitions, and 3) the parallelism of the keyed 
computation afterwards. This seems like a very specific situation, considering 
that you’ll be able to rescale Flink operators as the data’s key space / volume 
grows.

The main observation, I think, is that Flink itself maintains how the key space 
is partitioned within the system, which plays a crucial part in rescaling. 
That’s why by default it doesn’t respect existing partitioning of the key space 
in Kafka (or other external sources). Even if it initially does at the 
beginning of a job, partitioning will most likely change as you rescale your 
job / operators (which is a good thing, to be able to adapt).

Cheers,
Gordon

[1] 
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/kafka-partition-assignment-td12123.html

On January 6, 2017 at 1:38:05 AM, Niels Basjes (ni...@basjes.nl) wrote:

Hi,

In my scenario I have click stream data that I persist in Kafka.
I use the sessionId as the key to instruct Kafka to put everything with the 
same sessionId into the same Kafka partition. That way I already have all 
events of a visitor in a single kafka partition in a fixed order.

When I read this data into Flink I get a generic data stream ontop of which I 
have to do a keyBy before my processing can continue. Such a keyBy will 
redistribute the data again to later tasks that can do the actual work.

Is it possible to create an adapted version of the Kafka source that 
immediately produces a keyed data stream?
 

--
Best regards / Met vriendelijke groeten,

Niels Basjes



--
Best regards / Met vriendelijke groeten,

Niels Basjes

Re: Fw: Flink Kinesis Connector

2017-02-27 Thread Tzu-Li (Gordon) Tai
Hi Matt!

As mentioned in the docs, due to the ASL license, we do not deploy the artifact 
to the Maven central repository on Flink releases.
You will need to build the Kinesis connector by yourself (the instructions to 
do so are also in the Flink Kinesis connector docs :)), and install it to your 
local Maven repository.
Then, you’ll be able to add it as a Maven dependency in your projects.

Cheers,
Gordon


On February 27, 2017 at 8:10:52 PM, Matt (mattmcgowan1...@hotmail.com) wrote:

Hi,


I'm working through trying to connect flink up to a kinesis stream, off of 
this: 
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/connectors/kinesis.html

Apache Flink 1.2.0 Documentation: Amazon AWS Kinesis ...
ci.apache.org
The flink-connector-kinesis_2.10 has a dependency on code licensed under the 
Amazon Software License (ASL). Linking to the flink-connector-kinesis will 
include ASL ...
It gives the following Maven dependency:


  org.apache.flink
  flink-connector-kinesis_2.10
  1.2.0


However, I'm struggling to find that. It doesn't appear to be up on Maven, nor 
is it listed in the Apache repository.

Does this still exist? Am I missing something obvious?



Thanks in advance for any help,



Matt


Re: unclear exception when writing to elasticsearch

2017-02-28 Thread Tzu-Li (Gordon) Tai
Hi!

This could be a Elasticsearch server / client version conflict, or that the 
uber jar of your code wasn’t built properly.

For the first possible issue, we’re currently using Elasticsearch 2.3.5 to 
build the Flink Elasticsearch Connector. Could you try overriding this version 
to 2.4.1 when building your code and see if the problem remains?

For the second issue, please check out 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/linking.html#packaging-dependencies-with-your-usercode-with-maven.

Let me know if the problem remains after trying out the above :-)

Cheers,
Gordon

On March 1, 2017 at 12:24:08 AM, Martin Neumann (mneum...@sics.se) wrote:

Hej,

I'm trying to write to elastic search from a streaming application and I get a 
weird error message I that I can't decipher. Hopefully, someone here can help 
me. I'm trying to run the java example from the website.I doublechecked that I 
can reach the elastic search from the development machine by putting some data 
in with curl. Has anyone an idea what the problem is? 

Technical info:
Flink 1.1.3

Elasticsearch 2.4.1

http://bbc2.sics.se:19208/
{

"name" : "hopsworks",
"cluster_name" : "hops",
"cluster_uuid" : "XIVrGHeaTc2nICQC85chpw",
"version" : {
"number" : "2.4.1",
"build_hash" : "c67dc32e24162035d18d6fe1e952c4cbcbe79d16",
"build_timestamp" : "2016-09-27T18:57:55Z",
"build_snapshot" : false,
"lucene_version" : "5.5.2"
},
"tagline" : "You Know, for Search" 
} 

Changes in the code:
Map config = new HashMap<>();
// This instructs the sink to emit after every element, otherwise they would be 
buffered
config.put("bulk.flush.max.actions", "1");
config.put("cluster.name", "hops");

ArrayList transports = new ArrayList<>();
transports.add(new InetSocketAddress(InetAddress.getByName("bbc2.sics.se"), 
19208));
 

Exception:
Caused by: java.lang.NoClassDefFoundError: Could not initialize class 
org.elasticsearch.threadpool.ThreadPool
at 
org.elasticsearch.client.transport.TransportClient$Builder.build(TransportClient.java:133)
at 
org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:164)
at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585)
at java.lang.Thread.run(Thread.java:745)

Re: Flink, Yarn and MapR Kerberos issue

2017-03-01 Thread Tzu-Li (Gordon) Tai
Hi Aniket,

Thanks a lot for reporting this.

I’m afraid this seems to be a bug with Flink on YARN’s Kerberos authentication. 
It is incorrectly checking for Kerberos credentials even for non-Kerberos 
authentication methods.
I’ve filed a JIRA for this: https://issues.apache.org/jira/browse/FLINK-5949.

For the time being, I don’t think there’s a simple way to workaround it before 
the bug is fixed, because the bug indicates that whatever security type is 
enabled, Kerberos is used. We should probably have this fixed soon in the next 
bug fix release for Flink 1.2.

- Gordon


On March 2, 2017 at 7:11:02 AM, ani.desh1512 (ani.desh1...@gmail.com) wrote:

I am trying to setup Flink 1.2 using yarn on MapR (v5.2.0). The MapR cluster,  
on which, I am trying to setup this is a secure cluster. But, this cluster  
does not use Kerberos. Mapr, by default, uses some variant of ssl  
  
and MapR also normally has its own JAAS .conf file, which it relies on.  

When I try to run yarn-session.sh, I get the following error:  

/java.lang.RuntimeException: Hadoop security is enabled but the login user  
does not have Kerberos credentials/  
To resolve this I tried the following two things:  

1. I had seen a somewhat similar mention of this issue on JIRA  
 . The issue says that  
its resolved in 1.2 but the comments on that issue do not indicate that.  
By the way, I have added  
"-Djava.security.auth.login.config=/opt/mapr/conf/mapr.login.conf" in the  
yarn-session.sh file. But I still the get the same issue.  

So, is this issue resolved? What am I missing here? Why does Flink require  
Kerberos credentials when MapR has no Kerberos setup?  

2. I also tried specifying following in flink-conf.yaml:  
security.ssl.enabled: true  
security.ssl.keystore: /opt/mapr/conf/ssl_keystore  
security.ssl.keystore-password: <>  
security.ssl.key-password: <>  
security.ssl.truststore: /opt/mapr/conf/ssl_truststore  
security.ssl.truststore-password: <>  

But, this too did not solve the problem and I get the same issue. Why is  
Flink trying to get Kerberos credentials even after ssl security is enabled?  

Thanks,  
Aniket  





--  
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Yarn-and-MapR-Kerberos-issue-tp11996.html
  
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.  


Re: ElasticsearchSink Exception

2017-02-28 Thread Tzu-Li (Gordon) Tai
Good to know it’s working! Thanks for the update :-)


On March 1, 2017 at 6:03:44 AM, Govindarajan Srinivasaraghavan 
(govindragh...@gmail.com) wrote:

Hi Gordon/Flavio,

Found out the issue was because of elastic search version mismatch. Another 
person upgraded ES version to 5.x but I was using 2.x. After changing the 
version it worked. Thanks for the help.

On Mon, Feb 27, 2017 at 6:12 AM, Tzu-Li (Gordon) Tai <tzuli...@apache.org> 
wrote:
Hi!

Like wha Flavio suggested, at a first glance this looks like a problem with 
building the uber jar.

I haven’t bumped into the problem while testing out the connector on cluster 
submitted test jobs before, but I can try to test this quickly to make sure.

Could you tell me what your installed Elasticsearch version is? Also, how are 
you building your uber jar?

Cheers,
Gordon


On February 27, 2017 at 9:40:02 PM, Aljoscha Krettek (aljos...@apache.org) 
wrote:

+Tzu-Li (Gordon) Tai Do you have any idea what could be causing this? I'm 
asking because you recently worked on the Elasticsearch connectors, right?

On Sun, 26 Feb 2017 at 04:26 Govindarajan Srinivasaraghavan 
<govindragh...@gmail.com> wrote:
Thanks Flavio. I tried with multiple versions but still the same exception
and I was able to locate the class file inside my jar. Am I missing
something? Thanks for all the help.

On Sat, Feb 25, 2017 at 3:09 PM, Flavio Pompermaier <pomperma...@okkam.it>
wrote:

> The exception you have (NoClassDefFoundError:
> org/elasticsearch/index/mapper/MapperParsingException) is usually caused
> by
> elasticsearch version conflict or a bad shading when creating the uber jar.
> Can you check if one of the 2 is causing the problem?
>
> On 25 Feb 2017 23:13, "Govindarajan Srinivasaraghavan" <
> govindragh...@gmail.com> wrote:
>
> > Hi Flavio,
> >
> > I tried with both http port 9200 and tcp port 9300 and I see incoming
> > connections in the elasticserach node. Also I see the below errors in
> > taskmanager out logs. Below are the dependencies I have on my gradle
> > project. Am I missing something?
> >
> > Exception in thread "elasticsearch[Madame Menace][generic][T#2]"
> > java.lang.NoClassDefFoundError:
> > org/elasticsearch/index/mapper/MapperParsingException
> >         at
> > org.elasticsearch.ElasticsearchException.(
> > ElasticsearchException.java:597)
> >         at
> > org.elasticsearch.transport.TransportService$Adapter$3.
> > run(TransportService.java:622)
> >         at
> > java.util.concurrent.ThreadPoolExecutor.runWorker(
> > ThreadPoolExecutor.java:1142)
> >         at
> > java.util.concurrent.ThreadPoolExecutor$Worker.run(
> > ThreadPoolExecutor.java:617)
> >         at java.lang.Thread.run(Thread.java:745)
> > Caused by: java.lang.ClassNotFoundException:
> > org.elasticsearch.index.mapper.MapperParsingException
> >         at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> >         at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> >         at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> >         ... 5 more
> >
> >
> > Exception in thread "elasticsearch[Saint Elmo][generic][T#2]"
> > java.lang.NoClassDefFoundError: Could not initialize class
> > org.elasticsearch.transport.NodeDisconnectedException
> >         at
> > org.elasticsearch.transport.TransportService$Adapter$3.
> > run(TransportService.java:622)
> >         at
> > java.util.concurrent.ThreadPoolExecutor.runWorker(
> > ThreadPoolExecutor.java:1142)
> >         at
> > java.util.concurrent.ThreadPoolExecutor$Worker.run(
> > ThreadPoolExecutor.java:617)
> >         at java.lang.Thread.run(Thread.java:745)
> >
> >
> > compile group: 'org.apache.flink', name: 'flink-core', version: '1.2.0'
> > compile group: 'org.apache.flink', name: 'flink-streaming-java_2.10',
> > version: '1.2.0'
> > compile group: 'org.apache.flink', name: 'flink-java', version: '1.2.0'
> > compile group: 'org.apache.flink', name:
> > 'flink-connector-kafka-0.10_2.10', version: '1.2.0'
> > compile group: 'org.apache.flink', name: 'flink-clients_2.10', version:
> > '1.2.0'
> >
> > compile group: 'org.apache.flink', name:
> > 'flink-connector-elasticsearch2_2.10', version: '1.2.0'
> >
> >
> > On Sat, Feb 25, 2017 at 1:26 AM, Flavio Pompermaier <
> pomperma...@okkam.it>
> > wrote:
> >
> > > Are you sure that in elasticsearch.yml you've enabled ES to listen to
> the
> > > http port 9300?
> > >
> > > On 25 Feb 2017 08:58, "Govindarajan Srinivasaraghavan" <
> > > govindragh..

Re: unclear exception when writing to elasticsearch

2017-02-28 Thread Tzu-Li (Gordon) Tai
Hi Martin,

You can do that by adding a dependency to the Elasticsearch client of your 
desired version in your project.

You can also check what Elasticsearch client version the project is using by 
checking `mvn dependency:tree` from the base directory of your project.

Cheers,
Gordon

On March 1, 2017 at 1:21:56 AM, Martin Neumann (mneum...@sics.se) wrote:

Hej,

thanks for the fast reply.

I'm currently running things from inside my IDE so it should not be a packaging 
problem. That said I added the plugin from the link provided but I'm not sure 
what elastic search library is needed.

Where do I override the elastic search version? The only thing I'm currently 
using is the flink-connector do I have to modify its code?


org.apache.flink
flink-connector-elasticsearch2_2.10
1.1.3


One thing I forgot to mention, I can only modify things locally packing it into 
a jar. I'm stuck with stock Flink 1.1.3 for the execution since I'm running 
things on top of Hopsworks.

cheers Martin

On Tue, Feb 28, 2017 at 5:42 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org> 
wrote:
Hi!

This could be a Elasticsearch server / client version conflict, or that the 
uber jar of your code wasn’t built properly.

For the first possible issue, we’re currently using Elasticsearch 2.3.5 to 
build the Flink Elasticsearch Connector. Could you try overriding this version 
to 2.4.1 when building your code and see if the problem remains?

For the second issue, please check out 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/linking.html#packaging-dependencies-with-your-usercode-with-maven.

Let me know if the problem remains after trying out the above :-)

Cheers,
Gordon

On March 1, 2017 at 12:24:08 AM, Martin Neumann (mneum...@sics.se) wrote:

Hej,

I'm trying to write to elastic search from a streaming application and I get a 
weird error message I that I can't decipher. Hopefully, someone here can help 
me. I'm trying to run the java example from the website.I doublechecked that I 
can reach the elastic search from the development machine by putting some data 
in with curl. Has anyone an idea what the problem is? 

Technical info:
Flink 1.1.3

Elasticsearch 2.4.1

http://bbc2.sics.se:19208/
{

"name" : "hopsworks",
"cluster_name" : "hops",
"cluster_uuid" : "XIVrGHeaTc2nICQC85chpw",
"version" : {
"number" : "2.4.1",
"build_hash" : "c67dc32e24162035d18d6fe1e952c4cbcbe79d16",
"build_timestamp" : "2016-09-27T18:57:55Z",
"build_snapshot" : false,
"lucene_version" : "5.5.2"
},
"tagline" : "You Know, for Search" 
} 

Changes in the code:
Map<String, String> config = new HashMap<>();
// This instructs the sink to emit after every element, otherwise they would be 
buffered
config.put("bulk.flush.max.actions", "1");
config.put("cluster.name", "hops");

ArrayList transports = new ArrayList<>();
transports.add(new InetSocketAddress(InetAddress.getByName("bbc2.sics.se"), 
19208));
 

Exception:
Caused by: java.lang.NoClassDefFoundError: Could not initialize class 
org.elasticsearch.threadpool.ThreadPool
at 
org.elasticsearch.client.transport.TransportClient$Builder.build(TransportClient.java:133)
at 
org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:164)
at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585)
at java.lang.Thread.run(Thread.java:745)



Re: unclear exception when writing to elasticsearch

2017-03-01 Thread Tzu-Li (Gordon) Tai
Hi Martin,

Just letting you know I’m trying your setup right now, and will get back to you 
once I confirm the results.

- Gordon


On March 1, 2017 at 9:15:16 PM, Martin Neumann (mneum...@sics.se) wrote:

I created the project using the maven archetype so I'm using the packaged 
version pulled by maven. 

At this point, I just try to run it directly from inside the IDE (IntelliJ), 
mostly since I don't want to build it and deploy it on the cluster all the 
time. I tried building it (maven 3.0.5), it builds fine but fails to run on the 
cluster with the same exception that I get if I run things from within the IDE. 

My guess is that maybe some function names have changed between elastic search 
versions and they are just not compatible anymore.

In the Worst case, I will hack something together that just writes the data 
using HttpURLConnection pushing things to the rest interface. (If that works 
from within flink)


cheers Martin

On Wed, Mar 1, 2017 at 12:24 PM, Flavio Pompermaier <pomperma...@okkam.it> 
wrote:
Did you build Flink from sources or are you using the packeged version? 
Because I had an annoying problem when compiling Flink with maven > 3.3.
From 
https://ci.apache.org/projects/flink/flink-docs-release-1.2/setup/building.html#dependency-shading:

Maven 3.0.x, 3.1.x, and 3.2.x It is sufficient to call mvn clean install 
-DskipTests in the root directory of Flink code base.

Maven 3.3.x The build has to be done in two steps: First in the base directory, 
then in the distribution project:

mvn clean install -DskipTests
cd flink-dist
mvn clean install
Note: To check your Maven version, run mvn --version. 

On Wed, Mar 1, 2017 at 12:19 PM, Martin Neumann <mneum...@sics.se> wrote:
I tried to change the elastic search version to 2.4.1 which results in a new 
exception:

Caused by: java.lang.NoSuchMethodError: 
com.google.common.util.concurrent.MoreExecutors.directExecutor()Ljava/util/concurrent/Executor;
at org.elasticsearch.threadpool.ThreadPool.(ThreadPool.java:192)
at 
org.elasticsearch.client.transport.TransportClient$Builder.build(TransportClient.java:131)
at 
org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:164)
at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585)
at java.lang.Thread.run(Thread.java:745)


On Wed, Mar 1, 2017 at 7:58 AM, Tzu-Li (Gordon) Tai <tzuli...@apache.org> wrote:
Hi Martin,

You can do that by adding a dependency to the Elasticsearch client of your 
desired version in your project.

You can also check what Elasticsearch client version the project is using by 
checking `mvn dependency:tree` from the base directory of your project.

Cheers,
Gordon


On March 1, 2017 at 1:21:56 AM, Martin Neumann (mneum...@sics.se) wrote:

Hej,

thanks for the fast reply.

I'm currently running things from inside my IDE so it should not be a packaging 
problem. That said I added the plugin from the link provided but I'm not sure 
what elastic search library is needed.

Where do I override the elastic search version? The only thing I'm currently 
using is the flink-connector do I have to modify its code?


org.apache.flink
flink-connector-elasticsearch2_2.10
1.1.3


One thing I forgot to mention, I can only modify things locally packing it into 
a jar. I'm stuck with stock Flink 1.1.3 for the execution since I'm running 
things on top of Hopsworks.

cheers Martin

On Tue, Feb 28, 2017 at 5:42 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org> 
wrote:
Hi!

This could be a Elasticsearch server / client version conflict, or that the 
uber jar of your code wasn’t built properly.

For the first possible issue, we’re currently using Elasticsearch 2.3.5 to 
build the Flink Elasticsearch Connector. Could you try overriding this version 
to 2.4.1 when building your code and see if the problem remains?

For the second issue, please check out 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/linking.html#packaging-dependencies-with-your-usercode-with-maven.

Let me know if the problem remains after trying out the above :-)

Cheers,
Gordon

On March 1, 2017 at 12:24:08 AM, Martin Neumann (mneum...@sics.se) wrote:

Hej,

I'm trying to write to elastic search from a streaming application and I get a 
weird error message I that I can't decipher. Hopefully, someone here can help 
me. I'm trying to run the java example from the website.I doublechecked that I 
can reach the elastic search from the development machine by putting some data 
in with curl. Has anyone an idea what the problem is? 

Technical info:
Flink 1.1.3

Elast

Re: unclear exception when writing to elasticsearch

2017-03-01 Thread Tzu-Li (Gordon) Tai
Hi Martin,

I followed your setup:

1. Maven java quick start archetype (Flink version 1.1.3)
2. Added `flink-connector-elasticsearch2_2.10` version 1.1.3 dependency
3. Ran the example in the Flink Elasticsearch docs against a Elasticsearch 
2.4.1 installation

and everything worked fine.

Just to make sure nothing is conflicting, you could also try to do a `mvn 
dependency:purge-local-repository` on your project, and then re-download the 
dependencies with `mvn clean install`, and finally re-importing your project in 
the IDE.

Let me know if this works for you!

Cheers,
Gordon


On March 1, 2017 at 9:23:35 PM, Tzu-Li (Gordon) Tai (tzuli...@apache.org) wrote:

Hi Martin,

Just letting you know I’m trying your setup right now, and will get back to you 
once I confirm the results.

- Gordon


On March 1, 2017 at 9:15:16 PM, Martin Neumann (mneum...@sics.se) wrote:

I created the project using the maven archetype so I'm using the packaged 
version pulled by maven. 

At this point, I just try to run it directly from inside the IDE (IntelliJ), 
mostly since I don't want to build it and deploy it on the cluster all the 
time. I tried building it (maven 3.0.5), it builds fine but fails to run on the 
cluster with the same exception that I get if I run things from within the IDE. 

My guess is that maybe some function names have changed between elastic search 
versions and they are just not compatible anymore.

In the Worst case, I will hack something together that just writes the data 
using HttpURLConnection pushing things to the rest interface. (If that works 
from within flink)


cheers Martin

On Wed, Mar 1, 2017 at 12:24 PM, Flavio Pompermaier <pomperma...@okkam.it> 
wrote:
Did you build Flink from sources or are you using the packeged version? 
Because I had an annoying problem when compiling Flink with maven > 3.3.
From 
https://ci.apache.org/projects/flink/flink-docs-release-1.2/setup/building.html#dependency-shading:

Maven 3.0.x, 3.1.x, and 3.2.x It is sufficient to call mvn clean install 
-DskipTests in the root directory of Flink code base.

Maven 3.3.x The build has to be done in two steps: First in the base directory, 
then in the distribution project:

mvn clean install -DskipTests
cd flink-dist
mvn clean install
Note: To check your Maven version, run mvn --version. 

On Wed, Mar 1, 2017 at 12:19 PM, Martin Neumann <mneum...@sics.se> wrote:
I tried to change the elastic search version to 2.4.1 which results in a new 
exception:

Caused by: java.lang.NoSuchMethodError: 
com.google.common.util.concurrent.MoreExecutors.directExecutor()Ljava/util/concurrent/Executor;
at org.elasticsearch.threadpool.ThreadPool.(ThreadPool.java:192)
at 
org.elasticsearch.client.transport.TransportClient$Builder.build(TransportClient.java:131)
at 
org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:164)
at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585)
at java.lang.Thread.run(Thread.java:745)


On Wed, Mar 1, 2017 at 7:58 AM, Tzu-Li (Gordon) Tai <tzuli...@apache.org> wrote:
Hi Martin,

You can do that by adding a dependency to the Elasticsearch client of your 
desired version in your project.

You can also check what Elasticsearch client version the project is using by 
checking `mvn dependency:tree` from the base directory of your project.

Cheers,
Gordon


On March 1, 2017 at 1:21:56 AM, Martin Neumann (mneum...@sics.se) wrote:

Hej,

thanks for the fast reply.

I'm currently running things from inside my IDE so it should not be a packaging 
problem. That said I added the plugin from the link provided but I'm not sure 
what elastic search library is needed.

Where do I override the elastic search version? The only thing I'm currently 
using is the flink-connector do I have to modify its code?


org.apache.flink
flink-connector-elasticsearch2_2.10
1.1.3


One thing I forgot to mention, I can only modify things locally packing it into 
a jar. I'm stuck with stock Flink 1.1.3 for the execution since I'm running 
things on top of Hopsworks.

cheers Martin

On Tue, Feb 28, 2017 at 5:42 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org> 
wrote:
Hi!

This could be a Elasticsearch server / client version conflict, or that the 
uber jar of your code wasn’t built properly.

For the first possible issue, we’re currently using Elasticsearch 2.3.5 to 
build the Flink Elasticsearch Connector. Could you try overriding this version 
to 2.4.1 when building your code and see if the problem remains?

For the second issue, please check out 
https://ci.apache.org/projects/flink

Re: Kafka SimpleStringConsumer NPE

2016-09-04 Thread Tzu-Li (Gordon) Tai
Hi David,

Is it possible that your Kafka installation is an older version than 0.9? Or 
you may have used a different Kafka client major version in your job jar's 
dependency?
This seems like an odd incompatible protocol with the Kafka broker to me, as 
the client in the Kafka consumer is reading null record bytes.

Regards,
Gordon


On September 4, 2016 at 7:17:04 AM, dbciar (da...@dbciar.co.uk) wrote:

Hello Everyone, 

I was wondering if anyone could help shed light on where I have introduced 
an error into my code to get the following error: 

java.lang.NullPointerException 
at java.lang.String.(String.java:556) 
at 
org.apache.flink.streaming.util.serialization.SimpleStringSchema.deserialize(SimpleStringSchema.java:34)
 
at 
org.apache.flink.streaming.util.serialization.SimpleStringSchema.deserialize(SimpleStringSchema.java:27)
 
at 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper.deserialize(KeyedDeserializationSchemaWrapper.java:39)
 
at 
org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.run(Kafka09Fetcher.java:227)
 
at java.lang.Thread.run(Thread.java:745) 

I get this error while running a job that connects to kafka from a local 
deployment. Could it be to do with how I'm packaging the Jar before 
uploading it to the cluster? 

The job plan is created and deployed OK via the management website, but as 
soon as data is added to Kafka I get the above and the job stops. Using 
Kafka's own console consumer script, I validated the kafka queue and the 
data looks exactly like the testing data I used when reading from local 
files. 

Any help as always appreciated, 
Cheers, 
David 



-- 
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-SimpleStringConsumer-NPE-tp.html
 
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com. 


Re: setting the name of a subtask ?

2016-09-12 Thread Tzu-Li (Gordon) Tai
Hi!

Yes, you can set custom operator names by calling `.name(…)` on DataStreams 
after a transformation.
For example, `.addSource(…).map(...).name(…)`. This name will be used for 
visualization on the dashboard, and also for logging.

Regards,
Gordon


On September 12, 2016 at 3:44:58 PM, Bart van Deenen 
(bartvandee...@fastmail.fm) wrote:

Hi all  

I'm using Flink 1.1 with a streaming job, consisting of a few maps and a  
few aggregations.  
In the web dashboard for the job I see subtask names like:  

TriggerWindow(SlidingEventTimeWindows(60, 5000),  
FoldingStateDescriptor{serializer=null, initialValue=Res(0,List()),  
foldFunction=org.apache.flink.streaming.api.scala.function.util.ScalaFoldFunction@5c42d2b7},
  
EventTimeTrigger(), WindowedStream.fold(WindowedStream.java:238)) ->  
Filter -> Map  

Is it possible to give this a more human readable name from my job  
program?  

Greetings  

Bart van Deenen  


Re: Flink Checkpoint runs slow for low load stream

2016-10-04 Thread Tzu-Li (Gordon) Tai
Hi,

Helping out here: this is the PR for async Kafka offset committing - 
https://github.com/apache/flink/pull/2574.
It has already been merged into the master and release-1.1 branches, so you can 
try out the changes now if you’d like.
The change should also be included in the 1.1.3 release, which the Flink 
community is discussing to release soon.

Will definitely be helpful if you can provide feedback afterwards!

Best Regards,
Gordon


On October 3, 2016 at 9:40:14 PM, Chakravarthy varaga 
(chakravarth...@gmail.com) wrote:

Hi Stephan,

    Is the Async kafka offset commit released in 1.3.1?

Varaga

On Wed, Sep 28, 2016 at 9:49 AM, Chakravarthy varaga  
wrote:
Hi Stephan,

 That should be great. Let me know once the fix is done and the snapshot 
version to use, I'll check and revert then.
 Can you also share the JIRA that tracks the issue?
 
 With regards to offset commit issue, I'm not sure as to how to proceed 
here. Probably I'll use your fix first and see if the problem reoccurs.

Thanks much
Varaga

On Tue, Sep 27, 2016 at 7:46 PM, Stephan Ewen  wrote:
@CVP

Flink stores in checkpoints in your case only the Kafka offsets (few bytes) and 
the custom state (e).

Here is an illustration of the checkpoint and what is stored (from the Flink 
docs).
https://ci.apache.org/projects/flink/flink-docs-master/internals/stream_checkpointing.html


I am quite puzzled why the offset committing problem occurs only for one input, 
and not for the other.
I am preparing a fix for 1.2, possibly going into 1.1.3 as well.
Could you try out a snapshot version to see if that fixes your problem?

Greetings,
Stephan



On Tue, Sep 27, 2016 at 2:24 PM, Chakravarthy varaga  
wrote:
Hi Stefan,

 Thanks a million for your detailed explanation. I appreciate it.

 -  The zookeeper bundled with kafka 0.9.0.1 was used to start zookeeper. 
There is only 1 instance (standalone) of zookeeper running on my localhost 
(ubuntu 14.04)
 -  There is only 1 Kafka broker (version: 0.9.0.1 )

 With regards to Flink cluster there's only 1 JM & 2 TMs started with no 
HA. I presume this does not use zookeeper anyways as it runs as standalone 
cluster.

 
 BTW., The kafka connector version that I use is as suggested in the flink 
connectors page.
   
              org.apache.flink
              flink-connector-kafka-0.9_2.10
              1.1.1
        
 
 Do you see any issues with versions?
   
 1) Do you have benchmarks wrt., to checkpointing in flink?

     2) There isn't detailed explanation on what states are stored as part of 
the checkpointing process. For ex.,  If I have pipeline like source -> map -> 
keyBy -> map -> sink, my assumption on what's stored is:
 a) The source stream's custom watermarked records
 b) Intermediate states of each of the transformations in the pipeline
 c) Delta of Records stored from the previous sink
 d) Custom States (SayValueState as in my case) - Essentially this is 
what I bother about storing.
 e) All of my operators

  Is my understanding right?

 3) Is there a way in Flink to checkpoint only d) as stated above

 4) Can you apply checkpointing to only streams and certain operators (say 
I wish to store aggregated values part of the transformation)

Best Regards
CVP


On Mon, Sep 26, 2016 at 6:18 PM, Stephan Ewen  wrote:
Thanks, the logs were very helpful!

TL:DR - The offset committing to ZooKeeper is very slow and prevents proper 
starting of checkpoints.

Here is what is happening in detail:

  - Between the point when the TaskManager receives the "trigger checkpoint" 
message and when the point when the KafkaSource actually starts the checkpoint 
is a long time (many seconds) - for one of the Kafka Inputs (the other is fine).
  - The only way this delayed can be introduced is if another checkpoint 
related operation (such as trigger() or notifyComplete() ) is still in progress 
when the checkpoint is started. Flink does not perform concurrent checkpoint 
operations on a single operator, to ease the concurrency model for users.
  - The operation that is still in progress must be the committing of the 
offsets (to ZooKeeper or Kafka). That also explains why this only happens once 
one side receives the first record. Before that, there is nothing to commit.


What Flink should fix:
  - The KafkaConsumer should run the commit operations asynchronously, to not 
block the "notifyCheckpointComplete()" method.

What you can fix:
  - Have a look at your Kafka/ZooKeeper setup. One Kafka Input works well, the 
other does not. Do they go against different sets of brokers, or different 
ZooKeepers? Is the metadata for one input bad?
  - In the next Flink version, you may opt-out of committing offsets to 
Kafka/ZooKeeper all together. It is not important for Flink's checkpoints 
anyways.

Greetings,
Stephan


On Mon, Sep 26, 2016 

Re: Using Flink

2016-10-04 Thread Tzu-Li (Gordon) Tai
Hi Govindarajan,

Regarding the stagnant Kakfa offsets, it’ll be helpful if you can supply more 
information for the following to help us identify the cause:
1. What is your checkpointing interval set to?
2. Did you happen to have set the “max.partition.fetch.bytes” property in the 
properties given to FlinkKafkaConsumer? I’m suspecting with some recent changes 
to the offset committing, large fetches can also affect when offsets are 
committed to Kafka.
3. I’m assuming that you’ve built the Kafka connector from source. Could you 
tell which commit it was built on?

If you could, you can also reply with the taskmanager logs (or via private 
email) so we can check in detail, that would definitely be helpful!

Best Regards,
Gordon


On October 4, 2016 at 3:51:59 PM, Till Rohrmann (trohrm...@apache.org) wrote:

Hi Govindarajan,

you can broadcast the stream with debug logger information by calling 
`stream.broadcast`. Then every stream record should be send to all sub-tasks of 
the downstream operator.

Cheers,
Till

On Mon, Oct 3, 2016 at 5:13 PM, Govindarajan Srinivasaraghavan 
<govindragh...@gmail.com> wrote:
Hi Gordon,

- I'm currently running my flink program on 1.2 SNAPSHOT with kafka source and 
I have checkpoint enabled. When I look at the consumer offsets in kafka it 
appears to be stagnant and there is a huge lag. But I can see my flink program 
is in pace with kafka source in JMX metrics and outputs. Is there a way to 
identify why the offsets are not committed to kafka?

On which commit was your Kafka connector built? There was a recent change to 
the offset committing for Kafka 0.9 consumer, so identifying the exact commit 
will help clarify whether the recent change introduced any new problems. Also, 
what is your configured checkpoint interval? When checkpointing is enabled, the 
Kafka consumer only commits to Kafka when checkpoints are completed. So, 
offsets in Kafka are not updated until the next checkpoint is triggered.

I am using 1.2-SNAPSHOT
'org.apache.flink', name: 'flink-connector-kafka-base_2.11', version: 
'1.2-SNAPSHOT'
'org.apache.flink', name: 'flink-connector-kafka-0.9_2.11', version: 
'1.2-SNAPSHOT'

- In my current application we custom loggers for debugging purposes. Let’s say 
we want to find what’s happening for a particular user, we fire an api request 
to add the custom logger for that particular user and use it for logging along 
the data path. Is there a way to achieve this in flink? Are there any global 
mutable parameters that I can use to achieve this functionality?

I’m not sure if I understand the use case correctly, but it seems like you will 
need to change configuration / behaviour of a specific Flink operator at 
runtime. To my knowledge, the best way to do this in Flink right now is to 
translate your original logging-trigger api requests to a stream of events fed 
to Flink. This stream of events will then basically be changes of your user 
logger behaviour, and your operators can change its logging behaviour according 
to this stream.

I can send the changes as streams, but I need this change for all the operators 
in my pipeline. Instead of using coflatmap at each operator to combine the 
streams, is there a way to send a change to all the operators?

- Can I pass on state between operators? If I need the state stored on previous 
operators, how can I fetch it?
I don’t think this is possible.
Fine, thanks.

Thanks.

On Mon, Oct 3, 2016 at 12:23 AM, Tzu-Li (Gordon) Tai <tzuli...@apache.org> 
wrote:
Hi!

- I'm currently running my flink program on 1.2 SNAPSHOT with kafka source and 
I have checkpoint enabled. When I look at the consumer offsets in kafka it 
appears to be stagnant and there is a huge lag. But I can see my flink program 
is in pace with kafka source in JMX metrics and outputs. Is there a way to 
identify why the offsets are not committed to kafka?

On which commit was your Kafka connector built? There was a recent change to 
the offset committing for Kafka 0.9 consumer, so identifying the exact commit 
will help clarify whether the recent change introduced any new problems. Also, 
what is your configured checkpoint interval? When checkpointing is enabled, the 
Kafka consumer only commits to Kafka when checkpoints are completed. So, 
offsets in Kafka are not updated until the next checkpoint is triggered.

- In my current application we custom loggers for debugging purposes. Let’s say 
we want to find what’s happening for a particular user, we fire an api request 
to add the custom logger for that particular user and use it for logging along 
the data path. Is there a way to achieve this in flink? Are there any global 
mutable parameters that I can use to achieve this functionality?

I’m not sure if I understand the use case correctly, but it seems like you will 
need to change configuration / behaviour of a specific Flink operator at 
runtime. To my knowledge, the best way to do this in Flink right now is to 
translate your orig

Re: Presented Flink use case in Japan

2016-10-04 Thread Tzu-Li (Gordon) Tai
Really great to hear this!

Cheers,
Gordon


On October 4, 2016 at 8:13:27 PM, Till Rohrmann (trohrm...@apache.org) wrote:

It's always great to hear Flink success stories :-) Thanks for sharing it with 
the community. 

I hope Flink helps you to solve even more problems. And don't hesitate to reach 
out to the community whenever you stumble across some Flink problems.

Cheers,
Till

On Tue, Oct 4, 2016 at 2:04 PM, Hironori Ogibayashi  
wrote:
Hello,

Just for information.

Last week, I have presented our Flink use case in my company's conference.
(http://developers.linecorp.com/blog/?p=3992)

Here is the slide.
http://www.slideshare.net/linecorp/b-6-new-stream-processing-platformwith-apache-flink
I think the video with English subtitle will also be published soon.

The use case itself might not be very interesting, but I think this is the
first Flink production use case in Japan opened to the public.

Thank you for great software.

Regards,
Hironori Ogibayashi



Re: Kinesis connector - Iterator expired exception

2016-08-26 Thread Tzu-Li (Gordon) Tai
Hi Josh,

Thank you for reporting this, I’m looking into it. There was some major changes 
to the Kinesis connector after mid June, but the changes don’t seem to be 
related to the iterator timeout, so it may be a bug that had always been there.

I’m not sure yet if it may be related, but may I ask how long was your Flink 
job down before restarting it again from the existing state? Was it longer than 
the retention duration of the Kinesis records (default is 24 hours)?

Regards,
Gordon


On August 26, 2016 at 7:20:59 PM, Josh (jof...@gmail.com) wrote:

Hi all,

I guess this is probably a question for Gordon - I've been using the 
Flink-Kinesis connector for a while now and seen this exception a couple of 
times:

com.amazonaws.services.kinesis.model.ExpiredIteratorException: Iterator 
expired. The iterator was created at time Fri Aug 26 10:47:47 UTC 2016 while 
right now it is Fri Aug 26 11:05:40 UTC 2016 which is further in the future 
than the tolerated delay of 30 milliseconds. (Service: AmazonKinesis; 
Status Code: 400; Error Code: ExpiredIteratorException; Request ID: 
d3db1d90-df97-912b-83e1-3954e766bbe0)

It happens when my Flink job goes down for a couple of hours, then I restart 
from the existing state and it needs to catch up on all the data that has been 
put in Kinesis stream in the hours where the job was down. The job then runs 
for ~15 mins and fails with this exception (and this happens repeatedly - 
meaning I can't restore the job from the existing state).

Any ideas what's causing this? It's possible that it's been fixed in recent 
commits, as the version of the Kinesis connector I'm using is behind master - 
I'm not sure exactly what commit I'm using (doh!) but it was built around mid 
June.

Thanks,
Josh

Re: Kinesis connector - Iterator expired exception

2016-08-26 Thread Tzu-Li (Gordon) Tai
Hi Josh,

Thanks for the description. From your description and a check into the code, 
I’m suspecting what could be happening is that before the consumer caught up to 
the head of the stream, Kinesis was somehow returning the same shard iterator 
on consecutive fetch calls, and the consumer kept on using the same one until 
it eventually timed out.

This is actually suggesting the cause is due to Kinesis-side unexpected 
behaviour, so I probably need to run some long-running tests to clarify / 
reproduce this. The constant "15 minute" fail is suggesting this too, because 
the expire time for shard iterators is actually 5 minutes (from the Kinesis 
docs) …

Either way, it should be possible to handle this in the consumer so that it 
doesn’t fail on such situations. I’ve filed up a JIRA for this: 
https://issues.apache.org/jira/browse/FLINK-4514 .
I’ll get back to you after I figure out the root cause ;)

Regards,
Gordon


On August 26, 2016 at 10:43:02 PM, Josh (jof...@gmail.com) wrote:

Hi Gordon,

My job only went down for around 2-3 hours, and I'm using the default Kinesis 
retention of 24 hours. When I restored the job, it got this exception after 
around 15 minutes (and then restarted again, and got the same exception 15 
minutes later etc) - but actually I found that after this happened around 5 
times the job fully caught up to the head of the stream and started running 
smoothly again.

Thanks for looking into this!

Best,
Josh


On Fri, Aug 26, 2016 at 1:57 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org> 
wrote:
Hi Josh,

Thank you for reporting this, I’m looking into it. There was some major changes 
to the Kinesis connector after mid June, but the changes don’t seem to be 
related to the iterator timeout, so it may be a bug that had always been there.

I’m not sure yet if it may be related, but may I ask how long was your Flink 
job down before restarting it again from the existing state? Was it longer than 
the retention duration of the Kinesis records (default is 24 hours)?

Regards,
Gordon


On August 26, 2016 at 7:20:59 PM, Josh (jof...@gmail.com) wrote:

Hi all,

I guess this is probably a question for Gordon - I've been using the 
Flink-Kinesis connector for a while now and seen this exception a couple of 
times:

com.amazonaws.services.kinesis.model.ExpiredIteratorException: Iterator 
expired. The iterator was created at time Fri Aug 26 10:47:47 UTC 2016 while 
right now it is Fri Aug 26 11:05:40 UTC 2016 which is further in the future 
than the tolerated delay of 30 milliseconds. (Service: AmazonKinesis; 
Status Code: 400; Error Code: ExpiredIteratorException; Request ID: 
d3db1d90-df97-912b-83e1-3954e766bbe0)

It happens when my Flink job goes down for a couple of hours, then I restart 
from the existing state and it needs to catch up on all the data that has been 
put in Kinesis stream in the hours where the job was down. The job then runs 
for ~15 mins and fails with this exception (and this happens repeatedly - 
meaning I can't restore the job from the existing state).

Any ideas what's causing this? It's possible that it's been fixed in recent 
commits, as the version of the Kinesis connector I'm using is behind master - 
I'm not sure exactly what commit I'm using (doh!) but it was built around mid 
June.

Thanks,
Josh



Re: Threading Model for Kinesis

2016-08-23 Thread Tzu-Li (Gordon) Tai
Slight misunderstanding here. The one thread per Kafka broker happens
*after* the assignment of Kafka partitions to the source instances. So,
with a total of 10 partitions and 10 source instances, each source instance
will first be assigned 1 partition. Then, each source instance will create
1 thread for every individual broker that holds partitions that the source
instance is assigned. The per-broker threading model of the Kafka consumer
has nothing to do with the initial assignment of partitions to source
instances.

Another example to explain this more clearly:
Say you have 2 Kafka brokers, and each hold 5 partitions, and have source
parallelism 5. Each source instance will still have 2 partitions. If the
2 partitions belong to the same broker, the source instance will have only
1 consuming threads; otherwise if the 2 partitions belong to different
brokers, the source instance will have 2 consuming threads.

Regards,
Gordon


On August 23, 2016 at 8:47:15 PM, Sameer W (sam...@axiomine.com) wrote:

Gordon,

I tried the following with Kafka - 1 Broker but a topic has 10 partitions.
I have a parallelism of 10 defined for the job. I see all my 10
source->Mapper->assignTimestamps receiving and sending data. If there is
only one source instance per broker how does that happen?

Thanks,
Sameer

On Tue, Aug 23, 2016 at 7:17 AM, Tzu-Li (Gordon) Tai <tzuli...@gmail.com>
wrote:

> Hi!
>
> Kinesis shards should be ideally evenly assigned to the source instances.
> So, with your example of source parallelism of 10 and 20 shards, each
> source instance will have 2 shards and will have 2 threads consuming them
> (therefore, not in round robin).
>
> For the Kafka consumer, in the source instances there will be one
> consuming thread per broker, instead of partition. So, if a source instance
> is assigned partitions that happen to be on the same broker, the source
> instance will only create 1 thread to consume all of them.
>
> You are correct that currently the Kafka consumer does not handle
> repartitioning transparently like the Kinesis connector, but we’re working
> on this :)
>
> Regards,
> Gordon
>
> On August 23, 2016 at 6:50:31 PM, Sameer W (sam...@axiomine.com) wrote:
>
> Hi,
>
> The documentation says that there will be one thread per shard. If I my
> streaming job runs with a parallelism of 10 and there are 20 shards, are
> more threads going to be launched within  a task slot running a source
> function to consume the additional shards or will one source function
> instance consume 2 shards in round robin.
>
> Is it any different for Kafka? Based on the documentation my understanding
> is that if there are 10 source function instances and 20 partitions, each
> one will read 2 partitions.
>
> Also if partitions are added to Kafka are they handled by the existing
> streaming job or does it need to be restarted? It appears as though Kinesis
> handles it via the consumer constantly checking for more shards.
>
> Thanks,
> Sameer
>
>


Re: Threading Model for Kinesis

2016-08-23 Thread Tzu-Li (Gordon) Tai
Hi!

Kinesis shards should be ideally evenly assigned to the source instances.
So, with your example of source parallelism of 10 and 20 shards, each
source instance will have 2 shards and will have 2 threads consuming them
(therefore, not in round robin).

For the Kafka consumer, in the source instances there will be one consuming
thread per broker, instead of partition. So, if a source instance is
assigned partitions that happen to be on the same broker, the source
instance will only create 1 thread to consume all of them.

You are correct that currently the Kafka consumer does not handle
repartitioning transparently like the Kinesis connector, but we’re working
on this :)

Regards,
Gordon

On August 23, 2016 at 6:50:31 PM, Sameer W (sam...@axiomine.com) wrote:

Hi,

The documentation says that there will be one thread per shard. If I my
streaming job runs with a parallelism of 10 and there are 20 shards, are
more threads going to be launched within  a task slot running a source
function to consume the additional shards or will one source function
instance consume 2 shards in round robin.

Is it any different for Kafka? Based on the documentation my understanding
is that if there are 10 source function instances and 20 partitions, each
one will read 2 partitions.

Also if partitions are added to Kafka are they handled by the existing
streaming job or does it need to be restarted? It appears as though Kinesis
handles it via the consumer constantly checking for more shards.

Thanks,
Sameer


Re: Default timestamps for Event Time when no Watermark Assigner used?

2016-08-23 Thread Tzu-Li (Gordon) Tai
No, it does not default to Ingestion Time. For other connectors in general,
you have to explicitly call `assignTimestampAndWatermarks()` before the
first operator in the topology that works on time (ex. windows), otherwise
the job will fail as soon as records start incoming.

Currently, I think only the Kinesis connector and, shortly in the future,
Kafka 0.10 connector will have default timestamps when the topology uses
Event Time. Otherwise, the behaviour is described as above.

Regards,
Gordon


On August 23, 2016 at 7:34:25 PM, Sameer W (sam...@axiomine.com) wrote:

Thanks - Is there also a default behavior for non Kinesis streams? If I set
the time characteristics as Event Time but do not assign timestamps or
generate watermarks by invoking the assignTimestampsAndWatermarks
function, does
that default to using Ingestion time. Or in other words is it like I
invoking this method on the source stream-

assignTimestampsAndWatermarks(new IngestionTimeExtractor<>())

Sameer

On Tue, Aug 23, 2016 at 7:29 AM, Tzu-Li (Gordon) Tai <tzuli...@gmail.com>
wrote:

> Hi,
>
> For the Kinesis consumer, when you use Event Time but do not explicitly
> assign timestamps, the Kinesis server-side timestamp (the time which
> Kinesis received the record) is attached to the record as default, not
> Flink’s ingestion time.
>
> Does this answer your question?
>
> Regards,
> Gordon
>
>
> On August 23, 2016 at 6:42:26 PM, Sameer W (sam...@axiomine.com) wrote:
>
> Hi,
>
> If you do not explicitly assign timestamps and watermarks when using Event
> Time, does it automatically default to using Ingestion Time?
>
> I was reading the Kinesis integration section and came across the note
> below and which raised the above question. I saw another place where you
> explicitly use Event Time with ingestion time with the following - .
> assignTimestampsAndWatermarks(new IngestionTimeExtractor<>());.
>
> Does that line have to called explicitly or is it the default?
>
>
> "If streaming topologies choose to use the event time notion
> <https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/event_time.html>
>  for
> record timestamps, an *approximate arrival timestamp* will be used by
> default. This timestamp is attached to records by Kinesis once they were
> successfully received and stored by streams. Note that this timestamp is
> typically referred to as a Kinesis server-side timestamp, and there are no
> guarantees about the accuracy or order correctness (i.e., the timestamps
> may not always be ascending)."
>
> Thanks,
> Sameer
>
>


Re: Threading Model for Kinesis

2016-08-23 Thread Tzu-Li (Gordon) Tai
Hi Sameer,

I realized you might be a bit confused between “source instances (which in
general are Flink tasks)” and “threads” in my previous explanations. The
per-broker threads in the Kafka consumer and per-shard threads in the
Kinesis consumer I mentioned are threads created by the source instance’s
main thread. So, they have nothing to do with the assignment of
shard/partitions to the source instances. The threading models previously
explained refers to how a single source instance consumes multiple
shards/partitions that are assigned to it.

Hope this clarifies things for you more :)

Regards,
Gordon


On August 23, 2016 at 9:31:58 PM, Tzu-Li (Gordon) Tai (tzuli...@gmail.com)
wrote:

Slight misunderstanding here. The one thread per Kafka broker happens
*after* the assignment of Kafka partitions to the source instances. So,
with a total of 10 partitions and 10 source instances, each source instance
will first be assigned 1 partition. Then, each source instance will create
1 thread for every individual broker that holds partitions that the source
instance is assigned. The per-broker threading model of the Kafka consumer
has nothing to do with the initial assignment of partitions to source
instances.

Another example to explain this more clearly:
Say you have 2 Kafka brokers, and each hold 5 partitions, and have source
parallelism 5. Each source instance will still have 2 partitions. If the
2 partitions belong to the same broker, the source instance will have only
1 consuming threads; otherwise if the 2 partitions belong to different
brokers, the source instance will have 2 consuming threads.

Regards,
Gordon


On August 23, 2016 at 8:47:15 PM, Sameer W (sam...@axiomine.com) wrote:

Gordon,

I tried the following with Kafka - 1 Broker but a topic has 10 partitions.
I have a parallelism of 10 defined for the job. I see all my 10
source->Mapper->assignTimestamps receiving and sending data. If there is
only one source instance per broker how does that happen?

Thanks,
Sameer

On Tue, Aug 23, 2016 at 7:17 AM, Tzu-Li (Gordon) Tai <tzuli...@gmail.com>
wrote:

> Hi!
>
> Kinesis shards should be ideally evenly assigned to the source instances.
> So, with your example of source parallelism of 10 and 20 shards, each
> source instance will have 2 shards and will have 2 threads consuming them
> (therefore, not in round robin).
>
> For the Kafka consumer, in the source instances there will be one
> consuming thread per broker, instead of partition. So, if a source instance
> is assigned partitions that happen to be on the same broker, the source
> instance will only create 1 thread to consume all of them.
>
> You are correct that currently the Kafka consumer does not handle
> repartitioning transparently like the Kinesis connector, but we’re working
> on this :)
>
> Regards,
> Gordon
>
> On August 23, 2016 at 6:50:31 PM, Sameer W (sam...@axiomine.com) wrote:
>
> Hi,
>
> The documentation says that there will be one thread per shard. If I my
> streaming job runs with a parallelism of 10 and there are 20 shards, are
> more threads going to be launched within  a task slot running a source
> function to consume the additional shards or will one source function
> instance consume 2 shards in round robin.
>
> Is it any different for Kafka? Based on the documentation my understanding
> is that if there are 10 source function instances and 20 partitions, each
> one will read 2 partitions.
>
> Also if partitions are added to Kafka are they handled by the existing
> streaming job or does it need to be restarted? It appears as though Kinesis
> handles it via the consumer constantly checking for more shards.
>
> Thanks,
> Sameer
>
>


Re: Default timestamps for Event Time when no Watermark Assigner used?

2016-08-23 Thread Tzu-Li (Gordon) Tai
Hi,

For the Kinesis consumer, when you use Event Time but do not explicitly
assign timestamps, the Kinesis server-side timestamp (the time which
Kinesis received the record) is attached to the record as default, not
Flink’s ingestion time.

Does this answer your question?

Regards,
Gordon


On August 23, 2016 at 6:42:26 PM, Sameer W (sam...@axiomine.com) wrote:

Hi,

If you do not explicitly assign timestamps and watermarks when using Event
Time, does it automatically default to using Ingestion Time?

I was reading the Kinesis integration section and came across the note
below and which raised the above question. I saw another place where you
explicitly use Event Time with ingestion time with the following
- .assignTimestampsAndWatermarks(new IngestionTimeExtractor<>());.

Does that line have to called explicitly or is it the default?


"If streaming topologies choose to use the event time notion

for
record timestamps, an *approximate arrival timestamp* will be used by
default. This timestamp is attached to records by Kinesis once they were
successfully received and stored by streams. Note that this timestamp is
typically referred to as a Kinesis server-side timestamp, and there are no
guarantees about the accuracy or order correctness (i.e., the timestamps
may not always be ascending)."

Thanks,
Sameer


Re: Using Flink

2016-10-03 Thread Tzu-Li (Gordon) Tai
Hi!

- I'm currently running my flink program on 1.2 SNAPSHOT with kafka source and 
I have checkpoint enabled. When I look at the consumer offsets in kafka it 
appears to be stagnant and there is a huge lag. But I can see my flink program 
is in pace with kafka source in JMX metrics and outputs. Is there a way to 
identify why the offsets are not committed to kafka?

On which commit was your Kafka connector built? There was a recent change to 
the offset committing for Kafka 0.9 consumer, so identifying the exact commit 
will help clarify whether the recent change introduced any new problems. Also, 
what is your configured checkpoint interval? When checkpointing is enabled, the 
Kafka consumer only commits to Kafka when checkpoints are completed. So, 
offsets in Kafka are not updated until the next checkpoint is triggered.

- In my current application we custom loggers for debugging purposes. Let’s say 
we want to find what’s happening for a particular user, we fire an api request 
to add the custom logger for that particular user and use it for logging along 
the data path. Is there a way to achieve this in flink? Are there any global 
mutable parameters that I can use to achieve this functionality?

I’m not sure if I understand the use case correctly, but it seems like you will 
need to change configuration / behaviour of a specific Flink operator at 
runtime. To my knowledge, the best way to do this in Flink right now is to 
translate your original logging-trigger api requests to a stream of events fed 
to Flink. This stream of events will then basically be changes of your user 
logger behaviour, and your operators can change its logging behaviour according 
to this stream.

- Can I pass on state between operators? If I need the state stored on previous 
operators, how can I fetch it?

I don’t think this is possible.


Best Regards,
Gordon


On October 3, 2016 at 2:08:31 PM, Govindarajan Srinivasaraghavan 
(govindragh...@gmail.com) wrote:

Hi,

 

I have few questions on how I need to model my use case in flink. Please 
advise. Thanks for the help.

 

- I'm currently running my flink program on 1.2 SNAPSHOT with kafka source and 
I have checkpoint enabled. When I look at the consumer offsets in kafka it 
appears to be stagnant and there is a huge lag. But I can see my flink program 
is in pace with kafka source in JMX metrics and outputs. Is there a way to 
identify why the offsets are not committed to kafka?

 

- In my current application we custom loggers for debugging purposes. Let’s say 
we want to find what’s happening for a particular user, we fire an api request 
to add the custom logger for that particular user and use it for logging along 
the data path. Is there a way to achieve this in flink? Are there any global 
mutable parameters that I can use to achieve this functionality?

 

- Can I pass on state between operators? If I need the state stored on previous 
operators, how can I fetch it?

 

Thanks

Re: FlinkKafkaConsumer and Kafka topic/partition change

2016-09-27 Thread Tzu-Li (Gordon) Tai
Hi!

This is definitely a planned feature for the Kafka connectors, there’s a JIRA 
exactly for this [1].
We’re currently going through some blocking tasks to make this happen, I also 
hope to speed up things over there :)

Your observation is correct that the Kaka consumer uses “assign()” instead of 
“subscribe()”.
This is due to the fact that the partition-to-subtask assignment needs to be 
determinate in Flink
for exactly-once semantics.
If you’re not concerned about exactly-once and want to experiment around for 
now before [1] comes around,
I believe Robert has recently implemented a Kafka consumer that uses 
“subscribe()”, so the Kafka
topics can scale (looping in Robert to provide more info about this one).

Best Regards,
Gordon

[1] https://issues.apache.org/jira/browse/FLINK-4022


On September 27, 2016 at 6:17:06 PM, Hironori Ogibayashi (ogibaya...@gmail.com) 
wrote:

Hello,  

I want FlinkKafkaConsumer to follow changes in Kafka topic/partition change.  
This means:  
- When we add partitions to a topic, we want FlinkKafkaConsumer to  
start reading added partitions.  
- We want to specify topics by pattern (e.g accesslog.*), and want  
FlinkKafkaConsumer to start reading new topics if they appeared after  
starting job.  

As long as reading source code and my experiment, FlinkKafkaConsumer  
uses KafkaConsumer.assign() instead of subscribe(), so partitions are  
assigned to each KafkaConsumer instance just once at job starting  
time.  

Is there any way to let FlinkKafkaConsumer follow topic/partition change?  

Regards,  
Hironori Ogibayashi  


Re: Why use Kafka after all?

2016-11-22 Thread Tzu-Li (Gordon) Tai
Hi Matt,

Just to be clear, what I'm looking for is a way to serialize a POJO class for 
Kafka but also for Flink, I'm not sure the interface of both frameworks are 
compatible but it seems they aren't.

For Kafka (producer) I need a Serializer and a Deserializer class, and for 
Flink (consumer) a SerializationSchema and DeserializationSchema class.

Any example of how to put this together would be greatly appreciated.

There’s actually a related JIRA for this: 
https://issues.apache.org/jira/browse/FLINK-4050.
The corresponding PR is https://github.com/apache/flink/pull/2705, which adds 
wrappers for the Kafka serializers.
Is this feature what you’re probably looking for?

Best Regards,
Gordon


On November 18, 2016 at 12:11:23 PM, Matt (dromitl...@gmail.com) wrote:

Just to be clear, what I'm looking for is a way to serialize a POJO class for 
Kafka but also for Flink, I'm not sure the interface of both frameworks are 
compatible but it seems they aren't.

For Kafka (producer) I need a Serializer and a Deserializer class, and for 
Flink (consumer) a SerializationSchema and DeserializationSchema class.

Any example of how to put this together would be greatly appreciated.

On Thu, Nov 17, 2016 at 9:12 PM, Dromit  wrote:
Tzu-Li Tai, thanks for your response.

I've seen the example you mentioned before, TaxiRideSchema.java, but it's way 
too simplified.

In a real POJO class you may have multiple fields such as integers, strings, 
doubles, etc. So serializing them as a string like in the example wouldn't work 
(you can't put together two arbitrary strings and later split the byte array to 
get each of them, same for two integers, and nearly any other types).

I feel there should be a more general way of doing this regardless of the 
fields on the class you're de/serializing.

What do you do in these cases? It should be a pretty common scenario!

Regards,
Matt

On Wed, Nov 16, 2016 at 2:01 PM, Philipp Bussche  
wrote:
Hi Dromit

I started using Flink with Kafka but am currently looking into Kinesis to
replace Kafka.
The reason behind this is that eventually my application will run in
somebody's cloud and if I go for AWS then I don't have to take care of
operating Kafka and Zookeeper myself. I understand this can be a challenging
task.
Up to know where the Kafka bit is only running in a local test environment I
am happy running it as I just start 2 Docker containers and it does the job.
But this also means I have no clue how Kafka really works and what I need to
be careful with.
Besides knowledge which is required as it seems for Kafka costs is another
aspect here.
If one wants to operate a Kafka cluster plus Zookeeper on let's say the
Amazon cloud this might actually be more expensive than "just" using Kinesis
as a service.
There are apparently draw backs in terms of functionality and performance
but for my use case that does not seem to matter.

Philipp



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Why-use-Kafka-after-all-tp10112p10155.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.




Re: Subtask keeps on discovering new Kinesis shard when using Kinesalite

2016-11-16 Thread Tzu-Li (Gordon) Tai
Hi Phillip,

Thanks for testing it. From your log and my own tests, I can confirm the 
problem is with Kinesalite not correctly
mocking the official Kinesis behaviour for the `describeStream` API.

There’s a PR for the fix here: https://github.com/apache/flink/pull/2822. With 
this change, shard discovery
should work normally when tested against Kinesalite.

However, I’m not completely sure yet if the fix is viable, and would like to 
wait for others to take a look / review.
Therefore, it might not make it into the next Flink minor bugfix release. If 
you’d like, you can try out the patch for now
and see if the problem remains.

Best Regards,
Gordon

On November 17, 2016 at 1:07:44 AM, Philipp Bussche (philipp.buss...@gmail.com) 
wrote:

Hello Gordon,  

thank you for your help. I have set the discovery interval to 30 seconds and  
just starting the job on a clean kinesalite service (I am running it inside  
docker so every time the container gets stopped and removed to start from  
scratch).  

This is the output without actually any data in the stream:  

11/16/2016 17:59:03 Source: Custom Source -> Sink: Unnamed(1/1) switched to 
 
RUNNING  
17:59:04,673 INFO  
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher  
- Subtask 0 will be seeded with initial shard  
KinesisStreamShard{streamName='TestKinesisStream', shard='{ShardId:  
shardId-,HashKeyRange: {StartingHashKey: 0,EndingHashKey:  
340282366920938463463374607431768211455},SequenceNumberRange:  
{StartingSequenceNumber:  
49567694685205596999719397165301965297537316555774230530,}}'}, starting  
state set as sequence number LATEST_SEQUENCE_NUM  
17:59:04,674 INFO  
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher  
- Subtask 0 will start consuming seeded shard  
KinesisStreamShard{streamName='TestKinesisStream', shard='{ShardId:  
shardId-,HashKeyRange: {StartingHashKey: 0,EndingHashKey:  
340282366920938463463374607431768211455},SequenceNumberRange:  
{StartingSequenceNumber:  
49567694685205596999719397165301965297537316555774230530,}}'} from sequence  
number LATEST_SEQUENCE_NUM with ShardConsumer 0  
17:59:04,689 INFO  
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher  
- Subtask 0 has discovered a new shard  
KinesisStreamShard{streamName='TestKinesisStream', shard='{ShardId:  
shardId-,HashKeyRange: {StartingHashKey: 0,EndingHashKey:  
340282366920938463463374607431768211455},SequenceNumberRange:  
{StartingSequenceNumber:  
49567694685205596999719397165301965297537316555774230530,}}'} due to  
resharding, and will start consuming the shard from sequence number  
EARLIEST_SEQUENCE_NUM with ShardConsumer 1  
17:59:08,817 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
- Triggering checkpoint 1 @ 1479315548815  
17:59:08,835 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
- Completed checkpoint 1 (in 20 ms)  
17:59:13,815 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
- Triggering checkpoint 2 @ 1479315553815  
17:59:13,817 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
- Completed checkpoint 2 (in 1 ms)  
17:59:18,814 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
- Triggering checkpoint 3 @ 1479315558814  
17:59:18,815 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
- Completed checkpoint 3 (in 1 ms)  
17:59:23,815 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
- Triggering checkpoint 4 @ 1479315563815  
17:59:23,816 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
- Completed checkpoint 4 (in 1 ms)  
17:59:28,814 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
- Triggering checkpoint 5 @ 1479315568813  
17:59:28,814 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
- Completed checkpoint 5 (in 1 ms)  
17:59:33,814 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
- Triggering checkpoint 6 @ 1479315573814  
17:59:33,815 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
- Completed checkpoint 6 (in 1 ms)  
17:59:34,704 INFO  
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher  
- Subtask 0 has discovered a new shard  
KinesisStreamShard{streamName='TestKinesisStream', shard='{ShardId:  
shardId-,HashKeyRange: {StartingHashKey: 0,EndingHashKey:  
340282366920938463463374607431768211455},SequenceNumberRange:  
{StartingSequenceNumber:  
49567694685205596999719397165301965297537316555774230530,}}'} due to  
resharding, and will start consuming the shard from sequence number  
EARLIEST_SEQUENCE_NUM with ShardConsumer 2  

I then restarted the kinesalite container and posted a message to the stream  
before the 30 second mark occurred. The output shows that the job consumes  
from the 2 shards discovered initially (I initialized kinsalite with one  
shard only) right away and then continues to consume 

Re: Why use Kafka after all?

2016-11-15 Thread Tzu-Li (Gordon) Tai
Hi Matt,

Here’s an example of writing a DeserializationSchema for your POJOs: [1].

As for simply writing messages from WebSocket to Kafka using a Flink job, while 
it is absolutely viable, I would not recommend it,
mainly because you’d never know if you might need to temporarily shut down 
Flink jobs (perhaps for a version upgrade).

Shutting down the WebSocket consuming job, would then, of course, lead to 
missing messages during the shutdown time.
It would be perhaps simpler if you have a separate Kafka producer application 
to directly ingest messages from the WebSocket to Kafka.
You wouldn’t want this application to be down at all, so that all messages can 
safely land into Kafka first. I would recommend to keep this part
as simple as possible.

From there, like Till explained, your Flink processing pipelines can rely on 
Kafka’s replayability to provide exactly-once processing guarantees on your 
data.

Best,
Gordon


[1] 
https://github.com/dataArtisans/flink-training-exercises/blob/master/src/main/java/com/dataartisans/flinktraining/exercises/datastream_java/utils/TaxiRideSchema.java




On November 16, 2016 at 1:07:12 PM, Dromit (dromitl...@gmail.com) wrote:

"So in your case I would directly ingest my messages into Kafka"

I will do that through a custom SourceFunction that reads the messages from the 
WebSocket, creates simple java objects (POJOs) and sink them in a Kafka topic 
using a FlinkKafkaProducer, if that makes sense.

The problem now is I need a DeserializationSchema for my class. I read Flink is 
able to de/serialize POJO objects by its own, but I'm still required to provide 
a serializer to create the FlinkKafkaProducer (and FlinkKafkaConsumer).

Any idea or example? Should I create a DeserializationSchema for each POJO 
class I want to put into a Kafka stream?



On Tue, Nov 15, 2016 at 7:43 AM, Till Rohrmann  wrote:
Hi Matt,

as you've stated Flink is a stream processor and as such it needs to get its 
inputs from somewhere. Flink can provide you up to exactly-once processing 
guarantees. But in order to do this, it requires a re-playable source because 
in case of a failure you might have to reprocess parts of the input you had 
already processed prior to the failure. Kafka is such a source and people use 
it because it happens to be one of the most popular and widespread open source 
message queues/distributed logs.

If you don't require strong processing guarantees, then you can simply use the 
WebSocket source. But, for any serious use case, you probably want to have 
these guarantees because otherwise you just might calculate bogus results. So 
in your case I would directly ingest my messages into Kafka and then let Flink 
read from the created topic to do the processing.

Cheers,
Till

On Tue, Nov 15, 2016 at 8:14 AM, Dromit  wrote:
Hello,

As far as I've seen, there are a lot of projects using Flink and Kafka 
together, but I'm not seeing the point of that. Let me know what you think 
about this.

1. If I'm not wrong, Kafka provides basically two things: storage (records 
retention) and fault tolerance in case of failure, while Flink mostly cares 
about the transformation of such records. That means I can write a pipeline 
with Flink alone, and even distribute it on a cluster, but in case of failure 
some records may be lost, or I won't be able to reprocess the data if I change 
the code, since the records are not kept in Flink by default (only when sinked 
properly). Is that right?

2. In my use case the records come from a WebSocket and I create a custom class 
based on messages on that socket. Should I put those records inside a Kafka 
topic right away using a Flink custom source (SourceFunction) with a Kafka sink 
(FlinkKafkaProducer), and independently create a Kafka source (KafkaConsumer) 
for that topic and pipe the Flink transformations there? Is that data flow fine?

Basically what I'm trying to understand with both question is how and why 
people are using Flink and Kafka.

Regards,
Matt




Re: flink-dist shading

2016-11-18 Thread Tzu-Li (Gordon) Tai
Hi Craig,

I think the email wasn't sent to the ‘dev’ list, somehow.

Have you tried this:

mvn clean install -DskipTests
# In Maven 3.3 the shading of flink-dist doesn't work properly in one run, so 
we need to run mvn for flink-dist again.
cd flink-dist
mvn clean install -DskipTests
I agree that it’ll affect downstream users who need to build Flink themselves, 
and would be best if it can be resolved.
The above is still more or less a “workaround”, but since I don’t really know 
the reason for why the newer Maven versions
won’t properly shade, we’ll probably need to wait for others more knowledgable 
on the build infrastructure to chime in and
see if there’s a good long-term solution.

Best Regards,
Gordon
On November 19, 2016 at 8:48:32 AM, Foster, Craig (foscr...@amazon.com) wrote:

I’m not even sure this was delivered to the ‘dev’ list but I’ll go ahead and 
forward the same email to the ‘user’ list since I haven’t seen a response.

---

 

I’m following up on the issue in FLINK-5013 about flink-dist specifically 
requiring Maven 3.0.5 through to <3.3. This affects people who build Flink with 
BigTop (not only EMR), so I’m wondering about the context and how we can 
properly shade the Apache HTTP libraries so that flink-dist can be built with a 
current version of Maven. Any insight into this would be helpful.

 

Thanks!

Craig

 

Re: Subtask keeps on discovering new Kinesis shard when using Kinesalite

2016-11-15 Thread Tzu-Li (Gordon) Tai
Hi Philipp,

When used against Kinesalite, can you tell if the connector is already reading 
data from the test shard before any
of the shard discovery messages? If you have any spare time to test this, you 
can set a larger value for the
`ConsumerConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS` in the config 
properties to easier test this.

If yes, I’m suspecting the problem is that Kinesalite hasn’t sticked to the 
actual Kinesis behaviour for some of the APIs.
Specifically, I think the problem is with the `describeStream(streamName, 
lastSeenShardId)` Kinesis API, where the
expected behaviour is that the returned shard list only contains shardIds after 
`lastSeenShardId`. Perhaps Kinesalite
didn’t follow the behaviour on this part. That’s why the connector kept on 
determining that it’s a new discovered shard.

I’ll investigate and try to reproduce the problem, and see if there’s a good 
way to workaround this for Kinesalite.
Thank you for reporting the issue, I’ve filed up a JIRA 
(https://issues.apache.org/jira/browse/FLINK-5075) for this.

Best,
Gordon


On November 16, 2016 at 5:03:17 AM, Philipp Bussche (philipp.buss...@gmail.com) 
wrote:

has discovered a new shard 
KinesisStreamShard

Re: ExpiredIteratorException when reading from a Kinesis stream

2016-11-03 Thread Tzu-Li (Gordon) Tai
Hi Josh,

That warning message was added as part of FLINK-4514. It pops out whenever a 
shard iterator was used after 5 minutes it was returned from Kinesis.
The only time spent between after a shard iterator was returned and before it 
was used to fetch the next batch of records, is on deserializing and emitting 
of the records of the last fetched batch.
So unless processing of the last fetched batch took over 5 minutes, this 
normally shouldn’t happen.

Have you noticed any sign of long, constant full GC for your Flink task 
managers? From your description and check in code, the only possible guess I 
can come up with now is that
the source tasks completely seized to be running for a period of time, and when 
it came back, the shard iterator was unexpectedly found to be expired. 
According to the graph you attached,
when the iterator was refreshed and tasks successfully fetched a few more 
batches, the source tasks again halted, and so on.
So you should see that same warning message right before every small peak 
within the graph.

Best Regards,
Gordon


On November 3, 2016 at 7:46:42 PM, Josh (jof...@gmail.com) wrote:

Hey Gordon,

I've been using Flink 1.2-SNAPSHOT for the past week (with FLINK-4514) with no 
problems, but yesterday the Kinesis consumer started behaving strangely... My 
Kinesis data stream is fairly constant at around 1.5MB/sec, however the Flink 
Kinesis consumer started to stop consuming for periods of time (see the spikes 
in graph attached which shows data consumed by the Flink Kinesis consumer)

Looking in the task manager logs, there are no exceptions however there is this 
log message which I believe is related to the problem:
2016-11-03 09:27:53,782 WARN  
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer  - 
Encountered an unexpected expired iterator 
AAF8OJyh+X3yBnbtzUgIfXv+phS7PKppd7q09/tduXG3lOhCmBGPUOlZul24tzSSM6KjHsQ+AbZY8MThKcSvGax/EoOIYoTELYbZmuwY4hgeqUsndxLIM0HL55iejroBV8YFmUmGwHsW8qkHsz//Ci4cxcLrGArHex3n+4E+aoZ9AtgTPEZOBjXY49g+VGsDb0bQN5FJUoUVEfnbupk96ore
 for shard KinesisStreamShard{streamName='stream001', shard='{ShardId: 
shardId-,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 
85070511730234615865841151857942042863},SequenceNumberRange: 
{StartingSequenceNumber: 
49566542916923648892164247926679091159472198219567464450,}}'}; refreshing the 
iterator ...

Having restarted the job from my last savepoint, it's consuming the stream fine 
again with no problems.

Do you have any idea what might be causing this, or anything I should do to 
investigate further?

Cheers,

Josh


On Wed, Oct 5, 2016 at 4:55 AM, Tzu-Li (Gordon) Tai <tzuli...@apache.org> wrote:
Hi Steffen,

Turns out that FLINK-4514 just missed Flink 1.1.2 and wasn’t included in the 
release (I’ll update the resolve version in JIRA to 1.1.3, thanks for noticing 
this!).
The Flink community is going to release 1.1.3 asap, which will include the fix.
If you don’t want to wait for the release and want to try the fix now, you can 
also build on the current “release-1.1” branch, which already has FLINK-4514 
merged.
Sorry for the inconvenience. Let me know if you bump into any other problems 
afterwards.

Best Regards,
Gordon


On October 5, 2016 at 2:56:21 AM, Steffen Hausmann (stef...@hausmann-family.de) 
wrote:

Hi there,

I'm running a Flink 1.1.2 job on EMR and Yarn that is reading events
from a Kinesis stream. However, after a while (the exact duration varies
and is in the order of minutes) the Kinesis source doesn't emit any
further events and hence Flink doesn't produce any further output.
Eventually, an ExpiredIteratorException occurs in one of the task,
causing the entire job to fail:

> com.amazonaws.services.kinesis.model.ExpiredIteratorException: Iterator 
> expired. The iterator was created at time Mon Oct 03 18:40:30 UTC 2016 while 
> right now it is Mon Oct 03 18:45:33 UTC 2016 which is further in the future 
> than the tolerated delay of 30 milliseconds. (Service: AmazonKinesis; 
> Status Code: 400; Error Code: ExpiredIteratorException; Request ID: 
> dace9532-9031-54bc-8aa2-3cbfb136d590)

This seems to be related to FLINK-4514, which is marked as resovled for
Flink 1.1.2. In contrast to what is describe in the ticket, the job I'm
running isn't suspended but hangs just a few minutes after the job has
been started.

I've attached a log file showing the described behavior.

Any idea what may be wrong?

Thanks,
Steffen



Re: A question regarding to the checkpoint mechanism

2016-10-16 Thread Tzu-Li (Gordon) Tai
Users don’t need to explicitly make a copy of the state. Take checkpointing 
instance fields as operator state for example [1].
You simply return your current state in `snapshotState()`, and Flink will take 
care of snapshotting and persisting it to the state backend.
The persisting process does not block processing of input records if you 
implement the `CheckpointedAsynchronously` interface (which is usually the more 
desirable case).
The same goes for key-partitioned states.

Best Regards,
Gordon

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/state.html#checkpointing-instance-fields

On October 17, 2016 at 11:32:07 AM, Li Wang (wangli1...@gmail.com) wrote:

Hi Gordon,

Thanks for your prompt reply.
So do you mean when we are about to checkpoint the state of an operator, we 
first copy its state and then checkpoint the copied state while the operator 
continues processing?

Thanks,
Li


On Oct 17, 2016, at 11:10 AM, Tzu-Li (Gordon) Tai <tzuli...@apache.org> wrote:

Hi!

No, the operator does not need to pause processing input records while the 
checkpointing of its state is in progress.
The checkpointing of operator state is asynchronous. The operator state does 
not need to be immutable, since its a copy of the snapshot state that’s 
checkpointed.

Regards,
Gordon


On October 17, 2016 at 10:28:34 AM, Li Wang (wangli1...@gmail.com) wrote:

Hi All, 

Any feedback is highly appreciated. 

Thanks. 
Li 

> On Oct 15, 2016, at 11:17 AM, Li Wang <wangli1...@gmail.com> wrote: 
>  
> Hi all, 
>  
> As far as I know, a stateful operator will checkpoint its current state to a 
> persistent storage when it receives all the barrier from all of its upstream 
> operators. My question is that does the operator doing the checkpoint need to 
> pause processing the input tuples for the next batch until the checkpoint is 
> done? If yes, will it introduce significant processing latency when the state 
> is large. If no, does this need the operator state to be immutable? 
>  
> Thanks, 
> Li



Re: Serializers and Schemas

2016-12-07 Thread Tzu-Li (Gordon) Tai
Hi Matt,

1. There’s some in-progress work on wrapper util classes for Kafka 
de/serializers here [1] that allows
Kafka de/serializers to be used with the Flink Kafka Consumers/Producers with 
minimal user overhead.
The PR also has some proposed adds to the documentations for the wrappers.

2. I feel that it would be good to have more documentation on Flink’s 
de/serializers because they’ve been
frequently asked about on the mailing lists, but at the same time, probably the 
fastest / efficient de/serialization
approach would be tailored for each use case, so we’d need to think more on the 
presentation and the purpose
of the documentation.

Cheers,
Gordon

[1] https://github.com/apache/flink/pull/2705

On December 8, 2016 at 5:00:19 AM, milind parikh (milindspar...@gmail.com) 
wrote:

Why not use a self-describing format  (json), stream as String and read through 
a json reader and avoid top-level reflection?

Github.com/milindparikh/streamingsi

https://github.com/milindparikh/streamingsi/tree/master/epic-poc/sprint-2-simulated-data-no-cdc-advanced-eventing/2-dataprocessing

?

Apologies if I misunderstood the question. But I can quite see how to model 
your Product class (or indeed POJO) in a fairly generic way ( assumes JSON).

The real issues faced when you have different versions of same POJO class 
requires storing enough information to dynamically instantiate the actual 
version of the class; which I believe is beyond the simple use case.

Milind

On Dec 7, 2016 2:42 PM, "Matt"  wrote:
I've read your example, but I've found the same problem. You're serializing 
your POJO as a string, where all fields are separated by "\t". This may work 
for you, but not in general.

https://github.com/luigiselmi/pilot-sc4-fcd-producer/blob/master/src/main/java/eu/bde/sc4pilot/fcd/FcdTaxiEvent.java#L60

I would like to see a more "generic" approach for the class Product in my last 
message. I believe a more general purpose de/serializer for POJOs should be 
possible to achieve using reflection.

On Wed, Dec 7, 2016 at 1:16 PM, Luigi Selmi  wrote:
Hi Matt,

I had the same problem, trying to read some records in event time using a POJO, 
doing some transformation and save the result into Kafka for further 
processing. I am not yet done but maybe the code I wrote starting from the 
Flink Forward 2016 training docs could be useful.

https://github.com/luigiselmi/pilot-sc4-fcd-producer


Best,

Luigi 

On 7 December 2016 at 16:35, Matt  wrote:
Hello,

I don't quite understand how to integrate Kafka and Flink, after a lot of 
thoughts and hours of reading I feel I'm still missing something important.

So far I haven't found a non-trivial but simple example of a stream of a custom 
class (POJO). It would be good to have such an example in Flink docs, I can 
think of many many scenarios in which using SimpleStringSchema is not an 
option, but all Kafka+Flink guides insist on using that.

Maybe we can add a simple example to the documentation [1], it would be really 
helpful for many of us. Also, explaining how to create a Flink 
De/SerializationSchema from a Kafka De/Serializer would be really useful and 
would save a lot of time to a lot of people, it's not clear why you need both 
of them or if you need both of them.

As far as I know Avro is a common choice for serialization, but I've read 
Kryo's performance is much better (true?). I guess though that the fastest 
serialization approach is writing your own de/serializer.

1. What do you think about adding some thoughts on this to the documentation?
2. Can anyone provide an example for the following class?

---
public class Product {
    public String code;
    public double price;
    public String description;
    public long created;
}
---

Regards,
Matt

[1] http://data-artisans.com/kafka-flink-a-practical-how-to/



--
Luigi Selmi, M.Sc.
Fraunhofer IAIS Schloss Birlinghoven . 
53757 Sankt Augustin, Germany
Phone: +49 2241 14-2440




Re: Partitioning operator state

2016-12-07 Thread Tzu-Li (Gordon) Tai
Hi Dominik,

Do you mean how Flink redistributes an operator’s state when the parallelism of 
the operator is changed?
If so, you can take a look at [1] and [2].

Cheers,
Gordon

[1] https://issues.apache.org/jira/browse/FLINK-3755
[2] 
https://docs.google.com/document/d/1G1OS1z3xEBOrYD4wSu-LuBCyPUWyFd9l3T9WyssQ63w/edit#

On December 8, 2016 at 4:40:18 AM, Dominik Safaric (dominiksafa...@gmail.com) 
wrote:

Hi everyone,  

In the case of scaling out a Flink cluster, how does Flink handle operator 
state partitioning of a staged topology?  

Regards,  
Dominik  



Re: Flink Kafka producer with a topic per message

2016-12-07 Thread Tzu-Li (Gordon) Tai
Hi,

The FlinkKafkaProducers currently support per record topic through the
user-provided serialization schema which has a "getTargetTopic(T element)" 
method called for per record,
and also decides the partition the record will be sent to through a custom 
KafkaPartitioner, which is also provided 
by the user when creating a FlinkKafkaProducer.

Does this already provide the functionality you’ve mentioned? Or have I 
misunderstood what you have in mind?

Cheers,
Gordon


On December 7, 2016 at 5:55:24 PM, Sanne de Roever (sanne.de.roe...@gmail.com) 
wrote:

The next step would be to determine the impact on the interface of a Sink. 
Currently a Kafka sink has one topic, for example:



Re: Fault tolerance guarantees of Elasticsearch sink in flink-elasticsearch2?

2017-01-13 Thread Tzu-Li (Gordon) Tai
Hi Andrew,

Your observations are correct. Like you mentioned, the current problem circles 
around how we deal with the pending buffered requests with accordance to 
Flink’s checkpointing.
I’ve filed a JIRA for this, as well as some thoughts for the solution in the 
description: https://issues.apache.org/jira/browse/FLINK-5487. What do you 
think?

Thank you for bringing this up! We should probably fix this soon.
There’s already some on-going effort in fixing some other aspects of proper 
at-least-once support in the Elasticsearch sinks, so I believe this will be 
brought to attention very soon too.

Cheers,
Gordon




On January 11, 2017 at 3:49:06 PM, Andrew Roberts (arobe...@fuze.com) wrote:

I’m trying to understand the guarantees made by Flink’s Elasticsearch sink in 
terms of message delivery. according to (1), the ES sink offers at-least-once 
guarantees. This page doesn’t differentiate between flink-elasticsearch and 
flink-elasticsearch2, so I have to assume for the moment that they both offer 
that guarantee. However, a look at the code (2) shows that the invoke() method 
puts the record into a buffer, and then that buffer is flushed to elasticsearch 
some time later.



Re: Kafka topic partition skewness causes watermark not being emitted

2017-01-13 Thread Tzu-Li (Gordon) Tai
Hi,

This is expected behaviour due to how the per-partition watermarks are designed 
in the Kafka consumer, but I think it’s probably a good idea to handle idle 
partitions also when the Kafka consumer itself emits watermarks. I’ve filed a 
JIRA issue for this: https://issues.apache.org/jira/browse/FLINK-5479.

For the time being, I don’t think there will be an easy way to avoid this with 
the existing APIs, unfortunately. Is the skewed partition data intentional, or 
only for experimental purposes?

Best,
Gordon

On January 12, 2017 at 5:28:40 PM, tao xiao (xiaotao...@gmail.com) wrote:

Hi team,

I have a topic with 2 partitions in Kafka. I produced all data to partition 0 
and no data to partition 1. I created a Flink job with parallelism to 1 that 
consumes that topic and count the events with session event window (5 seconds 
gap). It turned out that the session event window was never closed even I sent 
a message with 10 minutes gap. After digging into the source code, 
AbstractFetcher[1] that is responsible for sending watermark to downstream 
calculates the min watermark of all partitions. Due to the fact that we don't 
have data in partition 1, the watermark returned from partition 1is always 
Long.MIN_VALUE therefore AbstractFetcher never fires the watermark to 
downstream. 

I want to know if this is expected behavior or a bug. If this is expected 
behavior how do I avoid the delay of watermark firing when data is not evenly 
distributed to all partitions?

This is the timestamp extractor I used

public class ExactTimestampExtractor implements 
AssignerWithPeriodicWatermarks {

private long currentMaxTimestamp = Long.MIN_VALUE;

@Nullable
@Override
public Watermark getCurrentWatermark() {
return new Watermark(currentMaxTimestamp == Long.MIN_VALUE ? Long.MIN_VALUE : 
currentMaxTimestamp - 1);
}

@Override
public long extractTimestamp(SessionEvent element, long 
previousElementTimestamp) {
long eventStartTime = (long) element.get(SessionEvent.SESSION_START_DT);
if (eventStartTime > currentMaxTimestamp) {
currentMaxTimestamp = eventStartTime;
}

return eventStartTime;
}
}

and this is the Flink topo

// get input data
FlinkKafkaConsumer010 consumer = new 
FlinkKafkaConsumer010<>("topic4",
new MyOwnSchema()
consumer.assignTimestampsAndWatermarks(new ExactTimestampExtractor());
DataStream input = env.addSource(consumer);

input.
keyBy("id").
window(EventTimeSessionWindows.withGap(Time.seconds(5))).
reduce(new Reducer(), new WindowFunction()).
print();

//        // execute program
env.execute("a job");

I used the latest code in github

[1] 
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java#L539


Re: Kafka 09 consumer does not commit offsets

2017-01-09 Thread Tzu-Li (Gordon) Tai
Good to know!


On January 10, 2017 at 1:06:29 PM, Renjie Liu (liurenjie2...@gmail.com) wrote:

Hi, all:
I used kafka connector 0.10 and the problem is fixed. I think this maybe caused 
by incompatible between consumer 0.9 and broker 0.10.
Thanks Henri and Gordon.

On Tue, Jan 10, 2017 at 4:46 AM Henri Heiskanen <henri.heiska...@gmail.com> 
wrote:
Hi,

We had the same problem when running 0.9 consumer against 0.10 Kafka. Upgrading 
Flink Kafka connector to 0.10 fixed our issue.

Br,
Henkka

On Mon, Jan 9, 2017 at 5:39 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org> wrote:
Hi,

Not sure what might be going on here. I’m pretty certain that for 
FlinkKafkaConsumer09 when checkpointing is turned off, the internally used 
KafkaConsumer client will auto commit offsets back to Kafka at a default 
interval of 5000ms (the default value for “auto.commit.interval.ms”).

Could you perhaps provide the logs of your job (you can send them to me 
privately if you prefer to)?
From the logs we should be able to see if the internal KafkaConsumer client is 
correctly configured to auto commit and also check if anything strange is going 
on.

Also, how are you reading the committed offsets in Kafka? I recall there was a 
problem with the 08 consumer that resulted in the Kafka cli not correctly 
showing committed offsets of consumer groups.
However, the 08 consumer had this problem only because we had to implement the 
auto offset committing ourselves. I don’t think this should be a issue for the 
09 consumer, since we’re solely relying on the Kafka client’s own 
implementation to do the auto offset committing.

Cheers,
Gordon


On January 9, 2017 at 7:55:33 PM, Timo Walther (twal...@apache.org) wrote:

I'm not a Kafka expert but maybe Gordon (in CC) knows more.

Timo


Am 09/01/17 um 11:51 schrieb Renjie Liu:
> Hi, all:
> I'm using flink 1.1.3 and kafka consumer 09. I read its code and it
> says that the kafka consumer will turn on auto offset commit if
> checkpoint is not enabled. I've turned off checkpoint and it seems
> that kafka client is not committing to offsets to kafka? The offset is
> important for helping us monitoring. Anyone has encountered this before?
> --
> Liu, Renjie
> Software Engineer, MVAD



--
Liu, Renjie
Software Engineer, MVAD

Re: Kafka 09 consumer does not commit offsets

2017-01-09 Thread Tzu-Li (Gordon) Tai
Hi,

Not sure what might be going on here. I’m pretty certain that for 
FlinkKafkaConsumer09 when checkpointing is turned off, the internally used 
KafkaConsumer client will auto commit offsets back to Kafka at a default 
interval of 5000ms (the default value for “auto.commit.interval.ms”).

Could you perhaps provide the logs of your job (you can send them to me 
privately if you prefer to)?
From the logs we should be able to see if the internal KafkaConsumer client is 
correctly configured to auto commit and also check if anything strange is going 
on.

Also, how are you reading the committed offsets in Kafka? I recall there was a 
problem with the 08 consumer that resulted in the Kafka cli not correctly 
showing committed offsets of consumer groups.
However, the 08 consumer had this problem only because we had to implement the 
auto offset committing ourselves. I don’t think this should be a issue for the 
09 consumer, since we’re solely relying on the Kafka client’s own 
implementation to do the auto offset committing.

Cheers,
Gordon


On January 9, 2017 at 7:55:33 PM, Timo Walther (twal...@apache.org) wrote:

I'm not a Kafka expert but maybe Gordon (in CC) knows more.  

Timo  


Am 09/01/17 um 11:51 schrieb Renjie Liu:  
> Hi, all:  
> I'm using flink 1.1.3 and kafka consumer 09. I read its code and it  
> says that the kafka consumer will turn on auto offset commit if  
> checkpoint is not enabled. I've turned off checkpoint and it seems  
> that kafka client is not committing to offsets to kafka? The offset is  
> important for helping us monitoring. Anyone has encountered this before?  
> --  
> Liu, Renjie  
> Software Engineer, MVAD  




Re: Joining two kafka streams

2017-01-08 Thread Tzu-Li (Gordon) Tai
Hi Igor!

What you can actually do is let a single FlinkKafkaConsumer consume from both 
topics, producing a single DataStream which you can keyBy afterwards.
All versions of the FlinkKafkaConsumer support consuming multiple Kafka topics 
simultaneously. This is logically the same as union and then a keyBy, like what 
you described.

Note that this approach requires that the records in both of your Kafka topics 
are of the same type when consumed into Flink (ex., same POJO classes, or 
simply both as Strings, etc.).
If that isn’t possible and you have different data types / schemas for the 
topics, you’d probably need to use “connect” and then a keyBy.

If you’re applying a window directly after joining the two topic streams, you 
could also use a window join:
dataStream.join(otherStream)
.where().equalTo()
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply (new JoinFunction () {...});
The “where” specifies how to select the key from the first stream, and 
“equalTo” the second one.

Hope this helps, let me know if you have other questions!

Cheers,
Gordon

On January 9, 2017 at 4:06:34 AM, igor.berman (igor.ber...@gmail.com) wrote:

Hi,  
I have usecase when I need to join two kafka topics together by some fields.  
In general, I could put content of one topic into another, and partition by  
same key, but I can't touch those two topics(i.e. there are other consumers  
from those topics), on the other hand it's essential to process same keys at  
same "thread" to achieve locality and not to get races when working with  
same key from different machines/threads  

my idea is to use union of two streams and then key by the field,  
but is there better approach to achieve "locality"?  

any inputs will be appreciated  
Igor  



--  
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Joining-two-kafka-streams-tp10912.html
  
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.  


Re: Kafka KeyedStream source

2017-01-09 Thread Tzu-Li (Gordon) Tai
Hi Niels,

Thank you for bringing this up. I recall there was some previous discussion 
related to this before: [1].

I don’t think this is possible at the moment, mainly because of how the API is 
designed.

On the other hand, a KeyedStream in Flink is basically just a DataStream with a 
hash partitioner that is used when deciding which instance of the following 
downstream operator an emitted record of the stream is sent to.
So, even if we have a Kafka source that directly produces a KeyedStream on 
“addSource”, redistribution of data can still happen. I.e., if the parallelism 
of the compute operators right after is different than the number of Kafka 
partitions, redistribution will happen to let the key space and state be evenly 
distributed in Flink.

This leads to the argument that we probably need to think about whether 
retaining the original partitioning of records in Kafka when consumed by Flink 
is actually only a special case.
Flink, as a parallel compute engine, can freely adjust the parallelism of its 
operators regardless of the parallelism of Kafka topics (rescaling isn’t 
actually in yet, but is on the near-future roadmap).

So, under the general case, the parallelism of a Flink operator may be 
different than the number of Kafka partitions, and therefore redistributing 
must occur.
For redistribution to not need to take place right after an already partitioned 
Kafka topic, you’d need identical numbers of 1) Kafka partitions, 2) Flink 
source instances consuming the partitions, and 3) the parallelism of the keyed 
computation afterwards. This seems like a very specific situation, considering 
that you’ll be able to rescale Flink operators as the data’s key space / volume 
grows.

The main observation, I think, is that Flink itself maintains how the key space 
is partitioned within the system, which plays a crucial part in rescaling. 
That’s why by default it doesn’t respect existing partitioning of the key space 
in Kafka (or other external sources). Even if it initially does at the 
beginning of a job, partitioning will most likely change as you rescale your 
job / operators (which is a good thing, to be able to adapt).

Cheers,
Gordon

[1] 
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/kafka-partition-assignment-td12123.html

On January 6, 2017 at 1:38:05 AM, Niels Basjes (ni...@basjes.nl) wrote:

Hi,

In my scenario I have click stream data that I persist in Kafka.
I use the sessionId as the key to instruct Kafka to put everything with the 
same sessionId into the same Kafka partition. That way I already have all 
events of a visitor in a single kafka partition in a fixed order.

When I read this data into Flink I get a generic data stream ontop of which I 
have to do a keyBy before my processing can continue. Such a keyBy will 
redistribute the data again to later tasks that can do the actual work.

Is it possible to create an adapted version of the Kafka source that 
immediately produces a keyed data stream?
 

--
Best regards / Met vriendelijke groeten,

Niels Basjes

[ANNOUNCE] Apache Flink 1.1.5 Released

2017-03-23 Thread Tzu-Li (Gordon) Tai
The Apache Flink community is pleased to announce the availability of Flink 
1.1.5, which is the next bugfix release for the 1.1 series.
The official release announcement: 
https://flink.apache.org/news/2017/03/23/release-1.1.5.html
Release binaries: http://apache.lauf-forum.at/flink/flink-1.1.5
For users of the Flink 1.1 series, please update your Maven dependencies to the 
new 1.1.5 version and update your binaries.
On behalf of the community, I would like to thank everybody who contributed to 
the release.



Re: Question Regarding a sink..

2017-03-23 Thread Tzu-Li (Gordon) Tai
Hi Steve,

This normally shouldn’t happen, unless there simply is two copies of the data.

What is the source of the topology? Also, this might be obvious, but if you 
have broadcasted your input stream to the sink, then each sink instance would 
then get all records in the input stream.

Cheers,
Gordon

On March 24, 2017 at 9:11:35 AM, Steve Jerman (st...@kloudspot.com) wrote:

Hi,

I have a sink writing data to InfluxDB. I’ve noticed that the sink gets 
multiple copies of upstream records..

Why does this happen, and how can I avoid it… ?

Below is a trace …showing 2 records (I have a parallelism of two) for each 
record in the ‘.printToError’ for the same stream.

Any help/suggestions appreciated.

Steve


1> Point [name=Presence, time=1490316086000, tags={ap=1OPUA7IKN9, 
associated=false, dwellTime=less_than_fifteen_mins, location=1OPUA7IKN9, 
returned=false, sessionEnd=false}, precision=MILLISECONDS, 
fields={client=2F:A4:BD:56:EC:4D, sessionTime=15000}]
2> Point [name=Presence, time=1490316086000, tags={ap=1OPUA7IKN9, 
associated=false, dwellTime=less_than_fifteen_mins, location=1OPUA7IKN9, 
returned=false, sessionEnd=false}, precision=MILLISECONDS, 
fields={client=17:8E:FC:7E:F7:20, sessionTime=2}]
1> Point [name=Presence, time=1490316086000, tags={ap=1OPUA7IKN9, 
associated=false, dwellTime=less_than_fifteen_mins, location=1OPUA7IKN9, 
returned=false, sessionEnd=false}, precision=MILLISECONDS, 
fields={client=40:07:2D:CB:41:07, sessionTime=5000}]
2> Point [name=Presence, time=1490316086000, tags={ap=1OPUA7IKN9, 
associated=false, dwellTime=less_than_fifteen_mins, location=1OPUA7IKN9, 
returned=false, sessionEnd=false}, precision=MILLISECONDS, 
fields={client=28:24:0B:B6:42:CA, sessionTime=5000}]
1> Point [name=Presence, time=1490316086000, tags={ap=1OPUA7IKN9, 
associated=false, dwellTime=less_than_fifteen_mins, location=1OPUA7IKN9, 
returned=false, sessionEnd=false}, precision=MILLISECONDS, 
fields={client=8F:13:AC:4A:DA:93, sessionTime=5000}]
2> Point [name=Presence, time=1490316086000, tags={ap=1OPUA7IKN9, 
associated=false, dwellTime=less_than_fifteen_mins, location=1OPUA7IKN9, 
returned=false, sessionEnd=false}, precision=MILLISECONDS, 
fields={client=43:7D:8A:D4:7D:D7, sessionTime=5000}]
2> Point [name=Presence, time=1490316086000, tags={ap=1OPUA7IKN9, 
associated=false, dwellTime=less_than_fifteen_mins, location=1OPUA7IKN9, 
returned=false, sessionEnd=false}, precision=MILLISECONDS, 
fields={client=77:CD:BD:48:EE:D8, sessionTime=5000}]
18:41:27,351 INFO  com.kloudspot.flink.sink.InfluxSink                          
 - Point [name=Presence, time=1490316086000, tags={ap=1OPUA7IKN9, 
associated=false, dwellTime=less_than_fifteen_mins, location=1OPUA7IKN9, 
returned=false, sessionEnd=false}, precision=MILLISECONDS, 
fields={client=2F:A4:BD:56:EC:4D, sessionTime=15000}]
18:41:27,351 INFO  com.kloudspot.flink.sink.InfluxSink                          
 - Point [name=Presence, time=1490316086000, tags={ap=1OPUA7IKN9, 
associated=false, dwellTime=less_than_fifteen_mins, location=1OPUA7IKN9, 
returned=false, sessionEnd=false}, precision=MILLISECONDS, 
fields={client=2F:A4:BD:56:EC:4D, sessionTime=15000}]
18:41:27,351 INFO  com.kloudspot.flink.sink.InfluxSink                          
 - Point [name=Presence, time=1490316086000, tags={ap=1OPUA7IKN9, 
associated=false, dwellTime=less_than_fifteen_mins, location=1OPUA7IKN9, 
returned=false, sessionEnd=false}, precision=MILLISECONDS, 
fields={client=17:8E:FC:7E:F7:20, sessionTime=2}]
18:41:27,351 INFO  com.kloudspot.flink.sink.InfluxSink                          
 - Point [name=Presence, time=1490316086000, tags={ap=1OPUA7IKN9, 
associated=false, dwellTime=less_than_fifteen_mins, location=1OPUA7IKN9, 
returned=false, sessionEnd=false}, precision=MILLISECONDS, 
fields={client=17:8E:FC:7E:F7:20, sessionTime=2}]
18:41:27,351 INFO  com.kloudspot.flink.sink.InfluxSink                          
 - Point [name=Presence, time=1490316086000, tags={ap=1OPUA7IKN9, 
associated=false, dwellTime=less_than_fifteen_mins, location=1OPUA7IKN9, 
returned=false, sessionEnd=false}, precision=MILLISECONDS, 
fields={client=40:07:2D:CB:41:07, sessionTime=5000}]
18:41:27,351 INFO  com.kloudspot.flink.sink.InfluxSink                          
 - Point [name=Presence, time=1490316086000, tags={ap=1OPUA7IKN9, 
associated=false, dwellTime=less_than_fifteen_mins, location=1OPUA7IKN9, 
returned=false, sessionEnd=false}, precision=MILLISECONDS, 
fields={client=40:07:2D:CB:41:07, sessionTime=5000}]
18:41:27,351 INFO  com.kloudspot.flink.sink.InfluxSink                          
 - Point [name=Presence, time=1490316086000, tags={ap=1OPUA7IKN9, 
associated=false, dwellTime=less_than_fifteen_mins, location=1OPUA7IKN9, 
returned=false, sessionEnd=false}, precision=MILLISECONDS, 
fields={client=28:24:0B:B6:42:CA, sessionTime=5000}]
18:41:27,351 INFO  com.kloudspot.flink.sink.InfluxSink                          
 - Point [name=Presence, time=1490316086000, tags={ap=1OPUA7IKN9, 

Re: Flink 1.2 time window operation

2017-03-30 Thread Tzu-Li (Gordon) Tai
Hi,

Thanks for the clarification.

What are the reasons behind consuming/producing messages from/to Kafka while 
the window has not expired yet?
First, some remarks here -  sources (in your case the Kafka consumer) will not 
stop fetching / producing data when the windows haven’t fired yet. Does this 
explain what you have plotted in the diagram you attached (sorry, I can’t 
really reason about the diagram because I’m not so sure what the values of the 
x-y axes represent)?

If you’re writing the outputs of the window operation to Kafka (by adding a 
Kafka sink after the windowing), then yes it should only write to Kafka when 
the window has fired. The characteristics will also differ for different types 
of windows, so you should definitely take a look at the Windowing docs [1] 
about them.

Cheers,
Gordon

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#window-assigners
On March 30, 2017 at 2:37:41 PM, Dominik Safaric (dominiksafa...@gmail.com) 
wrote:

What are the reasons behind consuming/producing messages from/to Kafka while 
the window has not expired yet?

Re: 20 times higher throughput with Window function vs fold function, intended?

2017-03-30 Thread Tzu-Li (Gordon) Tai
I'm wondering what I can tweak further to increase this. I was reading in this 
blog: https://data-artisans.com/extending-the-yahoo-streaming-benchmark/
about 3 millions per sec with only 20 partitions. So i'm sure I should be able 
to squeeze out more out of it.

Not really sure if it is relevant under the context of your case, but you could 
perhaps try tweaking the maximum size of Kafka records fetched on each poll on 
the partitions.
You can do this by setting a higher value for “max.partition.fetch.bytes” in 
the provided config properties when instantiating the consumer; that will 
directly configure the internal Kafka clients.
Generally, all Kafka settings are applicable through the provided config 
properties, so you can perhaps take a look at the Kafka docs to see what else 
there is to tune for the clients.

On March 30, 2017 at 6:11:27 PM, Kamil Dziublinski 
(kamil.dziublin...@gmail.com) wrote:

I'm wondering what I can tweak further to increase this. I was reading in this 
blog: https://data-artisans.com/extending-the-yahoo-streaming-benchmark/
about 3 millions per sec with only 20 partitions. So i'm sure I should be able 
to squeeze out more out of it.

Re: Kinesis connector SHARD_GETRECORDS_MAX default value

2017-03-22 Thread Tzu-Li (Gordon) Tai
Hi Steffan,

I have to admit that I didn’t put too much thoughts in the default values for 
the Kinesis consumer.

I’d say it would be reasonable to change the default values to follow KCL’s 
settings. Could you file a JIRA for this?

In general, we might want to reconsider all the default values for configs 
related to the getRecords call, i.e.
- SHARD_GETRECORDS_MAX
- SHARD_GETRECORDS_INTERVAL_MILLIS
- SHARD_GETRECORDS_BACKOFF_*

Cheers,
Gordon

On March 23, 2017 at 2:12:32 AM, Steffen Hausmann (stef...@hausmann-family.de) 
wrote:

Hi there,  

I recently ran into problems with a Flink job running on an EMR cluster  
consuming events from a Kinesis stream receiving roughly 15k  
event/second. Although the EMR cluster was substantially scaled and CPU  
utilization and system load were well below any alarming threshold, the  
processing of events of the stream increasingly fell behind.  

Eventually, it turned out that the SHARD_GETRECORDS_MAX defaults to 100  
which is apparently causing too much overhead when consuming events from  
the stream. Increasing the value to 5000, a single GetRecords call to  
Kinesis can retrieve up to 10k records, made the problem go away.  

I wonder why the default value for SHARD_GETRECORDS_MAX is chosen so low  
(100x less than it could be). The Kinesis Client Library defaults to  
5000 and it's recommended to use this default value:  
http://docs.aws.amazon.com/streams/latest/dev/troubleshooting-consumers.html#consumer-app-reading-slower.
  

Thanks for the clarification!  

Cheers,  
Steffen  


Re: How to rewind Kafka cursors into a Flink job ?

2017-03-30 Thread Tzu-Li (Gordon) Tai
Hi Dominique,

What your plan A is suggesting is that a downstream operator can provide 
signals to upstream operators and alter their behaviour.
In general, this isn’t possible, as in a distributed streaming environment it’s 
hard to guarantee what records exactly will be altered by the behaviour.

I would say plan B would be the approach to go for this.
Also, in Flink 1.3.0, the FlinkKafkaConsumer will allow users to define if they 
want to start from the earliest, latest, or some specific offset, completely 
independent of the committed consumer group offsets in Kafka.
This should also come in handy for what you have in mind. Have a look at 
https://issues.apache.org/jira/browse/FLINK-4280 for more details on this :)

Cheers,
Gordon

On March 28, 2017 at 12:35:38 AM, Dominique De Vito (ddv36...@gmail.com) wrote:

Hi,

Is there a way to rewind Kafka cursors (while using here Kafka as a consumer) 
from (inside) a Flink job ?

Use case [plan A]
* The Flink job would listen 1 main "data" topic + 1 secondary "event" topic
* In case of a given event, the Flink job would rewind all Kafka cursors of the 
"data" topic, to go back to the latest cursors and retreat data from there.

Use case-bis [plan A-bis] :
* The Flink job would listen 1 main "data" topic, dealing with data according 
to some params
* This Flink job would listen a WS and in case of a given event, the Flink job 
would rewind all Kafka cursors of the "data" topic, to go back from the latest 
cursors and retreat data from there, according to some new params.

Plan B ;-)
* Listen the events from outside Flink, and in case of an event, stop the Flink 
and relaunch it.

So, if someone has any hint about how to rewind for [plan A] and/or [plan 
A-bis] => thank you !
 
Regards,
Dominique



Re: Apache Flink Hackathon

2017-03-30 Thread Tzu-Li (Gordon) Tai
Sounds like a cool event! Thanks for sharing this!


On March 27, 2017 at 11:40:24 PM, Lior Amar (lior.a...@parallelmachines.com) 
wrote:

Hi all,

My name is Lior and I am working at Parallel Machines (a startup company 
located in the Silicon Valley).

We are hosting a Flink Hackathon on April 10, 3pm - 8pm at Hotel Majestic in 
San Francisco.
(During the first day of Flink Forward, training day)

More details at the meet up event:
https://www.meetup.com/Parallel-Machines-Meetup/events/238390498/


See you there :-)

--lior

Re: Flink 1.2 time window operation

2017-03-30 Thread Tzu-Li (Gordon) Tai
Hi Dominik,

Was the job running with processing time or event time? If event time, how are 
you producing the watermarks?
Normally to understand how windows are firing in Flink, these two factors would 
be the place to look at.
I can try to further explain this once you provide info with these. Also, are 
you using Kafka 0.10?

Cheers,
Gordon

On March 27, 2017 at 11:25:49 PM, Dominik Safaric (dominiksafa...@gmail.com) 
wrote:

Hi all,  

Lately I’ve been investigating onto the performance characteristics of Flink 
part of our internal benchmark. Part of this we’ve developed and deployed an 
application that pools data from Kafka, groups the data by a key during a fixed 
time window of a minute.  

In total, the topic that the KafkaConsumer pooled from consists of 100 million 
messages each of 100 bytes size. What we were expecting is that no records will 
be neither read nor produced back to Kafka for the first minute of the window 
operation - however, this is unfortunately not the case. Below you may find a 
plot showing the number of records produced per second.  

Could anyone provide an explanation onto the behaviour shown in the graph 
below? What are the reasons behind consuming/producing messages from/to Kafka 
while the window has not expired yet?  



Re: Flink 1.2 time window operation

2017-03-31 Thread Tzu-Li (Gordon) Tai
Hi Dominik,

I see, thanks for explaining the diagram.

This is expected because the 1 minute window in your case is aligned with the 
beginning of every minute.

For example, if the first element element comes at 12:10:45, then the element 
will be put in the window of 12:10:00 to 12:10:59.
Therefore, it will fire after 14 seconds instead of 1 minute.

Does that explain what you are experiencing?

Cheers,
Gordon


On March 31, 2017 at 3:06:56 AM, Dominik Safaric (dominiksafa...@gmail.com) 
wrote:

First, some remarks here -  sources (in your case the Kafka consumer) will not 
stop fetching / producing data when the windows haven’t fired yet.

This is for sure true. However, the plot shows the number of records produced 
per second, where each record was assigned a created at timestamp while being 
created and before being pushed back to Kafka. Sorry I did not clarify this 
before. Anyway, because of this I would expect to have a certain lag. Of 
course, messages will not only be produced into Kafka exactly at window expiry 
and then the produced shutdown - however, what concerns me is that messages 
were produced to Kafka before the first window expired - hence the questions. 

If you’re writing the outputs of the window operation to Kafka (by adding a 
Kafka sink after the windowing), then yes it should only write to Kafka when 
the window has fired.

Hence, I this behaviour that you’ve described and we’ve expected did not occur. 

If it would help, I can share the source code and a detail Flink configuration. 

Cheers,
Dominik

On 30 Mar 2017, at 13:09, Tzu-Li (Gordon) Tai <tzuli...@apache.org> wrote:

Hi,

Thanks for the clarification.

What are the reasons behind consuming/producing messages from/to Kafka while 
the window has not expired yet?
First, some remarks here -  sources (in your case the Kafka consumer) will not 
stop fetching / producing data when the windows haven’t fired yet. Does this 
explain what you have plotted in the diagram you attached (sorry, I can’t 
really reason about the diagram because I’m not so sure what the values of the 
x-y axes represent)?

If you’re writing the outputs of the window operation to Kafka (by adding a 
Kafka sink after the windowing), then yes it should only write to Kafka when 
the window has fired. The characteristics will also differ for different types 
of windows, so you should definitely take a look at the Windowing docs [1] 
about them.

Cheers,
Gordon

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#window-assigners
On March 30, 2017 at 2:37:41 PM, Dominik Safaric (dominiksafa...@gmail.com) 
wrote:

What are the reasons behind consuming/producing messages from/to Kafka while 
the window has not expired yet?



Re: 回复:Re: flink one transformation end,the next transformation start

2017-03-31 Thread Tzu-Li (Gordon) Tai
Sorry, I just realized our previous conversation on this question was done via 
private email and not to user@flink.apache.org

Forwarding the previous content of the conversation back to the mailing list:

On March 30, 2017 at 4:15:46 PM, rimin...@sina.cn (rimin...@sina.cn) wrote:

the job can run success,but the result is error.
the record 1 and the record 14 is same,so the vector compute cos value is 1,but 
on the yarn the value is not 1,and others are different from the result which 
run on local.

so,i guess,the step:
1 val data = env.readTextFile("hdfs:///")//DataSet[(String,Array[String])]
2 val dataVec = computeDataVect(data)//DataSet[(String,Int,Array[(Int,Double)])]
3 val rescomm = computeCosSims 
(dataVec)//DataSet[(String,Array[(String,Double)])]
the record is from 1,2,3;but the step 3 must start when step 2 is end,because 
step 3 compute all record cos sim value must use all data.so is there some 
operate can set the step 3 start when step 2 is end.
- 原始邮件 -----
发件人:"Tzu-Li (Gordon) Tai" <tzuli...@apache.org>
收件人:rimin...@sina.cn
主题:Re: flink one transformation end,the next transformation start
日期:2017年03月30日 15点54分

Hi,

What exactly is the error you’re running into on YARN? You should be able to 
find them in the TM logs.
It’ll be helpful to understand the problem if you can provide them (just the 
relevant parts of the error will do).
Otherwise, I currently can not tell much from the information here.

Cheers,
Gordon


On March 30, 2017 at 3:33:53 PM, rimin...@sina.cn (rimin...@sina.cn) wrote:

hi,all,
i run a job,it is :
-
val data = env.readTextFile("hdfs:///")//DataSet[(String,Array[String])]
val dataVec = computeDataVect(data)//DataSet[(String,Int,Array[(Int,Double)])]
val rescomm = computeCosSims (dataVec)//DataSet[(String,Array[(String,Double)])]

but when run on the yarn cluster,the result was error,the job can success;and 
run on the local,in eclipse on my computer,the result is correct.

so,i run twice,
first:
val data = env.readTextFile("hdfs:///")//DataSet[(String,Array[String])]
val dataVec = computeDataVect(data)//DataSet[(String,Int,Array[(Int,Double)])]
dataVec.writeAsText("hdfs///vec")//the vector is correct,

second:
val readVec = 
env.readTextFile("hdfs:///vec").map(...)//DataSet[(String,Int,Array[(Int,Double)])]
val rescomm = computeCosSims (dataVec)//DataSet[(String,Array[(String,Double)])]
and the result is correct,is the same as on local,in eclispe.
--
someone can solve the problem?



Re: shaded version of legacy kafka connectors

2017-03-21 Thread Tzu-Li (Gordon) Tai
Hi,

There currently isn’t any shaded version Kafka 0.8 connector version available, 
so yes, you would need to do build that yourself.

I’m not completely sure if there will be any class name clashes, because the 
Kafka 0.8 API is typically packaged under `kafka.javaapi.*`, while in 0.9 / 
0.10 they’re under `org.apache.kafka.*`.

But relocating the `kafka.javaapi.*` classes to a different path is probably 
the straightforward safe way to go, it shouldn’t have any issues. You shouldn’t 
need to relocate the Flink Kafka connector classes (under 
`o.a.f.streaming.connectors.kafka.*`) because there aren’t any name clashes for 
them between 0.8 and 0.10.

Please feel free to let me know if you bump into any questions with this :-)

Cheers,
Gordon

On March 21, 2017 at 12:43:02 AM, Gwenhael Pasquiers 
(gwenhael.pasqui...@ericsson.com) wrote:

Hi,

 

Before doing it myself I thought it would be better to ask.

We need to consume from kafka 0.8 and produce to kafka 0.10 in a flink app.

I guess there will be classes and package names conflicts for a lot of 
dependencies of both connectors.

 

The obvious solution it to make a “shaded” version of the kafka 0.8 connector 
so that it can coexist with the 0.10 version.

 

Does it already exists ?

Re: Data+control stream from kafka + window function - not working

2017-03-16 Thread Tzu-Li (Gordon) Tai
Hi Tarandeep,

I haven’t looked at the rest of the code yet, but my first guess is that you 
might not be reading any data from Kafka at all:

private static DataStream readKafkaStream(String topic, 
StreamExecutionEnvironment env) throws IOException {

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "group-0009");
properties.setProperty("auto.offset.reset", "smallest");
return env.addSource(new FlinkKafkaConsumer08<>(topic, new 
SimpleStringSchema(), properties));
}

Have you tried using a different “group.id” everytime you’re re-running the job?
Note that the “auto.offset.reset” value is only respected when there aren’t any 
offsets for the group committed in Kafka.
So you might not actually be reading the complete “small_input.cv” dataset, 
unless you use a different group.id overtime.

Cheers,
Gordon

On March 16, 2017 at 2:39:10 PM, Tarandeep Singh (tarand...@gmail.com) wrote:

Hi,

I am using flink-1.2 and reading data stream from Kafka (using 
FlinkKafkaConsumer08). I want to connect this data stream with another stream 
(read control stream) so as to do some filtering on the fly. After filtering, I 
am applying window function (tumbling/sliding event window) along with fold 
function. However, the window function does not get called.

Any help to debug/fix this is greatly appreciated!

Below is a reproducible code that one can run in IDE like IntelliJ or on flink 
cluster. You will need to have a running Kafka cluster (local or otherwise).
Create a topic and add test data points-

$KAFKA_HOME/bin/kafka-topics.sh --create --topic test --zookeeper 
localhost:2181 --replication-factor 1 --partitions 1
$KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic 
test < small_input.csv

where small_input.csv contains the following lines-

p1,10.0f,2017-03-14 16:01:01
p1,10.0f,2017-03-14 16:01:02
p1,10.0f,2017-03-14 16:01:03
p1,10.0f,2017-03-14 16:01:04
p1,10.0f,2017-03-14 16:01:05
p1,10.0f,2017-03-14 16:01:10
p1,10.0f,2017-03-14 16:01:11
p1,10.0f,2017-03-14 16:01:12
p1,10.0f,2017-03-14 16:01:40
p1,10.0f,2017-03-14 16:01:50

Now you can run the code given below. Note:

1) In this example, I am not reading control stream from Kafka (but issue can 
be reproduced with this code as well)
2) If instead of reading data stream from kafka, I create stream from elements 
(i.e. use getInput function instead of getKafkaInput function), the code works 
and window function is fired.

Thanks,
Tarandeep



import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
import 
org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import 
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.util.Collector;

import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.*;

public class Test3 {

public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

//DataStream product = getInput(env);
DataStream product = getKafkaInput(env);
DataStream control= getControl(env);

DataStream filteredStream = product.keyBy(0)
.connect(control.keyBy(0))
.flatMap(new CoFlatMapFunImpl());

DataStream watermarkedStream = 
filteredStream.assignTimestampsAndWatermarks(
getTimestampAssigner(Time.seconds(1))).setParallelism(3);

watermarkedStream.transform("WatermarkDebugger", 
watermarkedStream.getType(), 

Re: Telling if a job has caught up with Kafka

2017-03-18 Thread Tzu-Li (Gordon) Tai
So we would have current lag per partition (for instance every 1 sec) and lag 
at the latest checkpoint per partition in an easily queryable way.
I quite like this idea! We could perhaps call them “currentOffsetLag” and 
“lastCheckpointedOffsetLag”.

I’ve filed a JIRA to track this feature, and added some details there too: 
https://issues.apache.org/jira/browse/FLINK-6109.

Cheers,
Gordon

On March 17, 2017 at 9:53:43 PM, Gyula Fóra (gyula.f...@gmail.com) wrote:

Hi Gordon,

Thanks for the suggestions, I think in general it would be good to make this 
periodic (with a configurable interval), and also show the latest committed 
(checkpointed) offset lag.
I think it's better to show both not only one of them as they both carry useful 
information.

So we would have current lag per partition (for instance every 1 sec) and lag 
at the latest checkpoint per partition in an easily queryable way.

Gyula

Tzu-Li (Gordon) Tai <tzuli...@apache.org> ezt írta (időpont: 2017. márc. 17., 
P, 14:24):
One other possibility for reporting “consumer lag” is to update the metric only 
at a
configurable interval, if use cases can tolerate a certain delay in realizing 
the consumer
has caught up.

Or we could also piggy pack the consumer lag update onto the checkpoint 
interval -
I think in the case that Gyula described, users might additionally want
to stop the old job only when the new job has “caught up with partition head” &&
“the offsets used to determine the lag is secured in a checkpoint”. That should
address syncing the consumer lag calculation with the commit frequency 
discussed here.

What do you think?


On March 17, 2017 at 9:05:04 PM, Tzu-Li (Gordon) Tai (tzuli...@apache.org) 
wrote:

Hi,

I was thinking somewhat similar to what Ufuk suggested,
but if we want to report a “consumer lag” metric, we would
essentially need to request the latest offset on every record fetch
(because the latest offset advances as well), so I wasn’t so sure
of the performance tradeoffs there (the partition metadata request
and records requests require 2 separate calls, so we would
basically be doubling the requests calls to Kafka just for this).

If we just want a metric that can show whether or not the
consumer has caught up with the “latest offset at the time the
consumer starts”, it would definitely be feasible. I wonder
how we want to name this metric though.
@Gyula @Florian @Bruno do you think this is enough for your needs?

- Gordon

On March 17, 2017 at 8:51:49 PM, Ufuk Celebi (u...@apache.org) wrote:

@Gordon: What's your take on integrating this directly into the
consumer? Can't we poll the latest offset wie the Offset API [1] and
report a consumer lag metric for the consumer group of the
application? This we could also display in the web frontend.

In the first version, users would have to poll this metric manually.

[1] 
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-TopicMetadataRequest

On Fri, Mar 17, 2017 at 11:23 AM, Bruno Aranda <brunoara...@gmail.com> wrote:
> Hi,
>
> We are interested on this too. So far we flag the records with timestamps in
> different points of the pipeline and use metrics gauges to measure latency
> between the different components, but would be good to know if there is
> something more specific to Kafka that we can do out of the box in Flink.
>
> Cheers,
>
> Bruno
>
> On Fri, 17 Mar 2017 at 10:07 Florian König <florian.koe...@micardo.com>
> wrote:
>>
>> Hi,
>>
>> thank you Gyula for posting that question. I’d also be interested in how
>> this could be done.
>>
>> You mentioned the dependency on the commit frequency. I’m using
>> https://github.com/quantifind/KafkaOffsetMonitor. With the 08 Kafka consumer
>> a job's offsets as shown in the diagrams updated a lot more regularly than
>> the checkpointing interval. With the 10 consumer a commit is only made after
>> a successful checkpoint (or so it seems).
>>
>> Why is that so? The checkpoint contains the Kafka offset and would be able
>> to start reading wherever it left off, regardless of any offset stored in
>> Kafka or Zookeeper. Why is the offset not committed regularly, independently
>> from the checkpointing? Or did I misconfigure anything?
>>
>> Thanks
>> Florian
>>
>> > Am 17.03.2017 um 10:26 schrieb Gyula Fóra <gyf...@apache.org>:
>> >
>> > Hi All,
>> >
>> > I am wondering if anyone has some nice suggestions on what would be the
>> > simplest/best way of telling if a job is caught up with the Kafka input.
>> > An alternative question would be how to tell if a job is caught up to
>> > another job reading from the same topic.
>> >
>> > The first thing that comes to my mind is looking at the offsets Fli

Re: Telling if a job has caught up with Kafka

2017-03-18 Thread Tzu-Li (Gordon) Tai
@Florian
the 0.9 / 0.10 version and 0.8 version behave a bit differently right now for 
the offset committing.

In 0.9 / 0.10, if checkpointing is enabled, the “auto.commit.enable” etc. 
settings will be completely ignored and overwritten before used to instantiate 
the interval Kafka clients, hence committing will only happen on Flink 
checkpoints.

In 0.8, this isn’t the case. Both automatic periodic committing and committing 
on checkpoints can take place. That’s perhaps why you’re observing the 0.8 
consumer to be committing more frequently.

FYI: This behaviour will be unified in Flink 1.3.0. If you’re interested, you 
can take a look at https://github.com/apache/flink/pull/3527.

- Gordon


On March 17, 2017 at 6:07:38 PM, Florian König (florian.koe...@micardo.com) 
wrote:

Why is that so? The checkpoint contains the Kafka offset and would be able to 
start reading wherever it left off, regardless of any offset stored in Kafka or 
Zookeeper. Why is the offset not committed regularly, independently from the 
checkpointing? Or did I misconfigure anything? 

Re: Telling if a job has caught up with Kafka

2017-03-17 Thread Tzu-Li (Gordon) Tai
Hi,

I was thinking somewhat similar to what Ufuk suggested,
but if we want to report a “consumer lag” metric, we would
essentially need to request the latest offset on every record fetch
(because the latest offset advances as well), so I wasn’t so sure
of the performance tradeoffs there (the partition metadata request
and records requests require 2 separate calls, so we would
basically be doubling the requests calls to Kafka just for this).

If we just want a metric that can show whether or not the
consumer has caught up with the “latest offset at the time the
consumer starts”, it would definitely be feasible. I wonder
how we want to name this metric though.
@Gyula @Florian @Bruno do you think this is enough for your needs?

- Gordon

On March 17, 2017 at 8:51:49 PM, Ufuk Celebi (u...@apache.org) wrote:

@Gordon: What's your take on integrating this directly into the  
consumer? Can't we poll the latest offset wie the Offset API [1] and  
report a consumer lag metric for the consumer group of the  
application? This we could also display in the web frontend.  

In the first version, users would have to poll this metric manually.  

[1] 
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-TopicMetadataRequest
  

On Fri, Mar 17, 2017 at 11:23 AM, Bruno Aranda  wrote:  
> Hi,  
>  
> We are interested on this too. So far we flag the records with timestamps in  
> different points of the pipeline and use metrics gauges to measure latency  
> between the different components, but would be good to know if there is  
> something more specific to Kafka that we can do out of the box in Flink.  
>  
> Cheers,  
>  
> Bruno  
>  
> On Fri, 17 Mar 2017 at 10:07 Florian König   
> wrote:  
>>  
>> Hi,  
>>  
>> thank you Gyula for posting that question. I’d also be interested in how  
>> this could be done.  
>>  
>> You mentioned the dependency on the commit frequency. I’m using  
>> https://github.com/quantifind/KafkaOffsetMonitor. With the 08 Kafka consumer 
>>  
>> a job's offsets as shown in the diagrams updated a lot more regularly than  
>> the checkpointing interval. With the 10 consumer a commit is only made after 
>>  
>> a successful checkpoint (or so it seems).  
>>  
>> Why is that so? The checkpoint contains the Kafka offset and would be able  
>> to start reading wherever it left off, regardless of any offset stored in  
>> Kafka or Zookeeper. Why is the offset not committed regularly, independently 
>>  
>> from the checkpointing? Or did I misconfigure anything?  
>>  
>> Thanks  
>> Florian  
>>  
>> > Am 17.03.2017 um 10:26 schrieb Gyula Fóra :  
>> >  
>> > Hi All,  
>> >  
>> > I am wondering if anyone has some nice suggestions on what would be the  
>> > simplest/best way of telling if a job is caught up with the Kafka input.  
>> > An alternative question would be how to tell if a job is caught up to  
>> > another job reading from the same topic.  
>> >  
>> > The first thing that comes to my mind is looking at the offsets Flink  
>> > commits to Kafka. However this will only work if every job uses a 
>> > different  
>> > group id and even then it is not very reliable depending on the commit  
>> > frequency.  
>> >  
>> > The use case I am trying to solve is fault tolerant update of a job, by  
>> > taking a savepoint for job1 starting job2 from the savepoint, waiting 
>> > until  
>> > it catches up and then killing job1.  
>> >  
>> > Thanks for your input!  
>> > Gyula  
>>  
>>  
>  


Re: Telling if a job has caught up with Kafka

2017-03-17 Thread Tzu-Li (Gordon) Tai
One other possibility for reporting “consumer lag” is to update the metric only 
at a
configurable interval, if use cases can tolerate a certain delay in realizing 
the consumer
has caught up.

Or we could also piggy pack the consumer lag update onto the checkpoint 
interval -
I think in the case that Gyula described, users might additionally want
to stop the old job only when the new job has “caught up with partition head” &&
“the offsets used to determine the lag is secured in a checkpoint”. That should
address syncing the consumer lag calculation with the commit frequency 
discussed here.

What do you think?

On March 17, 2017 at 9:05:04 PM, Tzu-Li (Gordon) Tai (tzuli...@apache.org) 
wrote:

Hi,

I was thinking somewhat similar to what Ufuk suggested,
but if we want to report a “consumer lag” metric, we would
essentially need to request the latest offset on every record fetch
(because the latest offset advances as well), so I wasn’t so sure
of the performance tradeoffs there (the partition metadata request
and records requests require 2 separate calls, so we would
basically be doubling the requests calls to Kafka just for this).

If we just want a metric that can show whether or not the
consumer has caught up with the “latest offset at the time the
consumer starts”, it would definitely be feasible. I wonder
how we want to name this metric though.
@Gyula @Florian @Bruno do you think this is enough for your needs?

- Gordon

On March 17, 2017 at 8:51:49 PM, Ufuk Celebi (u...@apache.org) wrote:

@Gordon: What's your take on integrating this directly into the
consumer? Can't we poll the latest offset wie the Offset API [1] and
report a consumer lag metric for the consumer group of the
application? This we could also display in the web frontend.

In the first version, users would have to poll this metric manually.

[1] 
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-TopicMetadataRequest

On Fri, Mar 17, 2017 at 11:23 AM, Bruno Aranda <brunoara...@gmail.com> wrote:
> Hi,
>
> We are interested on this too. So far we flag the records with timestamps in
> different points of the pipeline and use metrics gauges to measure latency
> between the different components, but would be good to know if there is
> something more specific to Kafka that we can do out of the box in Flink.
>
> Cheers,
>
> Bruno
>
> On Fri, 17 Mar 2017 at 10:07 Florian König <florian.koe...@micardo.com>
> wrote:
>>
>> Hi,
>>
>> thank you Gyula for posting that question. I’d also be interested in how
>> this could be done.
>>
>> You mentioned the dependency on the commit frequency. I’m using
>> https://github.com/quantifind/KafkaOffsetMonitor. With the 08 Kafka consumer
>> a job's offsets as shown in the diagrams updated a lot more regularly than
>> the checkpointing interval. With the 10 consumer a commit is only made after
>> a successful checkpoint (or so it seems).
>>
>> Why is that so? The checkpoint contains the Kafka offset and would be able
>> to start reading wherever it left off, regardless of any offset stored in
>> Kafka or Zookeeper. Why is the offset not committed regularly, independently
>> from the checkpointing? Or did I misconfigure anything?
>>
>> Thanks
>> Florian
>>
>> > Am 17.03.2017 um 10:26 schrieb Gyula Fóra <gyf...@apache.org>:
>> >
>> > Hi All,
>> >
>> > I am wondering if anyone has some nice suggestions on what would be the
>> > simplest/best way of telling if a job is caught up with the Kafka input.
>> > An alternative question would be how to tell if a job is caught up to
>> > another job reading from the same topic.
>> >
>> > The first thing that comes to my mind is looking at the offsets Flink
>> > commits to Kafka. However this will only work if every job uses a different
>> > group id and even then it is not very reliable depending on the commit
>> > frequency.
>> >
>> > The use case I am trying to solve is fault tolerant update of a job, by
>> > taking a savepoint for job1 starting job2 from the savepoint, waiting until
>> > it catches up and then killing job1.
>> >
>> > Thanks for your input!
>> > Gyula
>>
>>
>


Re: Data+control stream from kafka + window function - not working

2017-03-16 Thread Tzu-Li (Gordon) Tai
Hi Tarandeep,

Thanks for clarifying.

For the next step, I would recommend taking a look at 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/debugging_event_time.html
 and try to find out what exactly is wrong with the watermark progression. 
Flink 1.2 exposes watermarks as a metric, and that should help in figuring out 
why the windows aren’t firing.

Also, I see you have added a “WatermarkDebugger” in your job. Have you checked 
whether or not the watermarks printed there are identical (using getInput v.s. 
getKafkaInput)?

Cheers,
Gordon

On March 17, 2017 at 12:32:51 PM, Tarandeep Singh (tarand...@gmail.com) wrote:

Anyone?
Any suggestions what could be going wrong or what I am doing wrong?

Thanks,
Tarandeep


On Thu, Mar 16, 2017 at 7:34 AM, Tarandeep Singh <tarand...@gmail.com> wrote:
Data is read from Kafka and yes I use different group id every time I run the 
code. I have put break points and print statements to verify that.

Also, if I don't connect with control stream the window function works. 

- Tarandeep

On Mar 16, 2017, at 1:12 AM, Tzu-Li (Gordon) Tai <tzuli...@apache.org> wrote:

Hi Tarandeep,

I haven’t looked at the rest of the code yet, but my first guess is that you 
might not be reading any data from Kafka at all:

private static DataStream readKafkaStream(String topic, 
StreamExecutionEnvironment env) throws IOException {

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "group-0009");
properties.setProperty("auto.offset.reset", "smallest");
return env.addSource(new FlinkKafkaConsumer08<>(topic, new 
SimpleStringSchema(), properties));
}

Have you tried using a different “group.id” everytime you’re re-running the job?
Note that the “auto.offset.reset” value is only respected when there aren’t any 
offsets for the group committed in Kafka.
So you might not actually be reading the complete “small_input.cv” dataset, 
unless you use a different group.id overtime.

Cheers,
Gordon

On March 16, 2017 at 2:39:10 PM, Tarandeep Singh (tarand...@gmail.com) wrote:

Hi,

I am using flink-1.2 and reading data stream from Kafka (using 
FlinkKafkaConsumer08). I want to connect this data stream with another stream 
(read control stream) so as to do some filtering on the fly. After filtering, I 
am applying window function (tumbling/sliding event window) along with fold 
function. However, the window function does not get called.

Any help to debug/fix this is greatly appreciated!

Below is a reproducible code that one can run in IDE like IntelliJ or on flink 
cluster. You will need to have a running Kafka cluster (local or otherwise).
Create a topic and add test data points-

$KAFKA_HOME/bin/kafka-topics.sh --create --topic test --zookeeper 
localhost:2181 --replication-factor 1 --partitions 1
$KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic 
test < small_input.csv

where small_input.csv contains the following lines-

p1,10.0f,2017-03-14 16:01:01
p1,10.0f,2017-03-14 16:01:02
p1,10.0f,2017-03-14 16:01:03
p1,10.0f,2017-03-14 16:01:04
p1,10.0f,2017-03-14 16:01:05
p1,10.0f,2017-03-14 16:01:10
p1,10.0f,2017-03-14 16:01:11
p1,10.0f,2017-03-14 16:01:12
p1,10.0f,2017-03-14 16:01:40
p1,10.0f,2017-03-14 16:01:50

Now you can run the code given below. Note:

1) In this example, I am not reading control stream from Kafka (but issue can 
be reproduced with this code as well)
2) If instead of reading data stream from kafka, I create stream from elements 
(i.e. use getInput function instead of getKafkaInput function), the code works 
and window function is fired.

Thanks,
Tarandeep



import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
import 
org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import 
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streamin

Re: Doubt Regarding producing to kafka using flink

2017-04-03 Thread Tzu-Li (Gordon) Tai
Hi Archit,

The problem is that you need to assign the returned `DataStream` from 
`stream.assignTimestampsAndWatermarks` to a separate variable, and use that 
when instantiating the Kafka 0.10 sink.
The `assignTimestampsAndWatermarks` method returns a new `DataStream` instance 
with records that have assigned timestamps. Calling it does not affect the 
original `DataStream` instance.

Cheers,
Gordon

On April 3, 2017 at 5:15:03 PM, Archit Mittal (marchi...@gmail.com) wrote:

Hi Gordon
This is the function snippet i am using but i am getting invalid timestamp  
   
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "word");
properties.setProperty("auto.offset.reset", "earliest");


DataStream < WordCount > stream = env.fromElements(wordCount);
stream.assignTimestampsAndWatermarks(new 
AscendingTimestampExtractor() {
@Override
public long extractAscendingTimestamp(WordCount element) {
return DateTime.now().getMillis();
}
});


FlinkKafkaProducer010.FlinkKafkaProducer010Configuration config = 
FlinkKafkaProducer010.writeToKafkaWithTimestamps(stream, KAFKA_TOPIC, new 
WordCountSchema(), properties);
config.setWriteTimestampToKafka(true);

    env.execute("job");

On Mon, Apr 3, 2017 at 8:20 AM, Tzu-Li (Gordon) Tai <tzuli...@apache.org> wrote:
Hi Archit!

You’ll need to assign timestamps to the records in your stream before producing 
them to Kafka (i.e. before the FlinkKafkaProducer operator).
Have a look at [1] and [2] on how to do that. Feel free to ask further 
questions if you bump into any!

Cheers,
Gordon

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/event_timestamps_watermarks.html
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/event_timestamp_extractors.html

On April 2, 2017 at 6:38:13 PM, Archit Mittal (marchi...@gmail.com) wrote:

Hi 

I am using flink-connector-kafka-0.10_2.10

while producing i am getting error as 

java.lang.IllegalArgumentException: Invalid timestamp -9223372036854775808
at 
org.apache.kafka.clients.producer.ProducerRecord.(ProducerRecord.java:60) 
~[kafka-clients-0.10.0.1.jar:na]
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.invokeInternal(FlinkKafkaProducer010.java:249)
 ~[flink-connector-kafka-0.10_2.10-1.2.0.jar:1.2.0]
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.processElement(FlinkKafkaProducer010.java:345)
 

how do i put timestamp in my object before producing ?

Thanks
Archit



Re: Problems with Kerberos Kafka connection in version 1.2.0

2017-04-11 Thread Tzu-Li (Gordon) Tai
Hi Diego,

I think the problem is here:

security.kerberos.login.contexts: Client, KafkaClient

The space between “Client,” and “KafkaClient” is causing the problem.
Removing it should fix your issue.
Cheers,
Gordon

On April 11, 2017 at 3:24:20 AM, Diego Fustes Villadóniga (dfus...@oesia.com)
wrote:

Hi all,



I’m trying to connect to a kerberized Kafka cluster from Flink 1.2.0. I’ve
configured Flink correctly following instructions to get the credentials
from a given keytab. Here is the configuration:





security.kerberos.login.keytab: /home/biguardian/biguardian.keytab

security.kerberos.login.principal: biguardian

security.kerberos.login.use-ticket-cache: false

security.kerberos.login.contexts: Client, KafkaClient





However, I get this error:



org.apache.kafka.common.KafkaException: java.lang.IllegalArgumentException:
Could not find a 'KafkaClient' entry in `/tmp/jaas-522055201249067191.conf`



Indeed, the JAAS file is empty.



Do yon know what may be happening?



Diego Fustes Villadóniga, Arquitecto Big Data, CCIM


Re: Custom timer implementation using Flink

2017-04-11 Thread Tzu-Li (Gordon) Tai
Hi,

I just need to 
start a timer of x days/hours (lets say) and when it is fired just trigger 
something.
Flink’s lower-level ProcessFunction [1] should be very suitable to implement 
this. Have you taken a look at this and see if it suits your case?

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/process_function.html

Cheers,
Gordon


On April 11, 2017 at 3:25:39 AM, jaxbihani (jagad...@helpshift.com) wrote:

I have a use case which I am trying to solve using Flink. Need an advice to  
decide on the correct approach.  

Use case:  
--  
I have a stream of events partitioned by a key. For some events, I need to  
start a timer (conside this as a SLA i.e. if something is not done in x  
secs/minutes do something). Now when that timer expires I need to perform  
some arbitrary action (like writing to database etc). There will be some  
events which can cancel the timers. (i.e. if event comes before x secs we  
need not run SLA violation action etc.). We are considering flink because  
then we can reuse the scaling, fault tolerance provided by the engine rather  
than building our own. Current rps is ~ 200-300 but it can be expected to  
increase quickly.  

Solutions in mind:  
---  
1. We can think it like CEP use case, where with encoding like "event1  
followed by event2" with "not" in x seconds. i.e. when event 2 is "not"  
arrived in x seconds. I assume there will be NOT operator support. I am not  
sure about memory consumption in CEP. Because x seconds can be x days as  
well and I do not need any batching of events in memory. I just need to  
start a timer of x days/hours (lets say) and when it is fired just trigger  
something. So there is no notion of window as such. Can CEP fit in this type  
of use case? If the timer between events is in days, how about the memory  
consumption?  

2. Use Flink for event processing and delegate the tasks of timers to  
another service i.e. when event occurs send it to kafka with timer  
information and then another service handles timers and send back the event  
again once that is done etc. Looks like many hops in this process and  
latency will be high if SLA is in seconds (I am thinking of using Kafka  
here).  

Is anyone aware of a better way of doing this in flink?  



--  
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Custom-timer-implementation-using-Flink-tp12581.html
  
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.  


Re: Flink Kafka Consumer Behaviour

2017-04-20 Thread Tzu-Li (Gordon) Tai
Hi Sandeep,

It isn’t fixed yet, so I think external tools like the Kafka offset checker 
still won’t work.
If you’re using 08 and is currently stuck with this issue, you can still 
directly query ZK to get the offsets.

I think for FlinkKafkaConsumer09 the offset is exposed to Flink's metric system 
using Kafka’s own returned metrics, but for 08 this is still missing.

There is this JIRA [1] that aims at exposing consumer lag across all Kafka 
consumer versions to Flink metrics. Perhaps it would make sense to also 
generally expose the offset for all Kafka consumer versions to Flink metrics as 
well.

- Gordon

[1] https://issues.apache.org/jira/browse/FLINK-6109


On 19 April 2017 at 5:11:11 AM, sandeep6 (vr1meghash...@gmail.com) wrote:

Is this fixed now? If not, is there any way to monitor kafka offset that is  
being processed by Flink? This should be a use case for everyone who uses  
Flink with Kafka.  



--  
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Kafka-Consumer-Behaviour-tp8257p12663.html
  
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.  


Re: Read from and Write to Kafka through flink

2017-04-19 Thread Tzu-Li (Gordon) Tai
Hi Pradeep,

There is not single API or connector to take input as a file and writing it to 
Kafka.
In Flink, this operation consists of 2 parts, 1) source reading from input, and 
2) sink producing to Kafka.
So, all you have to have a job that consists of that source and sink.

You’ve already figured out 2). For 1), you can take a look at the built-in file 
reading source: `StreamExecutionEnvironment.readFile`.

The program quickly executes comes out. 

I might need some more information here:
Do you mean that the job finished executing very fast?
If so, there should be an error of some kind. Could you find and paste it here?

If the job is actually running, and you’re constantly writing to the Kafka 
topic, but the job just isn’t consuming them, there are a few things you could 
probably check:
1) are you sure the Kafka broker is the same version as the connector you are 
using?
2) make sure that you are using different consumer groups, if the offsets are 
committed back to Kafka. Check out [1] to see in which conditions offsets are 
committed.

By the way, I’m continuing this thread only on the user@ mailing list, as 
that’s the more suitable place for this.

Cheers,
Gordon

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/kafka.html#kafka-consumers-offset-committing-behaviour-configuration
On 20 April 2017 at 7:38:36 AM, Pradeep Anumala (pradeep.anuma...@gmail.com) 
wrote:

Hi,  
I am a beginner with Apache Flink. I am trying to write to Kafka through  
a file and read the data from kafka. I see there is an API to read from and  
write to kafka.  

The following writes to kafka  
FlinkKafkaProducer08 myProducer = new FlinkKafkaProducer08(  
"localhost:9092", // broker list  
"my-topic", // target topic  
new SimpleStringSchema()); // serialization schema  

Is there any API which takes input as file and writes the file content to  
kafka ?  


My second question  
-  
I have run the kafka producer on the terminal  
I am trying to read from kafka using the below code. But this doesn't print  
any output though I am giving some input in the producer terminal.  
The program quickly executes comes out. Please let me know how I can read  
from kafka ?  

DataStream data = env.addSource(new  
FlinkKafkaConsumer010("myTopic",new SimpleStringSchema(),  
props)).print();  


Re: put record to kinesis and then trying consume using flink connector

2017-04-23 Thread Tzu-Li (Gordon) Tai
Hi Sathi,

Here, in the producer-side log, it says:
2017-04-22 19:46:53,608 INFO  [main] DLKnesisPublisher: Successfully published 
record, of bytes :162810 partition key 
:fb043b73-1d9d-447a-a593-b9c1bf3d4974-1492915604228 ShardId: 
shardId-Sequence 
number49572539577762637793132019619873654976833479400677703682 Stream 
Name:mystream

The stream the record was inserted into is “mystream”.

However,

    DataStream outputStream = see.addSource(new 
FlinkKinesisConsumer<>("myStream", new MyDeserializerSchema(), consumerConfig));
you seem to be consuming from “myStream”.

Could the capital “S” be the issue?

Cheers,
Gordon


On 24 April 2017 at 12:33:28 AM, Sathi Chowdhury (sathi.chowdh...@elliemae.com) 
wrote:

Hi Flink Dev,

I thought something will work easily with flink and it is simple enough ,yer I 
am struggling to make it work.

I am using flink kinesis connector ..using 1.3-SNAPSHOT version.

 

Basically I am trying to bootstrap a stream with one event pushed into it as a 
warmup inside flink job’s main method and I use aws kinesis client to simply 
putrecord into a given stream.

My expectation is that now if I addSource to a kinesis stream the data stream 
will consume the event I pushed.

 

 

 

//This is the method that pushes to the kinesis Stream “mystream”

    publishToKinesis(“mystream”,regionName,data) ;

 

 

    Properties consumerConfig = new Properties();
    consumerConfig.setProperty(ConsumerConfigConstants.AWS_REGION, region);
    
consumerConfig.setProperty(ConsumerConfigConstants.DEFAULT_STREAM_INITIAL_POSITION,
 ConsumerConfigConstants.InitialPosition.LATEST.toString());
    
consumerConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, 
"AUTO");

    final LocalFlinkMiniCluster cluster = new LocalFlinkMiniCluster(new 
Configuration(), false);
    cluster.start();
    ObjectMapper mapper = new ObjectMapper();
    final StreamExecutionEnvironment see = 
StreamExecutionEnvironment.createRemoteEnvironment(
    "localhost", cluster.getLeaderRPCPort());

    DataStream outputStream = see.addSource(new 
FlinkKinesisConsumer<>("myStream", new MyDeserializerSchema(), consumerConfig));


    for (Iterator it = DataStreamUtils.collect(outputStream); 
it.hasNext(); ) {
    String actualOut = it.next();
    ObjectNode actualOutNode = (ObjectNode) 
mapper.readTree(actualOut);
    //then I do want to  either print it or do some further 
validation etc.      
   }
 

 

……..

 

Not sure why the record that I published , FlinkKinesisConsumer is not able to 
react to it, it keeps waiting for it…at the step it.next();

 

 

I print out the SequenceNumber I put the record at

2017-04-22 19:46:53,608 INFO  [main] DLKnesisPublisher: Successfully published 
record, of bytes :162810 partition key 
:fb043b73-1d9d-447a-a593-b9c1bf3d4974-1492915604228 ShardId: 
shardId-Sequence 
number49572539577762637793132019619873654976833479400677703682 Stream 
Name:mystream

 

 

And Flink job is logging this at the end where it is waiting

2017-04-22 19:47:29,423 INFO  [Source: Custom Source (1/1)] KinesisDataFetcher: 
Subtask 0 will be seeded with initial shard 
KinesisStreamShard{streamName='mystream', shard='{ShardId: 
shardId-,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 
340282366920938463463374607431768211455},SequenceNumberRange: 
{StartingSequenceNumber: 
49572531519165352852103352736022695959324427654906511362,}}'}, starting state 
set as sequence number LATEST_SEQUENCE_NUM

2017-04-22 19:47:29,425 INFO  [Source: Custom Source (1/1)] KinesisDataFetcher: 
Subtask 0 will start consuming seeded shard 
KinesisStreamShard{streamName=’mystream’, shard='{ShardId: 
shardId-,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 
340282366920938463463374607431768211455},SequenceNumberRange: 
{StartingSequenceNumber: 
49572531519165352852103352736022695959324427654906511362,}}'} from sequence 
number LATEST_SEQUENCE_NUM with ShardConsumer 0

 

Any clue will be awesome to clear my confusion.

Thanks

Sathi

=Notice to Recipient: This e-mail transmission, and any documents, 
files or previous e-mail messages attached to it may contain information that 
is confidential or legally privileged, and intended for the use of the 
individual or entity named above. If you are not the intended recipient, or a 
person responsible for delivering it to the intended recipient, you are hereby 
notified that you must not read this transmission and that any disclosure, 
copying, printing, distribution or use of any of the information contained in 
or attached to this transmission is STRICTLY PROHIBITED. If you have received 
this transmission in error, please immediately notify the sender by telephone 
or return e-mail and delete the original transmission and its attachments 

Re: Kinesis connector SHARD_GETRECORDS_MAX default value

2017-04-23 Thread Tzu-Li (Gordon) Tai
Thanks for filing the JIRA!

Would you also be up to open a PR to for the change? That would be very very 
helpful :)

Cheers,
Gordon

On 24 April 2017 at 3:27:48 AM, Steffen Hausmann (stef...@hausmann-family.de) 
wrote:

Hi Gordon,  

thanks for looking into this and sorry it took me so long to file the  
issue: https://issues.apache.org/jira/browse/FLINK-6365.  

Really appreciate your contributions for the Kinesis connector!  

Cheers,  
Steffen  

On 22/03/2017 20:21, Tzu-Li (Gordon) Tai wrote:  
> Hi Steffan,  
>  
> I have to admit that I didn’t put too much thoughts in the default  
> values for the Kinesis consumer.  
>  
> I’d say it would be reasonable to change the default values to follow  
> KCL’s settings. Could you file a JIRA for this?  
>  
> In general, we might want to reconsider all the default values for  
> configs related to the getRecords call, i.e.  
> - SHARD_GETRECORDS_MAX  
> - SHARD_GETRECORDS_INTERVAL_MILLIS  
> - SHARD_GETRECORDS_BACKOFF_*  
>  
> Cheers,  
> Gordon  
>  
> On March 23, 2017 at 2:12:32 AM, Steffen Hausmann  
> (stef...@hausmann-family.de <mailto:stef...@hausmann-family.de>) wrote:  
>  
>> Hi there,  
>>  
>> I recently ran into problems with a Flink job running on an EMR cluster  
>> consuming events from a Kinesis stream receiving roughly 15k  
>> event/second. Although the EMR cluster was substantially scaled and CPU  
>> utilization and system load were well below any alarming threshold, the  
>> processing of events of the stream increasingly fell behind.  
>>  
>> Eventually, it turned out that the SHARD_GETRECORDS_MAX defaults to 100  
>> which is apparently causing too much overhead when consuming events from  
>> the stream. Increasing the value to 5000, a single GetRecords call to  
>> Kinesis can retrieve up to 10k records, made the problem go away.  
>>  
>> I wonder why the default value for SHARD_GETRECORDS_MAX is chosen so low  
>> (100x less than it could be). The Kinesis Client Library defaults to  
>> 5000 and it's recommended to use this default value:  
>> http://docs.aws.amazon.com/streams/latest/dev/troubleshooting-consumers.html#consumer-app-reading-slower.
>>   
>>  
>>  
>> Thanks for the clarification!  
>>  
>> Cheers,  
>> Steffen  


Re: AWS exception serialization problem

2017-03-11 Thread Tzu-Li (Gordon) Tai
Hi Shannon,

Thanks a lot for providing the example, it was very helpful in reproducing the 
problem.

I think this is actually a Kryo bug, that was just recently fixed: 
https://github.com/EsotericSoftware/kryo/pull/483
It will be available in Kryo 4.0.1, which unfortunately doesn’t seem to be 
released yet.

The problem is that when Kryo defaults to Java serialization for the exception 
instance, the `ObjectInputStream` used to read the object does not correctly 
use Kryo’s configured class loader (i.e., the user code class loader). That’s 
why it's complaining that the class cannot be found.

We can “workaround” this by registering our own `JavaSerializer` as the 
serializer for Throwables in Kryo, but I’m not sure if we should actually do 
this, or just wait for the Kryo fix to be released.

- Gordon


On March 11, 2017 at 9:54:03 AM, Shannon Carey (sca...@expedia.com) wrote:

Here ya go (see attached).


From: Robert Metzger <rmetz...@apache.org>
Date: Friday, March 10, 2017 at 1:18 PM
To: "user@flink.apache.org" <user@flink.apache.org>
Subject: Re: AWS exception serialization problem

Can one of you guys provide us with a minimal example to reproduce the issue? 
(Ideally locally, not using EMR?)
I think once we can reproduce the issue its easy to fix.

On Thu, Mar 9, 2017 at 1:24 AM, Bruno Aranda <brunoara...@gmail.com> wrote:
Hi Stephan, we are running Flink 1.2.0 on Yarn (AWS EMR cluster)


On Wed, 8 Mar 2017, 21:41 Stephan Ewen, <se...@apache.org> wrote:
@Bruno: How are you running Flink? On yarn, standalone, mesos, docker?

On Wed, Mar 8, 2017 at 2:13 PM, Bruno Aranda <brunoara...@gmail.com> wrote:
Hi,

We have seen something similar in Flink 1.2. We have an operation that parses 
some JSON, and when it fails to parse it, we can see the ClassNotFoundException 
for the relevant exception (in our case JsResultException from the play-json 
library). The library is indeed in the shaded JAR, otherwise we would not be 
able to parse the JSON.

Cheers,

Bruno

On Wed, 8 Mar 2017 at 12:57 Tzu-Li (Gordon) Tai <tzuli...@apache.org> wrote:
Hi Shannon,

Just to clarify:

From the error trace, it seems like that the messages fetched from Kafka are 
serialized `AmazonS3Exception`s, and you’re emitting a stream of 
`AmazonS3Exception` as records from FlinkKafkaConsumer?
Is this correct? If so, I think we should just make sure that the 
`com.amazonaws.services.s3.model.AmazonS3Exception` class exists in the user 
fat jar.

Also, what is the Flink version you are using?

Cheers,
Gordon




Re: AWS exception serialization problem

2017-03-11 Thread Tzu-Li (Gordon) Tai
FYI: Here’s the JIRA ticket to track this issue - 
https://issues.apache.org/jira/browse/FLINK-6025.


On March 11, 2017 at 6:27:36 PM, Tzu-Li (Gordon) Tai (tzuli...@apache.org) 
wrote:

Hi Shannon,

Thanks a lot for providing the example, it was very helpful in reproducing the 
problem.

I think this is actually a Kryo bug, that was just recently fixed: 
https://github.com/EsotericSoftware/kryo/pull/483
It will be available in Kryo 4.0.1, which unfortunately doesn’t seem to be 
released yet.

The problem is that when Kryo defaults to Java serialization for the exception 
instance, the `ObjectInputStream` used to read the object does not correctly 
use Kryo’s configured class loader (i.e., the user code class loader). That’s 
why it's complaining that the class cannot be found.

We can “workaround” this by registering our own `JavaSerializer` as the 
serializer for Throwables in Kryo, but I’m not sure if we should actually do 
this, or just wait for the Kryo fix to be released.

- Gordon


On March 11, 2017 at 9:54:03 AM, Shannon Carey (sca...@expedia.com) wrote:

Here ya go (see attached).


From: Robert Metzger <rmetz...@apache.org>
Date: Friday, March 10, 2017 at 1:18 PM
To: "user@flink.apache.org" <user@flink.apache.org>
Subject: Re: AWS exception serialization problem

Can one of you guys provide us with a minimal example to reproduce the issue? 
(Ideally locally, not using EMR?)
I think once we can reproduce the issue its easy to fix.

On Thu, Mar 9, 2017 at 1:24 AM, Bruno Aranda <brunoara...@gmail.com> wrote:
Hi Stephan, we are running Flink 1.2.0 on Yarn (AWS EMR cluster)


On Wed, 8 Mar 2017, 21:41 Stephan Ewen, <se...@apache.org> wrote:
@Bruno: How are you running Flink? On yarn, standalone, mesos, docker?

On Wed, Mar 8, 2017 at 2:13 PM, Bruno Aranda <brunoara...@gmail.com> wrote:
Hi,

We have seen something similar in Flink 1.2. We have an operation that parses 
some JSON, and when it fails to parse it, we can see the ClassNotFoundException 
for the relevant exception (in our case JsResultException from the play-json 
library). The library is indeed in the shaded JAR, otherwise we would not be 
able to parse the JSON.

Cheers,

Bruno

On Wed, 8 Mar 2017 at 12:57 Tzu-Li (Gordon) Tai <tzuli...@apache.org> wrote:
Hi Shannon,

Just to clarify:

From the error trace, it seems like that the messages fetched from Kafka are 
serialized `AmazonS3Exception`s, and you’re emitting a stream of 
`AmazonS3Exception` as records from FlinkKafkaConsumer?
Is this correct? If so, I think we should just make sure that the 
`com.amazonaws.services.s3.model.AmazonS3Exception` class exists in the user 
fat jar.

Also, what is the Flink version you are using?

Cheers,
Gordon




Re: Flink, Yarn and MapR Kerberos issue

2017-03-13 Thread Tzu-Li (Gordon) Tai
Hi Aniket!

Thanks for also looking into the problem!

I think checking `getAuthenticationMethod` on the UGI subject is the way to go.
At the moment I don’t think there’s a better “proper” solution for this.
As explained in the JIRA, we simply should not be checking for Kerberos 
credentials for all kinds of authentication methods, just Kerberos.

I’m currently doing some final tests on a MapR Sandbox with the bug fix PR, 
will open it very soon.
Would be great if you could take a look at the proposed fix too (it’s basically 
following along the lines you mentioned here :-) ).

Cheers,
Gordon

On March 14, 2017 at 6:10:43 AM, ani.desh1512 (ani.desh1...@gmail.com) wrote:

So, I was able to circumvent this issue. This is in no way a permanent  
solution, but I thought I should let you (and anybody who encounters this  
problem in future) know some of my observations.  
What I fount out was that,  
1. In Mapr's version of hadoop, they do the authentication inside  
initialize() method of UserGroupInformation.java.  
2. So, we would not need to check for kerberos credentials in flink's  
deploy() method of AbstractYarnClusterDescriptor.java (atleast for MapR's  
Hadoop version).  
3. Also, the authentication method returned by MapR (via  
getAuthenticationMethod()) is CUSTOM.  
4. I added a check for authenticationMethod, so that flink will check for  
hasKerberosAuthentication() ONLY if  
the authentication method is Kerberos.  
5. After doing this change, and building flink, I was able to confirm that  
indeed a user with appropriate MapR credentials was able to login without  
issues and an error was raised for an user without credentials. This is the  
desired behavior that we wanted.  





--  
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Yarn-and-MapR-Kerberos-issue-tp11996p12194.html
  
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.  


Re: AWS exception serialization problem

2017-03-08 Thread Tzu-Li (Gordon) Tai
Hi Shannon,

Just to clarify:

From the error trace, it seems like that the messages fetched from Kafka are 
serialized `AmazonS3Exception`s, and you’re emitting a stream of 
`AmazonS3Exception` as records from FlinkKafkaConsumer?
Is this correct? If so, I think we should just make sure that the 
`com.amazonaws.services.s3.model.AmazonS3Exception` class exists in the user 
fat jar.

Also, what is the Flink version you are using?

Cheers,
Gordon

Re: Any good ideas for online/offline detection of devices that send events?

2017-03-06 Thread Tzu-Li (Gordon) Tai
Hi Bruno!

The Flink CEP library also seems like an option you can look into to see if it 
can easily realize what you have in mind.

Basically, the pattern you are detecting is a timeout of 5 minutes after the 
last event. Once that pattern is detected, you emit a “device offline” event 
downstream.
With this, you can also extend the pattern output stream to detect whether a 
device has became online again.

Here are some materials for you to take a look at Flink CEP:
1. http://flink.apache.org/news/2016/04/06/cep-monitoring.html
2. 
https://www.slideshare.net/FlinkForward/fabian-huesketill-rohrmann-declarative-stream-processing-with-streamsql-and-cep?qid=3c13eb7d-ed39-4eae-9b74-a6c94e8b08a3==_search=4

The CEP parts in the slides in 2. also provides some good examples of timeout 
detection using CEP.

Hope this helps!

Cheers,
Gordon

On March 4, 2017 at 1:27:51 AM, Bruno Aranda (bara...@apache.org) wrote:

Hi all,

We are trying to write an online/offline detector for devices that keep 
streaming data through Flink. We know how often roughly to expect events from 
those devices and we want to be able to detect when any of them stops (goes 
offline) or starts again (comes back online) sending events through the 
pipeline. For instance, if 5 minutes have passed since the last event of a 
device, we would fire an event to indicate that the device is offline.

The data from the devices comes through Kafka, with their own event time. The 
devices events are in order in the partitions and each devices goes to a 
specific partition, so in theory, we should not have out of order when looking 
at one partition.

We are assuming a good way to do this is by using sliding windows that are big 
enough, so we can see the relevant gap before/after the events for each 
specific device. 

We were wondering if there are other ideas on how to solve this.

Many thanks!

Bruno

Re: Flink using notebooks in EMR

2017-03-06 Thread Tzu-Li (Gordon) Tai
Hi,

Are you running Zeppelin on a local machine?

I haven’t tried this before, but you could first try and check if port ‘6123’ 
is publicly accessible in the security group settings of the AWS EMR instances.

- Gordon


On March 3, 2017 at 10:21:41 AM, Meghashyam Sandeep V (vr1meghash...@gmail.com) 
wrote:

Hi there,

Has anyone tried using flink interpreter in Zeppelin using  AWS EMR? I tried 
creating a new interpreter using host as 'localhots' and port '6123' which 
didn't seem to work.

Thanks,
Sandeep

Re: Memory Limits: MiniCluster vs. Local Mode

2017-03-06 Thread Tzu-Li (Gordon) Tai
Hi Dominik,

AFAIK, the local mode executions create a mini cluster within the JVM to run 
the job.

Also, `MiniCluster` seems to be something FLIP-6 related, and since FLIP-6 is 
still work
in progress, I’m not entirely sure if it is viable at the moment. Right now, 
you should look
into using `LocalFlinkMiniCluster`.

In a lot of the Flink integration tests, we use a `LocalFlinkMiniCluster` to 
setup the test
cluster programatically, and instantiate an `StreamExecutionEnvironment` 
against that
mini cluster. That would probably be helpful for trying out your Flink jobs 
programatically
in your CI / CD cycles.

You can also take a look at some Flink test utilities such as
`StreamingMultipleProgramsTestBase`, which helps you to set up an environment
that allows you to submit multiple test jobs on a single 
`LocalFlinkMiniCluster`. For a
simple example on how to use it, you can take a look at the tests in the 
Elasticsearch
connector. The Flink Kafka connector tests also have a more complicated test
environment setup where jobs are submitted to the `LocalFlinkMiniCluster` using 
an
remote environment.

As for memory consumption configuration for the created 
`LocalFlinkMiniCluster`, I think
you should be able to tune it using the `Configuration` instance passed to it.

Hope this helps!

Cheers,
Gordon

On March 4, 2017 at 12:27:53 AM, domi...@dbruhn.de (domi...@dbruhn.de) wrote:

Hey,  
for our CI/CD cycle I'd like to try out our Flink Jobs in an development  
environment without running them against a huge EMR cluster (which is  
what we do for production), so something like a standalone mode.  

Until now, for this standalone running, I just started the job jar. As  
the "env.execute()" is in the main-method, this works. I think this is  
callled "Local Mode" by the Flink Devs. I packaged the whole thing in a  
docker container so I have a deployable artefact.  

The problem with that is, that the memory constraint seem to be  
difficult to control: Setting Xmx and Xms for the job doesn't seem to  
limit the memory. This is most likely due to flinks off-heap memory  
allocation.  

Now, I got as feedback that perhaps the MiniCluster is the way to go  
instead of the "Local Mode".  

My questions:  
1. Is the MiniCluster better than the local mode? What are the use-cases  
in which you would choose one over the other?  
2. Is there an example how to use the MiniCluster? I see that I need a  
JobGraph, how do I get one?  
3. What are the tuning parameters to limit the memory consumption of the  
MiniCluster (and maybe the local mode)?  

Thanks for your help,  
Dominik  


Re: Any good ideas for online/offline detection of devices that send events?

2017-03-06 Thread Tzu-Li (Gordon) Tai
Some more input:

Right now, you can also use the `ProcessFunction` [1] available in Flink 1.2 to 
simulate state TTL.
The `ProcessFunction` should allow you to keep device state and simulate the 
online / offline detection by registering processing timers. In the `onTimer` 
callback, you can emit the “offline” marker event downstream, and in the 
`processElement` method, you can emit the “online” marker event if the case is 
the device has sent an event after it was determined to be offline.

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/process_function.html

On March 6, 2017 at 9:40:28 PM, Tzu-Li (Gordon) Tai (tzuli...@apache.org) wrote:

Hi Bruno!

The Flink CEP library also seems like an option you can look into to see if it 
can easily realize what you have in mind.

Basically, the pattern you are detecting is a timeout of 5 minutes after the 
last event. Once that pattern is detected, you emit a “device offline” event 
downstream.
With this, you can also extend the pattern output stream to detect whether a 
device has became online again.

Here are some materials for you to take a look at Flink CEP:
1. http://flink.apache.org/news/2016/04/06/cep-monitoring.html
2. 
https://www.slideshare.net/FlinkForward/fabian-huesketill-rohrmann-declarative-stream-processing-with-streamsql-and-cep?qid=3c13eb7d-ed39-4eae-9b74-a6c94e8b08a3==_search=4

The CEP parts in the slides in 2. also provides some good examples of timeout 
detection using CEP.

Hope this helps!

Cheers,
Gordon

On March 4, 2017 at 1:27:51 AM, Bruno Aranda (bara...@apache.org) wrote:

Hi all,

We are trying to write an online/offline detector for devices that keep 
streaming data through Flink. We know how often roughly to expect events from 
those devices and we want to be able to detect when any of them stops (goes 
offline) or starts again (comes back online) sending events through the 
pipeline. For instance, if 5 minutes have passed since the last event of a 
device, we would fire an event to indicate that the device is offline.

The data from the devices comes through Kafka, with their own event time. The 
devices events are in order in the partitions and each devices goes to a 
specific partition, so in theory, we should not have out of order when looking 
at one partition.

We are assuming a good way to do this is by using sliding windows that are big 
enough, so we can see the relevant gap before/after the events for each 
specific device. 

We were wondering if there are other ideas on how to solve this.

Many thanks!

Bruno

Re: How to use 'dynamic' state

2017-03-06 Thread Tzu-Li (Gordon) Tai
Hi Steve,

I’ll try to provide some input for the approaches you’re currently looking into 
(I’ll comment on your email below):

* API based stop and restart of job … ugly. 
Yes, indeed ;) I think this is absolutely not necessary.

* Use a co-map function with the rules alone stream and the events as the 
other. This seems better however, I would like to have several ‘trigger’ 
functions changed together .. e.g. a tumbling window for one type of criteria 
and a flat map for a different sort … So I’m not sure how to hook this up for 
more than a simple co-map/flatmap. I did see this suggested in one answer and 
Do you mean that operators listen only to certain rules / criteria settings 
changes? You could either have separate stream sources for each kind of 
criteria rule trigger events, or have a single source and split them 
afterwards. Then, you broadcast each of them with the corresponding co-map / 
flat-maps.

* Use broadcast state : this seems reasonable however I couldn’t tell if the 
broadcast state would be available to all of the processing functions. Is it 
generally available? 
From the context of your description, I think what you want is that the 
injected rules stream can be seen by all operators (instead of “broadcast 
state”, which in Flink streaming refers to something else).

Aljoscha recently consolidated a FLIP for Side Inputs [1], which I think is 
targeted exactly for what you have in mind here. Perhaps you can take a look at 
that and see if it makes sense for your use case? But of course, this isn’t yet 
available as it is still under discussion. I think Side Inputs may be an ideal 
solution for what you have in mind here, as the rule triggers I assume should 
be slowly changing.

I’ve CCed Aljoscha so that he can probably provide more insights, as he has 
worked a lot on the stuff mentioned here.

Cheers,
Gordon

[1] FLIP-17: 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API

On March 7, 2017 at 5:05:04 AM, Steve Jerman (st...@kloudspot.com) wrote:

I’ve been reading the code/user goup/SO and haven’t really found a great answer 
to this… so I thought I’d ask.  

I have a UI that allows the user to edit rules which include specific criteria 
for example trigger event if X many people present for over a minute.  

I would like to have a flink job that processes an event stream and triggers on 
these rules.  

The catch is that I don’t want to have to restart the job if the rules change… 
(it would be easy otherwise :))  

So I found four ways to proceed:  

* API based stop and restart of job … ugly.  

* Use a co-map function with the rules alone stream and the events as the 
other. This seems better however, I would like to have several ‘trigger’ 
functions changed together .. e.g. a tumbling window for one type of criteria 
and a flat map for a different sort … So I’m not sure how to hook this up for 
more than a simple co-map/flatmap. I did see this suggested in one answer and  

* Use broadcast state : this seems reasonable however I couldn’t tell if the 
broadcast state would be available to all of the processing functions. Is it 
generally available?  

* Implement my own operators… seems complicated ;)  

Are there other approaches?  

Thanks for any advice  
Steve

Re: AWS exception serialization problem

2017-03-07 Thread Tzu-Li (Gordon) Tai
Hi,

I just had a quick look on this, but the Kafka fetcher thread’s context 
classloader doesn’t seem to be the issue (at least for 1.1.4).

In Flink 1.1.4, a separate thread from the task thread is created to run the 
fetcher, but since the task thread sets the user code classloader as its 
context classloader, shouldn’t any threads created from it (i.e., the fetcher 
thread) use it also?

A quickly checked the context classloader the Kafka09Fetcher thread in 1.1.4 
was using, and it’s `FlinkUserCodeClassLoader`.


On March 7, 2017 at 7:32:35 PM, Stephan Ewen (se...@apache.org) wrote:

Ah, I see...

The issue is that the Kafka fetcher thread apparently do not have the user-code 
class loader set as the context class loader. Kryo relies on that for class 
resolution.

What Flink version are you on? I think that actual processing and forwarding 
does not happen in the Kafka Fetchers any more as of 1.2, so only Flink 1.1 
should be affected...


On Tue, Mar 7, 2017 at 2:43 AM, Shannon Carey  wrote:
I think my previous guess was wrong. From what I can tell, when Kryo tries to 
copy the exception object, it does that by serializing and deserializing it. 
For subclasses of RuntimeException, it doesn't know how to do it so it 
delegates serialization to Java. However, it doesn't use a custom 
ObjectInputStream to override resolveClass() and provide classes from the user 
code classloader… such as happens in RocksDBStateBackend's use of 
InstantiationUtil.deserializeObject(). Instead, it uses 
ObjectInputStream$latestUserDefinedLoader() which is the 
Launcher$AppClassLoader which definitely doesn't have the user code in it.

Seems like a bug in TrySerializer#copy? Or somewhere that Kryo is being 
configured?

Thanks,
Shannon


From: Shannon Carey 
Date: Monday, March 6, 2017 at 7:09 PM
To: "user@flink.apache.org" 
Subject: Re: AWS exception serialization problem

This happened when running Flink with bin/run-local.sh I notice that there only 
appears to be one Java process. The job manager and the task manager run in the 
same JVM, right? I notice, however, that there are two blob store folders on 
disk. Could the problem be caused by two different FlinkUserCodeClassLoader 
objects pointing to the two different JARs?


From: Shannon Carey 
Date: Monday, March 6, 2017 at 6:39 PM
To: "user@flink.apache.org" 
Subject: AWS exception serialization problem

Has anyone encountered this or know what might be causing it?


java.lang.RuntimeException: Could not forward element to next operator
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:394)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:366)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:349)
at 
org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:355)
at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecord(AbstractFetcher.java:225)
at 
org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.run(Kafka09Fetcher.java:253)
at java.lang.Thread.run(Thread.java:745)
Caused by: com.esotericsoftware.kryo.KryoException: Error during Java 
deserialization.
at 
com.esotericsoftware.kryo.serializers.JavaSerializer.read(JavaSerializer.java:47)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:172)
at 
org.apache.flink.api.scala.typeutils.TrySerializer.copy(TrySerializer.scala:51)
at 
org.apache.flink.api.scala.typeutils.TrySerializer.copy(TrySerializer.scala:32)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:389)
... 7 more
Caused by: java.lang.ClassNotFoundException: 
com.amazonaws.services.s3.model.AmazonS3Exception
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:677)
at 
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1819)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713)
at 

Re: Flink Error/Exception Handling

2017-03-03 Thread Tzu-Li (Gordon) Tai
Hi Sunil,

There’s recently some effort in allowing `DeserializationSchema#deserialize()` 
to return `null` in cases like yours, so that the invalid record can be simply 
skipped instead of throwing an exception from the deserialization schema.
Here are the related links that you may be interested in:
- JIRA: https://issues.apache.org/jira/browse/FLINK-3679
- PR: https://github.com/apache/flink/pull/3314

This means, however, that this isn’t available until Flink 1.3.
For the time being, a possible workaround with dealing with invalid records is 
explained by Robert in the first comment of 
https://issues.apache.org/jira/browse/FLINK-3679.

Cheers,
Gordon


On March 3, 2017 at 9:15:40 PM, raikarsunil (rsunil...@gmail.com) wrote:

Hi,  

Scenario :  

I am reading data from Kafka.The FlinkKafkaConsumer reads data from it .  
There are some application specific logic to check if the data is  
valid/in-valid. When i receive an invalid message i am throwing an custom  
Exception and it's handled in that class. But the problem is,the flink  
always try to read the same invalid message and the job keeps on restarting.  

Can any one let me know how can the error/exception handling be done without  
the flink job breaking?  

Thanks,  
Sunil  



-  
Cheers,  
Sunil Raikar  
--  
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Error-Exception-Handling-tp12029.html
  
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.  


Re: A Link Sink that writes to OAuth2.0 protected resource

2017-03-03 Thread Tzu-Li (Gordon) Tai
Hi Hussein!

Your approach seems reasonable to me. The open() method will be called only 
once for the UDF every time the job has started (and when the job is restored 
from failures also).

Cheers,
Gordon

On March 3, 2017 at 7:03:22 PM, Hussein Baghdadi (hussein.baghd...@zalando.de) 
wrote:

Hello, 

In our Sink, we are dealing with a system that uses OAuth 2.0. So the in the 
open() method of the Sink we are getting the token and then we initialise the 
client that we can use in order to write from Flink to that API. 

Is there a better approach to handle that? 

open() is a lifecycle method which I assume it will be called only once when 
creating a Sink, correct? Are there any conditions that might trigger open() to 
be called not upon creations? 

Thanks a lot for your help and time.

Re: Elasticsearch 5.x connection

2017-03-02 Thread Tzu-Li (Gordon) Tai
Hi,

java.lang.NoSuchMethodError: 
org.apache.flink.util.InstantiationUtil.isSerializable(Ljava/lang/Object;)Z
        at 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.(ElasticsearchSinkBase.java:195)
        at 
org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink.(ElasticsearchSink.java:95)
        at 
org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink.(
The reason for this exception is because the `isSerializable `method only 
exists in 1.3-SNAPSHOT of `flink-core` at the moment. These kind of errors can 
usually be expected to happen if you are using mismatching versions of Flink 
libraries and core Flink dependencies.

Elasticsearch 5 will be released with Flink 1.3.0 (targeted release time is end 
of May). For the time being, if Elasticsearch 5 is a must, you could try 
implementing a copy of the `isSerializable` method under the exact same package 
path / method and class name in your own project. However, I can not guarantee 
that this will work as there may be other conflicts.

- Gordon



On March 2, 2017 at 10:47:52 PM, Fábio Dias (fabiodio...@gmail.com) wrote:

java.lang.NoSuchMethodError: 
org.apache.flink.util.InstantiationUtil.isSerializable(Ljava/lang/Object;)Z
        at 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.(ElasticsearchSinkBase.java:195)
        at 
org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink.(ElasticsearchSink.java:95)
        at 
org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink.(

  1   2   3   4   5   6   >