Re: Kafka consumer are too fast for some partitions in "flatMap" like jobs

2017-08-30 Thread Elias Levy
On Wed, Aug 30, 2017 at 11:50 AM, Oleksandr Baliev <
aleksanderba...@gmail.com> wrote:

>
> So the main question is how to synchronize data reading between kafka
> partitions when data is sequential per partitions, but late for some of
> them and we care about that data is not thrown away and will be fully
> processed for some time range (window) later? :) It's more about manually
> handling consumption on Kafka Fetch level and FlinkKafka* is high level for
> that, isn't it?
>


At some point you have to give up on late data and drop it if you are
performing some window computation.  That said, that could be a long time,
allowing for very out of order data. Presumably most data won't be late,
and you want to output preliminary results to have timely data.  In that
case you want to implement a window trigger that fires early at regular
intervals without purging if it has received new events since the last time
it fired and purges the data once the allowed lateness time passes.

For instance, see this EventTimeTriggerWithEarlyAndLateFiring
 in
Java or this simplified EarlyFiringEventTimeTrigger
 in
Scala.


Re: metrics for Flink sinks

2017-08-30 Thread Elias Levy
Not an exact match, but I am guessing it is related to FLINK-7286
, which I
reported.  Feel free to modify that issue to cover the root cause.

On Wed, Aug 30, 2017 at 8:32 AM, Martin Eden 
wrote:

> Thanks Chesnay,
>
> Just for completeness, are there any relevant tickets for the discussion
> that one can follow, upvote, contribute to?
>
> M
>
> On Tue, Aug 29, 2017 at 8:57 PM, Chesnay Schepler 
> wrote:
>
>> Hello,
>>
>> 1. Because no one found time to fix it. In contrast to the remaining
>> byte/record metrics, input metrics for sources / output metrics for sinks
>> have to be implemented for every single implementation with their
>> respective semantics. In contrast, the output metrics are gathered in the
>> intersection between operators, independent of the actual operator
>> implementation. Furthermore, this requires system metrics (i.e. metrics
>> that Flink itself creates) to be exposed (and be mutable!) to user-defined
>> functions, which is something i *generally *wanted to avoid, but it
>> appears to be a big enough pain point to make an exception here.
>>
>> 2. Due to the above it is currently not possible without modifications of
>> the code to know how many reads/writes were made.
>>
>> 3. Do you mean aggregated metrics? The web UI allows the aggregation of
>> record/byte metrics on the task level. Beyond that we defer aggregation to
>> actual time-series databases that specialize in these things.
>>
>>
>> On 28.08.2017 19:08, Martin Eden wrote:
>>
>> Hi all,
>>
>> Just 3 quick questions both related to Flink metrics, especially around
>> sinks:
>>
>> 1. In the Flink UI Sources always have 0 input records / bytes and Sinks
>> always have 0 output records / bytes? Why is it like that?
>>
>> 2. What is the best practice for instrumenting off the shelf Flink sinks?
>>
>> Currently the only metrics available are num records/bytes in and out at
>> the operator and task scope. For the task scope there are extra buffer
>> metrics. However the output metrics are always zero (see question 1). How
>> can one know the actual number of successful writes done by an off the
>> shelf Flink sink? Or the latency of the write operation?
>>
>> 3. Is it possible to configure Flink to get global job metrics for all
>> subtasks of an operator? Or are there any best practices around that?
>>
>> Thanks,
>> M
>>
>>
>>
>


Re: Distributed reading and parsing of protobuf files from S3 in Apache Flink

2017-08-30 Thread ShB
Hi Fabian,

Thank you so much for your quick response, I appreciate it. 

Since I'm working with a very large number of files of small sizes, I don't
necessarily need to read each file in parallel. 

I need to read a my large list of files in parallel - that is, split up my
list of files into smaller subsets and have each task manager read a subset
of them. 

I implemented it like this:
env.fromCollection(fileList).rebalance().flatMap(new ReadFiles());
where ReadFiles is a map function that reads each of the files from S3 using
the AWS S3 Java SDK and parses and emits each of the protobufs. 

Is this implementation an efficient way of solving this problem? 

Is there a more performant way of reading a large number of files from S3 in
a distributed manner, with perhaps env.readFile()?




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Kafka consumer are too fast for some partitions in "flatMap" like jobs

2017-08-30 Thread Oleksandr Baliev
Hi Elias,

Thanks for reply, TOPIC_OUT has less partitions, ~20, but actually there
are 4 output topics with different amount of partitions. So the Job is kind
of router.

In general to have 1:1 partitions for IN and OUT topics is good, thanks for
tip. But since the main goal is to have windows in next Job which consumes
data from OUT_TOPIC, data in partitions will be sequential by partitions,
but not well distributed (some partitions will have more late data then
others). So when window will be applied and some data from one of 30th (if
setup TOPIC_OUT with 30 partitions) partitions will come with timestamp
(watermark) which is out if range of given time for watermarkExtractor,
window will be closed, and data will be counted as late.

If take next example, 2 partitions (not sure if indentation will be okay in
your mail, so sorry in advance):
PARTITION - TOPIC_IN data (timestamps) - TOPIC_OUT after maping (timestamps)
1  - t5, t4, t3, t2, t1- t5, t4,
t3, t2, t1
2  - t5, t4, t3, t2, t1- t3, t2, t1
I.e. there were some lag for consumer/producer for partition 2 so in
TOPIC_OUT data is coming later.
It means that when another job will read data from TOPIC_OUT, it will read
p1t1 (entry with t1 timestamp and partition 1), appropriate window will be
created i.e. w"t0-t1" (entries with timestamp from t0 till t1 timestamps
will be taken into account). Then p1t2 entry come and window w"t0-t1" will
be closed and w"t1-t2" window will be created and so on till object p2t1
will come and it will be counted as late data. I think that should be very
common situation for such tasks. I don't take into account
BoundedOutOfOrdernessTimestampExtractor or allowLateness parameters,
because they can just tune this things, but late data anyway will come.

Hm... probably I'm just thinking about the problem as not isolated, so as
"TOPIC_IN -> map job -> TOPIC_OUT -> another job with window". But I think
"TOPIC_OUT -> another job with window" should be separated from this
pipeline. And reviewed like a job with late data which is common question
for parallel data sources with just some slow parts.

So the main question is how to synchronize data reading between kafka
partitions when data is sequential per partitions, but late for some of
them and we care about that data is not thrown away and will be fully
processed for some time range (window) later? :) It's more about manually
handling consumption on Kafka Fetch level and FlinkKafka* is high level for
that, isn't it?

Also in Flink, watermark is only one thing which can be somehow applied as
i understand to that task, as it's a global metric, but it's using only for
window mechanism. Which is more about window assigner (how/when to create
window) and triggers (how/when to close window) and it's cannot say to
consumer, please wait a bit, because I know that some data is still there,
so let's check all sources first and only then I'll be closed, or maybe
not.. somehow in assigner somehow do not create window, so just wait till
some condition, but again some problems with stopping consumer. Eh..

