Re: Flink Kafka Producer Exception

2017-12-13 Thread Tzu-Li (Gordon) Tai
Hi Navneeth,

The exception you are getting is a Kafka NetworkException.
From the provided information I can’t really tell much and can only guess, but 
are you sure that the client / broker versions match?
It seems like that you are using 0.10; the default client version in the Flink 
Kafka 0.10 connector is 0.10.2.1.

Cheers,
Gordon

On 14 December 2017 at 5:37:31 AM, Navneeth Krishnan (reachnavnee...@gmail.com) 
wrote:

I have a kafka source and sink in my pipeline and when I start my job I get 
this error and the job goes to failed state. I checked the kafka node and 
everything looks good. Any suggestion on what is happening here? Thanks.



Re: how does time-windowed join and Over-Window Aggregation implemented in flink SQL?

2017-12-13 Thread Xingcan Cui
Hi Yan Zhou,

as you may have noticed, the SQL level stream join was not built on top of
some join APIs but was implemented with the low-level CoProcessFunction
(see TimeBoundedStreamInnerJoin.scala
).
The pipeline is generated in DataStreamWindowJoin.scala

.

Regarding the over-window aggregation, most of the implementations can be
found in this package
.
The pipeline is generated in DataStreamOverAggregate.scala

.

In summary, they use built-in state tools to cache the rows/intermediate
results and clean/fire them when necessary.

Hope that helps.

Best,
Xingcan

On Thu, Dec 14, 2017 at 7:09 AM, Yan Zhou [FDS Science] 
wrote:

> Hi,
>
>
> I am building a data pipeline with a lot of streaming join and
> over window aggregation. And flink SQL have these feature supported. However,
> there is no similar DataStream APIs provided(maybe there is and I didn't
> find them. please point out if there is). I got confused because I assume
> that the SQL logical plan will be translated into a graph of operators
> or transformations.
>
>
> Could someone explain how these two sql query are  implemented or
> translated into low level code ( operators or transformations)? I am asking
> this because I have implemented these features without using SQL and the
> performance looks good. And I certainly love to migrate to SQL, but I want
> to understand them well first. Any information or hints or links are
> appreciated.
>
>
>
>1. Time-Windowed Join
>
> The DataStream API only provides streaming join within same window. But
> the SQL API (time-windowed join) can join two streams within quite
> different time range. Below is an sample query that listed in official
> doc, and we can see that *Orders* and *Shipments *have 4 hours
> difference. Is it implemented by CoProcessFunction or TwoInputOperator
> which buffers the event for a certain period?
>
>
> SELECT *FROM Orders o, Shipments sWHERE o.id = s.orderId AND
>   o.ordertime BETWEEN s.shiptime - INTERVAL '4' HOUR AND s.shiptime
>
>
> 2. Over-Window Aggregation
> There is no similar feature in DataStream API. How does this get
> implemented? Does it use keyed state to buffer the previous events, and
> pull the records when there is a need? How does sorting get handled?
>
>
> Best
> Yan
>
>
>
>
>


Re: Flink flick cancel vs stop

2017-12-13 Thread Elias Levy
I am re-upping this thread now that FlinkKafkaProducer011 is out.  The new
producer, when used with the exactly once semantics, has the rather
troublesome behavior that it will fallback to at-most-once, rather than
at-least-once, if the job is down for longer than the Kafka broker's
transaction.max.timeout.ms setting.

In situations that require extended maintenance downtime, this behavior is
nearly certain to lead to message loss, as a canceling a job while taking a
savepoint will not wait for the Kafka transactions to bet committed and is
not atomic.

So it seems like there is a need for an atomic stop or cancel with
savepoint that waits for transactional sinks to commit and then immediately
stop any further message processing.


On Tue, Oct 24, 2017 at 4:46 AM, Piotr Nowojski 
wrote:

> I would propose implementations of NewSource to be not
> blocking/asynchronous. For example something like
>
> public abstract Future getCurrent();
>
> Which would allow us to perform some certain actions while there are no
> data available to process (for example flush output buffers). Something
> like this came up recently when we were discussing possible future changes
> in the network stack. It wouldn’t complicate API by a lot, since default
> implementation could just:
>
> public Future getCurrent() {
>   return completedFuture(getCurrentBlocking());
> }
>
> Another thing to consider is maybe we would like to leave the door open
> for fetching records in some batches from the source’s input buffers?
> Source function (like Kafka) have some internal buffers and it would be
> more efficient to read all/deserialise all data present in the input buffer
> at once, instead of paying synchronisation/calling virtual method/etc costs
> once per each record.
>
> Piotrek
>
> On 22 Sep 2017, at 11:13, Aljoscha Krettek  wrote:
>
> @Eron Yes, that would be the difference in characterisation. I think
> technically all sources could be transformed by that by pushing data into a
> (blocking) queue and having the "getElement()" method pull from that.
>
> On 15. Sep 2017, at 20:17, Elias Levy  wrote:
>
> On Fri, Sep 15, 2017 at 10:02 AM, Eron Wright 
> wrote:
>
>> Aljoscha, would it be correct to characterize your idea as a 'pull'
>> source rather than the current 'push'?  It would be interesting to look at
>> the existing connectors to see how hard it would be to reverse their
>> orientation.   e.g. the source might require a buffer pool.
>>
>
> The Kafka client works that way.  As does the QueueingConsumer used by the
> RabbitMQ source.  The Kinesis and NiFi sources also seems to poll. Those
> are all the bundled sources.
>
>
>
>


