Flink 1.5.0 no more allow multiple program argument?

2018-06-18 Thread Siew Wai Yow
Hi,


I get the following error when upgrade flink from 1.3.2 to 1.5.0 when using 
REST api to upload and run a jar.


{"errors":["Expected only one value [--KAFKA_IN PREBLN_O@192.168.56.120:9092, 
192.168.56.121:9092, 192.168.56.122:9092/BHARTI_FL_PREBLN_O_124 --KAFKA_OUT 
FX_AGGR_ASCII@192.168.56.120:9092, 192.168.56.121:9092, 
192.168.56.122:9092/BHARTI_FL_FX_AGGR_ASCII_181;FX_AGGREGATED@192.168.56.120:9092,
 192.168.56.121:9092, 192.168.56.122:9092/BHARTI_FL_FX_AGGREGATED_181 
--KAFKA_REJECT INVALID@192.168.56.120:9092, 192.168.56.121:9092, 
192.168.56.122:9092/BHARTI_FL_INVALID_181 --KAFKA_AUDIT 
AUDIT@192.168.56.120:9092, 192.168.56.121:9092, 
192.168.56.122:9092/BHARTI_FL_AUDIT_181 --KAFKA_REPROCESS 
REPROCESS@192.168.56.120:9092, 192.168.56.121:9092, 
192.168.56.122:9092/BHARTI_FL_REPROCESS_181 --OPTION --JOB_NAME 
POC_BHARTI_FLINK_AGGR  --AUDIT_WINDOWS_INTERVAL_MS 5000 --STATSD_URL 
192.168.56.200:8127  --STATSD_SAMPLING_DURATION 1000 --CHECK_POINT_INTERVAL 
2 --NODEID 181 --LS_SERVER LsServer --LS_TABLES Table1, Table2 --ZK_POLLING 
30 --ZK_URL 192.168.56.130, 192.168.56.131, 192.168.56.132 --LS_HOSTNAME  
--LS_ENABLE 0 --ZK_RETRY_COUNT 0 --ZK_RETRY_INTERVAL 0 
--LS_ABORT_ON_RELOAD_FAILURE 1 --ZK_SESSION_TIMEOUT 6 --ZK_KEEP_CONN_ALIVE 
1]."]}


Seems the program-argument no more allow multiple parameter? But the document 
tell different story,


"From the command line arguments"

https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/best_practices.html


Apache Flink 1.5 Documentation: Best 
Practices
ci.apache.org
Almost all Flink applications, both batch and streaming, rely on external 
configuration parameters. They are used to specify input and output sources 
(like paths or addresses), system parameters (parallelism, runtime 
configuration), and application specific parameters (typically used within user 
...

Any advice please? thank you.


Regards,

Yow




Re: [DISCUSS] Flink 1.6 features

2018-06-18 Thread zhangminglei
Hi, Sagar

Thank your for your review. I will fix it when available. 

> 2. Will you be able to add more unit tests in the commit ? Eg. Writing some 
> example data with simple schema which will initialize OrcWriter object and 
> sinking it to local hdfs node ?


Ans: Yes. I will add more unit tests in there.

> 3. Are there plans to add support for other data types ?

Ans: Yes. Since I have been busy these days. After a couple of days, I will add 
the rest data type. And give more tests for that.

Cheers
Zhangminglei


> 在 2018年6月19日,上午9:10,sagar loke  写道:
> 
> Thanks @zhangminglei for replying. 
> 
> I agree, hive on Flink would be a big project. 
> 
> By the way, i looked at the Jira ticket related to ORC format which you 
> shared. 
> 
> Couple of comments/requests about the pull request in th ticket:
> 
> 1. Sorry for nitpicking but meatSchema is mispelled. I think it should be 
> metaSchema. 
> 
> 2. Will you be able to add more unit tests in the commit ? Eg. Writing some 
> example data with simple schema which will initialize OrcWriter object and 
> sinking it to local hdfs node ?
> 
> 3. Are there plans to add support for other data types ?
> 
> Thanks,
> Sagar
> 
> On Sun, Jun 17, 2018 at 6:45 AM zhangminglei <18717838...@163.com 
> > wrote:
> But if we do hive on flink , I think it should be a very big project.
> 
> 
> > 在 2018年6月17日,下午9:36,Will Du mailto:will...@gmail.com>> 
> > 写道:
> > 
> > Agree, two missing pieces I think could make Flink more competitive against 
> > Spark SQL/Stream and Kafka Stream
> > 1. Flink over Hive or Flink SQL hive table source and sink
> > 2. Flink ML on stream
> > 
> > 
> >> On Jun 17, 2018, at 8:34 AM, zhangminglei <18717838...@163.com 
> >> > wrote:
> >> 
> >> Actually, I have been an idea, how about support hive on flink ? Since 
> >> lots of business are written by hive sql. And users wants to transform map 
> >> reduce to fink without changing the sql.
> >> 
> >> Zhangminglei
> >> 
> >> 
> >> 
> >>> 在 2018年6月17日,下午8:11,zhangminglei <18717838...@163.com 
> >>> > 写道:
> >>> 
> >>> Hi, Sagar
> >>> 
> >>> There already has relative JIRAs for ORC and Parquet, you can take a look 
> >>> here: 
> >>> 
> >>> https://issues.apache.org/jira/browse/FLINK-9407 
> >>>  
> >>>  >>> > and 
> >>> https://issues.apache.org/jira/browse/FLINK-9411 
> >>>  
> >>>  >>> >
> >>> 
> >>> For ORC format, Currently only support basic data types, such as Long, 
> >>> Boolean, Short, Integer, Float, Double, String. 
> >>> 
> >>> Best
> >>> Zhangminglei
> >>> 
> >>> 
> >>> 
>  在 2018年6月17日,上午11:11,sagar loke   > 写道:
>  
>  We are eagerly waiting for 
>  
>  - Extends Streaming Sinks:
>    - Bucketing Sink should support S3 properly (compensate for eventual 
>  consistency), work with Flink's shaded S3 file systems, and efficiently 
>  support formats that compress/index arcoss individual rows (Parquet, 
>  ORC, ...)
>  
>  Especially for ORC and Parquet sinks. Since, We are planning to use 
>  Kafka-jdbc to move data from rdbms to hdfs. 
>  
>  Thanks,
>  
>  On Sat, Jun 16, 2018 at 5:08 PM Elias Levy      >> wrote:
>  One more, since it we have to deal with it often:
>  
>  - Idling sources (Kafka in particular) and proper watermark propagation: 
>  FLINK-5018 / FLINK-5479
>  
>  On Fri, Jun 8, 2018 at 2:58 PM, Elias Levy      >> wrote:
>  Since wishes are free:
>  
>  - Standalone cluster job isolation: 
>  https://issues.apache.org/jira/browse/FLINK-8886 
>   
>    >
>  - Proper sliding window joins (not overlapping hoping window joins): 
>  https://issues.apache.org/jira/browse/FLINK-6243 
>   
>    >
>  - Sharing state across operators: 
>  https://issues.apache.org/jira/browse/FLINK-6239 
>   
>    >
>  - Synchronizing streams: 
>  

Re: [DISCUSS] Flink 1.6 features

2018-06-18 Thread sagar loke
Thanks @zhangminglei for replying.

I agree, hive on Flink would be a big project.

By the way, i looked at the Jira ticket related to ORC format which you
shared.

Couple of comments/requests about the pull request in th ticket:

1. Sorry for nitpicking but meatSchema is mispelled. I think it should be
metaSchema.

2. Will you be able to add more unit tests in the commit ? Eg. Writing some
example data with simple schema which will initialize OrcWriter object and
sinking it to local hdfs node ?

3. Are there plans to add support for other data types ?

Thanks,
Sagar

On Sun, Jun 17, 2018 at 6:45 AM zhangminglei <18717838...@163.com> wrote:

> But if we do hive on flink , I think it should be a very big project.
>
>
> > 在 2018年6月17日,下午9:36,Will Du  写道:
> >
> > Agree, two missing pieces I think could make Flink more competitive
> against Spark SQL/Stream and Kafka Stream
> > 1. Flink over Hive or Flink SQL hive table source and sink
> > 2. Flink ML on stream
> >
> >
> >> On Jun 17, 2018, at 8:34 AM, zhangminglei <18717838...@163.com> wrote:
> >>
> >> Actually, I have been an idea, how about support hive on flink ? Since
> lots of business are written by hive sql. And users wants to transform map
> reduce to fink without changing the sql.
> >>
> >> Zhangminglei
> >>
> >>
> >>
> >>> 在 2018年6月17日,下午8:11,zhangminglei <18717838...@163.com> 写道:
> >>>
> >>> Hi, Sagar
> >>>
> >>> There already has relative JIRAs for ORC and Parquet, you can take a
> look here:
> >>>
> >>> https://issues.apache.org/jira/browse/FLINK-9407 <
> https://issues.apache.org/jira/browse/FLINK-9407> and
> https://issues.apache.org/jira/browse/FLINK-9411 <
> https://issues.apache.org/jira/browse/FLINK-9411>
> >>>
> >>> For ORC format, Currently only support basic data types, such as Long,
> Boolean, Short, Integer, Float, Double, String.
> >>>
> >>> Best
> >>> Zhangminglei
> >>>
> >>>
> >>>
>  在 2018年6月17日,上午11:11,sagar loke  写道:
> 
>  We are eagerly waiting for
> 
>  - Extends Streaming Sinks:
>    - Bucketing Sink should support S3 properly (compensate for
> eventual consistency), work with Flink's shaded S3 file systems, and
> efficiently support formats that compress/index arcoss individual rows
> (Parquet, ORC, ...)
> 
>  Especially for ORC and Parquet sinks. Since, We are planning to use
> Kafka-jdbc to move data from rdbms to hdfs.
> 
>  Thanks,
> 
>  On Sat, Jun 16, 2018 at 5:08 PM Elias Levy <
> fearsome.lucid...@gmail.com > wrote:
>  One more, since it we have to deal with it often:
> 
>  - Idling sources (Kafka in particular) and proper watermark
> propagation: FLINK-5018 / FLINK-5479
> 
>  On Fri, Jun 8, 2018 at 2:58 PM, Elias Levy <
> fearsome.lucid...@gmail.com > wrote:
>  Since wishes are free:
> 
>  - Standalone cluster job isolation:
> https://issues.apache.org/jira/browse/FLINK-8886 <
> https://issues.apache.org/jira/browse/FLINK-8886>
>  - Proper sliding window joins (not overlapping hoping window joins):
> https://issues.apache.org/jira/browse/FLINK-6243 <
> https://issues.apache.org/jira/browse/FLINK-6243>
>  - Sharing state across operators:
> https://issues.apache.org/jira/browse/FLINK-6239 <
> https://issues.apache.org/jira/browse/FLINK-6239>
>  - Synchronizing streams:
> https://issues.apache.org/jira/browse/FLINK-4558 <
> https://issues.apache.org/jira/browse/FLINK-4558>
> 
>  Seconded:
>  - Atomic cancel-with-savepoint:
> https://issues.apache.org/jira/browse/FLINK-7634 <
> https://issues.apache.org/jira/browse/FLINK-7634>
>  - Support dynamically changing CEP patterns :
> https://issues.apache.org/jira/browse/FLINK-7129 <
> https://issues.apache.org/jira/browse/FLINK-7129>
> 
> 
>  On Fri, Jun 8, 2018 at 1:31 PM, Stephan Ewen  > wrote:
>  Hi all!
> 
>  Thanks for the discussion and good input. Many suggestions fit well
> with the proposal above.
> 
>  Please bear in mind that with a time-based release model, we would
> release whatever is mature by end of July.
>  The good thing is we could schedule the next release not too far
> after that, so that the features that did not quite make it will not be
> delayed too long.
>  In some sense, you could read this as as "what to do first" list,
> rather than "this goes in, other things stay out".
> 
>  Some thoughts on some of the suggestions
> 
>  Kubernetes integration: An opaque integration with Kubernetes should
> be supported through the "as a library" mode. For a deeper integration, I
> know that some committers have experimented with some PoC code. I would let
> Till add some thoughts, he has worked the most on the deployment parts
> recently.
> 
>  Per partition watermarks with idleness: Good point, could one
> implement that on the current interface, with a periodic watermark
> extractor?
> 
> 

Flink 1.5 Yarn Connection unexpectedly closed

2018-06-18 Thread Garrett Barton
Hey all,

 My jobs that I am trying to write in Flink 1.5 are failing after a few
minutes.  I think its because the idle task managers are shutting down,
which seems to kill the client and the running job. The running job itself
was still going on one of the other task managers.  I get:

org.apache.flink.client.program.ProgramInvocationException:
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
Connection unexpectedly closed by remote task manager ''. This might
indicate that the remote task manager was lost.
at
org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelInactive(CreditBasedPartitionRequestClientHandler.java:143)

Now I happen to have the last part of the flow paralleled to 1 right now
for debugging, so the 4 task managers that are spun up, 3 of them hit the
timeout period (currently set to 24).  I think as soon as the first one
goes the client throws up and the whole job dies as a result.

 Is this expected behavior and if so, is there another way around it? Do I
keep increasing the slotmanager.taskmanager-timeout to a really really
large number? I have verified setting the timeout to 84 lets the job
complete without error.

Thank you!


Control insert database with dataset

2018-06-18 Thread Dulce Morim
Hello,

I'm trying catch a BatchUpdateException when insert DataSet using a method 
output. Because, I need control if insert a duplicate key. How I can do this?



[2018-06-18 22:18:56,419] INFO DataSink 
(org.apache.flink.api.java.io.jdbc.JDBCOutputFormat@64aad6db) (1/1) 
(00a77c9e18f893cde9c62a3c9ca5c471) switched from RUNNING to FAILED. 
(org.apache.flink.runtime.executiongraph.ExecutionGraph)
java.lang.IllegalArgumentException: writeRecord() failed
at 
org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.writeRecord(JDBCOutputFormat.java:209)
at 
org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.writeRecord(JDBCOutputFormat.java:41)
at 
org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:194)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.sql.BatchUpdateException: Violation of PRIMARY KEY constraint 
'TEST_PK'. Cannot insert duplicate key in object 'TEST'. The duplicate key 
value is (37183).
at 
com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement.executeBatch(SQLServerPreparedStatement.java:2303)
at 
org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.writeRecord(JDBCOutputFormat.java:205)
... 4 more


Only have a generic exception:
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.


Thanks,
Dulce Morim


Test Bounce

2018-06-18 Thread Abdul Qadeer
Test


Metrics from detached execution mode

2018-06-18 Thread Abdul Qadeer
Hi!

I am trying to fetch metrics provided by Beam SDK via Flink runner in
detached mode, but looks like it is not supported yet.
I understand from class DetachedJobExecutionResult that metrics are not
supported to be extracted in detached mode job execution. Is this a
limitation of Flink as a runner or is it a feature not yet implemented? Is
there any way to get metrics out while the job is running? Any help
appreciated.


Re: Exception while submitting jobs through Yarn

2018-06-18 Thread Garvit Sharma
Yes, it is.

On Mon, Jun 18, 2018 at 7:54 PM Till Rohrmann  wrote:

> Is `/usr/hdp/2.6.3.0-235/hadoop/client/xercesImpl.jar` a link to `
> /usr/hdp/2.6.3.0-235/hadoop/client/xercesImpl-2.9.1.jar`?
>
> On Mon, Jun 18, 2018 at 4:02 PM Garvit Sharma  wrote:
>
>> I don't think I can access core-default as it comes with Hadoop jar
>>
>> On Mon, 18 Jun 2018 at 7:30 PM, Till Rohrmann 
>> wrote:
>>
>>> Hmm, could you check whether core-default.xml contains any suspicious
>>> entries? Apparently xerces:2.9.1 cannot read it.
>>>
>>> On Mon, Jun 18, 2018 at 3:40 PM Garvit Sharma 
>>> wrote:
>>>
 Hi,

 After putting the following log in my code, I can see that the Xerces
 version is - Xerces version : Xerces-J 2.9.1

 log.info("Xerces version : {}", 
 org.apache.xerces.impl.Version.getVersion());

 Also, following is the response of *$* *locate xerces* command on the
 server -


 /usr/hdp/2.6.1.0-129/falcon/client/lib/xercesImpl-2.10.0.jar

 /usr/hdp/2.6.1.0-129/hadoop/client/xercesImpl-2.9.1.jar

 /usr/hdp/2.6.1.0-129/hadoop/client/xercesImpl.jar

 /usr/hdp/2.6.1.0-129/hadoop-hdfs/lib/xercesImpl-2.9.1.jar

 /usr/hdp/2.6.1.0-129/hbase/lib/xercesImpl-2.9.1.jar


 /usr/hdp/2.6.1.0-129/hive-hcatalog/share/webhcat/svr/lib/xercesImpl-2.9.1.jar

 /usr/hdp/2.6.1.0-129/livy/jars/xercesImpl-2.9.1.jar

 /usr/hdp/2.6.1.0-129/livy2/jars/xercesImpl-2.9.1.jar

 /usr/hdp/2.6.1.0-129/oozie/lib/xercesImpl-2.10.0.jar

 /usr/hdp/2.6.1.0-129/oozie/libserver/xercesImpl-2.10.0.jar

 /usr/hdp/2.6.1.0-129/oozie/libtools/xercesImpl-2.10.0.jar

 /usr/hdp/2.6.1.0-129/slider/lib/xercesImpl-2.9.1.jar

 /usr/hdp/2.6.1.0-129/spark2/jars/xercesImpl-2.9.1.jar

 /usr/hdp/2.6.1.0-129/storm/contrib/storm-autocreds/xercesImpl-2.9.1.jar

 /usr/hdp/2.6.1.0-129/zookeeper/lib/xercesMinimal-1.9.6.2.jar

 /usr/hdp/2.6.3.0-235/falcon/client/lib/xercesImpl-2.10.0.jar

 /usr/hdp/2.6.3.0-235/hadoop/client/xercesImpl-2.9.1.jar

 /usr/hdp/2.6.3.0-235/hadoop/client/xercesImpl.jar

 /usr/hdp/2.6.3.0-235/hadoop-hdfs/lib/xercesImpl-2.9.1.jar

 /usr/hdp/2.6.3.0-235/hbase/lib/xercesImpl-2.9.1.jar


 /usr/hdp/2.6.3.0-235/hive-hcatalog/share/webhcat/svr/lib/xercesImpl-2.9.1.jar

 /usr/hdp/2.6.3.0-235/livy/jars/xercesImpl-2.9.1.jar

 /usr/hdp/2.6.3.0-235/livy2/jars/xercesImpl-2.9.1.jar

 /usr/hdp/2.6.3.0-235/oozie/lib/xercesImpl-2.10.0.jar

 /usr/hdp/2.6.3.0-235/oozie/libserver/xercesImpl-2.10.0.jar

 /usr/hdp/2.6.3.0-235/oozie/libtools/xercesImpl-2.10.0.jar


 /usr/hdp/2.6.3.0-235/ranger-admin/ews/webapp/WEB-INF/lib/xercesImpl-2.9.1.jar

 /usr/hdp/2.6.3.0-235/slider/lib/xercesImpl-2.9.1.jar

 /usr/hdp/2.6.3.0-235/spark2/jars/xercesImpl-2.9.1.jar

 /usr/hdp/2.6.3.0-235/storm/contrib/storm-autocreds/xercesImpl-2.9.1.jar

 /usr/hdp/2.6.3.0-235/zookeeper/lib/xercesMinimal-1.9.6.2.jar

 /usr/hdp/share/hst/hst-common/lib/xercesImpl-2.9.1.jar

 Now, I can say that the version of xerces are same.


 So, what is causing this issue if Xerces version is in sync?


 I am very excited to discover the issue :)


 Thanks,

 On Mon, Jun 18, 2018 at 6:27 PM Till Rohrmann 
 wrote:

> Could you check which xerces version you have on your classpath?
> Apparently, it cannot read core-default.xml as Ted pointed out. This might
> be the root cause for the failure.
>
> Cheers,
> Till
>
> On Mon, Jun 18, 2018 at 1:31 PM Garvit Sharma 
> wrote:
>
>> Hi,
>>
>> Sorry for the confusion, but the yarn is running on Hadoop version
>> 2.7 only and hence I am using Flink 1.5 Hadoop 2.7 binary.
>>
>> Below are the details provided by Yarn version command :
>>
>> Hadoop 2.7.3.2.6.3.0-235
>> Subversion g...@github.com:hortonworks/hadoop.git -r
>> 45bfd33bba8acadfa0e6024c80981c023b28d454
>> Compiled by jenkins on 2017-10-30T02:31Z
>> Compiled with protoc 2.5.0
>> From source with checksum cd1a4a466ef450f547c279989f3aa3
>> This command was run using
>> /usr/hdp/2.6.3.0-235/hadoop/hadoop-common-2.7.3.2.6.3.0-235.jar
>>
>> Please let me know if you have found the resolution to my issue :)
>>
>> Thanks,
>>
>>
>> On Mon, Jun 18, 2018 at 4:50 PM Till Rohrmann 
>> wrote:
>>
>>> Which Hadoop version have you installed? It looks as if Flink has
>>> been build with Hadoop 2.7 but I see /usr/hdp/2.6.3.0-235 in the class
>>> path. If you want to run Flink on Hadoop 2.6, then try to use the Hadoop
>>> free Flink binaries or the one built for Hadoop 2.6.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Mon, Jun 18, 2018 at 10:48 AM Garvit Sharma 
>>> wrote:
>>>
 Ok, I have attached the 

Re: Memory Leak in ProcessingTimeSessionWindow

2018-06-18 Thread ashish pok
Right, thats where I am headed now but was wondering there are any “gochas” I 
am missing before I try and dig into a few gigs of heap dump. 

Thanks, Ashish

Sent from Yahoo Mail for iPhone


On Monday, June 18, 2018, 3:37 AM, Stefan Richter  
wrote:

Hi,
can you take a heap dump from a JVM that runs into the problem and share it 
with us? That would make finding the cause a lot easier.
Best,Stefan


Am 15.06.2018 um 23:01 schrieb ashish pok :
All,
I have another slow Memory Leak situation using basic TimeSession Window 
(earlier it was GlobalWindow related that Fabian helped clarify). 
I have a very simple data pipeline:
DataStream processedData = rawTuples 
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(AppConfigs.getWindowSize(780
  .trigger(new ProcessingTimePurgeTrigger()) .apply(new IPSLAMetricWindowFn()) 
.name("windowFunctionTuple") .map(new TupleToPlatformEventMapFn()) 
.name("mapTupleEvent") ; 
I initially didnt even have ProcessingTmePurgeTrigger and it was using default 
Trigger. In an effort to fix this issue, I created my own Trigger from default 
ProcessingTimeTrigger with simple override to onProcessingTime method 
(essentially replacing FIRE with FIRE_AND_PURGE)
@Override public TriggerResult onProcessingTime(long time, 
TimeWindow window, TriggerContext ctx) { return TriggerResult.FIRE_AND_PURGE; }
This seems to have done nothing (may have delayed issue by couple of hours - 
not certain). But, I still see heap utilization creep up slowly and eventually 
reaches a point when GC starts to take too long and then the dreaded OOM. 
For completeness here is my Window Function (still using old function 
interface). It creates few metrics for reporting and applies logic by looping 
over the Iterable. NO states are explicitly kept in this function, needed 
RichWindowFunction to generate metrics basically.


public class IPSLAMetricWindowFn extends RichWindowFunction {




 private static final long serialVersionUID = 1L;

 

 private static Logger logger = 
LoggerFactory.getLogger(IPSLAMetricWindowFn.class);

 

 private Meter in;

 

 private Meter out;




 private Meter error;

 

 @Override

 public void open(Configuration conf) throws Exception {

     this.in = getRuntimeContext()

          .getMetricGroup()

          .addGroup(AppConstants.APP_METRICS.PROCESS)

          .meter(AppConstants.APP_METRICS.IN, new 
MeterView(AppConstants.APP_METRICS.INTERVAL_30));

     this.out = getRuntimeContext()

          .getMetricGroup()

          .addGroup(AppConstants.APP_METRICS.PROCESS)

          .meter(AppConstants.APP_METRICS.OUT, new 
MeterView(AppConstants.APP_METRICS.INTERVAL_30));

     this.error = getRuntimeContext()

          .getMetricGroup()

          .addGroup(AppConstants.APP_METRICS.PROCESS)

          .meter(AppConstants.APP_METRICS.ERROR, new 
MeterView(AppConstants.APP_METRICS.INTERVAL_30));

 super.open(conf);

 }




 @Override

 public void apply(String key, TimeWindow window, Iterable 
events, Collector collector) throws Exception {

 }

}



Appreciate any pointers on what could be causing leaks here. This seems pretty 
straight-forward.
Thanks, Ashish







Passing records between two jobs

2018-06-18 Thread Avihai Berkovitz
Hello,

We are planning a system that will be comprised of 3 different jobs:

  1.  Getting a stream of events, adding some metadata to the events, and 
outputting them to a temporary message queue.
  2.  Performing some calculations on the events we got from job 1, as required 
for product A.
  3.  Performing a different set of calculations of the events from job 1, for 
product B.

All 3 jobs will be developed by different teams, so we don't want to create one 
massive job that does everything.
The problem is that every message queuing sink only provides at-least-once 
guarantee. If job 1 crashes and recovers, we will get the same events in the 
queue and jobs 2 and 3 will process events twice. This is obviously a problem, 
and I guess we are not the first to stumble upon it.

Did anyone else had this issue? It seems to me like a fundamental problem of 
passing data between jobs, so hopefully there are known solutions and best 
practices. It would be great if you can share any solution.

Thanks,
Avihai



Re: Flink application does not scale as expected, please help!

2018-06-18 Thread Siew Wai Yow
Thanks @Fabian for your confirmation about the better performance when scaling 
happened at same TM machine. But it is so funny that it give impression "the 
more I scale the less I get" when the performance drop with more TM in play.

@Ovidiu question is interesting to know too. @Till do you mind to share your 
thoughts?

Thank you guys!


From: Ovidiu-Cristian MARCU 
Sent: Monday, June 18, 2018 6:28 PM
To: Fabian Hueske
Cc: Siew Wai Yow; Jörn Franke; user@flink.apache.org
Subject: Re: Flink application does not scale as expected, please help!

Hi all,

Allow me to add some comments/questions on this issue that is very interesting.
According to documentation [1] the pipeline example assumes the source is 
running with the same parallelism as successive map operator and the workflow 
optimizes to collocate source and map tasks if possible.

For an application configuring the source with different parallelism, assuming 
N task managers each with m slots, if I configure
the source operator with parallelism m, then all of the source's tasks could be 
scheduled on the first task manager?
I think the same story holds for sinks tasks.
So, in general is there any control over scheduling of source and sink tasks?
Would it be possible to enforce scheduling of source tasks to be balanced 
across task managers? Not sure if this is the default.
If the source creates a non-keyed stream, can we enforce the source to push 
records to local map tasks?

For Siew’s example, after source#map a keyBy complicates further things since 
each key can be possibly processed on another task manager.
At least the keyBy operator should run with the same parallelism as source and 
map and be pipelined on same slot (maybe shared slot configuration could 
enforce that).

DataStream AggregatedRecordWithAuditStream = sourceStringStream
.map(new JsonToRecordTranslator(markerFactory.getMarker(), 
inputlink)).name("JsonRecTranslator").setParallelism(pJ2R)
.keyBy(new KeySelector() {
private static final long serialVersionUID = 1L;
@Override
public String getKey(Record r) throws Exception {
return r.getUNIQUE_KEY();
}
})
.process(new ProcessAggregation(aggrDuration, markerFactory.getMarker(), 
markerFactory.getMarker())).setParallelism(pAggr)
.name("AggregationDuration: " + aggrDuration +"ms");


Thanks,
Ovidiu

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.5/internals/job_scheduling.html
[https://ci.apache.org/projects/flink/flink-docs-release-1.5/fig/slots.svg]

Apache Flink 1.5 Documentation: Jobs and 
Scheduling
ci.apache.org
Execution resources in Flink are defined through Task Slots. Each TaskManager 
will have one or more task slots, each of which can run one pipeline of 
parallel tasks. A pipeline consists of multiple successive tasks, such as the 
n-th parallel instance of a MapFunction together with the n-th parallel ...


On 18 Jun 2018, at 10:05, Fabian Hueske 
mailto:fhue...@gmail.com>> wrote:

Not sure if TM local assignment is explicitly designed in 1.5.0, but it might 
be an artifact of how slots are registered in the resource manager.
Till (in CC) should know how that works.

Anyway, tasks that run in the same TM exchange data via in-memory channels 
which is of course much faster than going over the network.
So yes, a performance drop when tasks are scheduled to different TMs is not 
unexpected IMO.
You can check that by starting multiple TMs with a single slot each and running 
you job on that setup.

Best, Fabian



2018-06-18 9:57 GMT+02:00 Siew Wai Yow 
mailto:wai_...@hotmail.com>>:
Hi Fabian,

We are using Flink 1.5.0. Any different in scheduler in Flink 1.5.0?

"Hence, applications might scale better until tasks are scheduled to different 
machines."

This seems the case. We have 32 vCPU 16 slots in one TM machine. So the scaling 
work perfectly 1-2-4-8-16 because all happens in same TM. When scale to 32 the 
performance drop, not even in par with case of parallelism 16. Is this 
something expected? Thank you.

Regards,
Yow


From: Fabian Hueske mailto:fhue...@gmail.com>>
Sent: Monday, June 18, 2018 3:47 PM
To: Siew Wai Yow
Cc: Jörn Franke; user@flink.apache.org

Subject: Re: Flink application does not scale as expected, please help!

Hi,

Which Flink version are you using?
Did you try to analyze the bottleneck of the application, i.e., is it CPU, disk 
IO, or network bound?

Regarding the task scheduling. AFAIK, before version 1.5.0, Flink tried to 
schedule tasks on the same machine to reduce the amount of network transfer.
Hence, applications might scale better until tasks are scheduled to different 
machines.

Fabian

2018-06-16 12:20 GMT+02:00 Siew Wai Yow 
mailto:wai_...@hotmail.com>>:
Hi Jorn, Please find the source 

Re: Exception while submitting jobs through Yarn

2018-06-18 Thread Till Rohrmann
Is `/usr/hdp/2.6.3.0-235/hadoop/client/xercesImpl.jar` a link to `
/usr/hdp/2.6.3.0-235/hadoop/client/xercesImpl-2.9.1.jar`?

On Mon, Jun 18, 2018 at 4:02 PM Garvit Sharma  wrote:

> I don't think I can access core-default as it comes with Hadoop jar
>
> On Mon, 18 Jun 2018 at 7:30 PM, Till Rohrmann 
> wrote:
>
>> Hmm, could you check whether core-default.xml contains any suspicious
>> entries? Apparently xerces:2.9.1 cannot read it.
>>
>> On Mon, Jun 18, 2018 at 3:40 PM Garvit Sharma 
>> wrote:
>>
>>> Hi,
>>>
>>> After putting the following log in my code, I can see that the Xerces
>>> version is - Xerces version : Xerces-J 2.9.1
>>>
>>> log.info("Xerces version : {}", 
>>> org.apache.xerces.impl.Version.getVersion());
>>>
>>> Also, following is the response of *$* *locate xerces* command on the
>>> server -
>>>
>>>
>>> /usr/hdp/2.6.1.0-129/falcon/client/lib/xercesImpl-2.10.0.jar
>>>
>>> /usr/hdp/2.6.1.0-129/hadoop/client/xercesImpl-2.9.1.jar
>>>
>>> /usr/hdp/2.6.1.0-129/hadoop/client/xercesImpl.jar
>>>
>>> /usr/hdp/2.6.1.0-129/hadoop-hdfs/lib/xercesImpl-2.9.1.jar
>>>
>>> /usr/hdp/2.6.1.0-129/hbase/lib/xercesImpl-2.9.1.jar
>>>
>>>
>>> /usr/hdp/2.6.1.0-129/hive-hcatalog/share/webhcat/svr/lib/xercesImpl-2.9.1.jar
>>>
>>> /usr/hdp/2.6.1.0-129/livy/jars/xercesImpl-2.9.1.jar
>>>
>>> /usr/hdp/2.6.1.0-129/livy2/jars/xercesImpl-2.9.1.jar
>>>
>>> /usr/hdp/2.6.1.0-129/oozie/lib/xercesImpl-2.10.0.jar
>>>
>>> /usr/hdp/2.6.1.0-129/oozie/libserver/xercesImpl-2.10.0.jar
>>>
>>> /usr/hdp/2.6.1.0-129/oozie/libtools/xercesImpl-2.10.0.jar
>>>
>>> /usr/hdp/2.6.1.0-129/slider/lib/xercesImpl-2.9.1.jar
>>>
>>> /usr/hdp/2.6.1.0-129/spark2/jars/xercesImpl-2.9.1.jar
>>>
>>> /usr/hdp/2.6.1.0-129/storm/contrib/storm-autocreds/xercesImpl-2.9.1.jar
>>>
>>> /usr/hdp/2.6.1.0-129/zookeeper/lib/xercesMinimal-1.9.6.2.jar
>>>
>>> /usr/hdp/2.6.3.0-235/falcon/client/lib/xercesImpl-2.10.0.jar
>>>
>>> /usr/hdp/2.6.3.0-235/hadoop/client/xercesImpl-2.9.1.jar
>>>
>>> /usr/hdp/2.6.3.0-235/hadoop/client/xercesImpl.jar
>>>
>>> /usr/hdp/2.6.3.0-235/hadoop-hdfs/lib/xercesImpl-2.9.1.jar
>>>
>>> /usr/hdp/2.6.3.0-235/hbase/lib/xercesImpl-2.9.1.jar
>>>
>>>
>>> /usr/hdp/2.6.3.0-235/hive-hcatalog/share/webhcat/svr/lib/xercesImpl-2.9.1.jar
>>>
>>> /usr/hdp/2.6.3.0-235/livy/jars/xercesImpl-2.9.1.jar
>>>
>>> /usr/hdp/2.6.3.0-235/livy2/jars/xercesImpl-2.9.1.jar
>>>
>>> /usr/hdp/2.6.3.0-235/oozie/lib/xercesImpl-2.10.0.jar
>>>
>>> /usr/hdp/2.6.3.0-235/oozie/libserver/xercesImpl-2.10.0.jar
>>>
>>> /usr/hdp/2.6.3.0-235/oozie/libtools/xercesImpl-2.10.0.jar
>>>
>>>
>>> /usr/hdp/2.6.3.0-235/ranger-admin/ews/webapp/WEB-INF/lib/xercesImpl-2.9.1.jar
>>>
>>> /usr/hdp/2.6.3.0-235/slider/lib/xercesImpl-2.9.1.jar
>>>
>>> /usr/hdp/2.6.3.0-235/spark2/jars/xercesImpl-2.9.1.jar
>>>
>>> /usr/hdp/2.6.3.0-235/storm/contrib/storm-autocreds/xercesImpl-2.9.1.jar
>>>
>>> /usr/hdp/2.6.3.0-235/zookeeper/lib/xercesMinimal-1.9.6.2.jar
>>>
>>> /usr/hdp/share/hst/hst-common/lib/xercesImpl-2.9.1.jar
>>>
>>> Now, I can say that the version of xerces are same.
>>>
>>>
>>> So, what is causing this issue if Xerces version is in sync?
>>>
>>>
>>> I am very excited to discover the issue :)
>>>
>>>
>>> Thanks,
>>>
>>> On Mon, Jun 18, 2018 at 6:27 PM Till Rohrmann 
>>> wrote:
>>>
 Could you check which xerces version you have on your classpath?
 Apparently, it cannot read core-default.xml as Ted pointed out. This might
 be the root cause for the failure.

 Cheers,
 Till

 On Mon, Jun 18, 2018 at 1:31 PM Garvit Sharma 
 wrote:

> Hi,
>
> Sorry for the confusion, but the yarn is running on Hadoop version 2.7
> only and hence I am using Flink 1.5 Hadoop 2.7 binary.
>
> Below are the details provided by Yarn version command :
>
> Hadoop 2.7.3.2.6.3.0-235
> Subversion g...@github.com:hortonworks/hadoop.git -r
> 45bfd33bba8acadfa0e6024c80981c023b28d454
> Compiled by jenkins on 2017-10-30T02:31Z
> Compiled with protoc 2.5.0
> From source with checksum cd1a4a466ef450f547c279989f3aa3
> This command was run using
> /usr/hdp/2.6.3.0-235/hadoop/hadoop-common-2.7.3.2.6.3.0-235.jar
>
> Please let me know if you have found the resolution to my issue :)
>
> Thanks,
>
>
> On Mon, Jun 18, 2018 at 4:50 PM Till Rohrmann 
> wrote:
>
>> Which Hadoop version have you installed? It looks as if Flink has
>> been build with Hadoop 2.7 but I see /usr/hdp/2.6.3.0-235 in the class
>> path. If you want to run Flink on Hadoop 2.6, then try to use the Hadoop
>> free Flink binaries or the one built for Hadoop 2.6.
>>
>> Cheers,
>> Till
>>
>> On Mon, Jun 18, 2018 at 10:48 AM Garvit Sharma 
>> wrote:
>>
>>> Ok, I have attached the log file.
>>>
>>> Please check and let me know.
>>>
>>> Thanks,
>>>
>>> On Mon, Jun 18, 2018 at 2:07 PM Amit Jain 
>>> wrote:
>>>
 Hi Gravit,

 I think Till is interested 

Debug job execution from savepoint

2018-06-18 Thread Haddadi Manuel
Hi all,


I would like to test my checkpointing implementation doing a step-by-step 
debugging under an IDE.


Is there a way to restore a job from a local savepoint in a local stream 
environnement, like a command "flink run -s :savepontpath" would do ?


Thanks,


Manuel


Re: Exception while submitting jobs through Yarn

2018-06-18 Thread Garvit Sharma
I don't think I can access core-default as it comes with Hadoop jar

On Mon, 18 Jun 2018 at 7:30 PM, Till Rohrmann  wrote:

> Hmm, could you check whether core-default.xml contains any suspicious
> entries? Apparently xerces:2.9.1 cannot read it.
>
> On Mon, Jun 18, 2018 at 3:40 PM Garvit Sharma  wrote:
>
>> Hi,
>>
>> After putting the following log in my code, I can see that the Xerces
>> version is - Xerces version : Xerces-J 2.9.1
>>
>> log.info("Xerces version : {}", org.apache.xerces.impl.Version.getVersion());
>>
>> Also, following is the response of *$* *locate xerces* command on the
>> server -
>>
>>
>> /usr/hdp/2.6.1.0-129/falcon/client/lib/xercesImpl-2.10.0.jar
>>
>> /usr/hdp/2.6.1.0-129/hadoop/client/xercesImpl-2.9.1.jar
>>
>> /usr/hdp/2.6.1.0-129/hadoop/client/xercesImpl.jar
>>
>> /usr/hdp/2.6.1.0-129/hadoop-hdfs/lib/xercesImpl-2.9.1.jar
>>
>> /usr/hdp/2.6.1.0-129/hbase/lib/xercesImpl-2.9.1.jar
>>
>>
>> /usr/hdp/2.6.1.0-129/hive-hcatalog/share/webhcat/svr/lib/xercesImpl-2.9.1.jar
>>
>> /usr/hdp/2.6.1.0-129/livy/jars/xercesImpl-2.9.1.jar
>>
>> /usr/hdp/2.6.1.0-129/livy2/jars/xercesImpl-2.9.1.jar
>>
>> /usr/hdp/2.6.1.0-129/oozie/lib/xercesImpl-2.10.0.jar
>>
>> /usr/hdp/2.6.1.0-129/oozie/libserver/xercesImpl-2.10.0.jar
>>
>> /usr/hdp/2.6.1.0-129/oozie/libtools/xercesImpl-2.10.0.jar
>>
>> /usr/hdp/2.6.1.0-129/slider/lib/xercesImpl-2.9.1.jar
>>
>> /usr/hdp/2.6.1.0-129/spark2/jars/xercesImpl-2.9.1.jar
>>
>> /usr/hdp/2.6.1.0-129/storm/contrib/storm-autocreds/xercesImpl-2.9.1.jar
>>
>> /usr/hdp/2.6.1.0-129/zookeeper/lib/xercesMinimal-1.9.6.2.jar
>>
>> /usr/hdp/2.6.3.0-235/falcon/client/lib/xercesImpl-2.10.0.jar
>>
>> /usr/hdp/2.6.3.0-235/hadoop/client/xercesImpl-2.9.1.jar
>>
>> /usr/hdp/2.6.3.0-235/hadoop/client/xercesImpl.jar
>>
>> /usr/hdp/2.6.3.0-235/hadoop-hdfs/lib/xercesImpl-2.9.1.jar
>>
>> /usr/hdp/2.6.3.0-235/hbase/lib/xercesImpl-2.9.1.jar
>>
>>
>> /usr/hdp/2.6.3.0-235/hive-hcatalog/share/webhcat/svr/lib/xercesImpl-2.9.1.jar
>>
>> /usr/hdp/2.6.3.0-235/livy/jars/xercesImpl-2.9.1.jar
>>
>> /usr/hdp/2.6.3.0-235/livy2/jars/xercesImpl-2.9.1.jar
>>
>> /usr/hdp/2.6.3.0-235/oozie/lib/xercesImpl-2.10.0.jar
>>
>> /usr/hdp/2.6.3.0-235/oozie/libserver/xercesImpl-2.10.0.jar
>>
>> /usr/hdp/2.6.3.0-235/oozie/libtools/xercesImpl-2.10.0.jar
>>
>>
>> /usr/hdp/2.6.3.0-235/ranger-admin/ews/webapp/WEB-INF/lib/xercesImpl-2.9.1.jar
>>
>> /usr/hdp/2.6.3.0-235/slider/lib/xercesImpl-2.9.1.jar
>>
>> /usr/hdp/2.6.3.0-235/spark2/jars/xercesImpl-2.9.1.jar
>>
>> /usr/hdp/2.6.3.0-235/storm/contrib/storm-autocreds/xercesImpl-2.9.1.jar
>>
>> /usr/hdp/2.6.3.0-235/zookeeper/lib/xercesMinimal-1.9.6.2.jar
>>
>> /usr/hdp/share/hst/hst-common/lib/xercesImpl-2.9.1.jar
>>
>> Now, I can say that the version of xerces are same.
>>
>>
>> So, what is causing this issue if Xerces version is in sync?
>>
>>
>> I am very excited to discover the issue :)
>>
>>
>> Thanks,
>>
>> On Mon, Jun 18, 2018 at 6:27 PM Till Rohrmann 
>> wrote:
>>
>>> Could you check which xerces version you have on your classpath?
>>> Apparently, it cannot read core-default.xml as Ted pointed out. This might
>>> be the root cause for the failure.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Mon, Jun 18, 2018 at 1:31 PM Garvit Sharma 
>>> wrote:
>>>
 Hi,

 Sorry for the confusion, but the yarn is running on Hadoop version 2.7
 only and hence I am using Flink 1.5 Hadoop 2.7 binary.

 Below are the details provided by Yarn version command :

 Hadoop 2.7.3.2.6.3.0-235
 Subversion g...@github.com:hortonworks/hadoop.git -r
 45bfd33bba8acadfa0e6024c80981c023b28d454
 Compiled by jenkins on 2017-10-30T02:31Z
 Compiled with protoc 2.5.0
 From source with checksum cd1a4a466ef450f547c279989f3aa3
 This command was run using
 /usr/hdp/2.6.3.0-235/hadoop/hadoop-common-2.7.3.2.6.3.0-235.jar

 Please let me know if you have found the resolution to my issue :)

 Thanks,


 On Mon, Jun 18, 2018 at 4:50 PM Till Rohrmann 
 wrote:

> Which Hadoop version have you installed? It looks as if Flink has been
> build with Hadoop 2.7 but I see /usr/hdp/2.6.3.0-235 in the class path. If
> you want to run Flink on Hadoop 2.6, then try to use the Hadoop free Flink
> binaries or the one built for Hadoop 2.6.
>
> Cheers,
> Till
>
> On Mon, Jun 18, 2018 at 10:48 AM Garvit Sharma 
> wrote:
>
>> Ok, I have attached the log file.
>>
>> Please check and let me know.
>>
>> Thanks,
>>
>> On Mon, Jun 18, 2018 at 2:07 PM Amit Jain  wrote:
>>
>>> Hi Gravit,
>>>
>>> I think Till is interested to know about classpath details present
>>> at the start of JM and TM logs e.g. following logs provide classpath
>>> details used by TM in our case.
>>>
>>> 2018-06-17 19:01:30,656 INFO
>>>  org.apache.flink.yarn.YarnTaskExecutorRunner  -
>>> 

Re: Exception while submitting jobs through Yarn

2018-06-18 Thread Till Rohrmann
Hmm, could you check whether core-default.xml contains any suspicious
entries? Apparently xerces:2.9.1 cannot read it.

On Mon, Jun 18, 2018 at 3:40 PM Garvit Sharma  wrote:

> Hi,
>
> After putting the following log in my code, I can see that the Xerces
> version is - Xerces version : Xerces-J 2.9.1
>
> log.info("Xerces version : {}", org.apache.xerces.impl.Version.getVersion());
>
> Also, following is the response of *$* *locate xerces* command on the
> server -
>
>
> /usr/hdp/2.6.1.0-129/falcon/client/lib/xercesImpl-2.10.0.jar
>
> /usr/hdp/2.6.1.0-129/hadoop/client/xercesImpl-2.9.1.jar
>
> /usr/hdp/2.6.1.0-129/hadoop/client/xercesImpl.jar
>
> /usr/hdp/2.6.1.0-129/hadoop-hdfs/lib/xercesImpl-2.9.1.jar
>
> /usr/hdp/2.6.1.0-129/hbase/lib/xercesImpl-2.9.1.jar
>
>
> /usr/hdp/2.6.1.0-129/hive-hcatalog/share/webhcat/svr/lib/xercesImpl-2.9.1.jar
>
> /usr/hdp/2.6.1.0-129/livy/jars/xercesImpl-2.9.1.jar
>
> /usr/hdp/2.6.1.0-129/livy2/jars/xercesImpl-2.9.1.jar
>
> /usr/hdp/2.6.1.0-129/oozie/lib/xercesImpl-2.10.0.jar
>
> /usr/hdp/2.6.1.0-129/oozie/libserver/xercesImpl-2.10.0.jar
>
> /usr/hdp/2.6.1.0-129/oozie/libtools/xercesImpl-2.10.0.jar
>
> /usr/hdp/2.6.1.0-129/slider/lib/xercesImpl-2.9.1.jar
>
> /usr/hdp/2.6.1.0-129/spark2/jars/xercesImpl-2.9.1.jar
>
> /usr/hdp/2.6.1.0-129/storm/contrib/storm-autocreds/xercesImpl-2.9.1.jar
>
> /usr/hdp/2.6.1.0-129/zookeeper/lib/xercesMinimal-1.9.6.2.jar
>
> /usr/hdp/2.6.3.0-235/falcon/client/lib/xercesImpl-2.10.0.jar
>
> /usr/hdp/2.6.3.0-235/hadoop/client/xercesImpl-2.9.1.jar
>
> /usr/hdp/2.6.3.0-235/hadoop/client/xercesImpl.jar
>
> /usr/hdp/2.6.3.0-235/hadoop-hdfs/lib/xercesImpl-2.9.1.jar
>
> /usr/hdp/2.6.3.0-235/hbase/lib/xercesImpl-2.9.1.jar
>
>
> /usr/hdp/2.6.3.0-235/hive-hcatalog/share/webhcat/svr/lib/xercesImpl-2.9.1.jar
>
> /usr/hdp/2.6.3.0-235/livy/jars/xercesImpl-2.9.1.jar
>
> /usr/hdp/2.6.3.0-235/livy2/jars/xercesImpl-2.9.1.jar
>
> /usr/hdp/2.6.3.0-235/oozie/lib/xercesImpl-2.10.0.jar
>
> /usr/hdp/2.6.3.0-235/oozie/libserver/xercesImpl-2.10.0.jar
>
> /usr/hdp/2.6.3.0-235/oozie/libtools/xercesImpl-2.10.0.jar
>
>
> /usr/hdp/2.6.3.0-235/ranger-admin/ews/webapp/WEB-INF/lib/xercesImpl-2.9.1.jar
>
> /usr/hdp/2.6.3.0-235/slider/lib/xercesImpl-2.9.1.jar
>
> /usr/hdp/2.6.3.0-235/spark2/jars/xercesImpl-2.9.1.jar
>
> /usr/hdp/2.6.3.0-235/storm/contrib/storm-autocreds/xercesImpl-2.9.1.jar
>
> /usr/hdp/2.6.3.0-235/zookeeper/lib/xercesMinimal-1.9.6.2.jar
>
> /usr/hdp/share/hst/hst-common/lib/xercesImpl-2.9.1.jar
>
> Now, I can say that the version of xerces are same.
>
>
> So, what is causing this issue if Xerces version is in sync?
>
>
> I am very excited to discover the issue :)
>
>
> Thanks,
>
> On Mon, Jun 18, 2018 at 6:27 PM Till Rohrmann 
> wrote:
>
>> Could you check which xerces version you have on your classpath?
>> Apparently, it cannot read core-default.xml as Ted pointed out. This might
>> be the root cause for the failure.
>>
>> Cheers,
>> Till
>>
>> On Mon, Jun 18, 2018 at 1:31 PM Garvit Sharma 
>> wrote:
>>
>>> Hi,
>>>
>>> Sorry for the confusion, but the yarn is running on Hadoop version 2.7
>>> only and hence I am using Flink 1.5 Hadoop 2.7 binary.
>>>
>>> Below are the details provided by Yarn version command :
>>>
>>> Hadoop 2.7.3.2.6.3.0-235
>>> Subversion g...@github.com:hortonworks/hadoop.git -r
>>> 45bfd33bba8acadfa0e6024c80981c023b28d454
>>> Compiled by jenkins on 2017-10-30T02:31Z
>>> Compiled with protoc 2.5.0
>>> From source with checksum cd1a4a466ef450f547c279989f3aa3
>>> This command was run using
>>> /usr/hdp/2.6.3.0-235/hadoop/hadoop-common-2.7.3.2.6.3.0-235.jar
>>>
>>> Please let me know if you have found the resolution to my issue :)
>>>
>>> Thanks,
>>>
>>>
>>> On Mon, Jun 18, 2018 at 4:50 PM Till Rohrmann 
>>> wrote:
>>>
 Which Hadoop version have you installed? It looks as if Flink has been
 build with Hadoop 2.7 but I see /usr/hdp/2.6.3.0-235 in the class path. If
 you want to run Flink on Hadoop 2.6, then try to use the Hadoop free Flink
 binaries or the one built for Hadoop 2.6.

 Cheers,
 Till

 On Mon, Jun 18, 2018 at 10:48 AM Garvit Sharma 
 wrote:

> Ok, I have attached the log file.
>
> Please check and let me know.
>
> Thanks,
>
> On Mon, Jun 18, 2018 at 2:07 PM Amit Jain  wrote:
>
>> Hi Gravit,
>>
>> I think Till is interested to know about classpath details present at
>> the start of JM and TM logs e.g. following logs provide classpath details
>> used by TM in our case.
>>
>> 2018-06-17 19:01:30,656 INFO
>>  org.apache.flink.yarn.YarnTaskExecutorRunner  -
>> 
>> 2018-06-17 19:01:30,658 INFO
>>  org.apache.flink.yarn.YarnTaskExecutorRunner  -  
>> Starting
>> YARN TaskExecutor runner (Version: 1.5.0, Rev:c61b108, Date:24.05.2018 @
>> 14:54:44 UTC)
>> 2018-06-17 19:01:30,659 INFO
>>  

Re: Exception while submitting jobs through Yarn

2018-06-18 Thread Till Rohrmann
Could you check which xerces version you have on your classpath?
Apparently, it cannot read core-default.xml as Ted pointed out. This might
be the root cause for the failure.

Cheers,
Till

On Mon, Jun 18, 2018 at 1:31 PM Garvit Sharma  wrote:

> Hi,
>
> Sorry for the confusion, but the yarn is running on Hadoop version 2.7
> only and hence I am using Flink 1.5 Hadoop 2.7 binary.
>
> Below are the details provided by Yarn version command :
>
> Hadoop 2.7.3.2.6.3.0-235
> Subversion g...@github.com:hortonworks/hadoop.git -r
> 45bfd33bba8acadfa0e6024c80981c023b28d454
> Compiled by jenkins on 2017-10-30T02:31Z
> Compiled with protoc 2.5.0
> From source with checksum cd1a4a466ef450f547c279989f3aa3
> This command was run using
> /usr/hdp/2.6.3.0-235/hadoop/hadoop-common-2.7.3.2.6.3.0-235.jar
>
> Please let me know if you have found the resolution to my issue :)
>
> Thanks,
>
>
> On Mon, Jun 18, 2018 at 4:50 PM Till Rohrmann 
> wrote:
>
>> Which Hadoop version have you installed? It looks as if Flink has been
>> build with Hadoop 2.7 but I see /usr/hdp/2.6.3.0-235 in the class path. If
>> you want to run Flink on Hadoop 2.6, then try to use the Hadoop free Flink
>> binaries or the one built for Hadoop 2.6.
>>
>> Cheers,
>> Till
>>
>> On Mon, Jun 18, 2018 at 10:48 AM Garvit Sharma 
>> wrote:
>>
>>> Ok, I have attached the log file.
>>>
>>> Please check and let me know.
>>>
>>> Thanks,
>>>
>>> On Mon, Jun 18, 2018 at 2:07 PM Amit Jain  wrote:
>>>
 Hi Gravit,

 I think Till is interested to know about classpath details present at
 the start of JM and TM logs e.g. following logs provide classpath details
 used by TM in our case.

 2018-06-17 19:01:30,656 INFO
  org.apache.flink.yarn.YarnTaskExecutorRunner  -
 
 2018-06-17 19:01:30,658 INFO
  org.apache.flink.yarn.YarnTaskExecutorRunner  -  Starting
 YARN TaskExecutor runner (Version: 1.5.0, Rev:c61b108, Date:24.05.2018 @
 14:54:44 UTC)
 2018-06-17 19:01:30,659 INFO
  org.apache.flink.yarn.YarnTaskExecutorRunner  -  OS
 current user: yarn
 2018-06-17 19:01:31,662 INFO
  org.apache.flink.yarn.YarnTaskExecutorRunner  -  Current
 Hadoop/Kerberos user: hadoop
 2018-06-17 19:01:31,663 INFO
  org.apache.flink.yarn.YarnTaskExecutorRunner  -  JVM:
 OpenJDK 64-Bit Server VM - Oracle Corporation - 1.8/25.171-b10
 2018-06-17 19:01:31,663 INFO
  org.apache.flink.yarn.YarnTaskExecutorRunner  -  Maximum
 heap size: 6647 MiBytes
 2018-06-17 19:01:31,663 INFO
  org.apache.flink.yarn.YarnTaskExecutorRunner  -
  JAVA_HOME: /usr/lib/jvm/java-openjdk
 2018-06-17 19:01:31,664 INFO
  org.apache.flink.yarn.YarnTaskExecutorRunner  -  Hadoop
 version: 2.8.3
 2018-06-17 19:01:31,664 INFO
  org.apache.flink.yarn.YarnTaskExecutorRunner  -  JVM
 Options:
 2018-06-17 19:01:31,665 INFO
  org.apache.flink.yarn.YarnTaskExecutorRunner  -
 -Xms6936m
 2018-06-17 19:01:31,665 INFO
  org.apache.flink.yarn.YarnTaskExecutorRunner  -
 -Xmx6936m
 2018-06-17 19:01:31,665 INFO
  org.apache.flink.yarn.YarnTaskExecutorRunner  -
 -XX:MaxDirectMemorySize=4072m
 2018-06-17 19:01:31,665 INFO
  org.apache.flink.yarn.YarnTaskExecutorRunner  -
 -Dlog.file=/var/log/hadoop-yarn/containers/application_1528342246614_0002/container_1528342246614_0002_01_282649/taskmanager.log
 2018-06-17 19:01:31,665 INFO
  org.apache.flink.yarn.YarnTaskExecutorRunner  -
 -Dlogback.configurationFile=file:./logback.xml
 2018-06-17 19:01:31,665 INFO
  org.apache.flink.yarn.YarnTaskExecutorRunner  -
 -Dlog4j.configuration=file:./log4j.properties
 2018-06-17 19:01:31,665 INFO
  org.apache.flink.yarn.YarnTaskExecutorRunner  -  Program
 Arguments:
 2018-06-17 19:01:31,665 INFO
  org.apache.flink.yarn.YarnTaskExecutorRunner  -
 --configDir
 2018-06-17 19:01:31,665 INFO
  org.apache.flink.yarn.YarnTaskExecutorRunner  - .
 *2018-06-17 19:01:31,666 INFO
  org.apache.flink.yarn.YarnTaskExecutorRunner  -
  Classpath:
 

Re: flink and akka HTTP

2018-06-18 Thread Till Rohrmann
Try adding akka-remote:2.5.11 to see whether it resolves the issues. The
problem is that you have akka-actor as a dependency which is also required
by Flink. If Flink loads akka-actor:2.5.11 and then tries to use
akka-remote:2.4.x, this might cause the problem.

Cheers,
Till

On Mon, Jun 18, 2018 at 2:17 PM Gäckler Martin <
martin.gaeck...@esolutions.de> wrote:

> Here are the dependencies:
>
>
>
> runtimeClasspath - Runtime classpath of source set 'main'.
>
> +--- org.apache.flink:flink-connector-kafka-0.11_2.11:1.4.2
>
> |+--- org.apache.flink:flink-connector-kafka-0.10_2.11:1.4.2
>
> ||+--- org.apache.flink:flink-connector-kafka-0.9_2.11:1.4.2
>
> |||+--- org.apache.flink:flink-connector-kafka-base_2.11:1.4.2
>
> ||||\--- org.apache.flink:force-shading:1.4.2
>
> |||+--- org.apache.kafka:kafka-clients:0.9.0.1 -> 0.11.0.2
>
> ||||+--- net.jpountz.lz4:lz4:1.3.0
>
> ||||+--- org.xerial.snappy:snappy-java:1.1.2.6
>
> ||||\--- org.slf4j:slf4j-api:1.7.25
>
> |||\--- org.apache.flink:force-shading:1.4.2
>
> ||+--- org.apache.kafka:kafka-clients:0.10.2.1 -> 0.11.0.2 (*)
>
> ||\--- org.apache.flink:force-shading:1.4.2
>
> |+--- org.apache.kafka:kafka-clients:0.11.0.2 (*)
>
> |\--- org.apache.flink:force-shading:1.4.2
>
> +--- openlr:map:1.4.2
>
> |+--- commons-configuration:commons-configuration:1.9 -> 1.10
>
> ||+--- commons-lang:commons-lang:2.6
>
> ||\--- commons-logging:commons-logging:1.1.1 -> 1.2
>
> |+--- log4j:log4j:1.2.17
>
> |\--- commons-lang:commons-lang:2.6
>
> +--- com.twitter:chill-protobuf:0.9.2
>
> |+--- com.twitter:chill-java:0.9.2
>
> ||\--- com.esotericsoftware:kryo-shaded:4.0.0
>
> || +--- com.esotericsoftware:minlog:1.3.0
>
> || \--- org.objenesis:objenesis:2.2
>
> |\--- com.esotericsoftware:kryo-shaded:4.0.0 (*)
>
> +--- de.eso.swarm:platform-sdk-java-core:latest.integration ->
> 0.0.1-SNAPSHOT
>
> |+--- org.apache.logging.log4j:log4j-api:2.7 -> 2.11.0
>
> |\--- org.apache.logging.log4j:log4j-core:2.7 -> 2.11.0
>
> | \--- org.apache.logging.log4j:log4j-api:2.11.0
>
> +--- de.eso.swarm:ncfs-openlrmap-sdk-java:latest.integration ->
> 0.0.1-SNAPSHOT
>
> |+--- de.eso.swarm:platform-sdk-java-core:latest.integration ->
> 0.0.1-SNAPSHOT (*)
>
> |+--- mysql:mysql-connector-java:5.1.39
>
> |+--- org.apache.logging.log4j:log4j-1.2-api:2.7 -> 2.11.0
>
> ||+--- org.apache.logging.log4j:log4j-api:2.11.0
>
> ||\--- org.apache.logging.log4j:log4j-core:2.11.0 (*)
>
> |+--- org.apache.logging.log4j:log4j-api:2.7 -> 2.11.0
>
> |+--- org.apache.logging.log4j:log4j-core:2.7 -> 2.11.0 (*)
>
> |+--- commons-configuration:commons-configuration:1.10 (*)
>
> |\--- openlr:map:1.4.2 (*)
>
> +--- de.eso:tpeg-sdk-java:latest.integration -> 0.0.1-SNAPSHOT
>
> |+--- org.apache.logging.log4j:log4j-1.2-api:latest.integration ->
> 2.11.0 (*)
>
> |+--- org.apache.logging.log4j:log4j-api:latest.integration -> 2.11.0
>
> |+--- org.apache.logging.log4j:log4j-core:latest.integration -> 2.11.0
> (*)
>
> |+--- openlr:map:1.4.2 (*)
>
> |+--- openlr:encoder:1.4.2
>
> ||+--- commons-lang:commons-lang:2.6
>
> ||+--- openlr:map:1.4.2 (*)
>
> ||+--- openlr:data:1.4.2
>
> |||+--- openlr:map:1.4.2 (*)
>
> |||+--- commons-lang:commons-lang:2.6
>
> |||+--- commons-configuration:commons-configuration:1.9 ->
> 1.10 (*)
>
> |||\--- log4j:log4j:1.2.17
>
> ||+--- log4j:log4j:1.2.17
>
> ||\--- commons-configuration:commons-configuration:1.9 -> 1.10 (*)
>
> |+--- openlr:decoder:1.4.2
>
> ||+--- log4j:log4j:1.2.17
>
> ||+--- openlr:map:1.4.2 (*)
>
> ||+--- openlr:data:1.4.2 (*)
>
> ||\--- commons-configuration:commons-configuration:1.9 -> 1.10 (*)
>
> |\--- openlr:data:1.4.2 (*)
>
> +--- de.eso.swarm.aqp:aqp-sdk-java:latest.integration -> 0.0.1-SNAPSHOT
>
> |+--- org.apache.avro:avro:1.8.2
>
> ||+--- org.codehaus.jackson:jackson-core-asl:1.9.13
>
> ||+--- org.codehaus.jackson:jackson-mapper-asl:1.9.13
>
> |||\--- org.codehaus.jackson:jackson-core-asl:1.9.13
>
> ||+--- com.thoughtworks.paranamer:paranamer:2.7
>
> ||+--- org.xerial.snappy:snappy-java:1.1.1.3 -> 1.1.2.6
>
> ||+--- org.apache.commons:commons-compress:1.8.1
>
> ||+--- org.tukaani:xz:1.5
>
> ||\--- org.slf4j:slf4j-api:1.7.7 -> 1.7.25
>
> +--- org.apache.commons:commons-lang3:3.7
>
> |+--- com.google.protobuf:protobuf-java:3.5.1
>
> |+--- de.eso.swarm:platform-sdk-java-core:latest.integration ->
> 0.0.1-SNAPSHOT (*)
>
> |\--- xfcd.codec:xfcd-sdk-java-codec:latest.integration ->
> 0.0.1-SNAPSHOT
>
> | \--- com.google.protobuf:protobuf-java:3.2.0 -> 3.5.1
>
> +---
> 

[ANNOUNCE] Weekly community update #25.

2018-06-18 Thread Till Rohrmann
Dear community,

this is the weekly community update thread #25. Please post any news and
updates you want to share with the community to this thread.

# Making Flink Table Scala free

The community started discussing whether and how to make flink-table Scala
free [1]. This change would be an incremental process where more and more
classes are replaced by Java implementations. If you have any experience
with transitioning from a Scala code base to a Java code base, then the
community would be happy to hear it.

# Static code analysis for Flink project

There is a discussion about the benefits of static code analysis for Apache
Flink [2]. If you want to share your opinion, then please join the thread.

[1]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Long-term-goal-of-making-flink-table-Scala-free-td22761.html
[2]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Static-code-analysis-for-Flink-project-td22752.html

Cheers,
Till


RE: flink and akka HTTP

2018-06-18 Thread Gäckler Martin
Here are the dependencies:

runtimeClasspath - Runtime classpath of source set 'main'.
+--- org.apache.flink:flink-connector-kafka-0.11_2.11:1.4.2
|+--- org.apache.flink:flink-connector-kafka-0.10_2.11:1.4.2
||+--- org.apache.flink:flink-connector-kafka-0.9_2.11:1.4.2
|||+--- org.apache.flink:flink-connector-kafka-base_2.11:1.4.2
||||\--- org.apache.flink:force-shading:1.4.2
|||+--- org.apache.kafka:kafka-clients:0.9.0.1 -> 0.11.0.2
||||+--- net.jpountz.lz4:lz4:1.3.0
||||+--- org.xerial.snappy:snappy-java:1.1.2.6
||||\--- org.slf4j:slf4j-api:1.7.25
|||\--- org.apache.flink:force-shading:1.4.2
||+--- org.apache.kafka:kafka-clients:0.10.2.1 -> 0.11.0.2 (*)
||\--- org.apache.flink:force-shading:1.4.2
|+--- org.apache.kafka:kafka-clients:0.11.0.2 (*)
|\--- org.apache.flink:force-shading:1.4.2
+--- openlr:map:1.4.2
|+--- commons-configuration:commons-configuration:1.9 -> 1.10
||+--- commons-lang:commons-lang:2.6
||\--- commons-logging:commons-logging:1.1.1 -> 1.2
|+--- log4j:log4j:1.2.17
|\--- commons-lang:commons-lang:2.6
+--- com.twitter:chill-protobuf:0.9.2
|+--- com.twitter:chill-java:0.9.2
||\--- com.esotericsoftware:kryo-shaded:4.0.0
|| +--- com.esotericsoftware:minlog:1.3.0
|| \--- org.objenesis:objenesis:2.2
|\--- com.esotericsoftware:kryo-shaded:4.0.0 (*)
+--- de.eso.swarm:platform-sdk-java-core:latest.integration -> 0.0.1-SNAPSHOT
|+--- org.apache.logging.log4j:log4j-api:2.7 -> 2.11.0
|\--- org.apache.logging.log4j:log4j-core:2.7 -> 2.11.0
| \--- org.apache.logging.log4j:log4j-api:2.11.0
+--- de.eso.swarm:ncfs-openlrmap-sdk-java:latest.integration -> 0.0.1-SNAPSHOT
|+--- de.eso.swarm:platform-sdk-java-core:latest.integration -> 
0.0.1-SNAPSHOT (*)
|+--- mysql:mysql-connector-java:5.1.39
|+--- org.apache.logging.log4j:log4j-1.2-api:2.7 -> 2.11.0
||+--- org.apache.logging.log4j:log4j-api:2.11.0
||\--- org.apache.logging.log4j:log4j-core:2.11.0 (*)
|+--- org.apache.logging.log4j:log4j-api:2.7 -> 2.11.0
|+--- org.apache.logging.log4j:log4j-core:2.7 -> 2.11.0 (*)
|+--- commons-configuration:commons-configuration:1.10 (*)
|\--- openlr:map:1.4.2 (*)
+--- de.eso:tpeg-sdk-java:latest.integration -> 0.0.1-SNAPSHOT
|+--- org.apache.logging.log4j:log4j-1.2-api:latest.integration -> 2.11.0 
(*)
|+--- org.apache.logging.log4j:log4j-api:latest.integration -> 2.11.0
|+--- org.apache.logging.log4j:log4j-core:latest.integration -> 2.11.0 (*)
|+--- openlr:map:1.4.2 (*)
|+--- openlr:encoder:1.4.2
||+--- commons-lang:commons-lang:2.6
||+--- openlr:map:1.4.2 (*)
||+--- openlr:data:1.4.2
|||+--- openlr:map:1.4.2 (*)
|||+--- commons-lang:commons-lang:2.6
|||+--- commons-configuration:commons-configuration:1.9 -> 1.10 (*)
|||\--- log4j:log4j:1.2.17
||+--- log4j:log4j:1.2.17
||\--- commons-configuration:commons-configuration:1.9 -> 1.10 (*)
|+--- openlr:decoder:1.4.2
||+--- log4j:log4j:1.2.17
||+--- openlr:map:1.4.2 (*)
||+--- openlr:data:1.4.2 (*)
||\--- commons-configuration:commons-configuration:1.9 -> 1.10 (*)
|\--- openlr:data:1.4.2 (*)
+--- de.eso.swarm.aqp:aqp-sdk-java:latest.integration -> 0.0.1-SNAPSHOT
|+--- org.apache.avro:avro:1.8.2
||+--- org.codehaus.jackson:jackson-core-asl:1.9.13
||+--- org.codehaus.jackson:jackson-mapper-asl:1.9.13
|||\--- org.codehaus.jackson:jackson-core-asl:1.9.13
||+--- com.thoughtworks.paranamer:paranamer:2.7
||+--- org.xerial.snappy:snappy-java:1.1.1.3 -> 1.1.2.6
||+--- org.apache.commons:commons-compress:1.8.1
||+--- org.tukaani:xz:1.5
||\--- org.slf4j:slf4j-api:1.7.7 -> 1.7.25
+--- org.apache.commons:commons-lang3:3.7
|+--- com.google.protobuf:protobuf-java:3.5.1
|+--- de.eso.swarm:platform-sdk-java-core:latest.integration -> 
0.0.1-SNAPSHOT (*)
|\--- xfcd.codec:xfcd-sdk-java-codec:latest.integration -> 0.0.1-SNAPSHOT
| \--- com.google.protobuf:protobuf-java:3.2.0 -> 3.5.1
+--- com.here:here-data-ingestion-sdk-java-hazard-events:latest.integration -> 
0.0.1-SNAPSHOT
|\--- com.google.protobuf:protobuf-java:3.2.0 -> 3.5.1
\--- de.eso.swarm:here-data-ingestion-sdk-java-client:latest.integration -> 
0.0.1-SNAPSHOT
 +--- com.here.account:here-oauth-client:latest.integration -> 0.4.13
 |+--- org.ini4j:ini4j:0.5.1
 |+--- commons-codec:commons-codec:1.10
 |+--- com.fasterxml.jackson.core:jackson-databind:2.8.1
 ||+--- com.fasterxml.jackson.core:jackson-annotations:2.8.0
 ||\--- com.fasterxml.jackson.core:jackson-core:2.8.1
 |\--- org.apache.httpcomponents:httpclient:4.5.2
 | +--- org.apache.httpcomponents:httpcore:4.4.4

Re: Exception while submitting jobs through Yarn

2018-06-18 Thread Till Rohrmann
Which Hadoop version have you installed? It looks as if Flink has been
build with Hadoop 2.7 but I see /usr/hdp/2.6.3.0-235 in the class path. If
you want to run Flink on Hadoop 2.6, then try to use the Hadoop free Flink
binaries or the one built for Hadoop 2.6.

Cheers,
Till

On Mon, Jun 18, 2018 at 10:48 AM Garvit Sharma  wrote:

> Ok, I have attached the log file.
>
> Please check and let me know.
>
> Thanks,
>
> On Mon, Jun 18, 2018 at 2:07 PM Amit Jain  wrote:
>
>> Hi Gravit,
>>
>> I think Till is interested to know about classpath details present at the
>> start of JM and TM logs e.g. following logs provide classpath details used
>> by TM in our case.
>>
>> 2018-06-17 19:01:30,656 INFO
>>  org.apache.flink.yarn.YarnTaskExecutorRunner  -
>> 
>> 2018-06-17 19:01:30,658 INFO
>>  org.apache.flink.yarn.YarnTaskExecutorRunner  -  Starting
>> YARN TaskExecutor runner (Version: 1.5.0, Rev:c61b108, Date:24.05.2018 @
>> 14:54:44 UTC)
>> 2018-06-17 19:01:30,659 INFO
>>  org.apache.flink.yarn.YarnTaskExecutorRunner  -  OS
>> current user: yarn
>> 2018-06-17 19:01:31,662 INFO
>>  org.apache.flink.yarn.YarnTaskExecutorRunner  -  Current
>> Hadoop/Kerberos user: hadoop
>> 2018-06-17 19:01:31,663 INFO
>>  org.apache.flink.yarn.YarnTaskExecutorRunner  -  JVM:
>> OpenJDK 64-Bit Server VM - Oracle Corporation - 1.8/25.171-b10
>> 2018-06-17 19:01:31,663 INFO
>>  org.apache.flink.yarn.YarnTaskExecutorRunner  -  Maximum
>> heap size: 6647 MiBytes
>> 2018-06-17 19:01:31,663 INFO
>>  org.apache.flink.yarn.YarnTaskExecutorRunner  -
>>  JAVA_HOME: /usr/lib/jvm/java-openjdk
>> 2018-06-17 19:01:31,664 INFO
>>  org.apache.flink.yarn.YarnTaskExecutorRunner  -  Hadoop
>> version: 2.8.3
>> 2018-06-17 19:01:31,664 INFO
>>  org.apache.flink.yarn.YarnTaskExecutorRunner  -  JVM
>> Options:
>> 2018-06-17 19:01:31,665 INFO
>>  org.apache.flink.yarn.YarnTaskExecutorRunner  -
>> -Xms6936m
>> 2018-06-17 19:01:31,665 INFO
>>  org.apache.flink.yarn.YarnTaskExecutorRunner  -
>> -Xmx6936m
>> 2018-06-17 19:01:31,665 INFO
>>  org.apache.flink.yarn.YarnTaskExecutorRunner  -
>> -XX:MaxDirectMemorySize=4072m
>> 2018-06-17 19:01:31,665 INFO
>>  org.apache.flink.yarn.YarnTaskExecutorRunner  -
>> -Dlog.file=/var/log/hadoop-yarn/containers/application_1528342246614_0002/container_1528342246614_0002_01_282649/taskmanager.log
>> 2018-06-17 19:01:31,665 INFO
>>  org.apache.flink.yarn.YarnTaskExecutorRunner  -
>> -Dlogback.configurationFile=file:./logback.xml
>> 2018-06-17 19:01:31,665 INFO
>>  org.apache.flink.yarn.YarnTaskExecutorRunner  -
>> -Dlog4j.configuration=file:./log4j.properties
>> 2018-06-17 19:01:31,665 INFO
>>  org.apache.flink.yarn.YarnTaskExecutorRunner  -  Program
>> Arguments:
>> 2018-06-17 19:01:31,665 INFO
>>  org.apache.flink.yarn.YarnTaskExecutorRunner  -
>> --configDir
>> 2018-06-17 19:01:31,665 INFO
>>  org.apache.flink.yarn.YarnTaskExecutorRunner  - .
>> *2018-06-17 19:01:31,666 INFO
>>  org.apache.flink.yarn.YarnTaskExecutorRunner  -
>>  Classpath:
>> lib/flink-dist_2.11-1.5.0.jar:lib/flink-python_2.11-1.5.0.jar:lib/flink-shaded-hadoop2-uber-1.5.0.jar:lib/flink-shaded-include-yarn-0.9.1.jar:lib/guava-18.0.jar:lib/log4j-1.2.17.jar:lib/slf4j-log4j12-1.7.7.jar:log4j.properties:logback.xml:flink.jar:flink-conf.yaml::/etc/hadoop/conf:/usr/lib/hadoop/hadoop-common-2.8.3-amzn-0.jar:/usr/lib/hadoop/hadoop-archive-logs.jar:/usr/lib/hadoop/hadoop-auth.jar:/usr/lib/hadoop/hadoop-archives-2.8.3-amzn-0.jar:/usr/lib/hadoop/hadoop-archive-logs-2.8.3-amzn-0.jar:/usr/lib/hadoop/hadoop-azure-datalake-2.8.3-amzn-0.jar.*
>>
>> --
>> Thanks,
>> Amit
>>
>> On Mon, Jun 18, 2018 at 2:00 PM, Garvit Sharma 
>> wrote:
>>
>>> Hi,
>>>
>>> Please refer to my previous mail for complete logs.
>>>
>>> Thanks,
>>>
>>> On Mon, Jun 18, 2018 at 1:17 PM Till Rohrmann 
>>> wrote:
>>>
 Could you also please share the complete log file with us.

 Cheers,
 Till

 On Sat, Jun 16, 2018 at 5:22 PM Ted Yu  wrote:

> The error for core-default.xml is interesting.
>
> Flink doesn't have this file. Probably it came with Yarn. Please check
> the hadoop version Flink was built with versus the hadoop version in your
> cluster.
>
> Thanks
>
>  Original message 
> From: Garvit Sharma 
> Date: 6/16/18 7:23 AM (GMT-08:00)
> To: trohrm...@apache.org
> Cc: Chesnay Schepler , user@flink.apache.org
> Subject: Re: Exception while submitting jobs through Yarn
>
> I am not able to figure out, got stuck badly in this since last 1
> week. Any little help would be appreciated.
>
>
> 

Re: flink and akka HTTP

2018-06-18 Thread Till Rohrmann
Just for clarification: The user jar contains the required Akka dependency
(including akka-remote and akka-actor)? Which version of Akka do you
require?

On Mon, Jun 18, 2018 at 1:07 PM Gäckler Martin <
martin.gaeck...@esolutions.de> wrote:

> No, without including the flink runtime to my own jar, nothing works.
>
>
>
> Regards
>
>
>
> Martin
>
>
>
>
>
>
>
>
> *--*
>
> *Martin Gäckler *
>
> Entwicklung Schwarmdienste
>
>
>
> Im Auftrag der
>
> e.solutions GmbH
>
> Despagstr. 4a
>
> 85055 Ingolstadt
>
> Germany
>
>
>
> Registered Office:
>
> Despagstr. 4a
>
> 85055 Ingolstadt
>
> Germany
>
>
>
> Phone  +49 8458 3332 145
>
>
>
> e.solutions GmbH
>
> Managing Directors Uwe Reder, Dr. Riclef Schmidt-Clausen
>
> Register Court Ingolstadt HRB 5221
>
>
>
> *From:* Till Rohrmann [mailto:trohrm...@apache.org]
> *Sent:* Montag, 18. Juni 2018 13:05
> *To:* Gäckler Martin 
> *Cc:* user 
> *Subject:* Re: flink and akka HTTP
>
>
>
> Is it then working or not?
>
>
>
> On Mon, Jun 18, 2018, 11:52 Gäckler Martin 
> wrote:
>
> Good morning,
>
>
>
> Thanks for the hint. AKKA streams and http are added to our user jar. If
> not, our HTTP client won't work. According to the flink documentation here:
> https://flink.apache.org/news/2017/12/12/release-1.4.0.html#changes-to-dynamic-class-loading-of-user-code
>  child
> first class loading is enabled by default and we did not change any
> settings here.
>
>
>
> Cheers
>
> Martin
>
>
>
>
> *--*
>
> *Martin Gäckler *
>
> Entwicklung Schwarmdienste
>
>
>
> Im Auftrag der
>
> e.solutions GmbH
>
> Despagstr. 4a
>
> 85055 Ingolstadt
>
> Germany
>
>
>
> Registered Office:
>
> Despagstr. 4a
>
> 85055 Ingolstadt
>
> Germany
>
>
>
> Phone  +49 8458 3332 145
>
>
>
> e.solutions GmbH
>
> Managing Directors Uwe Reder, Dr. Riclef Schmidt-Clausen
>
> Register Court Ingolstadt HRB 5221
>
>
>
> *From:* Till Rohrmann [mailto:trohrm...@apache.org]
> *Sent:* Montag, 18. Juni 2018 09:25
> *To:* Gäckler Martin 
> *Cc:* user 
> *Subject:* Re: flink and akka HTTP
>
>
>
> Hi,
>
>
>
> I assume that you have an Akka dependency conflict. By adding the Akka
> dependency version to your user jar and enabling child first class loading
> you should be able to control which Akka version is loaded. The only thing
> you have to check is whether Flink works with a newer version of Akka.
>
>
>
> Cheers,
>
> Till
>
>
>
> On Fri, Jun 15, 2018 at 8:15 PM Gäckler Martin <
> martin.gaeck...@esolutions.de> wrote:
>
> Good evening,
>
>
>
> According to Flink's documentation I have excluded the Flink runtime
> library from the runtime dependencies of my project:
>
>
>
> dependencies {
>
> compileOnly group: 'org.apache.flink',  name:
> 'flink-core',  version: '1.4.2'
>
> compileOnly group: 'org.apache.flink',  name:
> 'flink-java',  version: '1.4.2'
>
> compileOnly group: 'org.apache.flink',  name:
> 'flink-streaming-java_2.11',   version: '1.4.2'
>
> implementation  group: 'org.apache.flink',  name:
> 'flink-connector-kafka-0.11_2.11', version: '1.4.2'
>
> ...
>
> }
>
>
>
> Unfortunately I get the following error:
>
>
>
> Caused by: java.lang.ClassCastException: interface
> akka.serialization.Serializer is not assignable from class
> akka.remote.serialization.MiscMessageSerializer
>
>  at
> akka.actor.ReflectiveDynamicAccess$$anonfun$getClassFor$1.apply(ReflectiveDynamicAccess.scala:23)
>
>  at
> akka.actor.ReflectiveDynamicAccess$$anonfun$getClassFor$1.apply(ReflectiveDynamicAccess.scala:20)
>
>  at scala.util.Try$.apply(Try.scala:192)
>
>  at
> akka.actor.ReflectiveDynamicAccess.getClassFor(ReflectiveDynamicAccess.scala:20)
>
>  at
> akka.actor.ReflectiveDynamicAccess.createInstanceFor(ReflectiveDynamicAccess.scala:38)
>
>  at
> akka.serialization.Serialization.serializerOf(Serialization.scala:301)
>
>  at
> akka.serialization.Serialization$$anonfun$6.apply(Serialization.scala:327)
>
>  at
> akka.serialization.Serialization$$anonfun$6.apply(Serialization.scala:327)
>
>  at
> scala.collection.TraversableLike$WithFilter$$anonfun$map$2.apply(TraversableLike.scala:683)
>
>  at
> scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221)
>
>  at
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428)
>
>  at
> scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:682)
>
>  at akka.serialization.Serialization.(Serialization.scala:327)
>
>  at
> akka.serialization.SerializationExtension$.createExtension(SerializationExtension.scala:15)
>
>  at
> akka.serialization.SerializationExtension$.createExtension(SerializationExtension.scala:12)
>
>  at akka.actor.ActorSystemImpl.registerExtension(ActorSystem.scala:913)
>
>  at
> 

RE: flink and akka HTTP

2018-06-18 Thread Gäckler Martin
No, without including the flink runtime to my own jar, nothing works.

Regards

Martin



--
Martin Gäckler
Entwicklung Schwarmdienste

Im Auftrag der
e.solutions GmbH
Despagstr. 4a
85055 Ingolstadt
Germany

Registered Office:
Despagstr. 4a
85055 Ingolstadt
Germany

Phone  +49 8458 3332 145

e.solutions GmbH
Managing Directors Uwe Reder, Dr. Riclef Schmidt-Clausen
Register Court Ingolstadt HRB 5221

From: Till Rohrmann [mailto:trohrm...@apache.org]
Sent: Montag, 18. Juni 2018 13:05
To: Gäckler Martin 
Cc: user 
Subject: Re: flink and akka HTTP

Is it then working or not?

On Mon, Jun 18, 2018, 11:52 Gäckler Martin 
mailto:martin.gaeck...@esolutions.de>> wrote:
Good morning,

Thanks for the hint. AKKA streams and http are added to our user jar. If not, 
our HTTP client won't work. According to the flink documentation here: 
https://flink.apache.org/news/2017/12/12/release-1.4.0.html#changes-to-dynamic-class-loading-of-user-code
 child first class loading is enabled by default and we did not change any 
settings here.

Cheers
Martin

--
Martin Gäckler
Entwicklung Schwarmdienste

Im Auftrag der
e.solutions GmbH
Despagstr. 4a
85055 Ingolstadt
Germany

Registered Office:
Despagstr. 4a
85055 Ingolstadt
Germany

Phone  +49 8458 3332 145

e.solutions GmbH
Managing Directors Uwe Reder, Dr. Riclef Schmidt-Clausen
Register Court Ingolstadt HRB 5221

From: Till Rohrmann [mailto:trohrm...@apache.org]
Sent: Montag, 18. Juni 2018 09:25
To: Gäckler Martin 
mailto:martin.gaeck...@esolutions.de>>
Cc: user mailto:user@flink.apache.org>>
Subject: Re: flink and akka HTTP

Hi,

I assume that you have an Akka dependency conflict. By adding the Akka 
dependency version to your user jar and enabling child first class loading you 
should be able to control which Akka version is loaded. The only thing you have 
to check is whether Flink works with a newer version of Akka.

Cheers,
Till

On Fri, Jun 15, 2018 at 8:15 PM Gäckler Martin 
mailto:martin.gaeck...@esolutions.de>> wrote:
Good evening,

According to Flink's documentation I have excluded the Flink runtime library 
from the runtime dependencies of my project:

dependencies {
compileOnly group: 'org.apache.flink',  name: 'flink-core', 
 version: '1.4.2'
compileOnly group: 'org.apache.flink',  name: 'flink-java', 
 version: '1.4.2'
compileOnly group: 'org.apache.flink',  name: 
'flink-streaming-java_2.11',   version: '1.4.2'
implementation  group: 'org.apache.flink',  name: 
'flink-connector-kafka-0.11_2.11', version: '1.4.2'
...
}

Unfortunately I get the following error:

Caused by: java.lang.ClassCastException: interface 
akka.serialization.Serializer is not assignable from class 
akka.remote.serialization.MiscMessageSerializer
 at 
akka.actor.ReflectiveDynamicAccess$$anonfun$getClassFor$1.apply(ReflectiveDynamicAccess.scala:23)
 at 
akka.actor.ReflectiveDynamicAccess$$anonfun$getClassFor$1.apply(ReflectiveDynamicAccess.scala:20)
 at scala.util.Try$.apply(Try.scala:192)
 at 
akka.actor.ReflectiveDynamicAccess.getClassFor(ReflectiveDynamicAccess.scala:20)
 at 
akka.actor.ReflectiveDynamicAccess.createInstanceFor(ReflectiveDynamicAccess.scala:38)
 at akka.serialization.Serialization.serializerOf(Serialization.scala:301)
 at 
akka.serialization.Serialization$$anonfun$6.apply(Serialization.scala:327)
 at 
akka.serialization.Serialization$$anonfun$6.apply(Serialization.scala:327)
 at 
scala.collection.TraversableLike$WithFilter$$anonfun$map$2.apply(TraversableLike.scala:683)
 at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221)
 at 
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428)
 at 
scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:682)
 at akka.serialization.Serialization.(Serialization.scala:327)
 at 
akka.serialization.SerializationExtension$.createExtension(SerializationExtension.scala:15)
 at 
akka.serialization.SerializationExtension$.createExtension(SerializationExtension.scala:12)
 at akka.actor.ActorSystemImpl.registerExtension(ActorSystem.scala:913)
 at 
akka.actor.ActorSystemImpl$$anonfun$loadExtensions$1$1.apply(ActorSystem.scala:946)
 at 
akka.actor.ActorSystemImpl$$anonfun$loadExtensions$1$1.apply(ActorSystem.scala:944)
 at scala.collection.Iterator$class.foreach(Iterator.scala:891)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
 at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
 at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
 at akka.actor.ActorSystemImpl.loadExtensions$1(ActorSystem.scala:944)
 at akka.actor.ActorSystemImpl.loadExtensions(ActorSystem.scala:961)
 at 

Re: flink and akka HTTP

2018-06-18 Thread Till Rohrmann
Is it then working or not?

On Mon, Jun 18, 2018, 11:52 Gäckler Martin 
wrote:

> Good morning,
>
>
>
> Thanks for the hint. AKKA streams and http are added to our user jar. If
> not, our HTTP client won't work. According to the flink documentation here:
> https://flink.apache.org/news/2017/12/12/release-1.4.0.html#changes-to-dynamic-class-loading-of-user-code
>  child
> first class loading is enabled by default and we did not change any
> settings here.
>
>
>
> Cheers
>
> Martin
>
>
>
>
> *--*
>
> *Martin Gäckler *
>
> Entwicklung Schwarmdienste
>
>
>
> Im Auftrag der
>
> e.solutions GmbH
>
> Despagstr. 4a
>
> 85055 Ingolstadt
>
> Germany
>
>
>
> Registered Office:
>
> Despagstr. 4a
>
> 85055 Ingolstadt
>
> Germany
>
>
>
> Phone  +49 8458 3332 145
>
>
>
> e.solutions GmbH
>
> Managing Directors Uwe Reder, Dr. Riclef Schmidt-Clausen
>
> Register Court Ingolstadt HRB 5221
>
>
>
> *From:* Till Rohrmann [mailto:trohrm...@apache.org]
> *Sent:* Montag, 18. Juni 2018 09:25
> *To:* Gäckler Martin 
> *Cc:* user 
> *Subject:* Re: flink and akka HTTP
>
>
>
> Hi,
>
>
>
> I assume that you have an Akka dependency conflict. By adding the Akka
> dependency version to your user jar and enabling child first class loading
> you should be able to control which Akka version is loaded. The only thing
> you have to check is whether Flink works with a newer version of Akka.
>
>
>
> Cheers,
>
> Till
>
>
>
> On Fri, Jun 15, 2018 at 8:15 PM Gäckler Martin <
> martin.gaeck...@esolutions.de> wrote:
>
> Good evening,
>
>
>
> According to Flink's documentation I have excluded the Flink runtime
> library from the runtime dependencies of my project:
>
>
>
> dependencies {
>
> compileOnly group: 'org.apache.flink',  name:
> 'flink-core',  version: '1.4.2'
>
> compileOnly group: 'org.apache.flink',  name:
> 'flink-java',  version: '1.4.2'
>
> compileOnly group: 'org.apache.flink',  name:
> 'flink-streaming-java_2.11',   version: '1.4.2'
>
> implementation  group: 'org.apache.flink',  name:
> 'flink-connector-kafka-0.11_2.11', version: '1.4.2'
>
> ...
>
> }
>
>
>
> Unfortunately I get the following error:
>
>
>
> Caused by: java.lang.ClassCastException: interface
> akka.serialization.Serializer is not assignable from class
> akka.remote.serialization.MiscMessageSerializer
>
>  at
> akka.actor.ReflectiveDynamicAccess$$anonfun$getClassFor$1.apply(ReflectiveDynamicAccess.scala:23)
>
>  at
> akka.actor.ReflectiveDynamicAccess$$anonfun$getClassFor$1.apply(ReflectiveDynamicAccess.scala:20)
>
>  at scala.util.Try$.apply(Try.scala:192)
>
>  at
> akka.actor.ReflectiveDynamicAccess.getClassFor(ReflectiveDynamicAccess.scala:20)
>
>  at
> akka.actor.ReflectiveDynamicAccess.createInstanceFor(ReflectiveDynamicAccess.scala:38)
>
>  at
> akka.serialization.Serialization.serializerOf(Serialization.scala:301)
>
>  at
> akka.serialization.Serialization$$anonfun$6.apply(Serialization.scala:327)
>
>  at
> akka.serialization.Serialization$$anonfun$6.apply(Serialization.scala:327)
>
>  at
> scala.collection.TraversableLike$WithFilter$$anonfun$map$2.apply(TraversableLike.scala:683)
>
>  at
> scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221)
>
>  at
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428)
>
>  at
> scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:682)
>
>  at akka.serialization.Serialization.(Serialization.scala:327)
>
>  at
> akka.serialization.SerializationExtension$.createExtension(SerializationExtension.scala:15)
>
>  at
> akka.serialization.SerializationExtension$.createExtension(SerializationExtension.scala:12)
>
>  at akka.actor.ActorSystemImpl.registerExtension(ActorSystem.scala:913)
>
>  at
> akka.actor.ActorSystemImpl$$anonfun$loadExtensions$1$1.apply(ActorSystem.scala:946)
>
>  at
> akka.actor.ActorSystemImpl$$anonfun$loadExtensions$1$1.apply(ActorSystem.scala:944)
>
>  at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>
>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>
>  at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>
>  at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>
>  at akka.actor.ActorSystemImpl.loadExtensions$1(ActorSystem.scala:944)
>
>  at akka.actor.ActorSystemImpl.loadExtensions(ActorSystem.scala:961)
>
>  at akka.actor.ActorSystemImpl.liftedTree2$1(ActorSystem.scala:833)
>
>  at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:823)
>
>  at akka.actor.ActorSystemImpl._start(ActorSystem.scala:823)
>
>  at akka.actor.ActorSystemImpl.start(ActorSystem.scala:842)
>
>  at akka.actor.ActorSystem$.apply(ActorSystem.scala:246)
>
>  at akka.actor.ActorSystem$.apply(ActorSystem.scala:289)
>
>  at 

Re: Flink application does not scale as expected, please help!

2018-06-18 Thread Ovidiu-Cristian MARCU
Hi all,

Allow me to add some comments/questions on this issue that is very interesting.
According to documentation [1] the pipeline example assumes the source is 
running with the same parallelism as successive map operator and the workflow 
optimizes to collocate source and map tasks if possible.

For an application configuring the source with different parallelism, assuming 
N task managers each with m slots, if I configure
the source operator with parallelism m, then all of the source's tasks could be 
scheduled on the first task manager?
I think the same story holds for sinks tasks.
So, in general is there any control over scheduling of source and sink tasks?
Would it be possible to enforce scheduling of source tasks to be balanced 
across task managers? Not sure if this is the default.
If the source creates a non-keyed stream, can we enforce the source to push 
records to local map tasks?

For Siew’s example, after source#map a keyBy complicates further things since 
each key can be possibly processed on another task manager.
At least the keyBy operator should run with the same parallelism as source and 
map and be pipelined on same slot (maybe shared slot configuration could 
enforce that).

DataStream AggregatedRecordWithAuditStream = sourceStringStream
.map(new JsonToRecordTranslator(markerFactory.getMarker(), 
inputlink)).name("JsonRecTranslator").setParallelism(pJ2R) 
.keyBy(new KeySelector() {
private static final long serialVersionUID = 1L;

@Override
public String getKey(Record r) throws Exception {
return r.getUNIQUE_KEY(); 
}
}) 
.process(new ProcessAggregation(aggrDuration, 
markerFactory.getMarker(), markerFactory.getMarker())).setParallelism(pAggr)
.name("AggregationDuration: " + aggrDuration +"ms");


Thanks,
Ovidiu

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.5/internals/job_scheduling.html
 


> On 18 Jun 2018, at 10:05, Fabian Hueske  wrote:
> 
> Not sure if TM local assignment is explicitly designed in 1.5.0, but it might 
> be an artifact of how slots are registered in the resource manager. 
> Till (in CC) should know how that works.
> 
> Anyway, tasks that run in the same TM exchange data via in-memory channels 
> which is of course much faster than going over the network.
> So yes, a performance drop when tasks are scheduled to different TMs is not 
> unexpected IMO.
> You can check that by starting multiple TMs with a single slot each and 
> running you job on that setup.
> 
> Best, Fabian
> 
> 
> 
> 2018-06-18 9:57 GMT+02:00 Siew Wai Yow  >:
> Hi Fabian,
> 
> We are using Flink 1.5.0. Any different in scheduler in Flink 1.5.0?
> 
> "Hence, applications might scale better until tasks are scheduled to 
> different machines."
> 
> This seems the case. We have 32 vCPU 16 slots in one TM machine. So the 
> scaling work perfectly 1-2-4-8-16 because all happens in same TM. When scale 
> to 32 the performance drop, not even in par with case of parallelism 16. Is 
> this something expected? Thank you.
> 
> Regards,
> Yow
> 
> From: Fabian Hueske mailto:fhue...@gmail.com>>
> Sent: Monday, June 18, 2018 3:47 PM
> To: Siew Wai Yow
> Cc: Jörn Franke; user@flink.apache.org 
> 
> Subject: Re: Flink application does not scale as expected, please help!
>  
> Hi,
> 
> Which Flink version are you using?
> Did you try to analyze the bottleneck of the application, i.e., is it CPU, 
> disk IO, or network bound?
> 
> Regarding the task scheduling. AFAIK, before version 1.5.0, Flink tried to 
> schedule tasks on the same machine to reduce the amount of network transfer.
> Hence, applications might scale better until tasks are scheduled to different 
> machines.
> 
> Fabian
> 
> 2018-06-16 12:20 GMT+02:00 Siew Wai Yow  >:
> Hi Jorn, Please find the source @https://github.com/swyow/flink_sample_git 
> 
> Thank you!
> 
> From: Jörn Franke mailto:jornfra...@gmail.com>>
> Sent: Saturday, June 16, 2018 6:03 PM
> 
> To: Siew Wai Yow
> Cc: user@flink.apache.org 
> Subject: Re: Flink application does not scale as expected, please help!
>  
> Can you share the app source on gitlab, github or bitbucket etc? 
> 
> On 16. Jun 2018, at 11:46, Siew Wai Yow  > wrote:
> 
>> Hi, There is an interesting finding, the reason of low parallelism work much 
>> better is because all task being run in same TM, once we scale more, the 
>> task is distributed to different TM and the performance worse than the low 
>> parallelism case. Is this something expected? The more I scale the less I 
>> get?
>> 
>> From: Siew Wai Yow mailto:wai_...@hotmail.com>>
>> Sent: Saturday, June 16, 2018 5:09 PM
>> To: Jörn Franke
>> Cc: user@flink.apache.org 

Re: A question about Kryo and Window State

2018-06-18 Thread Vishal Santoshi
Any more insight?

On Wed, Jun 13, 2018, 3:34 PM Vishal Santoshi 
wrote:

> Any ideas on the standard way ( or any roundabout way ) of doing a version
> upgrade that looks back ward compatible.
> The  @FieldSerializer.Optional("0") actually does  ignore the field (
> even if reset ) giving it the default value if kyro is used. It has to do
> with the FieldSerializer behaves  .  There is another Serializer (
> Composite I believe ) that allows for such back ward compatible changes.
>
>
> I know some work is being done in 1.6 to allow for above use case and I
> think Google Data Flow does provide some avenues.
>
> Thanks much
>
> Vishal
>
>
>
> On Tue, Jun 12, 2018 at 11:30 PM, Vishal Santoshi <
> vishal.santo...@gmail.com> wrote:
>
>> I have a running pipe with Window State in a class say
>>
>> Class A{
>>  long a;
>> }
>>
>> It uses the default KryoSerializer
>>
>> I want to add a field to
>>
>> Class A {
>>   long a;
>>   long b;
>> }
>>
>> I need to suspend with SP and resume with the new version of Class A
>>
>>
>> Is there a definite way to do this. I tried
>>
>> Class A {
>>   long a;
>>@FieldSerializer.Optional("0")
>>   long b;
>> }
>>
>> but that seems to default to 0 , even when the Aggregation is putting in
>> values.
>>
>> Could somebody give pointers as to how to solve this
>>
>> Thanks a ton.
>>
>>
>>
>>
>


RE: flink and akka HTTP

2018-06-18 Thread Gäckler Martin
Good morning,

Thanks for the hint. AKKA streams and http are added to our user jar. If not, 
our HTTP client won't work. According to the flink documentation here: 
https://flink.apache.org/news/2017/12/12/release-1.4.0.html#changes-to-dynamic-class-loading-of-user-code
 child first class loading is enabled by default and we did not change any 
settings here.

Cheers
Martin

--
Martin Gäckler
Entwicklung Schwarmdienste

Im Auftrag der
e.solutions GmbH
Despagstr. 4a
85055 Ingolstadt
Germany