Best,
Sasha

2017-08-29 18:10 GMT+02:00 Elias Levy :

> How many partitions does the output topic have?  If it has the same number
> of partitions as the input topic (30), have you considered simply using a
> custom partitioner for the Kafka sink that uses the input partition number
> as the output partition number?  If the input messages are ordered per
> input partition, that would guarantee their order in the output partitions.
>
> On Tue, Aug 29, 2017 at 1:54 AM, Oleksandr Baliev <
> aleksanderba...@gmail.com> wrote:
>
>> Hello,
>>
>> There is one Flink job which consumes from Kafka topic (TOPIC_IN), simply
>> flatMap / map data and push to another Kafka topic (TOPIC_OUT).
>> TOPIC_IN has around 30 partitions, data is more or less sequential per
>> partition and the job has parallelism 30. So in theory there should be 1:1
>> mapping between consumer and partition.
>>
>> But it's often to see big lag in offsets for some partitions. So that
>> should mean that some of consumers are slower than another (i.e. some
>> network issues for particular broker host or anything else). So data in
>> TOPIC_OUT partitions is distributed but not sequential at all.
>>
>> So when some another flink job consumes from TOPIC_OUT and uses
>> BoundedOutOfOrdernessTimestampExtractor to generate watermarks, due to
>> difference in data timestamps, there can be a lot of late data. Maybe
>> something is missing of course in this setup or there is more good approach
>> for such flatMap / map jobs.
>>
>> Setting big WindowedStream#allowedLateness or giving more time for
>> BoundedOutOfOrdernessTimestampExtractor will increase memory consumption
>> and probably will cause another issues and anyway there can be late data
>> which is not good for later windows.
>>
>> One of the solution is to have some shared 

Re: datastream.print() doesn't works

2017-08-30 Thread AndreaKinn
Hi, in the night uninstalling and re-installing maven and flink I solved my
issue.

I started the web dashboard using start-local.sh script and used
/createLocalEnvironmentWithWebUI(new Configuration())/ as you suggested.
Anyway when I start it in eclipse in the ui dashboard no running jobs are
showed (as I expected).
To run it via console with the console command: 

/./bin/flink run -c org.apache.flink.quickstart.StreamingJob
target/flink-java-project-0.1.jar/

but an error appears:

/No operators defined in streaming topology. Cannot execute.
/

Strangely, if I execute it in eclipse no errors appear. Do you know which
operators it refers??



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Twitter example

2017-08-30 Thread Krishnanand Khambadkone
Fabian,  Thank you for the prompt responses.  I was able to find the output in 
the job manager logs.  There were no task manager logs generated

Sent from my iPhone