FlinkKafkaProducer011 and Flink 1.4.0 Kafka docs

2017-12-13 Thread Elias Levy
Looks like the Flink Kafka connector page, in the Producer section
,
is missing a section for the new FlinkKafkaProducer011 producer.  Given
that the new producer no longer has a static writeToKafkaWithTimestamps
method, it would be good to add a section that specifies that you must now
use DataStream.addSink.


how does time-windowed join and Over-Window Aggregation implemented in flink SQL?

2017-12-13 Thread Yan Zhou [FDS Science]
Hi,


I am building a data pipeline with a lot of streaming join and over window 
aggregation. And flink SQL have these feature supported. However, there is no 
similar DataStream APIs provided(maybe there is and I didn't find them. please 
point out if there is). I got confused because I assume that the SQL logical 
plan will be translated into a graph of operators or transformations.


Could someone explain how these two sql query are  implemented or translated 
into low level code ( operators or transformations)? I am asking this because I 
have implemented these features without using SQL and the performance looks 
good. And I certainly love to migrate to SQL, but I want to understand them 
well first. Any information or hints or links are appreciated.


  1.  Time-Windowed Join

The DataStream API only provides streaming join within same window. But the SQL 
API (time-windowed join) can join two streams within quite different time 
range. Below is an sample query that listed in official doc, and we can see 
that Orders and Shipments have 4 hours difference. Is it implemented by 
CoProcessFunction or TwoInputOperator which buffers the event for a certain 
period?


SELECT *
FROM Orders o, Shipments s
WHERE o.id = s.orderId AND
  o.ordertime BETWEEN s.shiptime - INTERVAL '4' HOUR AND s.shiptime

2. Over-Window Aggregation
There is no similar feature in DataStream API. How does this get implemented? 
Does it use keyed state to buffer the previous events, and pull the records 
when there is a need? How does sorting get handled?


Best
Yan





Re: Flink Kafka Producer Exception

2017-12-13 Thread Navneeth Krishnan
Hi,

I'm receiving this error and due to which I'm not able to run my job. Any
help is greatly appreciated. Thanks.

On Tue, Dec 12, 2017 at 10:21 AM, Navneeth Krishnan <
reachnavnee...@gmail.com> wrote:

> Hi,
>
> I have a kafka source and sink in my pipeline and when I start my job I
> get this error and the job goes to failed state. I checked the kafka node
> and everything looks good. Any suggestion on what is happening here? Thanks.
>
> java.lang.Exception: Failed to send data to Kafka: The server disconnected 
> before a response was received.
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:373)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.invokeInternal(FlinkKafkaProducer010.java:302)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.processElement(FlinkKafkaProducer010.java:421)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:443)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:420)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:394)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:612)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:598)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
>   at 
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>   at com.transformations.MyProcessor.flatMap(MyProcessor.java:115)
>   at com.transformations.MyProcessor.flatMap(MyProcessor.java:47)
>   at 
> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206)
>   at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>   at java.lang.Thread.run(Thread.java:748)
>
>


Re: Custom Metrics

2017-12-13 Thread Navneeth Krishnan
Thanks Pitor.

I have couple more questions related to metrics. I use Influx db reporter
to report flink metrics and I see a lot of metrics are bring reported. Is
there a way to select only a subset of metrics that we need to monitor the
application?

Also, Is there a way to specify custom metics scope? Basically I register
metrics like below, add a custom metric group and then add a meter per
user. I would like this to be reported as measurement "Users" and tags with
user id. This way I can easily visualize the data in grafana or any other
tool by selecting the measurement and group by tag. Is there a way to
report like that instead of host, process_type, tm_id, job_name, task_name
& subtask_index?

metricGroup.addGroup("Users")
.meter(userId, new DropwizardMeterWrapper(new
com.codahale.metrics.Meter()));

Thanks a bunch.

On Mon, Dec 11, 2017 at 11:12 PM, Piotr Nowojski 
wrote:

> Hi,
>
> Reporting once per 10 seconds shouldn’t create problems. Best to try it
> out. Let us know if you get into some troubles :)
>
> Piotrek
>
> On 11 Dec 2017, at 18:23, Navneeth Krishnan 
> wrote:
>
> Thanks Piotr.
>
> Yes, passing the metric group should be sufficient. The subcomponents will
> not be able to provide the list of metrics to register since the metrics
> are created based on incoming data by tenant. Also I am planning to have
> the metrics reported every 10 seconds and hope it shouldn't be a problem.
> We use influx and grafana to plot the metrics.
>
> The option 2 that I had in mind was to collect all metrics and use influx
> db sink to report it directly inside the pipeline. But it seems reporting
> per node might not be possible.
>
>
> On Mon, Dec 11, 2017 at 3:14 AM, Piotr Nowojski 
> wrote:
>
>> Hi,
>>
>> I’m not sure if I completely understand your issue.
>>
>> 1.
>> - You don’t have to pass RuntimeContext, you can always pass just the
>> MetricGroup or ask your components/subclasses “what metrics do you want to
>> register” and register them at the top level.
>> - Reporting tens/hundreds/thousands of metrics shouldn’t be an issue for
>> Flink, as long as you have a reasonable reporting interval. However keep in
>> mind that Flink only reports your metrics and you still need something to
>> read/handle/process/aggregate your metrics
>> 2.
>> I don’t think that reporting per node/jvm is possible with Flink’s metric
>> system. For that you would need some other solution, like report your
>> metrics using JMX (directly register MBeans from your code)
>>
>> Piotrek
>>
>> > On 10 Dec 2017, at 18:51, Navneeth Krishnan 
>> wrote:
>> >
>> > Hi,
>> >
>> > I have a streaming pipeline running on flink and I need to collect
>> metrics to identify how my algorithm is performing. The entire pipeline is
>> multi-tenanted and I also need metrics per tenant. Lets say there would be
>> around 20 metrics to be captured per tenant. I have the following ideas for
>> implemention but any suggestions on which one might be better will help.
>> >
>> > 1. Use flink metric group and register a group per tenant at the
>> operator level. The disadvantage of this approach for me is I need the
>> runtimecontext parameter to register a metric and I have various subclasses
>> to which I need to pass this object to limit the metric scope within the
>> operator. Also there will be too many metrics reported if there are higher
>> number of subtasks.
>> > How is everyone accessing flink state/ metrics from other classes where
>> you don't have access to runtimecontext?
>> >
>> > 2. Use a custom singleton metric registry to add and send these metrics
>> using custom sink. Instead of using flink metric group to collect metrics
>> per operatior - subtask, collect per jvm and use influx sink to send the
>> metric data. What i'm not sure in this case is how to collect only once per
>> node/jvm.
>> >
>> > Thanks a bunch in advance.
>>
>>
>
>