Registered Office:
Despagstr. 4a
85055 Ingolstadt
Germany

Phone  +49 8458 3332 145

e.solutions GmbH
Managing Directors Uwe Reder, Dr. Riclef Schmidt-Clausen
Register Court Ingolstadt HRB 5221

From: Till Rohrmann [mailto:trohrm...@apache.org]
Sent: Montag, 18. Juni 2018 09:25
To: Gäckler Martin 
Cc: user 
Subject: Re: flink and akka HTTP

Hi,

I assume that you have an Akka dependency conflict. By adding the Akka 
dependency version to your user jar and enabling child first class loading you 
should be able to control which Akka version is loaded. The only thing you have 
to check is whether Flink works with a newer version of Akka.

Cheers,
Till

On Fri, Jun 15, 2018 at 8:15 PM Gäckler Martin 
mailto:martin.gaeck...@esolutions.de>> wrote:
Good evening,

According to Flink's documentation I have excluded the Flink runtime library 
from the runtime dependencies of my project:

dependencies {
compileOnly group: 'org.apache.flink',  name: 'flink-core', 
 version: '1.4.2'
compileOnly group: 'org.apache.flink',  name: 'flink-java', 
 version: '1.4.2'
compileOnly group: 'org.apache.flink',  name: 
'flink-streaming-java_2.11',   version: '1.4.2'
implementation  group: 'org.apache.flink',  name: 
'flink-connector-kafka-0.11_2.11', version: '1.4.2'
...
}

Unfortunately I get the following error:

Caused by: java.lang.ClassCastException: interface 
akka.serialization.Serializer is not assignable from class 
akka.remote.serialization.MiscMessageSerializer
 at 
akka.actor.ReflectiveDynamicAccess$$anonfun$getClassFor$1.apply(ReflectiveDynamicAccess.scala:23)
 at 
akka.actor.ReflectiveDynamicAccess$$anonfun$getClassFor$1.apply(ReflectiveDynamicAccess.scala:20)
 at scala.util.Try$.apply(Try.scala:192)
 at 
akka.actor.ReflectiveDynamicAccess.getClassFor(ReflectiveDynamicAccess.scala:20)
 at 
akka.actor.ReflectiveDynamicAccess.createInstanceFor(ReflectiveDynamicAccess.scala:38)
 at akka.serialization.Serialization.serializerOf(Serialization.scala:301)
 at 
akka.serialization.Serialization$$anonfun$6.apply(Serialization.scala:327)
 at 
akka.serialization.Serialization$$anonfun$6.apply(Serialization.scala:327)
 at 
scala.collection.TraversableLike$WithFilter$$anonfun$map$2.apply(TraversableLike.scala:683)
 at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221)
 at 
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428)
 at 
scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:682)
 at akka.serialization.Serialization.(Serialization.scala:327)
 at 
akka.serialization.SerializationExtension$.createExtension(SerializationExtension.scala:15)
 at 
akka.serialization.SerializationExtension$.createExtension(SerializationExtension.scala:12)
 at akka.actor.ActorSystemImpl.registerExtension(ActorSystem.scala:913)
 at 
akka.actor.ActorSystemImpl$$anonfun$loadExtensions$1$1.apply(ActorSystem.scala:946)
 at 
akka.actor.ActorSystemImpl$$anonfun$loadExtensions$1$1.apply(ActorSystem.scala:944)
 at scala.collection.Iterator$class.foreach(Iterator.scala:891)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
 at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
 at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
 at akka.actor.ActorSystemImpl.loadExtensions$1(ActorSystem.scala:944)
 at akka.actor.ActorSystemImpl.loadExtensions(ActorSystem.scala:961)
 at akka.actor.ActorSystemImpl.liftedTree2$1(ActorSystem.scala:833)
 at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:823)
 at akka.actor.ActorSystemImpl._start(ActorSystem.scala:823)
 at akka.actor.ActorSystemImpl.start(ActorSystem.scala:842)
 at akka.actor.ActorSystem$.apply(ActorSystem.scala:246)
 at akka.actor.ActorSystem$.apply(ActorSystem.scala:289)
 at akka.actor.ActorSystem$.apply(ActorSystem.scala:234)
 at akka.actor.ActorSystem$.apply(ActorSystem.scala:225)
 at akka.actor.ActorSystem$.create(ActorSystem.scala:160)
 at akka.actor.ActorSystem.create(ActorSystem.scala)
 at 
de.eso.swarm.rest.client.akka.AkkaRestClient.(AkkaRestClient.java:43)
 ... 12 more

My application needs to initialize the AKKA ActorSystem because it uses an HTTP 
client that I have developed using akka-http and 

Re: Stream Join With Early firings

2018-06-18 Thread Johannes Schulte
Hi Fabian,

thanks for the hints, though I somehow got the feeling that I am on the
wrong track given how much code I would need to write for implementing a
"blueprint" usecase.

Would a join be more simple using the Table API? In the end it's the
classical Order & OrderPosition example, where the output is an
upsert-stream. Would I get the expected behaviour (output elements on every
update on either side of the input stream). I realize that my session
window approach wasn't driven by the requirements but by operational
aspects (state size), so using a concept like idle state retention time
would be a more natural fit.

Thanks,

Johannes

On Mon, Jun 18, 2018 at 9:57 AM Fabian Hueske  wrote:

> Hi Johannes,
>
> EventTimeSessionWindows [1] use the EventTimeTrigger [2] as default
> trigger (see EventTimeSessionWindows.getDefaultTrigger()).
>
> I would take the EventTimeTrigger and extend it with early firing
> functionality.
> However, there are a few things to consider
> * you need to be aware that session window can be merged, i.e., two
> session windows A, B with gap 10: A [20,25), B [37, 45), will be merged
> when a record at 32 is received.
> * windows store all records in a list. For every firing, you need to
> iterate the full list and also track which records you joined already to
> avoid duplicates. Maybe you can migrate records from the window state into
> a custom state defined in a ProcessWindowFunction.
>
> Best, Fabian
>
>
>
>
>
> 2018-06-13 13:43 GMT+02:00 Johannes Schulte :
>
>> Hi,
>>
>> I am joining two streams with a session window and want to emit a joined
>> (early) result for every element arriving on one of the streams.
>>
>> Currently the code looks like this:
>>
>> s1.join(s2)
>> .where(s1.id).equalTo(s2.id)
>> .window(EventTimeSessionWindows.withGap(Time.minutes(15)))
>> // trigger(?)
>> .apply(...custom code..)
>>
>> What I am missing is the right trigger ala "withEarlyFiring" - do I need
>> to implement my on trigger for this and if yes, what kind of functionality
>> must be present to not break the session window semantics?
>>
>> Thanks in advance,
>>
>> Johannes
>>
>>
>


Re: Exception while submitting jobs through Yarn

2018-06-18 Thread Garvit Sharma
Ok, I have attached the log file.

Please check and let me know.

Thanks,

On Mon, Jun 18, 2018 at 2:07 PM Amit Jain  wrote:

> Hi Gravit,
>
> I think Till is interested to know about classpath details present at the
> start of JM and TM logs e.g. following logs provide classpath details used
> by TM in our case.
>
> 2018-06-17 19:01:30,656 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
>  -
> 
> 2018-06-17 19:01:30,658 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
>  -  Starting YARN TaskExecutor runner (Version: 1.5.0,
> Rev:c61b108, Date:24.05.2018 @ 14:54:44 UTC)
> 2018-06-17 19:01:30,659 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
>  -  OS current user: yarn
> 2018-06-17 19:01:31,662 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
>  -  Current Hadoop/Kerberos user: hadoop
> 2018-06-17 19:01:31,663 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
>  -  JVM: OpenJDK 64-Bit Server VM - Oracle Corporation -
> 1.8/25.171-b10
> 2018-06-17 19:01:31,663 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
>  -  Maximum heap size: 6647 MiBytes
> 2018-06-17 19:01:31,663 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
>  -  JAVA_HOME: /usr/lib/jvm/java-openjdk
> 2018-06-17 19:01:31,664 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
>  -  Hadoop version: 2.8.3
> 2018-06-17 19:01:31,664 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
>  -  JVM Options:
> 2018-06-17 19:01:31,665 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
>  - -Xms6936m
> 2018-06-17 19:01:31,665 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
>  - -Xmx6936m
> 2018-06-17 19:01:31,665 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
>  - -XX:MaxDirectMemorySize=4072m
> 2018-06-17 19:01:31,665 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
>  -
> -Dlog.file=/var/log/hadoop-yarn/containers/application_1528342246614_0002/container_1528342246614_0002_01_282649/taskmanager.log
> 2018-06-17 19:01:31,665 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
>  - -Dlogback.configurationFile=file:./logback.xml
> 2018-06-17 19:01:31,665 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
>  - -Dlog4j.configuration=file:./log4j.properties
> 2018-06-17 19:01:31,665 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
>  -  Program Arguments:
> 2018-06-17 19:01:31,665 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
>  - --configDir
> 2018-06-17 19:01:31,665 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
>  - .
> *2018-06-17 19:01:31,666 INFO
>  org.apache.flink.yarn.YarnTaskExecutorRunner  -
>  Classpath:
> lib/flink-dist_2.11-1.5.0.jar:lib/flink-python_2.11-1.5.0.jar:lib/flink-shaded-hadoop2-uber-1.5.0.jar:lib/flink-shaded-include-yarn-0.9.1.jar:lib/guava-18.0.jar:lib/log4j-1.2.17.jar:lib/slf4j-log4j12-1.7.7.jar:log4j.properties:logback.xml:flink.jar:flink-conf.yaml::/etc/hadoop/conf:/usr/lib/hadoop/hadoop-common-2.8.3-amzn-0.jar:/usr/lib/hadoop/hadoop-archive-logs.jar:/usr/lib/hadoop/hadoop-auth.jar:/usr/lib/hadoop/hadoop-archives-2.8.3-amzn-0.jar:/usr/lib/hadoop/hadoop-archive-logs-2.8.3-amzn-0.jar:/usr/lib/hadoop/hadoop-azure-datalake-2.8.3-amzn-0.jar.*
>
> --
> Thanks,
> Amit
>
> On Mon, Jun 18, 2018 at 2:00 PM, Garvit Sharma 
> wrote:
>
>> Hi,
>>
>> Please refer to my previous mail for complete logs.
>>
>> Thanks,
>>
>> On Mon, Jun 18, 2018 at 1:17 PM Till Rohrmann 
>> wrote:
>>
>>> Could you also please share the complete log file with us.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Sat, Jun 16, 2018 at 5:22 PM Ted Yu  wrote:
>>>
 The error for core-default.xml is interesting.

 Flink doesn't have this file. Probably it came with Yarn. Please check
 the hadoop version Flink was built with versus the hadoop version in your
 cluster.

 Thanks

  Original message 
 From: Garvit Sharma 
 Date: 6/16/18 7:23 AM (GMT-08:00)
 To: trohrm...@apache.org
 Cc: Chesnay Schepler , user@flink.apache.org
 Subject: Re: Exception while submitting jobs through Yarn

 I am not able to figure out, got stuck badly in this since last 1 week.
 Any little help would be appreciated.


 2018-06-16 19:25:10,523 DEBUG
 org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator  -
 Parallelism set: 1 for 8

 2018-06-16 19:25:10,578 DEBUG
 org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator  -
 Parallelism set: 1 for 1

 2018-06-16 19:25:10,588 DEBUG
 org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator  -
 CONNECTED: KeyGroupStreamPartitioner - 1 -> 8

 2018-06-16 

Re: Exception while submitting jobs through Yarn

2018-06-18 Thread Amit Jain
Hi Gravit,

I think Till is interested to know about classpath details present at the
start of JM and TM logs e.g. following logs provide classpath details used
by TM in our case.

2018-06-17 19:01:30,656 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
 -

2018-06-17 19:01:30,658 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
 -  Starting YARN TaskExecutor runner (Version: 1.5.0,
Rev:c61b108, Date:24.05.2018 @ 14:54:44 UTC)
2018-06-17 19:01:30,659 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
 -  OS current user: yarn
2018-06-17 19:01:31,662 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
 -  Current Hadoop/Kerberos user: hadoop
2018-06-17 19:01:31,663 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
 -  JVM: OpenJDK 64-Bit Server VM - Oracle Corporation -
1.8/25.171-b10
2018-06-17 19:01:31,663 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
 -  Maximum heap size: 6647 MiBytes
2018-06-17 19:01:31,663 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
 -  JAVA_HOME: /usr/lib/jvm/java-openjdk
2018-06-17 19:01:31,664 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
 -  Hadoop version: 2.8.3
2018-06-17 19:01:31,664 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
 -  JVM Options:
2018-06-17 19:01:31,665 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
 - -Xms6936m
2018-06-17 19:01:31,665 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
 - -Xmx6936m
2018-06-17 19:01:31,665 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
 - -XX:MaxDirectMemorySize=4072m
2018-06-17 19:01:31,665 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
 -
-Dlog.file=/var/log/hadoop-yarn/containers/application_1528342246614_0002/container_1528342246614_0002_01_282649/taskmanager.log
2018-06-17 19:01:31,665 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
 - -Dlogback.configurationFile=file:./logback.xml
2018-06-17 19:01:31,665 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
 - -Dlog4j.configuration=file:./log4j.properties
2018-06-17 19:01:31,665 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
 -  Program Arguments:
2018-06-17 19:01:31,665 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
 - --configDir
2018-06-17 19:01:31,665 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
 - .
*2018-06-17 19:01:31,666 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
 -  Classpath:
lib/flink-dist_2.11-1.5.0.jar:lib/flink-python_2.11-1.5.0.jar:lib/flink-shaded-hadoop2-uber-1.5.0.jar:lib/flink-shaded-include-yarn-0.9.1.jar:lib/guava-18.0.jar:lib/log4j-1.2.17.jar:lib/slf4j-log4j12-1.7.7.jar:log4j.properties:logback.xml:flink.jar:flink-conf.yaml::/etc/hadoop/conf:/usr/lib/hadoop/hadoop-common-2.8.3-amzn-0.jar:/usr/lib/hadoop/hadoop-archive-logs.jar:/usr/lib/hadoop/hadoop-auth.jar:/usr/lib/hadoop/hadoop-archives-2.8.3-amzn-0.jar:/usr/lib/hadoop/hadoop-archive-logs-2.8.3-amzn-0.jar:/usr/lib/hadoop/hadoop-azure-datalake-2.8.3-amzn-0.jar.*

--
Thanks,
Amit

On Mon, Jun 18, 2018 at 2:00 PM, Garvit Sharma  wrote:

> Hi,
>
> Please refer to my previous mail for complete logs.
>
> Thanks,
>
> On Mon, Jun 18, 2018 at 1:17 PM Till Rohrmann 
> wrote:
>
>> Could you also please share the complete log file with us.
>>
>> Cheers,
>> Till
>>
>> On Sat, Jun 16, 2018 at 5:22 PM Ted Yu  wrote:
>>
>>> The error for core-default.xml is interesting.
>>>
>>> Flink doesn't have this file. Probably it came with Yarn. Please check
>>> the hadoop version Flink was built with versus the hadoop version in your
>>> cluster.
>>>
>>> Thanks
>>>
>>>  Original message 
>>> From: Garvit Sharma 
>>> Date: 6/16/18 7:23 AM (GMT-08:00)
>>> To: trohrm...@apache.org
>>> Cc: Chesnay Schepler , user@flink.apache.org
>>> Subject: Re: Exception while submitting jobs through Yarn
>>>
>>> I am not able to figure out, got stuck badly in this since last 1 week.
>>> Any little help would be appreciated.
>>>
>>>
>>> 2018-06-16 19:25:10,523 DEBUG org.apache.flink.streaming.api.graph.
>>> StreamingJobGraphGenerator  - Parallelism set: 1 for 8
>>>
>>> 2018-06-16 19:25:10,578 DEBUG org.apache.flink.streaming.api.graph.
>>> StreamingJobGraphGenerator  - Parallelism set: 1 for 1
>>>
>>> 2018-06-16 19:25:10,588 DEBUG org.apache.flink.streaming.api.graph.
>>> StreamingJobGraphGenerator  - CONNECTED: KeyGroupStreamPartitioner - 1
>>> -> 8
>>>
>>> 2018-06-16 19:25:10,591 DEBUG org.apache.flink.streaming.api.graph.
>>> StreamingJobGraphGenerator  - Parallelism set: 1 for 5
>>>
>>> 2018-06-16 19:25:10,597 DEBUG org.apache.flink.streaming.api.graph.
>>> StreamingJobGraphGenerator  - CONNECTED: KeyGroupStreamPartitioner - 5
>>> -> 8
>>>
>>> 

Re: Exception while submitting jobs through Yarn

2018-06-18 Thread Garvit Sharma
Hi,

Please refer to my previous mail for complete logs.

Thanks,

On Mon, Jun 18, 2018 at 1:17 PM Till Rohrmann  wrote:

> Could you also please share the complete log file with us.
>
> Cheers,
> Till
>
> On Sat, Jun 16, 2018 at 5:22 PM Ted Yu  wrote:
>
>> The error for core-default.xml is interesting.
>>
>> Flink doesn't have this file. Probably it came with Yarn. Please check
>> the hadoop version Flink was built with versus the hadoop version in your
>> cluster.
>>
>> Thanks
>>
>>  Original message 
>> From: Garvit Sharma 
>> Date: 6/16/18 7:23 AM (GMT-08:00)
>> To: trohrm...@apache.org
>> Cc: Chesnay Schepler , user@flink.apache.org
>> Subject: Re: Exception while submitting jobs through Yarn
>>
>> I am not able to figure out, got stuck badly in this since last 1 week.
>> Any little help would be appreciated.
>>
>>
>> 2018-06-16 19:25:10,523 DEBUG
>> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator  -
>> Parallelism set: 1 for 8
>>
>> 2018-06-16 19:25:10,578 DEBUG
>> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator  -
>> Parallelism set: 1 for 1
>>
>> 2018-06-16 19:25:10,588 DEBUG
>> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator  -
>> CONNECTED: KeyGroupStreamPartitioner - 1 -> 8
>>
>> 2018-06-16 19:25:10,591 DEBUG
>> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator  -
>> Parallelism set: 1 for 5
>>
>> 2018-06-16 19:25:10,597 DEBUG
>> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator  -
>> CONNECTED: KeyGroupStreamPartitioner - 5 -> 8
>>
>> 2018-06-16 19:25:10,618 FATAL org.apache.hadoop.conf.Configuration
>> - error parsing conf core-default.xml
>>
>> javax.xml.parsers.ParserConfigurationException: Feature '
>> http://apache.org/xml/features/xinclude' is not recognized.
>>
>> at
>> org.apache.xerces.jaxp.DocumentBuilderFactoryImpl.newDocumentBuilder(Unknown
>> Source)
>>
>> at
>> org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:2482)
>>
>> at
>> org.apache.hadoop.conf.Configuration.loadResources(Configuration.java:2444)
>>
>> at org.apache.hadoop.conf.Configuration.getProps(Configuration.java:2361)
>>
>> at org.apache.hadoop.conf.Configuration.get(Configuration.java:1188)
>>
>> at
>> org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider.getRecordFactory(RecordFactoryProvider.java:49)
>>
>> at org.apache.hadoop.yarn.util.Records.(Records.java:32)
>>
>> at
>> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.getQueueInfoRequest(YarnClientImpl.java:495)
>>
>> at
>> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.getAllQueues(YarnClientImpl.java:525)
>>
>> at
>> org.apache.flink.yarn.AbstractYarnClusterDescriptor.checkYarnQueues(AbstractYarnClusterDescriptor.java:658)
>>
>> at
>> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:486)
>>
>> at
>> org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:75)
>>
>> at
>> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:235)
>>
>> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210)
>>
>> at
>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1020)
>>
>> at
>> org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1096)
>>
>> at java.security.AccessController.doPrivileged(Native Method)
>>
>> at javax.security.auth.Subject.doAs(Subject.java:422)
>>
>> at
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
>>
>> at
>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>>
>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1096)
>>
>> 2018-06-16 19:25:10,620 WARN  
>> org.apache.flink.yarn.AbstractYarnClusterDescriptor
>>   - Error while getting queue information from YARN: null
>>
>> 2018-06-16 19:25:10,621 DEBUG
>> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Error
>> details
>>
>> java.lang.ExceptionInInitializerError
>>
>> at
>> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.getQueueInfoRequest(YarnClientImpl.java:495)
>>
>> at
>> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.getAllQueues(YarnClientImpl.java:525)
>>
>> at
>> org.apache.flink.yarn.AbstractYarnClusterDescriptor.checkYarnQueues(AbstractYarnClusterDescriptor.java:658)
>>
>> at
>> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:486)
>>
>> at
>> org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:75)
>>
>> at
>> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:235)
>>
>> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210)
>>
>> at
>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1020)
>>
>> at
>> org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1096)
>>
>