> On Aug 30, 2017, at 8:24 AM, Fabian Hueske  wrote:
> 
> print() writes to the standard output of the TaskManager process. 
> The TM stdout is usually redirected to an out file in the ./log folder.
> 
> 2017-08-30 17:20 GMT+02:00 Krishnanand Khambadkone :
>> I am running this standalone, not under yarn,  on a single instance setup.  
>> I believe you are referring to the flink log files
>> 
>> Sent from my iPhone
>> 
>>> On Aug 30, 2017, at 12:56 AM, Fabian Hueske  wrote:
>>> 
>>>Fabian Hueske (fhue...@gmail.com) is not on your Guest List | Approve 
>>> sender | Approve domain 
>>> Hi,
>>> 
>>> print() writes the data to the out files of the TaskManagers. So you need 
>>> to go to the machine that runs the TM and check its out file which is 
>>> located in the log folder.
>>> 
>>> Best, Fabian
>>> 
>>> 2017-08-29 23:53 GMT+02:00 Krishnanand Khambadkone :
 I am trying to run the basic twitter example,  it does deploy correctly 
 and show as RUNNING both in the command line and in the UI.  However I see 
 no output.  I am able to verify that my credentials are correct with 
 another small client I have built that does connect to twitter and prints 
 out tweets.
 
 
 streamSource = env.addSource(new TwitterSource(props));
 streamSource.print();
 
 
 
 
 *** TWITTER INPUTS *
 qzenHPFVPNb7GaEypSvp3yV3s : 
 XQuQHpOklVnUHEUTmtEbePU7O80Vaj5mwWnvNMrGyBiSsML9dA : 
 78358289-rjJ44rtlFr5BlTMyNPzDjQq6daOMT7PnQVvkGgwgO : 
 IAr1eoCbBuJRkaERcKFxv5mFAOu6KPr7Q2EmAZYRXmedl
 * FOUND PROPERTIES *
 Submitting job with JobID: de6024b484fc1d163b4190011595e4f0. Waiting for 
 job completion.
 Connected to JobManager at 
 Actor[akka.tcp://flink@localhost:6123/user/jobmanager#1901137740] with 
 leader session id ----.
 08/29/2017 14:25:32Job execution switched to status RUNNING.
 08/29/2017 14:25:32Source: Custom Source -> Sink: Unnamed(1/1) 
 switched to SCHEDULED 
 08/29/2017 14:25:32Source: Custom Source -> Sink: Unnamed(1/1) 
 switched to DEPLOYING 
 08/29/2017 14:25:32Source: Custom Source -> Sink: Unnamed(1/1) 
 switched to RUNNING
 
>>> 
> 


Re: metrics for Flink sinks

2017-08-30 Thread Martin Eden
Thanks Chesnay,

Just for completeness, are there any relevant tickets for the discussion
that one can follow, upvote, contribute to?

M

On Tue, Aug 29, 2017 at 8:57 PM, Chesnay Schepler 
wrote:

> Hello,
>
> 1. Because no one found time to fix it. In contrast to the remaining
> byte/record metrics, input metrics for sources / output metrics for sinks
> have to be implemented for every single implementation with their
> respective semantics. In contrast, the output metrics are gathered in the
> intersection between operators, independent of the actual operator
> implementation. Furthermore, this requires system metrics (i.e. metrics
> that Flink itself creates) to be exposed (and be mutable!) to user-defined
> functions, which is something i *generally *wanted to avoid, but it
> appears to be a big enough pain point to make an exception here.
>
> 2. Due to the above it is currently not possible without modifications of
> the code to know how many reads/writes were made.
>
> 3. Do you mean aggregated metrics? The web UI allows the aggregation of
> record/byte metrics on the task level. Beyond that we defer aggregation to
> actual time-series databases that specialize in these things.
>
>
> On 28.08.2017 19:08, Martin Eden wrote:
>
> Hi all,
>
> Just 3 quick questions both related to Flink metrics, especially around
> sinks:
>
> 1. In the Flink UI Sources always have 0 input records / bytes and Sinks
> always have 0 output records / bytes? Why is it like that?
>
> 2. What is the best practice for instrumenting off the shelf Flink sinks?
>
> Currently the only metrics available are num records/bytes in and out at
> the operator and task scope. For the task scope there are extra buffer
> metrics. However the output metrics are always zero (see question 1). How
> can one know the actual number of successful writes done by an off the
> shelf Flink sink? Or the latency of the write operation?
>
> 3. Is it possible to configure Flink to get global job metrics for all
> subtasks of an operator? Or are there any best practices around that?
>
> Thanks,
> M
>
>
>


Re: Twitter example

2017-08-30 Thread Fabian Hueske
print() writes to the standard output of the TaskManager process.
The TM stdout is usually redirected to an out file in the ./log folder.

2017-08-30 17:20 GMT+02:00 Krishnanand Khambadkone :

> I am running this standalone, not under yarn,  on a single instance
> setup.  I believe you are referring to the flink log files
>
> Sent from my iPhone
>
> On Aug 30, 2017, at 12:56 AM, Fabian Hueske  wrote:
>
> [image: Boxbe]  Fabian Hueske (
> fhue...@gmail.com) is not on your Guest List
> 
> | Approve sender
> 
> | Approve domain
> 
>
> Hi,
>
> print() writes the data to the out files of the TaskManagers. So you need
> to go to the machine that runs the TM and check its out file which is
> located in the log folder.
>
> Best, Fabian
>
> 2017-08-29 23:53 GMT+02:00 Krishnanand Khambadkone  >:
>
>> I am trying to run the basic twitter example,  it does deploy correctly
>> and show as RUNNING both in the command line and in the UI.  However I see
>> no output.  I am able to verify that my credentials are correct with
>> another small client I have built that does connect to twitter and prints
>> out tweets.
>>
>>
>> streamSource = env.addSource(new TwitterSource(props));
>>
>> streamSource.print();
>>
>>
>>
>>
>>
>> *** TWITTER INPUTS *
>>
>> qzenHPFVPNb7GaEypSvp3yV3s : 
>> XQuQHpOklVnUHEUTmtEbePU7O80Vaj5mwWnvNMrGyBiSsML9dA
>> : 78358289-rjJ44rtlFr5BlTMyNPzDjQq6daOMT7PnQVvkGgwgO :
>> IAr1eoCbBuJRkaERcKFxv5mFAOu6KPr7Q2EmAZYRXmedl
>>
>> * FOUND PROPERTIES *
>>
>> Submitting job with JobID: de6024b484fc1d163b4190011595e4f0. Waiting for
>> job completion.
>>
>> Connected to JobManager at Actor[akka.tcp://flink@localho
>> st:6123/user/jobmanager#1901137740] with leader session id
>> ----.
>>
>> 08/29/2017 14:25:32 Job execution switched to status RUNNING.
>>
>> 08/29/2017 14:25:32 Source: Custom Source -> Sink: Unnamed(1/1) switched
>> to SCHEDULED
>>
>> 08/29/2017 14:25:32 Source: Custom Source -> Sink: Unnamed(1/1) switched
>> to DEPLOYING
>>
>> 08/29/2017 14:25:32 Source: Custom Source -> Sink: Unnamed(1/1) switched
>> to RUNNING
>>
>>
>


Re: Twitter example

2017-08-30 Thread Krishnanand Khambadkone
I am running this standalone, not under yarn,  on a single instance setup.  I 
believe you are referring to the flink log files

Sent from my iPhone

> On Aug 30, 2017, at 12:56 AM, Fabian Hueske  wrote:
> 
>Fabian Hueske (fhue...@gmail.com) is not on your Guest List | Approve 
> sender | Approve domain 
> Hi,
> 
> print() writes the data to the out files of the TaskManagers. So you need to 
> go to the machine that runs the TM and check its out file which is located in 
> the log folder.
> 
> Best, Fabian
> 
> 2017-08-29 23:53 GMT+02:00 Krishnanand Khambadkone :
>> I am trying to run the basic twitter example,  it does deploy correctly and 
>> show as RUNNING both in the command line and in the UI.  However I see no 
>> output.  I am able to verify that my credentials are correct with another 
>> small client I have built that does connect to twitter and prints out tweets.
>> 
>> 
>> streamSource = env.addSource(new TwitterSource(props));
>> streamSource.print();
>> 
>> 
>> 
>> 
>> *** TWITTER INPUTS *
>> qzenHPFVPNb7GaEypSvp3yV3s : 
>> XQuQHpOklVnUHEUTmtEbePU7O80Vaj5mwWnvNMrGyBiSsML9dA : 
>> 78358289-rjJ44rtlFr5BlTMyNPzDjQq6daOMT7PnQVvkGgwgO : 
>> IAr1eoCbBuJRkaERcKFxv5mFAOu6KPr7Q2EmAZYRXmedl
>> * FOUND PROPERTIES *
>> Submitting job with JobID: de6024b484fc1d163b4190011595e4f0. Waiting for job 
>> completion.
>> Connected to JobManager at 
>> Actor[akka.tcp://flink@localhost:6123/user/jobmanager#1901137740] with 
>> leader session id ----.
>> 08/29/2017 14:25:32  Job execution switched to status RUNNING.
>> 08/29/2017 14:25:32  Source: Custom Source -> Sink: Unnamed(1/1) switched to 
>> SCHEDULED 
>> 08/29/2017 14:25:32  Source: Custom Source -> Sink: Unnamed(1/1) switched to 
>> DEPLOYING 
>> 08/29/2017 14:25:32  Source: Custom Source -> Sink: Unnamed(1/1) switched to 
>> RUNNING
>> 
> 


Re: Kafka Offset settings in Flink Kafka Consumer 10

2017-08-30 Thread sohimankotia
My Bad :-) Sorry.

We are using flink 1.2 dependencies  . And I think this functionality is
only available from flink 1.3 API Version .



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Kafka Offset settings in Flink Kafka Consumer 10

2017-08-30 Thread Oleksandr Baliev
Hi,

it's there https://ci.apache.org/projects/flink/flink-docs-
release-1.3/api/java/org/apache/flink/streaming/connectors/kafka/
FlinkKafkaConsumerBase.html#setStartFromSpecificOffsets-java.util.Map-
just defined in FlinkKafkaConsumerBase

2017-08-30 16:34 GMT+02:00 sohimankotia :

> Hi,
>
> I see that Flink Kafka consumer have ability to set specific offset to read
> from
>
> Map specificStartOffsets = new HashMap<>();
> specificStartOffsets.put(new KafkaTopicPartition("topic", 0), 23L);
> specificStartOffsets.put(new KafkaTopicPartition("topic", 1), 31L);
> specificStartOffsets.put(new KafkaTopicPartition("topic", 2), 43L);
>
> FlinkKafkaConsumer08 consumer = new FlinkKafkaConsumer08<>(...);
> consumer.setStartFromSpecificOffsets(specificStartOffsets);
>
>
> But there is no methods present in  FlinkKafkaConsumer010 .
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>


Kafka Offset settings in Flink Kafka Consumer 10

2017-08-30 Thread sohimankotia
Hi,

I see that Flink Kafka consumer have ability to set specific offset to read
from 

Map specificStartOffsets = new HashMap<>();
specificStartOffsets.put(new KafkaTopicPartition("topic", 0), 23L);
specificStartOffsets.put(new KafkaTopicPartition("topic", 1), 31L);
specificStartOffsets.put(new KafkaTopicPartition("topic", 2), 43L);

FlinkKafkaConsumer08 consumer = new FlinkKafkaConsumer08<>(...);
consumer.setStartFromSpecificOffsets(specificStartOffsets);


But there is no methods present in  FlinkKafkaConsumer010 .



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: "Unable to find registrar for hdfs" on Flink cluster

2017-08-30 Thread P. Ramanjaneya Reddy
Thank you Aljoscha.

With above steps working wordcount beam using quick start program.

When running on actual beam source tree getting following error.

root1@master:~/Projects/*beam*/examples/java$ *git branch *
  master
* release-2.0.0 * ==> beam source code*
root1@master:~/Projects/beam/examples/java$
root1@master:~/Projects/beam/examples/java$* mvn dependency:tree
-Pflink-runner |grep flink*
[INFO] \- org.apache.beam:beam-runners-flink_2.10:jar:2.2.0-SNAPSHOT:runtime
[INFO]+- org.apache.flink:flink-clients_2.10:jar:1.3.0:runtime
[INFO]|  +- org.apache.flink:flink-optimizer_2.10:jar:1.3.0:runtime
[INFO]|  \- org.apache.flink:force-shading:jar:1.3.0:runtime
[INFO]+- org.apache.flink:flink-core:jar:1.3.0:runtime
[INFO]|  +- org.apache.flink:flink-annotations:jar:1.3.0:runtime
[INFO]+- org.apache.flink:flink-metrics-core:jar:1.3.0:runtime
[INFO]+- org.apache.flink:flink-java:jar:1.3.0:runtime
[INFO]|  +- org.apache.flink:flink-shaded-hadoop2:jar:1.3.0:runtime
[INFO]+- org.apache.flink:flink-runtime_2.10:jar:1.3.0:runtime
[INFO]+- org.apache.flink:flink-streaming-java_2.10:jar:1.3.0:runtime
root1@master:~/Projects/beam/examples/java$


root1@master:~/Projects/*beam*/examples/java$ *mvn package exec:java
-Dexec.mainClass=org.apache.be am.examples.WordCount
-Dexec.args="--runner=FlinkRunner --flinkMaster=192.168.56.1:6123

--filesToStage=/home/root1/Projects/beam/examples/java/target/beam-examples-java-2.0.0.jar
--inputFile=hdfs://master:9000/test/wordcount_input.txt
 --output=hdfs://master:9000/test/wordcount_output919" -Pflink-runner
-Dcheckstyle.skip=true -DskipTests*


*Error Log:*

INFO: Received job wordcount-root1-0830134254-67bc7d88
(02066e0dc345cdd6f34f20258a4c807e).
Aug 30, 2017 7:12:56 PM org.apache.flink.runtime.client.JobClientActor
disconnectFromJobManager
INFO: Disconnect from JobManager null.
Aug 30, 2017 7:12:56 PM org.apache.flink.runtime.client.JobClientActor
connectToJobManager
INFO: Connect to JobManager Actor[akka.tcp://flink@master:
6123/user/jobmanager#-1763674796].
Aug 30, 2017 7:12:56 PM org.apache.flink.runtime.client.JobClientActor
logAndPrintMessage
INFO: Connected to JobManager at Actor[akka.tcp://flink@master:
6123/user/jobmanager#-1763674796] with leader session id
----.
Connected to JobManager at Actor[akka.tcp://flink@master:
6123/user/jobmanager#-1763674796] with leader session id
----.
Aug 30, 2017 7:12:56 PM
org.apache.flink.runtime.client.JobSubmissionClientActor
tryToSubmitJob
INFO: Sending message to JobManager
akka.tcp://flink@master:6123/user/jobmanager
to submit job wordcount-root1-0830134254-67bc7d88
(02066e0dc345cdd6f34f20258a4c807e) and wait for progress
Aug 30, 2017 7:12:56 PM
org.apache.flink.runtime.client.JobSubmissionClientActor$1
call
INFO: Upload jar files to job manager akka.tcp://flink@master:6123/u
ser/jobmanager.
Aug 30, 2017 7:12:56 PM org.apache.flink.runtime.blob.BlobClient
uploadJarFiles
INFO: Blob client connecting to akka.tcp://flink@master:6123/user/jobmanager
Aug 30, 2017 7:12:56 PM
org.apache.flink.runtime.client.JobSubmissionClientActor$1
call
INFO: Submit job to the job manager akka.tcp://flink@master:6123/u
ser/jobmanager.
Aug 30, 2017 7:12:56 PM org.apache.flink.runtime.client.JobClientActor
terminate
INFO: Terminate JobClientActor.
Aug 30, 2017 7:12:56 PM org.apache.flink.runtime.client.JobClientActor
disconnectFromJobManager
INFO: Disconnect from JobManager Actor[akka.tcp://flink@master:
6123/user/jobmanager#-1763674796].
Aug 30, 2017 7:12:56 PM org.apache.flink.runtime.client.JobClient
awaitJobResult
INFO: Job execution failed
Aug 30, 2017 7:12:56 PM akka.event.slf4j.Slf4jLogger$$
anonfun$receive$1$$anonfun$applyOrElse$3 apply$mcV$sp
INFO: Shutting down remote daemon.
Aug 30, 2017 7:12:56 PM akka.event.slf4j.Slf4jLogger$$
anonfun$receive$1$$anonfun$applyOrElse$3 apply$mcV$sp
INFO: Remote daemon shut down; proceeding with flushing remote transports.
Aug 30, 2017 7:12:56 PM akka.event.slf4j.Slf4jLogger$$
anonfun$receive$1$$anonfun$applyOrElse$3 apply$mcV$sp
INFO: Remoting shut down.
Aug 30, 2017 7:12:56 PM org.apache.beam.runners.flink.FlinkRunner run
SEVERE: Pipeline execution failed
org.apache.flink.client.program.ProgramInvocationException: The program
execution failed: Cannot initialize task 'DataSource (at Read(CreateSource)
(org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat))':
Deserializing the InputFormat (org.apache.beam.runners.flink
.translation.wrappers.SourceInputFormat@7ef64f) failed: Could not read the
user code wrapper: org.apache.beam.runners.flink.
translation.wrappers.SourceInputFormat
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:478)
at org.apache.flink.client.program.StandaloneClusterClient.subm
itJob(StandaloneClusterClient.java:105)
at 

Re: Classloader issue with UDF's in DataStreamSource

2017-08-30 Thread Edward
In case anyone else runs into this, here's what I discovered:

For whatever reason, the classloader used by
org.apache.flink.api.java.typeutils.TypeExtractor did not have access to the
classes in my udf.jar file. However, if I changed my
KeyedDeserializationSchema implementation to use standard Avro classes (i.e.
GenericRecord rather than a SpecificRecord), the classloader didn't require
any of the generated Avro classes in udf.jar during the ExecutionGraph
stage.

At execution time, my deserializer forced the returned GenericRecord into
the my custom Avro SpecificRecord class, which was available to the
classloader at this point. 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Union limit

2017-08-30 Thread Fabian Hueske
Hi b0c1,

This is an limitation in Flink's optimizer.
Internally, all binary unions are merged into a single n-ary union. The
optimizer restricts the number of inputs for an operator to 64.

You can work around this limitation with an identity mapper which prevents
the union operators from merging:

in1\
in2-- Id-Map--- NextOp
...   / / /
in14--/ / /
  / /
in15/ /
...   /
in74/

This is not a super nice solution, but the only way that comes to my mind.

Cheers, Fabian

2017-08-28 23:29 GMT+02:00 boci :

> Hi guys!
>
> I have one input (from mongo) and I split the incoming data to multiple
> datasets (each created dynamically from configuration) and before I write
> back the result I want to merge it to one dataset (there is some common
> transformation).
> so the flow:
>
> DataSet from Mongod =>
> Create Mappers dynamically (currently 74) so I have 74 DataSet =>
> Custom filter and mapping on each dataset =>
> Union dynamically to one (every mapper result is same type) =>
> Some another common transformation =>
> Count the result
>
> but when I want to union more than 64 dataset I got these exception:
>
> Exception in thread "main" org.apache.flink.optimizer.CompilerException:
> Cannot currently handle nodes with more than 64 outputs.
> at org.apache.flink.optimizer.dag.OptimizerNode.addOutgoingConnection(
> OptimizerNode.java:348)
> at org.apache.flink.optimizer.dag.SingleInputNode.setInput(
> SingleInputNode.java:202)
> at org.apache.flink.optimizer.traversals.GraphCreatingVisitor.postVisit(
> GraphCreatingVisitor.java:268)
> at org.apache.flink.optimizer.traversals.GraphCreatingVisitor.postVisit(
> GraphCreatingVisitor.java:82)
>
> I try to split the incoming (74) list of dataset to split to 60 + 14
>  dataset and create an id mapper and union the result datasets but no
> success:
>
> val listOfDataSet: List[DataSet[...]] = 
>
> listOfDataSet
> .sliding(60,60)
> .map(dsg => dsg.reduce((ds1,ds2) => ds1.union(ds2)),map(new IdMapper()))
> //There is an iterator of DataSet
> .reduce((dsg1,dsg2) => dsg1.union(dsg2)) // Here I got the exception
> .map(finalDataSet => ... some transformation ...)
> .count()
>
> There is any solution to solve this?
>
> Thanks
> b0c1
>


BlobCache and its functioning

2017-08-30 Thread Federico D'Ambrosio
Hi,

I have a rather simple Flink job which has a KinesisConsumer as a source
and an HBase table as sink, in which I write using writeOutputFormat. I'm
running it on a local machine with a single taskmanager (2 slots, 2G). The
KinesisConsumer works fine and the connection to the HBase table gets
opened fine (i.e. the open method of the class implementing OutputFormat
gets actually called).

I'm running the job at a parallelism of 2, while the sink has a parallelism
of 1. The

Still, looking at the log I see that after opening the connection, the job
gets stuck at lines like this one:

INFO  org.apache.flink.runtime.blob.BlobCache   -
Downloading 8638bdf78b0e540786de6c291f710a8db447a2b4 from
localhost/127.0.0.1:43268

Each following one another, like this:

2017-08-30 14:17:21,318 INFO  org.apache.flink.runtime.blob.BlobCache
 - Created BLOB cache storage directory
/tmp/blobStore-8a2a96af-b836-4c95-b79a-a4b80929126f
2017-08-30 14:17:21,321 DEBUG org.apache.flink.runtime.blob.BlobClient
 - PUT content addressable BLOB stream to
/127.0.0.1:59937
2017-08-30 14:17:21,323 DEBUG
org.apache.flink.runtime.blob.BlobServerConnection-
Received PUT request for content addressable BLOB
2017-08-30 14:17:21,324 INFO  org.apache.flink.runtime.blob.BlobCache
 - Downloading
3ff486dff4c4eaafdab42b30a877326e62bfca82 from
localhost/127.0.0.1:43268
2017-08-30 14:17:21,324 DEBUG org.apache.flink.runtime.blob.BlobClient
 - GET content addressable BLOB
3ff486dff4c4eaafdab42b30a877326e62bfca82 from /127.0.0.1:59938
2017-08-30 14:18:13,708 DEBUG org.apache.flink.runtime.blob.BlobClient
 - PUT content addressable BLOB stream to
/127.0.0.1:59976
2017-08-30 14:18:13,708 DEBUG
org.apache.flink.runtime.blob.BlobServerConnection-
Received PUT request for content addressable BLOB
2017-08-30 14:18:13,710 INFO  org.apache.flink.runtime.blob.BlobCache
 - Downloading
2f5283326aab77faa047b705cd1d6470035b3b7d from
localhost/127.0.0.1:43268
2017-08-30 14:18:13,710 DEBUG org.apache.flink.runtime.blob.BlobClient
 - GET content addressable BLOB
2f5283326aab77faa047b705cd1d6470035b3b7d from /127.0.0.1:59978
2017-08-30 14:19:29,811 DEBUG org.apache.flink.runtime.blob.BlobClient
 - PUT content addressable BLOB stream to
/127.0.0.1:60022
2017-08-30 14:19:29,812 DEBUG
org.apache.flink.runtime.blob.BlobServerConnection-
Received PUT request for content addressable BLOB
2017-08-30 14:19:29,814 INFO  org.apache.flink.runtime.blob.BlobCache
 - Downloading
f91fd7ecec6f90809f52ee189cb48aa1e30b04f6 from
localhost/127.0.0.1:43268
2017-08-30 14:19:29,814 DEBUG org.apache.flink.runtime.blob.BlobClient
 - GET content addressable BLOB
f91fd7ecec6f90809f52ee189cb48aa1e30b04f6 from /127.0.0.1:60024
2017-08-30 14:21:42,856 DEBUG org.apache.flink.runtime.blob.BlobClient
 - PUT content addressable BLOB stream to
/127.0.0.1:60110
2017-08-30 14:21:42,856 DEBUG
org.apache.flink.runtime.blob.BlobServerConnection-
Received PUT request for content addressable BLOB
2017-08-30 14:21:42,858 INFO  org.apache.flink.runtime.blob.BlobCache
 - Downloading
8638bdf78b0e540786de6c291f710a8db447a2b4 from
localhost/127.0.0.1:43268
2017-08-30 14:21:42,859 DEBUG org.apache.flink.runtime.blob.BlobClient
 - GET content addressable BLOB
8638bdf78b0e540786de6c291f710a8db447a2b4 from /127.0.0.1:60112
2017-08-30 14:26:11,242 DEBUG org.apache.flink.runtime.blob.BlobClient
 - PUT content addressable BLOB stream to
/127.0.0.1:60295
2017-08-30 14:26:11,243 DEBUG
org.apache.flink.runtime.blob.BlobServerConnection-
Received PUT request for content addressable BLOB
2017-08-30 14:26:11,247 INFO  org.apache.flink.runtime.blob.BlobCache
 - Downloading
6d30c88539d511bb9acc13b53bb2a128614f5621 from
localhost/127.0.0.1:43268
2017-08-30 14:26:11,247 DEBUG org.apache.flink.runtime.blob.BlobClient
 - GET content addressable BLOB
6d30c88539d511bb9acc13b53bb2a128614f5621 from /127.0.0.1:60297
2017-08-30 14:29:20,942 DEBUG org.apache.flink.runtime.blob.BlobClient
 - PUT content addressable BLOB stream to
/127.0.0.1:60410


My questions are: what is the jobmanager doing here? Why is he taking ages
to do this? How do i speed up this behaviour?

Thank you very much for your attention,

Federico D'Ambrosio


Modify field topics (KafkaConsumer) during runtime

2017-08-30 Thread Jose Miguel Tejedor Fernandez
Hi,

I am using Flink version 1.3.1.

I am wondering if it is possible to add/delete new topics to
FlinkKafkaConsumer during execution of a job? Otherwise, I guess I need to
cancel the job and redeploy the new job.
Cheers

BR


Re: EOFException related to memory segments during run of Beam pipeline on Flink

2017-08-30 Thread Fabian Hueske
Hi Reinier,

this is in fact a bug that you stumbled upon.
In general, Flink works very well with larger data sets and little memory
and gracefully spills data to disk.
The problem in your case is caused by a wrapped exception.

Internally, Flink uses an EOFException to signal that the memory pool is
exhausted (in your case thrown by SimpleCollectingOutputView.nextSegment()).
Unfortunately, Beam's SerializableCoder wraps this exception in a
CoderException which unknown to Flink and won't be detected as EOFException
by NormalizedKeySorter.write() (line 283).

I think it is debatable whether this is an issue of Beam's Flink runner or
Flink itself.
In any case, it would be good if you could open an issue at Beam's JIRA to
track this problem.
A quick solution for your use case should could be to use a custom coder
that forwards the EOFException instead of wrapping it.

Best, Fabian

2017-08-30 14:00 GMT+02:00 Reinier Kip :

> Hi all,
>
> I’ve been running a Beam pipeline on Flink. Depending on the dataset size
> and the heap memory configuration of the jobmanager and taskmanager, I may
> run into an EOFException, which causes the job to fail. You will find the
> stacktrace near the bottom of this post (data censored).
>
> I would not expect such a sudden failure as the dataset apparently grows
> above a certain size. Doesn’t Flink spill data over to disk when memory
> runs out? How do I deal with this unpredictable behaviour in a production
> situation? I’m running a clean Flink 1.3.2 with heap memory of 768MiB. The
> dataset size is in the tens of megabytes. The same root EOFException
> occurred in Flink 1.2.1. I will gladly provide more information where
> needed.
>
> If this is expected behaviour, I feel it should be documented, meaning a
> more informative exception message, and managing user expectations in the
> guides. (I have not been able to find any information regarding this
> exception.)
>
> Hoping that someone can enlighten me,
>
> Reinier
>
>
> 08/30/2017 13:48:33 GroupReduce (GroupReduce at GroupByKey)(1/1)
> switched to FAILED
> java.lang.Exception: The data preparation for task 'GroupReduce
> (GroupReduce at GroupByKey)' , caused an error: Error obtaining the sorted
> input: Thread 'SortMerger Reading Thread' terminated due to an exception:
> unable to serialize record FakeSerialisableObjectWithStringsAndDoubles{}
> at org.apache.flink.runtime.operators.BatchTask.run(
> BatchTask.java:466)
> at org.apache.flink.runtime.operators.BatchTask.invoke(
> BatchTask.java:355)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
> Thread 'SortMerger Reading Thread' terminated due to an exception: unable
> to serialize record FakeSerialisableObjectWithStringsAndDoubles{}
> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.
> getIterator(UnilateralSortMerger.java:619)
> at org.apache.flink.runtime.operators.BatchTask.getInput(
> BatchTask.java:1095)
> at org.apache.flink.runtime.operators.GroupReduceDriver.
> prepare(GroupReduceDriver.java:99)
> at org.apache.flink.runtime.operators.BatchTask.run(
> BatchTask.java:460)
> ... 3 more
> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
> terminated due to an exception: unable to serialize record
> FakeSerialisableObjectWithStringsAndDoubles{}
> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$
> ThreadBase.run(UnilateralSortMerger.java:800)
> Caused by: org.apache.beam.sdk.coders.CoderException: unable to serialize
> record FakeSerialisableObjectWithStringsAndDoubles{}
> at org.apache.beam.sdk.coders.SerializableCoder.encode(
> SerializableCoder.java:129)
> at org.apache.beam.sdk.coders.SerializableCoder.encode(
> SerializableCoder.java:48)
> at org.apache.beam.sdk.coders.Coder.encode(Coder.java:135)
> at org.apache.beam.sdk.transforms.join.UnionCoder.
> encode(UnionCoder.java:76)
> at org.apache.beam.sdk.transforms.join.UnionCoder.
> encode(UnionCoder.java:60)
> at org.apache.beam.sdk.transforms.join.UnionCoder.
> encode(UnionCoder.java:33)
> at org.apache.beam.sdk.coders.IterableLikeCoder.encode(
> IterableLikeCoder.java:99)
> at org.apache.beam.sdk.coders.IterableLikeCoder.encode(
> IterableLikeCoder.java:60)
> at org.apache.beam.sdk.coders.Coder.encode(Coder.java:135)
> at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:73)
> at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:36)
> at org.apache.beam.sdk.util.WindowedValue$
> FullWindowedValueCoder.encode(WindowedValue.java:652)
> at org.apache.beam.sdk.util.WindowedValue$
> FullWindowedValueCoder.encode(WindowedValue.java:641)
> at org.apache.beam.sdk.util.WindowedValue$
> 

EOFException related to memory segments during run of Beam pipeline on Flink

2017-08-30 Thread Reinier Kip
Hi all,

I’ve been running a Beam pipeline on Flink. Depending on the dataset size and 
the heap memory configuration of the jobmanager and taskmanager, I may run into 
an EOFException, which causes the job to fail. You will find the stacktrace 
near the bottom of this post (data censored).

I would not expect such a sudden failure as the dataset apparently grows above 
a certain size. Doesn’t Flink spill data over to disk when memory runs out? How 
do I deal with this unpredictable behaviour in a production situation? I’m 
running a clean Flink 1.3.2 with heap memory of 768MiB. The dataset size is in 
the tens of megabytes. The same root EOFException occurred in Flink 1.2.1. I 
will gladly provide more information where needed.

If this is expected behaviour, I feel it should be documented, meaning a more 
informative exception message, and managing user expectations in the guides. (I 
have not been able to find any information regarding this exception.)

Hoping that someone can enlighten me,

Reinier


08/30/2017 13:48:33 GroupReduce (GroupReduce at GroupByKey)(1/1) switched 
to FAILED
java.lang.Exception: The data preparation for task 'GroupReduce (GroupReduce at 
GroupByKey)' , caused an error: Error obtaining the sorted input: Thread 
'SortMerger Reading Thread' terminated due to an exception: unable to serialize 
record FakeSerialisableObjectWithStringsAndDoubles{}
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:466)
at 
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 
'SortMerger Reading Thread' terminated due to an exception: unable to serialize 
record FakeSerialisableObjectWithStringsAndDoubles{}
at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
at 
org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1095)
at 
org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460)
... 3 more
Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated 
due to an exception: unable to serialize record 
FakeSerialisableObjectWithStringsAndDoubles{}
at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
Caused by: org.apache.beam.sdk.coders.CoderException: unable to serialize 
record FakeSerialisableObjectWithStringsAndDoubles{}
at 
org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:129)
at 
org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:48)
at org.apache.beam.sdk.coders.Coder.encode(Coder.java:135)
at 
org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:76)
at 
org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:60)
at 
org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:33)
at 
org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:99)
at 
org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:60)
at org.apache.beam.sdk.coders.Coder.encode(Coder.java:135)
at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:73)
at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:36)
at 
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:652)
at 
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:641)
at 
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:599)
at 
org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.serialize(CoderTypeSerializer.java:80)
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:125)
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30)
at 
org.apache.flink.runtime.operators.sort.NormalizedKeySorter.write(NormalizedKeySorter.java:281)
at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1037)
at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
Caused by: java.io.EOFException
at 
org.apache.flink.runtime.io.disk.SimpleCollectingOutputView.nextSegment(SimpleCollectingOutputView.java:79)
at 
org.apache.flink.runtime.memory.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)
at 
org.apache.flink.runtime.memory.AbstractPagedOutputView.write(AbstractPagedOutputView.java:190)
at 