Re: Watermark in broadcast

2017-12-13 Thread Seth Wiesman
Quick follow up question. Is there some way to notify a TimestampAssigner that 
is consuming from an idle source?

[cid:image001.png@01D3740B.CADE87C0]

Seth Wiesman | Software Engineer, Data

4 World Trade Center, 46th Floor, New York, NY 10007




From: Seth Wiesman 
Date: Wednesday, December 13, 2017 at 12:04 PM
To: Timo Walther , "user@flink.apache.org" 

Subject: Re: Watermark in broadcast

Hi Timo,

I think you are correct. This stream is consumed from Kafka and the number of 
partitions is much less than the parallelism of the program so there would be 
many partitions that never forward watermarks greater than Long.Min_Value.

Thank you for the quick response.

[cid:image001.png@01D3740B.CADE87C0]

Seth Wiesman | Software Engineer, Data

4 World Trade Center, 46th Floor, New York, NY 10007




From: Timo Walther 
Date: Wednesday, December 13, 2017 at 11:46 AM
To: "user@flink.apache.org" 
Subject: Re: Watermark in broadcast

Hi Seth,

are you sure that all partitions of the broadcasted stream send a watermark? 
processWatermark is only called if a minimum watermark arrived from all 
partitions.

Regards,
Timo

Am 12/13/17 um 5:10 PM schrieb Seth Wiesman:
Hi,

How are watermarks propagated during a broadcast partition? I have a 
TwoInputStreamTransformation that takes a broadcast stream as one of its 
inputs. Both streams are assigned timestamps and watermarks before being 
connected however I only ever see watermarks from my non-broadcast stream. Is 
this expected behavior? Currently I have overridden processWatermark1 to 
unconditionally call processWatermark but that does not seem like an ideal 
solution.

Thank you,
[cid:image001.png@01D3740B.CADE87C0]

Seth Wiesman | Software Engineer, Data

4 World Trade Center, 46th Floor, New York, NY 10007







Re: Watermark in broadcast

2017-12-13 Thread Seth Wiesman
Hi Timo,

I think you are correct. This stream is consumed from Kafka and the number of 
partitions is much less than the parallelism of the program so there would be 
many partitions that never forward watermarks greater than Long.Min_Value.

Thank you for the quick response.

[cid:image001.png@01D3740A.880106E0]

Seth Wiesman | Software Engineer, Data

4 World Trade Center, 46th Floor, New York, NY 10007




From: Timo Walther 
Date: Wednesday, December 13, 2017 at 11:46 AM
To: "user@flink.apache.org" 
Subject: Re: Watermark in broadcast

Hi Seth,

are you sure that all partitions of the broadcasted stream send a watermark? 
processWatermark is only called if a minimum watermark arrived from all 
partitions.

Regards,
Timo

Am 12/13/17 um 5:10 PM schrieb Seth Wiesman:
Hi,

How are watermarks propagated during a broadcast partition? I have a 
TwoInputStreamTransformation that takes a broadcast stream as one of its 
inputs. Both streams are assigned timestamps and watermarks before being 
connected however I only ever see watermarks from my non-broadcast stream. Is 
this expected behavior? Currently I have overridden processWatermark1 to 
unconditionally call processWatermark but that does not seem like an ideal 
solution.

Thank you,
[cid:image001.png@01D3740A.880106E0]

Seth Wiesman | Software Engineer, Data

4 World Trade Center, 46th Floor, New York, NY 10007







Re: Watermark in broadcast

2017-12-13 Thread Timo Walther

Hi Seth,

are you sure that all partitions of the broadcasted stream send a 
watermark? processWatermark is only called if a minimum watermark 
arrived from all partitions.


Regards,
Timo

Am 12/13/17 um 5:10 PM schrieb Seth Wiesman:


Hi,

How are watermarks propagated during a broadcast partition? I have a 
TwoInputStreamTransformation that takes a broadcast stream as one of 
its inputs. Both streams are assigned timestamps and watermarks before 
being connected however I only ever see watermarks from my 
non-broadcast stream. Is this expected behavior? Currently I have 
overridden processWatermark1 to unconditionally call processWatermark 
but that does not seem like an ideal solution.