-- 

Garvit Sharma

Re: Flink application does not scale as expected, please help!

2018-06-18 Thread Fabian Hueske
Not sure if TM local assignment is explicitly designed in 1.5.0, but it
might be an artifact of how slots are registered in the resource manager.
Till (in CC) should know how that works.

Anyway, tasks that run in the same TM exchange data via in-memory channels
which is of course much faster than going over the network.
So yes, a performance drop when tasks are scheduled to different TMs is not
unexpected IMO.
You can check that by starting multiple TMs with a single slot each and
running you job on that setup.

Best, Fabian



2018-06-18 9:57 GMT+02:00 Siew Wai Yow :

> Hi Fabian,
>
>
> We are using Flink 1.5.0. Any different in scheduler in Flink 1.5.0?
>
> *"Hence, applications might scale better until tasks are scheduled to
> different machines."*
>
> This seems the case. We have 32 vCPU 16 slots in one TM machine. So the
> scaling work perfectly 1-2-4-8-16 because all happens in same TM. When
> scale to 32 the performance drop, not even in par with case of parallelism
> 16. Is this something expected? Thank you.
>
> Regards,
> Yow
>
> --
> *From:* Fabian Hueske 
> *Sent:* Monday, June 18, 2018 3:47 PM
> *To:* Siew Wai Yow
> *Cc:* Jörn Franke; user@flink.apache.org
>
> *Subject:* Re: Flink application does not scale as expected, please help!
>
> Hi,
>
> Which Flink version are you using?
> Did you try to analyze the bottleneck of the application, i.e., is it CPU,
> disk IO, or network bound?
>
> Regarding the task scheduling. AFAIK, before version 1.5.0, Flink tried to
> schedule tasks on the same machine to reduce the amount of network transfer.
> Hence, applications might scale better until tasks are scheduled to
> different machines.
>
> Fabian
>
> 2018-06-16 12:20 GMT+02:00 Siew Wai Yow :
>
> Hi Jorn, Please find the source @https://github.com/swyow/flink_sample_git
>
> Thank you!
>
> --
> *From:* Jörn Franke 
> *Sent:* Saturday, June 16, 2018 6:03 PM
>
> *To:* Siew Wai Yow
> *Cc:* user@flink.apache.org
> *Subject:* Re: Flink application does not scale as expected, please help!
>
> Can you share the app source on gitlab, github or bitbucket etc?
>
> On 16. Jun 2018, at 11:46, Siew Wai Yow  wrote:
>
> Hi, There is an interesting finding, the reason of low parallelism work
> much better is because all task being run in same TM, once we scale more,
> the task is distributed to different TM and the performance worse than the
> low parallelism case. Is this something expected? The more I scale the less
> I get?
>
> --
> *From:* Siew Wai Yow 
> *Sent:* Saturday, June 16, 2018 5:09 PM
> *To:* Jörn Franke
> *Cc:* user@flink.apache.org
> *Subject:* Re: Flink application does not scale as expected, please help!
>
>
> Hi Jorn, the input data is 1kb per record, in production it will have 10
> billions of record per day and it will be increased so scalability is quite
> important to us to handle more data. Unfortunately this is not work as
> expected even with only 10 millions of testing data. The test application
> is just a simple jackson map + an empty process. CPU and memory is not an
> issue as we have 32 vCPU + 100 GB RAM per TM. Network should be fine as
> well as total TX+RX peak is around 800Mbps while we have 1000Mbps. Do you
> mind to share your thought? Or mind to test the attach application in your
> lab?
>
>
> To run the program, sample parameters,
>
> "aggrinterval=600 loop=750 statsd=1 psrc=4 pJ2R=32 pAggr=72 URL=
> do36.mycompany.com:8127"
>
>- aggrinterval: time in ms for timer to trigger
>- loop: how many row of data to feed
>- statsd: to send result to statsd
>- psrc: source parallelism
>- pJ2R: parallelism of map operator(JsonRecTranslator)
>- pAggr: parallelism of process+timer operator(AggregationDuration)
>
>
> Thank you!
>
> Yow
>
> --
> *From:* Jörn Franke 
> *Sent:* Saturday, June 16, 2018 4:46 PM
> *To:* Siew Wai Yow
> *Cc:* user@flink.apache.org
> *Subject:* Re: Flink application does not scale as expected, please help!
>
> How large is the input data? If the input data is very small then it does
> not make sense to scale it even more. The larger the data is the more
> parallelism you will have. You can modify this behavior of course by
> changing the partition on the Dataset.
>
> On 16. Jun 2018, at 10:41, Siew Wai Yow  wrote:
>
> Hi,
>
>
> We found that our Flink application with simple logic, which using process
> function is not scale-able when scale from 8 parallelism onward even though
> with sufficient resources. Below it the result which is capped at ~250k
> TPS. No matter how we tune the parallelism of the operators it just not
> scale, same to increase source parallelism.
>
>
> Please refer to "scaleNotWork.png",
> 1. fixed source parallelism 4, other operators parallelism 8
> 2. fixed source parallelism 4, other operators parallelism 16
> 3. fixed source parallelism 4, other operators parallelism 32
> 4. fixed source parallelism 

Re: Flink application does not scale as expected, please help!

2018-06-18 Thread Siew Wai Yow
*additional info in bold.


From: Siew Wai Yow 
Sent: Monday, June 18, 2018 3:57 PM
To: Fabian Hueske
Cc: Jörn Franke; user@flink.apache.org
Subject: Re: Flink application does not scale as expected, please help!


Hi Fabian,


We are using Flink 1.5.0. Any different in scheduler in Flink 1.5.0?

"Hence, applications might scale better until tasks are scheduled to different 
machines."

This seems the case. We have 32 vCPU 16 slots in one TM machine, total 5 TM. So 
the scaling work perfectly 1-2-4-8-16 because all happens in same TM. When 
scale to 32(which it happen amongst different machine) the performance drop, 
not even in par with case of parallelism 16. Is this something expected? Thank 
you.

Regards,
Yow


From: Fabian Hueske 
Sent: Monday, June 18, 2018 3:47 PM
To: Siew Wai Yow
Cc: Jörn Franke; user@flink.apache.org
Subject: Re: Flink application does not scale as expected, please help!

Hi,

Which Flink version are you using?
Did you try to analyze the bottleneck of the application, i.e., is it CPU, disk 
IO, or network bound?

Regarding the task scheduling. AFAIK, before version 1.5.0, Flink tried to 
schedule tasks on the same machine to reduce the amount of network transfer.
Hence, applications might scale better until tasks are scheduled to different 
machines.

Fabian

2018-06-16 12:20 GMT+02:00 Siew Wai Yow 
mailto:wai_...@hotmail.com>>:

Hi Jorn, Please find the source @https://github.com/swyow/flink_sample_git

Thank you!


From: Jörn Franke mailto:jornfra...@gmail.com>>
Sent: Saturday, June 16, 2018 6:03 PM

To: Siew Wai Yow
Cc: user@flink.apache.org
Subject: Re: Flink application does not scale as expected, please help!

Can you share the app source on gitlab, github or bitbucket etc?

On 16. Jun 2018, at 11:46, Siew Wai Yow 
mailto:wai_...@hotmail.com>> wrote:


Hi, There is an interesting finding, the reason of low parallelism work much 
better is because all task being run in same TM, once we scale more, the task 
is distributed to different TM and the performance worse than the low 
parallelism case. Is this something expected? The more I scale the less I get?


From: Siew Wai Yow mailto:wai_...@hotmail.com>>
Sent: Saturday, June 16, 2018 5:09 PM
To: Jörn Franke
Cc: user@flink.apache.org
Subject: Re: Flink application does not scale as expected, please help!


Hi Jorn, the input data is 1kb per record, in production it will have 10 
billions of record per day and it will be increased so scalability is quite 
important to us to handle more data. Unfortunately this is not work as expected 
even with only 10 millions of testing data. The test application is just a 
simple jackson map + an empty process. CPU and memory is not an issue as we 
have 32 vCPU + 100 GB RAM per TM. Network should be fine as well as total TX+RX 
peak is around 800Mbps while we have 1000Mbps. Do you mind to share your 
thought? Or mind to test the attach application in your lab?


To run the program, sample parameters,

"aggrinterval=600 loop=750 statsd=1 psrc=4 pJ2R=32 pAggr=72 
URL=do36.mycompany.com:8127"

  *   aggrinterval: time in ms for timer to trigger
  *   loop: how many row of data to feed
  *   statsd: to send result to statsd
  *   psrc: source parallelism
  *   pJ2R: parallelism of map operator(JsonRecTranslator)
  *   pAggr: parallelism of process+timer operator(AggregationDuration)


Thank you!

Yow


From: Jörn Franke mailto:jornfra...@gmail.com>>
Sent: Saturday, June 16, 2018 4:46 PM
To: Siew Wai Yow
Cc: user@flink.apache.org
Subject: Re: Flink application does not scale as expected, please help!

How large is the input data? If the input data is very small then it does not 
make sense to scale it even more. The larger the data is the more parallelism 
you will have. You can modify this behavior of course by changing the partition 
on the Dataset.

On 16. Jun 2018, at 10:41, Siew Wai Yow 
mailto:wai_...@hotmail.com>> wrote:


Hi,


We found that our Flink application with simple logic, which using process 
function is not scale-able when scale from 8 parallelism onward even though 
with sufficient resources. Below it the result which is capped at ~250k TPS. No 
matter how we tune the parallelism of the operators it just not scale, same to 
increase source parallelism.


Please refer to "scaleNotWork.png",
1. fixed source parallelism 4, other operators parallelism 8
2. fixed source parallelism 4, other operators parallelism 16
3. fixed source parallelism 4, other operators parallelism 32
4. fixed source parallelism 6, other operators parallelism 8
5. fixed source parallelism 6, other operators parallelism 16
6. fixed source parallelism 6, other operators parallelism 32
7. fixed source parallelism 6, other 

Re: Stream Join With Early firings

2018-06-18 Thread Fabian Hueske
Hi Johannes,

EventTimeSessionWindows [1] use the EventTimeTrigger [2] as default trigger
(see EventTimeSessionWindows.getDefaultTrigger()).

I would take the EventTimeTrigger and extend it with early firing
functionality.
However, there are a few things to consider
* you need to be aware that session window can be merged, i.e., two session
windows A, B with gap 10: A [20,25), B [37, 45), will be merged when a
record at 32 is received.
* windows store all records in a list. For every firing, you need to
iterate the full list and also track which records you joined already to
avoid duplicates. Maybe you can migrate records from the window state into
a custom state defined in a ProcessWindowFunction.

Best, Fabian





2018-06-13 13:43 GMT+02:00 Johannes Schulte :

> Hi,
>
> I am joining two streams with a session window and want to emit a joined
> (early) result for every element arriving on one of the streams.
>
> Currently the code looks like this:
>
> s1.join(s2)
> .where(s1.id).equalTo(s2.id)
> .window(EventTimeSessionWindows.withGap(Time.minutes(15)))
> // trigger(?)
> .apply(...custom code..)
>
> What I am missing is the right trigger ala "withEarlyFiring" - do I need
> to implement my on trigger for this and if yes, what kind of functionality
> must be present to not break the session window semantics?
>
> Thanks in advance,
>
> Johannes
>
>


Re: Flink application does not scale as expected, please help!

2018-06-18 Thread Siew Wai Yow
Hi Fabian,


We are using Flink 1.5.0. Any different in scheduler in Flink 1.5.0?

"Hence, applications might scale better until tasks are scheduled to different 
machines."

This seems the case. We have 32 vCPU 16 slots in one TM machine. So the scaling 
work perfectly 1-2-4-8-16 because all happens in same TM. When scale to 32 the 
performance drop, not even in par with case of parallelism 16. Is this 
something expected? Thank you.

Regards,
Yow


From: Fabian Hueske 
Sent: Monday, June 18, 2018 3:47 PM
To: Siew Wai Yow
Cc: Jörn Franke; user@flink.apache.org
Subject: Re: Flink application does not scale as expected, please help!

Hi,

Which Flink version are you using?
Did you try to analyze the bottleneck of the application, i.e., is it CPU, disk 
IO, or network bound?

Regarding the task scheduling. AFAIK, before version 1.5.0, Flink tried to 
schedule tasks on the same machine to reduce the amount of network transfer.
Hence, applications might scale better until tasks are scheduled to different 
machines.

Fabian

2018-06-16 12:20 GMT+02:00 Siew Wai Yow 
mailto:wai_...@hotmail.com>>:

Hi Jorn, Please find the source @https://github.com/swyow/flink_sample_git

Thank you!


From: Jörn Franke mailto:jornfra...@gmail.com>>
Sent: Saturday, June 16, 2018 6:03 PM

To: Siew Wai Yow
Cc: user@flink.apache.org
Subject: Re: Flink application does not scale as expected, please help!

Can you share the app source on gitlab, github or bitbucket etc?

On 16. Jun 2018, at 11:46, Siew Wai Yow 
mailto:wai_...@hotmail.com>> wrote:


Hi, There is an interesting finding, the reason of low parallelism work much 
better is because all task being run in same TM, once we scale more, the task 
is distributed to different TM and the performance worse than the low 
parallelism case. Is this something expected? The more I scale the less I get?


From: Siew Wai Yow mailto:wai_...@hotmail.com>>
Sent: Saturday, June 16, 2018 5:09 PM
To: Jörn Franke
Cc: user@flink.apache.org
Subject: Re: Flink application does not scale as expected, please help!


Hi Jorn, the input data is 1kb per record, in production it will have 10 
billions of record per day and it will be increased so scalability is quite 
important to us to handle more data. Unfortunately this is not work as expected 
even with only 10 millions of testing data. The test application is just a 
simple jackson map + an empty process. CPU and memory is not an issue as we 
have 32 vCPU + 100 GB RAM per TM. Network should be fine as well as total TX+RX 
peak is around 800Mbps while we have 1000Mbps. Do you mind to share your 
thought? Or mind to test the attach application in your lab?


To run the program, sample parameters,

"aggrinterval=600 loop=750 statsd=1 psrc=4 pJ2R=32 pAggr=72 
URL=do36.mycompany.com:8127"

  *   aggrinterval: time in ms for timer to trigger
  *   loop: how many row of data to feed
  *   statsd: to send result to statsd
  *   psrc: source parallelism
  *   pJ2R: parallelism of map operator(JsonRecTranslator)
  *   pAggr: parallelism of process+timer operator(AggregationDuration)


Thank you!

Yow


From: Jörn Franke mailto:jornfra...@gmail.com>>
Sent: Saturday, June 16, 2018 4:46 PM
To: Siew Wai Yow
Cc: user@flink.apache.org
Subject: Re: Flink application does not scale as expected, please help!

How large is the input data? If the input data is very small then it does not 
make sense to scale it even more. The larger the data is the more parallelism 
you will have. You can modify this behavior of course by changing the partition 
on the Dataset.

On 16. Jun 2018, at 10:41, Siew Wai Yow 
mailto:wai_...@hotmail.com>> wrote:


Hi,


We found that our Flink application with simple logic, which using process 
function is not scale-able when scale from 8 parallelism onward even though 
with sufficient resources. Below it the result which is capped at ~250k TPS. No 
matter how we tune the parallelism of the operators it just not scale, same to 
increase source parallelism.


Please refer to "scaleNotWork.png",
1. fixed source parallelism 4, other operators parallelism 8
2. fixed source parallelism 4, other operators parallelism 16
3. fixed source parallelism 4, other operators parallelism 32
4. fixed source parallelism 6, other operators parallelism 8
5. fixed source parallelism 6, other operators parallelism 16
6. fixed source parallelism 6, other operators parallelism 32
7. fixed source parallelism 6, other operators parallelism 64 performance worse 
than parallelism 32.


Sample source code attached(flink_app_parser_git.zip). It is a simple program, 
parsing json record into object, and pass it to a empty logic Flink's process 
function. Rocksdb is in used, and the source is generated by the program 

Re: [BucketingSink] incorrect indexing of part files, when part suffix is specified

2018-06-18 Thread Rinat
I’ve created a JIRA issue https://issues.apache.org/jira/browse/FLINK-9603 
 and added a proposal with PR.

Thx

> On 16 Jun 2018, at 17:21, Rinat  wrote:
> 
> Hi mates, since 1.5 release, BucketingSink has ability to configure suffix of 
> the part file. It’s very useful, when it’s necessary to set specific 
> extension of the file.
> 
> During the usage, I’ve found the issue - when new part file is created, it 
> has the same part index, as index of just closed file. 
> So, when Flink tries to move it into final state, we have a 
> FileAlreadyExistsException.
> 
> This problem is related with the following code:
> Here we are trying to find the max index of part file, that doesn’t exist in 
> bucket directory, the problem is, that the partSuffix is not involved into 
> path assembly. This means, that path always doesn’t exist
> and partCounter wouldn’t be ever incremented.
> Path partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + 
> bucketState.partCounter);
> while (fs.exists(partPath) ||
>   fs.exists(getPendingPathFor(partPath)) ||
>   fs.exists(getInProgressPathFor(partPath))) {
>bucketState.partCounter++;
>partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + 
> bucketState.partCounter);
> }
> 
> bucketState.creationTime = processingTimeService.getCurrentProcessingTime();
> 
> Before creating of writer, we appending the partSuffix here, but it should be 
> already appended, before index checks
> if (partSuffix != null) {
>partPath = partPath.suffix(partSuffix);
> }
> I’ll create an issue and try to submit a fix
> 
> Sincerely yours,
> Rinat Sharipov
> Software Engineer at 1DMP CORE Team
> 
> email: r.shari...@cleverdata.ru 
> mobile: +7 (925) 416-37-26
> 
> CleverDATA
> make your data clever
> 

Sincerely yours,
Rinat Sharipov
Software Engineer at 1DMP CORE Team

email: r.shari...@cleverdata.ru 
mobile: +7 (925) 416-37-26

CleverDATA
make your data clever



Re: Flink application does not scale as expected, please help!

2018-06-18 Thread Fabian Hueske
Hi,

Which Flink version are you using?
Did you try to analyze the bottleneck of the application, i.e., is it CPU,
disk IO, or network bound?

Regarding the task scheduling. AFAIK, before version 1.5.0, Flink tried to
schedule tasks on the same machine to reduce the amount of network transfer.
Hence, applications might scale better until tasks are scheduled to
different machines.

Fabian

2018-06-16 12:20 GMT+02:00 Siew Wai Yow :