Re: Flink session on Yarn - ClassNotFoundException

2017-08-30 Thread Federico D'Ambrosio
Hi,
What is your "hadoop version" output? I'm asking because you said your
hadoop distribution is in /usr/hdp so it looks like you're using
Hortonworks HDP, just like myself. So, this would be a third party
distribution and you'd need to build Flink from source according to this:
https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/building.html#vendor-specific-versions

Federico D'Ambrosio

Il 30 ago 2017 13:33, "albert"  ha scritto:

> Hi Chesnay,
>
> Thanks for your reply. I did download the binaries matching my Hadoop
> version (2.7), that's why I was wondering if the issue had something to do
> with the exact hadoop version flink is compiled again, or if there might be
> things that are missing in my environment.
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>


Re: Flink session on Yarn - ClassNotFoundException

2017-08-30 Thread albert
Hi Chesnay,

Thanks for your reply. I did download the binaries matching my Hadoop
version (2.7), that's why I was wondering if the issue had something to do
with the exact hadoop version flink is compiled again, or if there might be
things that are missing in my environment.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Off heap memory issue

2017-08-30 Thread Robert Metzger
Hi Javier,

I'm not aware of such issues with Flink, but if you could give us some more
details on your setup, I might get some more ideas on what to look for.

are you using the RocksDBStateBackend? (RocksDB is doing some JNI
allocations, that could potentially leak memory)
Also, are you passing any special garbage collector options? (Maybe some
classes are not unloaded)
Are you using anything else that is special (such as protobuf or avro
formats, or any other big library)?