Thank you,





*Seth Wiesman *| Software Engineer, Data


4 World Trade Center, 46th Floor, New York, NY 10007






Watermark in broadcast

2017-12-13 Thread Seth Wiesman
Hi,

How are watermarks propagated during a broadcast partition? I have a 
TwoInputStreamTransformation that takes a broadcast stream as one of its 
inputs. Both streams are assigned timestamps and watermarks before being 
connected however I only ever see watermarks from my non-broadcast stream. Is 
this expected behavior? Currently I have overridden processWatermark1 to 
unconditionally call processWatermark but that does not seem like an ideal 
solution.

Thank you,
[cid:image001.png@01D37402.F5C0B480]

Seth Wiesman | Software Engineer, Data

4 World Trade Center, 46th Floor, New York, NY 10007





Re: ClassCastException when using RowTypeInfo

2017-12-13 Thread Timo Walther

Hi Madan,

this is definitely a bug. The Row type has mostly been added for the 
Table & SQL API and has not tested for expression keys. But in general I 
would use a tuple in your case as they are more efficient. The 
`registerType` is only necessary for generic types serialized with Kryo.


I opened https://issues.apache.org/jira/browse/FLINK-8255. If you would 
like to fix it, I can assign it to you.


Thanks.

Regards,
Timo



Am 12/13/17 um 4:16 PM schrieb madan:

Hi,

Below is sample code I am trying with,

StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

TypeInformation[] types =new TypeInformation[] {BasicTypeInfo.INT_TYPE_INFO, 
BasicTypeInfo.STRING_TYPE_INFO,
 BasicTypeInfo.DOUBLE_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO};

String[] fieldNames =new String[]{"id","name","salary","department"};
RowTypeInfo rowTypeInfo =new RowTypeInfo(types, fieldNames);
env.registerType(RowTypeInfo.class);

env.addSource(new EmployeeSourceFunction(),"samplesource", rowTypeInfo)
 .keyBy("department").sum("salary").addSink(new PrintSinkFunction<>());

public class EmployeeSourceFunctionimplements SourceFunction {
 private boolean continueRead =true;

 @Override public void run(SourceContext ctx)throws Exception {
 while (continueRead) {
 for (int i =0; i <3 & i++) {
 Row row =new Row(4);
 row.setField(0, Integer.valueOf(i));
 row.setField(1, String.valueOf("user" + i));
 row.setField(2,1000 * i);
 row.setField(3,"DEV");
 ctx.collect(row);
 }
 continueRead =false;
 }
 }

 @Override public void cancel() {
 continueRead =false;
 }
}

And I am getting below exception

java.lang.ClassCastException: 
org.apache.flink.api.java.typeutils.RowTypeInfo cannot be cast to 
org.apache.flink.api.java.typeutils.TupleTypeInfo
at 
org.apache.flink.streaming.util.typeutils.FieldAccessorFactory.getAccessor(FieldAccessorFactory.java:167)