> Hi Jorn, Please find the source @https://github.com/swyow/flink_sample_git
>
> Thank you!
>
> --
> *From:* Jörn Franke 
> *Sent:* Saturday, June 16, 2018 6:03 PM
>
> *To:* Siew Wai Yow
> *Cc:* user@flink.apache.org
> *Subject:* Re: Flink application does not scale as expected, please help!
>
> Can you share the app source on gitlab, github or bitbucket etc?
>
> On 16. Jun 2018, at 11:46, Siew Wai Yow  wrote:
>
> Hi, There is an interesting finding, the reason of low parallelism work
> much better is because all task being run in same TM, once we scale more,
> the task is distributed to different TM and the performance worse than the
> low parallelism case. Is this something expected? The more I scale the less
> I get?
>
> --
> *From:* Siew Wai Yow 
> *Sent:* Saturday, June 16, 2018 5:09 PM
> *To:* Jörn Franke
> *Cc:* user@flink.apache.org
> *Subject:* Re: Flink application does not scale as expected, please help!
>
>
> Hi Jorn, the input data is 1kb per record, in production it will have 10
> billions of record per day and it will be increased so scalability is quite
> important to us to handle more data. Unfortunately this is not work as
> expected even with only 10 millions of testing data. The test application
> is just a simple jackson map + an empty process. CPU and memory is not an
> issue as we have 32 vCPU + 100 GB RAM per TM. Network should be fine as
> well as total TX+RX peak is around 800Mbps while we have 1000Mbps. Do you
> mind to share your thought? Or mind to test the attach application in your
> lab?
>
>
> To run the program, sample parameters,
>
> "aggrinterval=600 loop=750 statsd=1 psrc=4 pJ2R=32 pAggr=72 URL=
> do36.mycompany.com:8127"
>
>- aggrinterval: time in ms for timer to trigger
>- loop: how many row of data to feed
>- statsd: to send result to statsd
>- psrc: source parallelism
>- pJ2R: parallelism of map operator(JsonRecTranslator)
>- pAggr: parallelism of process+timer operator(AggregationDuration)
>
>
> Thank you!
>
> Yow
>
> --
> *From:* Jörn Franke 
> *Sent:* Saturday, June 16, 2018 4:46 PM
> *To:* Siew Wai Yow
> *Cc:* user@flink.apache.org
> *Subject:* Re: Flink application does not scale as expected, please help!
>
> How large is the input data? If the input data is very small then it does
> not make sense to scale it even more. The larger the data is the more
> parallelism you will have. You can modify this behavior of course by
> changing the partition on the Dataset.
>
> On 16. Jun 2018, at 10:41, Siew Wai Yow  wrote:
>
> Hi,
>
>
> We found that our Flink application with simple logic, which using process
> function is not scale-able when scale from 8 parallelism onward even though
> with sufficient resources. Below it the result which is capped at ~250k
> TPS. No matter how we tune the parallelism of the operators it just not
> scale, same to increase source parallelism.
>
>
> Please refer to "scaleNotWork.png",
> 1. fixed source parallelism 4, other operators parallelism 8
> 2. fixed source parallelism 4, other operators parallelism 16
> 3. fixed source parallelism 4, other operators parallelism 32
> 4. fixed source parallelism 6, other operators parallelism 8
> 5. fixed source parallelism 6, other operators parallelism 16
> 6. fixed source parallelism 6, other operators parallelism 32
> 7. fixed source parallelism 6, other operators parallelism 64 performance
> worse than parallelism 32.
>
>
> Sample source code attached(flink_app_parser_git.zip). It is a simple
> program, parsing json record into object, and pass it to a empty logic
> Flink's process function. Rocksdb is in used, and the source is generated
> by the program itself. This could be reproduce easily.
>
>
> We choose Flink because of it scalability, but this is not the case now,
> appreciated if anyone could help as this is impacting our projects! thank
> you.
>
>
> To run the program, sample parameters,
>
> "aggrinterval=600 loop=750 statsd=1 psrc=4 pJ2R=32 pAggr=72 URL=
> do36.mycompany.com:8127"
>
>- aggrinterval: time in ms for timer to trigger
>- loop: how many row of data to feed
>- statsd: to send result to statsd
>- psrc: source parallelism
>- pJ2R: parallelism of map operator(JsonRecTranslator)
>- pAggr: parallelism of process+timer operator(AggregationDuration)
>
> We are running in VMWare, 5 Task Managers and each has 32 slots.
>
>
> Architecture: x86_64
> CPU op-mode(s): 32-bit, 64-bit
> Byte Order: Little Endian
> 

Re: Exception while submitting jobs through Yarn

2018-06-18 Thread Till Rohrmann
Could you also please share the complete log file with us.

Cheers,
Till

On Sat, Jun 16, 2018 at 5:22 PM Ted Yu  wrote:

> The error for core-default.xml is interesting.
>
> Flink doesn't have this file. Probably it came with Yarn. Please check the
> hadoop version Flink was built with versus the hadoop version in your
> cluster.
>
> Thanks
>
>  Original message 
> From: Garvit Sharma 
> Date: 6/16/18 7:23 AM (GMT-08:00)
> To: trohrm...@apache.org
> Cc: Chesnay Schepler , user@flink.apache.org
> Subject: Re: Exception while submitting jobs through Yarn
>
> I am not able to figure out, got stuck badly in this since last 1 week.
> Any little help would be appreciated.
>
>
> 2018-06-16 19:25:10,523 DEBUG
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator  -
> Parallelism set: 1 for 8
>
> 2018-06-16 19:25:10,578 DEBUG
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator  -
> Parallelism set: 1 for 1
>
> 2018-06-16 19:25:10,588 DEBUG
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator  -
> CONNECTED: KeyGroupStreamPartitioner - 1 -> 8
>
> 2018-06-16 19:25:10,591 DEBUG
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator  -
> Parallelism set: 1 for 5
>
> 2018-06-16 19:25:10,597 DEBUG
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator  -
> CONNECTED: KeyGroupStreamPartitioner - 5 -> 8
>
> 2018-06-16 19:25:10,618 FATAL org.apache.hadoop.conf.Configuration
>   - error parsing conf core-default.xml
>
> javax.xml.parsers.ParserConfigurationException: Feature '
> http://apache.org/xml/features/xinclude' is not recognized.
>
> at
> org.apache.xerces.jaxp.DocumentBuilderFactoryImpl.newDocumentBuilder(Unknown
> Source)
>
> at
> org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:2482)
>
> at
> org.apache.hadoop.conf.Configuration.loadResources(Configuration.java:2444)
>
> at org.apache.hadoop.conf.Configuration.getProps(Configuration.java:2361)
>
> at org.apache.hadoop.conf.Configuration.get(Configuration.java:1188)
>
> at
> org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider.getRecordFactory(RecordFactoryProvider.java:49)
>
> at org.apache.hadoop.yarn.util.Records.(Records.java:32)
>
> at
> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.getQueueInfoRequest(YarnClientImpl.java:495)
>
> at
> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.getAllQueues(YarnClientImpl.java:525)
>
> at
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.checkYarnQueues(AbstractYarnClusterDescriptor.java:658)
>
> at
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:486)
>
> at
> org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:75)
>
> at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:235)
>
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210)
>
> at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1020)
>
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1096)
>
> at java.security.AccessController.doPrivileged(Native Method)
>
> at javax.security.auth.Subject.doAs(Subject.java:422)
>
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
>
> at
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1096)
>
> 2018-06-16 19:25:10,620 WARN  
> org.apache.flink.yarn.AbstractYarnClusterDescriptor
>   - Error while getting queue information from YARN: null
>
> 2018-06-16 19:25:10,621 DEBUG
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Error
> details
>
> java.lang.ExceptionInInitializerError
>
> at
> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.getQueueInfoRequest(YarnClientImpl.java:495)
>
> at
> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.getAllQueues(YarnClientImpl.java:525)
>
> at
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.checkYarnQueues(AbstractYarnClusterDescriptor.java:658)
>
> at
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:486)
>
> at
> org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:75)
>
> at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:235)
>
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210)
>
> at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1020)
>
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1096)
>


Re: Memory Leak in ProcessingTimeSessionWindow

2018-06-18 Thread Stefan Richter
Hi,

can you take a heap dump from a JVM that runs into the problem and share it 
with us? That would make finding the cause a lot easier.

Best,
Stefan

> Am 15.06.2018 um 23:01 schrieb ashish pok :
> 
> All,
> 
> I have another slow Memory Leak situation using basic TimeSession Window 
> (earlier it was GlobalWindow related that Fabian helped clarify). 
> 
> I have a very simple data pipeline:
> 
> DataStream processedData = rawTuples
>   
> .window(ProcessingTimeSessionWindows.withGap(Time.seconds(AppConfigs.getWindowSize(780
>  
>   .trigger(new ProcessingTimePurgeTrigger())
>   .apply(new IPSLAMetricWindowFn())
>   .name("windowFunctionTuple")
>   .map(new TupleToPlatformEventMapFn())
>   .name("mapTupleEvent")
>   ;
>   
> 
> I initially didnt even have ProcessingTmePurgeTrigger and it was using 
> default Trigger. In an effort to fix this issue, I created my own Trigger 
> from default ProcessingTimeTrigger with simple override to onProcessingTime 
> method (essentially replacing FIRE with FIRE_AND_PURGE)
> 
> @Override
>   public TriggerResult onProcessingTime(long time, TimeWindow window, 
> TriggerContext ctx) {
>   return TriggerResult.FIRE_AND_PURGE;
>   }
> 
> This seems to have done nothing (may have delayed issue by couple of hours - 
> not certain). But, I still see heap utilization creep up slowly and 
> eventually reaches a point when GC starts to take too long and then the 
> dreaded OOM. 
> 
> For completeness here is my Window Function (still using old function 
> interface). It creates few metrics for reporting and applies logic by looping 
> over the Iterable. NO states are explicitly kept in this function, needed 
> RichWindowFunction to generate metrics basically.
> 
> public class IPSLAMetricWindowFn extends RichWindowFunction BasicFactTuple, String, TimeWindow> {
> 
>   private static final long serialVersionUID = 1L;
>   
>   private static Logger logger = 
> LoggerFactory.getLogger(IPSLAMetricWindowFn.class);
>   
>   private Meter in;
>   
>   private Meter out;
> 
>   private Meter error;
>   
>   @Override
>   public void open(Configuration conf) throws Exception {
>   this.in = getRuntimeContext()
> .getMetricGroup()
> .addGroup(AppConstants.APP_METRICS.PROCESS)
> .meter(AppConstants.APP_METRICS.IN, new 
> MeterView(AppConstants.APP_METRICS.INTERVAL_30));
>   this.out = getRuntimeContext()
> .getMetricGroup()
> .addGroup(AppConstants.APP_METRICS.PROCESS)
> .meter(AppConstants.APP_METRICS.OUT, new 
> MeterView(AppConstants.APP_METRICS.INTERVAL_30));
>   this.error = getRuntimeContext()
> .getMetricGroup()
> .addGroup(AppConstants.APP_METRICS.PROCESS)
> .meter(AppConstants.APP_METRICS.ERROR, new 
> MeterView(AppConstants.APP_METRICS.INTERVAL_30));
>   super.open(conf);
>   }
> 
>   @Override
>   public void apply(String key, TimeWindow window, 
> Iterable events, Collector collector) 
> throws Exception {
>   }
> }
> 
> 
> Appreciate any pointers on what could be causing leaks here. This seems 
> pretty straight-forward.
> 
> Thanks, Ashish
> 



Re: IoT Use Case, Problem and Thoughts

2018-06-18 Thread Till Rohrmann
Hi Ashish,

the atomic savepoint with savepoint is going to be implemented for all
state backends.

Cheers,
Till

On Sat, Jun 16, 2018 at 4:29 AM Ashish Pokharel  wrote:

> Hi Till, Fabian,
>
> Thanks for your responses again.
>
> Till, you have nailed it. I will comment on them individually. But first,
> I feel like I am still not stating it well enough to illustrate the need.
> May be I am overthinking :)
>
> So let me try one more time with a preface that we are talking about
> millions of sensors reporting logs/metrics. So in my cluster we can
> potentially have 10s if not 100s of such apps for variety of data. I
> currently have 1 app in Prod so I can do a lot testing :) Just as a test, I
> enabled RocksDB State Backend and Checkpointing every 5 seconds with
> Graphite metrics enabled. On an average I could see almost 25GB of total
> state being written across couple of hundred slots based on Graphite
> numbers - it is setup with incremental and async Checkpoints. I am assuming
> main reason being states are transient and deltas are essentially entire
> set of new states. Our main concern is real-time processing vs no data loss
> or even possibly a few duplicates. To Fabian’s point, at least once vs
> exactly once semantics are also not of utmost concern at this point. Now,
> bottom line is I have Checkpointing disabled and use MemoryStateBackend
> with the thought that writing massive states to persistence every few
> seconds didn’t seem like best use of resources - I’d rather fit in more of
> these apps in cluster and use stateful processing for apps we really need
> them on. However, this leads to 2 main issue
>
> 1- If an operator fails (let’s say Kafka timeouts), entire job graph
> restarts which leads us to more than desirable gap of data (lost states
> across 100s of operators) as obviously there is no recoverable state
> 2- Same issue happens in planned restarts
>
> Per Fabian’s suggestion, I am going to try RocksDB State Backend with
> local drives and run some baseline tests - hoping states are kept in memory
> for the most part unless spillover is needed. This should at least allow us
> with decent solution of (2). I am still not convinced we should enable
> periodic Checkpointing (perhaps I am wrong here but again I have
> highlighted my reasons above).
>
> "
>
> Just for my own understanding: What do you want to do with event-based
> checkpointing triggers (onRestart, onShutdown?). Do you want to draw a
> partial snapshot which should then be used for recovery? Technically, this
> is possible but I'm not sure how much value this would add for Flink users.
> A partial snapshot could also be completely empty (equivalent of disabling
> checkpointing).
>
>
> I can see the point of making the checkpoint triggering more flexible and
> giving some control to the user. In contrast to savepoints, checkpoints are
> considered for recovery. My question here would be, what would be the
> triggering condition in your case (other than time)?
>
> "
> I’d think trigger condition would be based on life-cycle hook like RESTART
> (or perhaps even an external message when FLINK-6131 is available may be).
> Partial (best possible) snapshot is exactly what it would be - states from
> failing operators cannot be expected to be recoverable obviously.
>
> What the community will add very soon is an atomic stop with savepoint
> call which will take a savepoint of your job's state when and shut it down.
>
>
> Very nice! Would this also have same need to use Fs or RocksDB State
> Backend? It shouldn’t be an issue for us as long as my tests above turn out
> to be decent.
>
> Thanks again guys for your advice and feedback. Really appreciate it.
>
> — Ashish
>
>
>
> On Jun 15, 2018, at 5:43 AM, Till Rohrmann  wrote:
>
> Hi,
>
> ideally we would not have to cancel all tasks and only redeploy the whole
> job in case of a restart. Instead we should do what you've outlined:
> Redeploy the failed tasks and reset the state of all other running tasks.
> At the moment, this is, however, not yet possible. While improving Flink's
> recovery behavior this should be addressed eventually.
>
> Just for my own understanding: What do you want to do with event-based
> checkpointing triggers (onRestart, onShutdown?). Do you want to draw a
> partial snapshot which should then be used for recovery? Technically, this
> is possible but I'm not sure how much value this would add for Flink users.
> A partial snapshot could also be completely empty (equivalent of disabling
> checkpointing).
>
> I can see the point of making the checkpoint triggering more flexible and
> giving some control to the user. In contrast to savepoints, checkpoints are
> considered for recovery. My question here would be, what would be the
> triggering condition in your case (other than time)?
>
> What the community will add very soon is an atomic stop with savepoint
> call which will take a savepoint of your job's state when and shut it down.
>
> Cheers,
> Till
>
> 

Re: flink and akka HTTP

2018-06-18 Thread Till Rohrmann
Hi,

I assume that you have an Akka dependency conflict. By adding the Akka
dependency version to your user jar and enabling child first class loading
you should be able to control which Akka version is loaded. The only thing
you have to check is whether Flink works with a newer version of Akka.

Cheers,
Till

On Fri, Jun 15, 2018 at 8:15 PM Gäckler Martin <
martin.gaeck...@esolutions.de> wrote:

> Good evening,
>
>
>
> According to Flink's documentation I have excluded the Flink runtime
> library from the runtime dependencies of my project:
>
>
>
> dependencies {
>
> compileOnly group: 'org.apache.flink',  name:
> 'flink-core',  version: '1.4.2'
>
> compileOnly group: 'org.apache.flink',  name:
> 'flink-java',  version: '1.4.2'
>
> compileOnly group: 'org.apache.flink',  name:
> 'flink-streaming-java_2.11',   version: '1.4.2'
>
> implementation  group: 'org.apache.flink',  name:
> 'flink-connector-kafka-0.11_2.11', version: '1.4.2'
>
> ...
>
> }
>
>
>
> Unfortunately I get the following error:
>
>
>
> Caused by: java.lang.ClassCastException: interface
> akka.serialization.Serializer is not assignable from class
> akka.remote.serialization.MiscMessageSerializer
>
>  at
> akka.actor.ReflectiveDynamicAccess$$anonfun$getClassFor$1.apply(ReflectiveDynamicAccess.scala:23)
>
>  at
> akka.actor.ReflectiveDynamicAccess$$anonfun$getClassFor$1.apply(ReflectiveDynamicAccess.scala:20)
>
>  at scala.util.Try$.apply(Try.scala:192)
>
>  at
> akka.actor.ReflectiveDynamicAccess.getClassFor(ReflectiveDynamicAccess.scala:20)
>
>  at
> akka.actor.ReflectiveDynamicAccess.createInstanceFor(ReflectiveDynamicAccess.scala:38)
>
>  at
> akka.serialization.Serialization.serializerOf(Serialization.scala:301)
>
>  at
> akka.serialization.Serialization$$anonfun$6.apply(Serialization.scala:327)
>
>  at
> akka.serialization.Serialization$$anonfun$6.apply(Serialization.scala:327)
>
>  at
> scala.collection.TraversableLike$WithFilter$$anonfun$map$2.apply(TraversableLike.scala:683)
>
>  at
> scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221)
>
>  at
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428)
>
>  at
> scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:682)
>
>  at akka.serialization.Serialization.(Serialization.scala:327)
>
>  at
> akka.serialization.SerializationExtension$.createExtension(SerializationExtension.scala:15)
>
>  at
> akka.serialization.SerializationExtension$.createExtension(SerializationExtension.scala:12)
>
>  at akka.actor.ActorSystemImpl.registerExtension(ActorSystem.scala:913)
>
>  at
> akka.actor.ActorSystemImpl$$anonfun$loadExtensions$1$1.apply(ActorSystem.scala:946)
>
>  at
> akka.actor.ActorSystemImpl$$anonfun$loadExtensions$1$1.apply(ActorSystem.scala:944)
>
>  at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>
>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>
>  at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>
>  at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>
>  at akka.actor.ActorSystemImpl.loadExtensions$1(ActorSystem.scala:944)
>
>  at akka.actor.ActorSystemImpl.loadExtensions(ActorSystem.scala:961)
>
>  at akka.actor.ActorSystemImpl.liftedTree2$1(ActorSystem.scala:833)
>
>  at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:823)
>
>  at akka.actor.ActorSystemImpl._start(ActorSystem.scala:823)
>
>  at akka.actor.ActorSystemImpl.start(ActorSystem.scala:842)
>
>  at akka.actor.ActorSystem$.apply(ActorSystem.scala:246)
>
>  at akka.actor.ActorSystem$.apply(ActorSystem.scala:289)
>
>  at akka.actor.ActorSystem$.apply(ActorSystem.scala:234)
>
>  at akka.actor.ActorSystem$.apply(ActorSystem.scala:225)
>
>  at akka.actor.ActorSystem$.create(ActorSystem.scala:160)
>
>  at akka.actor.ActorSystem.create(ActorSystem.scala)
>
>  at
> de.eso.swarm.rest.client.akka.AkkaRestClient.(AkkaRestClient.java:43)
>
>  ... 12 more
>
>
>
> My application needs to initialize the AKKA ActorSystem because it uses an
> HTTP client that I have developed using akka-http and akka-stream. Here are
> the dependencies of my HTTP client:
>
>
>
> dependencies {
>
> compile project(':platform-sdk-java-core')
>
> testCompile project(':platform-sdk-java-testing')
>
>
>
> implementation group: 'com.typesafe.akka', name:
> 'akka-http_2.11',  version: '10.1.2'
>
> implementation group: 'com.typesafe.akka', name:
> 'akka-stream_2.11',version: '2.5.11'
>
>
>
> implementation group: 'com.google.code.gson',  name:
> 'gson',version: '2.8.4'
>
> implementation group: 'com.google.protobuf',   name:
> 'protobuf-java',   version: '3.5.1'
>
>
>
> testImplementation group: 'junit', name: 'junit', version: '4.12'
>
> }
>