Regards,
Robert



On Mon, Aug 28, 2017 at 5:04 PM, Javier Lopez 
wrote:

> Hi all,
>
> we are starting a lot of Flink jobs (streaming), and after we have started
> 200 or more jobs we see that the non-heap memory in the taskmanagers
> increases a lot, to the point of killing the instances. We found out that
> every time we start a new job, the committed non-heap memory increases by 5
> to 10MB. Is this an expected behavior? Are there ways to prevent this?
>


Re: Elasticsearch Sink - Error

2017-08-30 Thread Fabian Hueske
That's correct Flavio.
The issue has been reported as
https://issues.apache.org/jira/browse/FLINK-7386

Best, Fabian

2017-08-30 9:21 GMT+02:00 Flavio Pompermaier :

> I also had problems with ES 5.4.3 and I had to modify the connector
> code...I fear that the code is compatible only up to ES 5.2 or similar..
>
> On Wed, Aug 30, 2017 at 5:40 AM, Raj Kumar 
> wrote:
>
>> Hi,
>> I am using elasticsearch 5.4.3 version in my flink project(flink version
>> 1.3.1)
>> Details
>> 1. Using Maven build tool.
>> 2. Running from intellij IDE.
>> 3. Elasticsearch is running on the local machine.
>>
>> Have added the following maven dependency
>>
>> 
>> org.apache.flink
>> flink-connector-elasticsearch5_2.10
>> 1.3.1
>> 
>>
>>
>> *code added*
>>
>> Map config = new HashMap<>();
>> config.put("cluster.name", "elasticsearch");
>> config.put("bulk.flush.max.actions", "1");
>>
>> List transportAddresses = new
>> ArrayList<>();
>> transportAddresses.add(new
>> InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
>>
>> alerts.addSink(new ElasticsearchSink(config,
>> transportAddresses, new ElasticsearchSinkFunction() {
>> public IndexRequest createIndexRequest(AggResult
>> aggResult){
>> Map json = new HashMap<>();
>> json.put("totalCount", aggResult.getTotalCount());
>>
>> return Requests
>> .indexRequest()
>> .index("logdata")
>> .type("consolidatedStreamData")
>> .source(json);
>>
>> }
>> @Override
>> public void process(AggResult aggResult, RuntimeContext
>> runtimeContext, RequestIndexer requestIndexer) {
>> requestIndexer.add(createIndexRequest(aggResult));
>> }
>> }));
>>
>>
>>
>> *This results in the following error.*
>>
>> Caused by: java.lang.NoSuchMethodError:
>> org.elasticsearch.action.bulk.BulkProcessor.add(Lorg/elastic
>> search/action/ActionRequest;)Lorg/elasticsearch/action/
>> bulk/BulkProcessor;
>> at
>> org.apache.flink.streaming.connectors.elasticsearch.BulkProc
>> essorIndexer.add(BulkProcessorIndexer.java:52)
>> at ECSPrototype$2.process(ECSPrototype.java:148)
>> at ECSPrototype$2.process(ECSPrototype.java:134)
>> at
>> org.apache.flink.streaming.connectors.elasticsearch.Elastics
>> earchSinkBase.invoke(ElasticsearchSinkBase.java:282)
>> at
>> org.apache.flink.streaming.api.operators.StreamSink.processE
>> lement(StreamSink.java:41)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi
>> ngChainingOutput.pushToOperator(OperatorChain.java:528)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi
>> ngChainingOutput.collect(OperatorChain.java:503)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi
>> ngChainingOutput.collect(OperatorChain.java:483)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$Broad
>> castingOutputCollector.collect(OperatorChain.java:575)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$Broad
>> castingOutputCollector.collect(OperatorChain.java:536)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOpera
>> tor$CountingOutput.collect(AbstractStreamOperator.java:891)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOpera
>> tor$CountingOutput.collect(AbstractStreamOperator.java:869)
>> at
>> org.apache.flink.streaming.api.operators.TimestampedCollecto
>> r.collect(TimestampedCollector.java:51)
>> at ECSPrototype$FlinkFinalProcess.processElement(MyPrototype.ja
>> va:327)
>> at ECSPrototype$FlinkFinalProcess.processElement(MyPrototype.ja
>> va:303)
>> at
>> org.apache.flink.streaming.api.operators.KeyedProcessOperato
>> r.processElement(KeyedProcessOperator.java:94)
>> at
>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.p
>> rocessInput(StreamInputProcessor.java:206)
>> at
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.
>> run(OneInputStreamTask.java:69)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>> StreamTask.java:263)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>> at java.lang.Thread.run(Thread.java:748)
>>
>>
>> Anyidea what is wrong here ?
>>
>>
>>
>>
>> --
>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.
>> nabble.com/
>>
>
>
>
> --
> Flavio Pompermaier
> Development Department
>
> OKKAM S.r.l.
> Tel. +(39) 0461 1823908 <+39%200461%20182%203908>
>