I have checked FieldAccessorFactory.java:167,
if (typeInfo.isTupleType()) {
TupleTypeInfo tupleTypeInfo = (TupleTypeInfo) typeInfo;
RowTypeInfo returns 'true' for isTupleType() and cannot be casted.


Can someone please tell me, Is it that I have done wrong configuration 
or bug in code ?



--
Thank you,
Madan.





ClassCastException when using RowTypeInfo

2017-12-13 Thread madan
Hi,

Below is sample code I am trying with,

StreamExecutionEnvironment env = StreamExecutionEnvironment.
getExecutionEnvironment();


TypeInformation[] types = new TypeInformation[]
{BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.DOUBLE_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO};

String[] fieldNames = new String[]{"id", "name", "salary", "department"};
RowTypeInfo rowTypeInfo = new RowTypeInfo(types, fieldNames);
env.registerType(RowTypeInfo.class);

env.addSource(new EmployeeSourceFunction(), "samplesource", rowTypeInfo)
.keyBy("department").sum("salary").addSink(new PrintSinkFunction<>());


public class EmployeeSourceFunction implements SourceFunction {
private boolean continueRead = true;

@Override
public void run(SourceContext ctx) throws Exception {
while (continueRead) {
for (int i = 0; i < 3 && continueRead; i++) {
Row row = new Row(4);
row.setField(0, Integer.valueOf(i));
row.setField(1, String.valueOf("user" + i));
row.setField(2, 1000 * i);
row.setField(3, "DEV");
ctx.collect(row);
}
continueRead = false;
}
}

@Override
public void cancel() {
continueRead = false;
}
}


And I am getting below exception

java.lang.ClassCastException:
org.apache.flink.api.java.typeutils.RowTypeInfo cannot be cast to
org.apache.flink.api.java.typeutils.TupleTypeInfo
at
org.apache.flink.streaming.util.typeutils.FieldAccessorFactory.getAccessor(FieldAccessorFactory.java:167)


I have checked FieldAccessorFactory.java:167,

if (typeInfo.isTupleType()) {
   TupleTypeInfo tupleTypeInfo = (TupleTypeInfo) typeInfo;

RowTypeInfo returns 'true' for isTupleType() and cannot be casted.


Can someone please tell me, Is it that I have done wrong configuration or
bug in code ?


-- 
Thank you,
Madan.


Re: [ANNOUNCE] Apache Flink 1.4.0 released

2017-12-13 Thread Vishal Santoshi
+1

On Wed, Dec 13, 2017 at 8:13 AM, Till Rohrmann 
wrote:

> Thanks a lot Aljoscha for being the release manager and to the whole Flink
> community for this release :-)
>
> @Vishal: flink-cep/flink-connector were deployed to maven central. It just
> might take a while until they show up I guess.
>
> Cheers,
> Till
>
> On Wed, Dec 13, 2017 at 4:23 AM, Vishal Santoshi <
> vishal.santo...@gmail.com> wrote:
>
>> Awesome job folks, Congrats.
>>
>>  A query as we shift to 1.4 are the flink_cep/flink-connector-kafka-0.11
>> for the 1.4  out on maven central ?
>>
>> Regards and congrats again.
>>
>> On Tue, Dec 12, 2017 at 9:44 AM, Hao Sun  wrote:
>>
>>> Congratulations! Awesome work.
>>> Two quick questions about the HDFS free feature.
>>> I am using S3 to store checkpoints, savepoints, and I know it is being
>>> done through hadoop-aws.
>>>
>>> - Do I have to include a hadoop-aws jar in my flatjar AND flink's lib
>>> directory to make it work for 1.4? Both or just the lib directory?
>>> - Am I free to choose the latest version of hadoop-aws?
>>>
>>> On Tue, Dec 12, 2017 at 4:43 AM Flavio Pompermaier 
>>> wrote:
>>>
 Thanks Aljoscha! Just one question: is there any upgrade guideline?
 Or is the upgrade from 1.3.1 to 1.4 almost frictionless?

 On Tue, Dec 12, 2017 at 1:39 PM, Fabian Hueske 
 wrote:

> Thank you Aljoscha for managing the release!
>
> 2017-12-12 12:46 GMT+01:00 Aljoscha Krettek :
>
>> The Apache Flink community is very happy to announce the release of
>> Apache Flink 1.4.0.
>>
>> Apache Flink® is an open-source stream processing framework for
>> distributed, high-performing, always-available, and accurate data 
>> streaming
>> applications.
>>
>>
>> The release is available for download at:
>>
>>https://flink.apache.org/downloads.html
>>
>> Please check out the release blog post for an overview of the new
>> features and improvements and the list of contributors:
>>
>>   https://flink.apache.org/news/2017/12/12/release-1.4.0.html
>>
>> The full release notes are available in Jira:
>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?proje
>> ctId=12315522=12340533
>>
>>
>> I would like to thank all contributors for working very hard on
>> making this release a success!
>>
>> Best,
>> Aljoscha
>
>
>


 --
 Flavio Pompermaier
 Development Department

 OKKAM S.r.l.
 Tel. +(39) 0461 041809 <+39%200461%20041809>

>>>
>>
>


Re: when does the timed window ends?

2017-12-13 Thread Aljoscha Krettek
Hi,

Yes, those last two comments about the watermark and window triggering are 
correct. The watermark either has to advance based on events or based on some 
continuous generation.

Best,
Aljoscha

> On 13. Dec 2017, at 06:08, Jinhua Luo  wrote:
> 
> Unless I generate event-time watermark continuously regardless of elements?
> 
> Just like the doc does, it gives an example how to generate continuous
> watermark based on processing time (TimeLagWatermarkGenerator):
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/event_timestamps_watermarks.html#with-periodic-watermarks
> 
> But if I use pre-defined event-time watermark generators which are
> purely based on elements, then what I worried in my last mail is true?
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/event_timestamp_extractors.html
> 
> 
> 
> 2017-12-13 12:48 GMT+08:00 Jinhua Luo :
>> If the window contains only one element, no more elements come in,
>> then by default (with EventTimeTrigger), the window would be fired by
>> next element if that element advances watermark which passes the end
>> of the window, correct?
>> That is, even if the window ends at 12:30, then if no more element
>> come in and advance watermark through 12:30, the window would not be
>> fired; if the next element appears at 13:30, then the window would be
>> fired, although it has been delayed for 1 hour, correct?
>> 
>> 2017-12-12 16:53 GMT+08:00 Fabian Hueske :
>>> No, that's exactly what is mean by "a window is created when the first
>>> element arrives".
>>> Otherwise, you'd have to fire empty windows for all possible keys (in case
>>> of a window operator on a keyed stream) which is obviously not possible.
>>> 
>>> 2017-12-12 9:30 GMT+01:00 Jinhua Luo :
 
 OK, I see.
 
 But what if a window contains no elements? Is it still get fired and
 invoke the window function?
 
 2017-12-12 15:42 GMT+08:00 Fabian Hueske :
> Hi,
> 
> this depends on the window type. Tumbling and Sliding Windows are (by
> default) aligned with the epoch time (1970-01-01 00:00:00).
> For example a tumbling window of 2 hour starts and ends every two hours,
> i.e., from 12:00:00 to 13:59:59.999, from 14:00:00 to 15:59:59.999, etc.
> 
> The documentation says a window is created when an element arrives. This
> does not imply that the start time of the window is the time of the
> first
> element.
> So it might happen that the first element of a 2 hour tumbling window
> arrives at 13:59:59.000 and the window is closed 1 second later.
> 
> However, there are also windows for which the first element defines the
> start time such as the built-in session window.
> You can also define custom windows like that.
> 
> Best, Fabian
> 
> 2017-12-12 7:57 GMT+01:00 Jinhua Luo :
>> 
>> Hi All,
>> 
>> The document said "a window is created as soon as the first element
>> that should belong to this window arrives, and the window is
>> completely removed when the time (event or processing time) passes its
>> end timestamp plus the user-specified allowed lateness (see Allowed
>> Lateness).".
>> 
>> I am still confused.
>> 
>> If the window contains only one element (which triggers the window
>> creation), and no more elements come in during the window size (e.g. 1
>> minute), then when does the window function get invoked? after 1
>> minute?
>> 
>> I mean, the window would finish either when any element indicates the
>> watermark is larger than the window size, or, when the processing time
>> (no matter for event-timed window or process-timed window) pass over
>> the window size since the first element?
> 
> 
>>> 
>>> 



Re: [ANNOUNCE] Apache Flink 1.4.0 released

2017-12-13 Thread Till Rohrmann
Thanks a lot Aljoscha for being the release manager and to the whole Flink
community for this release :-)

@Vishal: flink-cep/flink-connector were deployed to maven central. It just
might take a while until they show up I guess.

Cheers,
Till

On Wed, Dec 13, 2017 at 4:23 AM, Vishal Santoshi 
wrote:

> Awesome job folks, Congrats.
>
>  A query as we shift to 1.4 are the flink_cep/flink-connector-kafka-0.11
> for the 1.4  out on maven central ?
>
> Regards and congrats again.
>
> On Tue, Dec 12, 2017 at 9:44 AM, Hao Sun  wrote:
>
>> Congratulations! Awesome work.
>> Two quick questions about the HDFS free feature.
>> I am using S3 to store checkpoints, savepoints, and I know it is being
>> done through hadoop-aws.
>>
>> - Do I have to include a hadoop-aws jar in my flatjar AND flink's lib
>> directory to make it work for 1.4? Both or just the lib directory?
>> - Am I free to choose the latest version of hadoop-aws?
>>
>> On Tue, Dec 12, 2017 at 4:43 AM Flavio Pompermaier 
>> wrote:
>>
>>> Thanks Aljoscha! Just one question: is there any upgrade guideline?
>>> Or is the upgrade from 1.3.1 to 1.4 almost frictionless?
>>>
>>> On Tue, Dec 12, 2017 at 1:39 PM, Fabian Hueske 
>>> wrote:
>>>
 Thank you Aljoscha for managing the release!

 2017-12-12 12:46 GMT+01:00 Aljoscha Krettek :

> The Apache Flink community is very happy to announce the release of
> Apache Flink 1.4.0.
>
> Apache Flink® is an open-source stream processing framework for
> distributed, high-performing, always-available, and accurate data 
> streaming
> applications.
>
>
> The release is available for download at:
>
>https://flink.apache.org/downloads.html
>
> Please check out the release blog post for an overview of the new
> features and improvements and the list of contributors:
>
>   https://flink.apache.org/news/2017/12/12/release-1.4.0.html
>
> The full release notes are available in Jira:
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?proje
> ctId=12315522=12340533
>
>
> I would like to thank all contributors for working very hard on making
> this release a success!
>
> Best,
> Aljoscha



>>>
>>>
>>> --
>>> Flavio Pompermaier
>>> Development Department
>>>
>>> OKKAM S.r.l.
>>> Tel. +(39) 0461 041809 <+39%200461%20041809>
>>>
>>
>


Re: netty conflict using lettuce redis client

2017-12-13 Thread Timo Walther

Hi,

we just released Flink 1.4.0 [1]. Maybe it is possible for you to 
upgrade? One of the greatest features is improved classloading and 
better dependency management.


I think this would be the easiest solution for you. Otherwise let us 
know if you still need help.


Regards,
Timo


[1] 
https://flink.apache.org/news/2017/12/12/release-1.4.0.html#a-significantly-improved-dependency-structure-and-reversed-class-loading



Am 12/13/17 um 1:58 PM schrieb Jinhua Luo:

Hi All,

The io.netty package included in flnk 1.3.2 is 4.0.23, while the
latest lettuce-core (4.4) depends on netty 4.0.35.

If I include netty 4.0.35 in the app jar, it would throw
java.nio.channels.UnresolvedAddressException. It seems the netty
classes are mixed between versions from app jar and flink runtime.

If I exclude netty from app jar, then the lettuce throws
java.lang.NoSuchMethodError:
io.netty.util.CharsetUtil.encoder(Ljava/nio/charset/Charset;)Ljava/nio/charset/CharsetEncoder,
because it needs that method but flink's netty version do not provide.

Is it a bug? Does the flink runtime provide individual class loader
env for app? That is, is there some clean way for the app to use
package solely from the app jar?

Now my workaround is degrade the lettuce version to 4.2, because it
depends on nettty 4.0.20. I also include netty into the app jar, now
it seems no conflict with the flink runtime. It's weird.

Please help, thanks.





Re: [ANNOUNCE] Apache Flink 1.4.0 released

2017-12-13 Thread Aljoscha Krettek
Hi,

@Flavio, I think the update should be frictionless. At the end of the release 
post are some notes and only if you would be affected by this do you have to 
change something.

@Hao Sun Are you using that on EMR? I think in that case you don't have to 
include anything. If not, it might be that you have to include them with your 
program. There is also another (new) alternative of using the new bundled S3 
filesystems of Flink. That's documented here: 
https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/filesystems.html#built-in-file-systems
 


@Vishal I think all dependencies should be on Maven Central but it can take 
some time for everything to sync to all mirrors.

Best,
Aljoscha