Re: Twitter example

2017-08-30 Thread Fabian Hueske
Hi,

print() writes the data to the out files of the TaskManagers. So you need
to go to the machine that runs the TM and check its out file which is
located in the log folder.

Best, Fabian

2017-08-29 23:53 GMT+02:00 Krishnanand Khambadkone :

> I am trying to run the basic twitter example,  it does deploy correctly
> and show as RUNNING both in the command line and in the UI.  However I see
> no output.  I am able to verify that my credentials are correct with
> another small client I have built that does connect to twitter and prints
> out tweets.
>
>
> streamSource = env.addSource(new TwitterSource(props));
>
> streamSource.print();
>
>
>
>
>
> *** TWITTER INPUTS *
>
> qzenHPFVPNb7GaEypSvp3yV3s : XQuQHpOklVnUHEUTmtEbePU7O80Vaj5mwWnvNMrGyBiSsML9dA
> : 78358289-rjJ44rtlFr5BlTMyNPzDjQq6daOMT7PnQVvkGgwgO :
> IAr1eoCbBuJRkaERcKFxv5mFAOu6KPr7Q2EmAZYRXmedl
>
> * FOUND PROPERTIES *
>
> Submitting job with JobID: de6024b484fc1d163b4190011595e4f0. Waiting for
> job completion.
>
> Connected to JobManager at 
> Actor[akka.tcp://flink@localhost:6123/user/jobmanager#1901137740]
> with leader session id ----.
>
> 08/29/2017 14:25:32 Job execution switched to status RUNNING.
>
> 08/29/2017 14:25:32 Source: Custom Source -> Sink: Unnamed(1/1) switched
> to SCHEDULED
>
> 08/29/2017 14:25:32 Source: Custom Source -> Sink: Unnamed(1/1) switched
> to DEPLOYING
>
> 08/29/2017 14:25:32 Source: Custom Source -> Sink: Unnamed(1/1) switched
> to RUNNING
>
>


Re: Elasticsearch Sink - Error

2017-08-30 Thread Flavio Pompermaier
I also had problems with ES 5.4.3 and I had to modify the connector
code...I fear that the code is compatible only up to ES 5.2 or similar..

On Wed, Aug 30, 2017 at 5:40 AM, Raj Kumar 
wrote:

> Hi,
> I am using elasticsearch 5.4.3 version in my flink project(flink version
> 1.3.1)
> Details
> 1. Using Maven build tool.
> 2. Running from intellij IDE.
> 3. Elasticsearch is running on the local machine.
>
> Have added the following maven dependency
>
> 
> org.apache.flink
> flink-connector-elasticsearch5_2.10
> 1.3.1
> 
>
>
> *code added*
>
> Map config = new HashMap<>();
> config.put("cluster.name", "elasticsearch");
> config.put("bulk.flush.max.actions", "1");
>
> List transportAddresses = new ArrayList<>();
> transportAddresses.add(new
> InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
>
> alerts.addSink(new ElasticsearchSink(config,
> transportAddresses, new ElasticsearchSinkFunction() {
> public IndexRequest createIndexRequest(AggResult
> aggResult){
> Map json = new HashMap<>();
> json.put("totalCount", aggResult.getTotalCount());
>
> return Requests
> .indexRequest()
> .index("logdata")
> .type("consolidatedStreamData")
> .source(json);
>
> }
> @Override
> public void process(AggResult aggResult, RuntimeContext
> runtimeContext, RequestIndexer requestIndexer) {
> requestIndexer.add(createIndexRequest(aggResult));
> }
> }));
>
>
>
> *This results in the following error.*
>
> Caused by: java.lang.NoSuchMethodError:
> org.elasticsearch.action.bulk.BulkProcessor.add(Lorg/elasticsearch/action/
> ActionRequest;)Lorg/elasticsearch/action/bulk/BulkProcessor;
> at
> org.apache.flink.streaming.connectors.elasticsearch.
> BulkProcessorIndexer.add(BulkProcessorIndexer.java:52)
> at ECSPrototype$2.process(ECSPrototype.java:148)
> at ECSPrototype$2.process(ECSPrototype.java:134)
> at
> org.apache.flink.streaming.connectors.elasticsearch.
> ElasticsearchSinkBase.invoke(ElasticsearchSinkBase.java:282)
> at
> org.apache.flink.streaming.api.operators.StreamSink.
> processElement(StreamSink.java:41)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.java:503)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.java:483)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$
> BroadcastingOutputCollector.collect(OperatorChain.java:575)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$
> BroadcastingOutputCollector.collect(OperatorChain.java:536)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:891)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:869)
> at
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(
> TimestampedCollector.java:51)
> at ECSPrototype$FlinkFinalProcess.processElement(MyPrototype.
> java:327)
> at ECSPrototype$FlinkFinalProcess.processElement(MyPrototype.
> java:303)
> at
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.
> processElement(KeyedProcessOperator.java:94)
> at
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(
> StreamInputProcessor.java:206)
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(
> OneInputStreamTask.java:69)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:263)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
> at java.lang.Thread.run(Thread.java:748)
>
>
> Anyidea what is wrong here ?
>
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>



-- 
Flavio Pompermaier
Development Department

OKKAM S.r.l.
Tel. +(39) 0461 1823908