> On 13. Dec 2017, at 04:23, Vishal Santoshi  wrote:
> 
> Awesome job folks, Congrats.
> 
>  A query as we shift to 1.4 are the flink_cep/flink-connector-kafka-0.11 for 
> the 1.4  out on maven central ? 
> 
> Regards and congrats again.
> 
> On Tue, Dec 12, 2017 at 9:44 AM, Hao Sun  > wrote:
> Congratulations! Awesome work.
> Two quick questions about the HDFS free feature. 
> I am using S3 to store checkpoints, savepoints, and I know it is being done 
> through hadoop-aws.
> 
> - Do I have to include a hadoop-aws jar in my flatjar AND flink's lib 
> directory to make it work for 1.4? Both or just the lib directory?
> - Am I free to choose the latest version of hadoop-aws?
> 
> On Tue, Dec 12, 2017 at 4:43 AM Flavio Pompermaier  > wrote:
> Thanks Aljoscha! Just one question: is there any upgrade guideline?
> Or is the upgrade from 1.3.1 to 1.4 almost frictionless?
> 
> On Tue, Dec 12, 2017 at 1:39 PM, Fabian Hueske  > wrote:
> Thank you Aljoscha for managing the release!
> 
> 2017-12-12 12:46 GMT+01:00 Aljoscha Krettek  >:
> The Apache Flink community is very happy to announce the release of Apache 
> Flink 1.4.0.
> 
> Apache Flink® is an open-source stream processing framework for distributed, 
> high-performing, always-available, and accurate data streaming applications.
> 
> 
> The release is available for download at:
> 
>https://flink.apache.org/downloads.html 
> 
> 
> Please check out the release blog post for an overview of the new features 
> and improvements and the list of contributors:
> 
>   https://flink.apache.org/news/2017/12/12/release-1.4.0.html 
> 
> 
> The full release notes are available in Jira: 
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12340533
>  
> 
> 
> 
> I would like to thank all contributors for working very hard on making this 
> release a success!
> 
> Best,
> Aljoscha
> 
> 
> 
> 
> -- 
> Flavio Pompermaier
> Development Department
> 
> OKKAM S.r.l.
> Tel. +(39) 0461 041809 



netty conflict using lettuce redis client

2017-12-13 Thread Jinhua Luo
Hi All,

The io.netty package included in flnk 1.3.2 is 4.0.23, while the
latest lettuce-core (4.4) depends on netty 4.0.35.

If I include netty 4.0.35 in the app jar, it would throw
java.nio.channels.UnresolvedAddressException. It seems the netty
classes are mixed between versions from app jar and flink runtime.

If I exclude netty from app jar, then the lettuce throws
java.lang.NoSuchMethodError:
io.netty.util.CharsetUtil.encoder(Ljava/nio/charset/Charset;)Ljava/nio/charset/CharsetEncoder,
because it needs that method but flink's netty version do not provide.

Is it a bug? Does the flink runtime provide individual class loader
env for app? That is, is there some clean way for the app to use
package solely from the app jar?

Now my workaround is degrade the lettuce version to 4.2, because it
depends on nettty 4.0.20. I also include netty into the app jar, now
it seems no conflict with the flink runtime. It's weird.

Please help, thanks.


Re: Kafka topic partition skewness causes watermark not being emitted

2017-12-13 Thread Gerard Garcia
Thanks Gordon.

Don't worry, I'll be careful to not have empty partitions until the next
release.
Also, I'll keep an eye to FLINK-5479 and if at some point I see that there
is a fix and the issue bothers us too much I'll try to apply the patch
myself to the latest stable release.

Gerard

On Wed, Dec 13, 2017 at 10:31 AM, Tzu-Li (Gordon) Tai 
wrote:

> Hi,
>
> I've just elevated FLINK-5479 to BLOCKER for 1.5.
>
> Unfortunately, AFAIK there is no easy workaround solution for this issue
> yet in the releases so far.
> The min watermark logic that controls per-partition watermark emission is
> hidden inside the consumer, making it hard to work around it.
>
> One possible solution I can imagine, but perhaps not that trivial, is to
> inject some special marker event into all partitions periodically.
> The watermark assigner should be able to recognize this special marker and
> try to provide some watermark for it.
> Another option is that I can provide some patch you can apply for a custom
> build of the Kafka connector that handles partition idleness properly.
> However, given that we're aiming for a faster release cycle for Flink 1.5
> (proposed release date is Feb. 2018), it might not be worth the extra
> maintenance effort on your side of a custom build.
>
> Best,
> Gordon
>
>
> On Tue, Dec 12, 2017 at 9:28 PM, gerardg  wrote:
>
>> I'm also affected by this behavior. There are no updates in FLINK-5479 but
>> did you manage to find a way to workaround this?
>>
>> Thanks,
>>
>> Gerard
>>
>>
>>
>> --
>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.
>> nabble.com/
>>
>
>


Re: Cannot load user classes

2017-12-13 Thread Chesnay Schepler

Can you show us the dependency section of your pom?

On 13.12.2017 08:25, Soheil Pourbafrani wrote:
Hey, I wrote a code using Flink and creating fat jar using maven, I 
can errorlessly run it on a remote cluster. Trying to run it without 
creating a fat jar and directly from IDE I got the error Cannot load 
user class for not Flink core classes. For example `Cannot load user 
class: 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink` or 
`Kafkaconnector things`. If I pass local jar files in 
*CreateRemoteEnvirnment *method its ok but passing all extra jar files 
is not the standard way and is time-consuming. I wish I could run 
Flink codes directly in IDE without creating maven fat jars. Is there 
any way to do that?





Re: Kafka topic partition skewness causes watermark not being emitted

2017-12-13 Thread Tzu-Li (Gordon) Tai
Hi,

I've just elevated FLINK-5479 to BLOCKER for 1.5.

Unfortunately, AFAIK there is no easy workaround solution for this issue
yet in the releases so far.
The min watermark logic that controls per-partition watermark emission is
hidden inside the consumer, making it hard to work around it.

One possible solution I can imagine, but perhaps not that trivial, is to
inject some special marker event into all partitions periodically.
The watermark assigner should be able to recognize this special marker and
try to provide some watermark for it.
Another option is that I can provide some patch you can apply for a custom
build of the Kafka connector that handles partition idleness properly.
However, given that we're aiming for a faster release cycle for Flink 1.5
(proposed release date is Feb. 2018), it might not be worth the extra
maintenance effort on your side of a custom build.

Best,
Gordon


On Tue, Dec 12, 2017 at 9:28 PM, gerardg  wrote:

> I'm also affected by this behavior. There are no updates in FLINK-5479 but
> did you manage to find a way to workaround this?
>
> Thanks,
>
> Gerard
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>


Re: Off heap memory issue

2017-12-13 Thread Piotr Nowojski
Hi,

OOMs from metaspace probably mean that your jars are not releasing some 
resources:
https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/debugging_classloading.html#unloading-of-dynamically-loaded-classes
 


Regarding second issue (I guess it is probably somehow related to the first 
one). If it’s indeed a heap space OOM, it should be fairly easy to 
analyse/debug. This article describes how to track such issues, Especially 
chapter titled "Using Java VisualVM”:
https://www.toptal.com/java/hunting-memory-leaks-in-java 

It should allow you to pinpoint the owner and the source of the leak.

Piotrek

> On 12 Dec 2017, at 14:47, Javier Lopez  wrote:
> 
> Hi Piotr,
> 
> We found out which one was the problem in the workers. After setting a value 
> for XX:MaxMetaspaceSize we started to get OOM exceptions from the metaspace. 
> We found out how Flink manages the User classes here 
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/monitoring/debugging_classloading.htm
>  
> l
>  and solved the problem by adding the job's jar file in the /lib of the nodes 
> (master and workers). Now we have a constant memory usage in the workers. 
> 
> Unfortunately, we still have an OOM problem in the master node. We are using 
> the same configuration as in the workers (200MB for MaxMetaspace and 13000MB 
> for Heap) and after ~6000 jobs, the master runs out of memory. The metaspace 
> usage is almost constant, around 50MB and the heap usage grows up to 1MB, 
> then GC does its work and reduces this usage. But we still have the OOM 
> problems. Do you have any other idea of what could cause this problem? Our 
> workaround is to restart the master, but we cannot keep doing this in the 
> long term.
> 
> Thanks for all your support, it has been helpful.
> 
> On 16 November 2017 at 15:27, Javier Lopez  > wrote:
> Hi Piotr,
> 
> Sorry for the late response, I'm out of the office and with limited access to 
> the Internet. I think we are on the right path to solve this problem. Some 
> time ago we did a memory analysis over 3 different cluster we are using, two 
> of them are running jobs 24/7 and the other is the one deploying thousands of 
> jobs. All of those clusters have the same behavior for arrays of Chars and 
> Bytes (as expected), but for this particular Class "java.lang.Class" the 
> clusters that have 24/7 jobs have less than 20K instances of that class, 
> whereas the other cluster has  383,120 
> instances. I don't know if this could be related.
> 
> I hope that we can test this soon, and will let you know if this fixed the 
> problem.
> 
> Thanks.
> 
> 
> On 15 November 2017 at 13:18, Piotr Nowojski  > wrote:
> Hi,
> 
> I have been able to observe some off heap memory “issues” by submitting Kafka 
> job provided by Javier Lopez (in different mailing thread). 
> 
> TL;DR;
> 
> There was no memory leak, just memory pool “Metaspace” and “Compressed Class 
> Space” are growing in size over time and are only rarely garbage collected. 
> In my test case they together were wasting up to ~7GB of memory, while my 
> test case could use as little as ~100MB. Connect with for example jconsole to 
> your JVM, check their size and cut their size by half by setting:
> 
> env.java.opts: -XX:CompressedClassSpaceSize=***M -XX:MaxMetaspaceSize=***M
> 
> In flink-conf.yaml. Everything works fine and memory consumption still too 
> high? Rinse and repeat.
> 
> 
> Long story:
> 
> In default settings, with max heap size of 1GB, off heap memory consumption, 
> memory consumption off non-heap memory pools of “Metaspace” and “Compressed 
> Class Space” was growing in time which seemed like indefinitely, and 
> Metaspace was always around ~6 times larger compared to compressed class 
> space. Default max meatspace size is unlimited, while “Compressed class 
> space” has a default max size of 1GB. 
> 
> When I decreased the CompressedClassSpaceSize down to 100MB, memory 
> consumption grew up to 90MB and then it started bouncing up and down by 
> couple of MB. “Metaspace” was following the same pattern, but using ~600MB. 
> When I decreased down MaxMetaspaceSize to 200MB, memory consumption of both 
> pools was bouncing around ~220MB.
> 
> It seems like there are no general guide lines how to configure those values, 
> since it’s heavily application dependent. However this seems like the most 
> likely suspect of the apparent OFF HEAP “memory leak” that was reported 
> couple of times in use cases where users are submitting hundreds/thousands of 
> jobs to Flink cluster.