Re: How to handle Flink Job with 400MB+ Uberjar with 800+ containers ?

2019-08-30 Thread Jörn Franke
Increase replication factor and/or use HDFS cache 
https://hadoop.apache.org/docs/r2.4.1/hadoop-project-dist/hadoop-hdfs/CentralizedCacheManagement.html
Try to reduce the size of the Jar, eg the Flink libraries do not need to be 
included.

> Am 30.08.2019 um 01:09 schrieb Elkhan Dadashov :
> 
> Dear Flink developers,
> 
> Having  difficulty of getting  a Flink job started.
> 
> The job's uberjar/fat jar is around 400MB, and  I need to kick 800+ 
> containers.  
> 
> The default HDFS replication is 3.
> 
> The Yarn queue is empty, and 800 containers  are allocated  almost 
> immediately  by Yarn  RM.
> 
> It takes very long time until all 800 nodes (node managers) will download 
> Uberjar from HDFS to local machines.
> 
> Q1:
> 
> a)  Do all those 800 nodes download of batch of  3  at a time  ? ( batch size 
> = HDFS replication size)
> 
> b) Or Do Flink TM's can replicate from each other  ? or  already started  
> TM's replicate  to  yet-started  nodes?
> 
> Most probably answer is (a), but  want to confirm.
> 
> Q2:
> 
> What  is the recommended way of handling  400MB+ Uberjar with 800+ containers 
> ?
> 
> Any specific params to tune?
> 
> Thanks.
> 
> Because downloading the UberJar takes really   long time, after around 15 
> minutes since the job kicked, facing this exception:
> 
> org.apache.hadoop.yarn.exceptions.YarnException: Unauthorized request to 
> start container.
> This token is expired. current time is 1567116179193 found 1567116001610
> Note: System times on machines may be out of sync. Check system time and time 
> zones.
>   at sun.reflect.GeneratedConstructorAccessor35.newInstance(Unknown 
> Source)
>   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>   at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>   at 
> org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.instantiateException(SerializedExceptionPBImpl.java:168)
>   at 
> org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.deSerialize(SerializedExceptionPBImpl.java:106)
>   at 
> org.apache.hadoop.yarn.client.api.impl.NMClientImpl.startContainer(NMClientImpl.java:205)
>   at 
> org.apache.flink.yarn.YarnResourceManager.lambda$onContainersAllocated$1(YarnResourceManager.java:400)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
>   at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>   at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>   at 
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>   at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>   at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>   at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 
> 


Re: [DISCUSS] Contributing Chinese website and docs to Apache Flink

2019-01-29 Thread Jörn Franke
Keep also in mind the other direction eg a new/modified version of the Chinese 
documentation needs to be reflected in the English one.

> Am 29.01.2019 um 06:35 schrieb SteNicholas :
> 
> Hi Jark, 
> 
> Thank you for starting this discussion.I am very willing to participate in
> flink document translation. 
> 
> Best, 
> Nicholas
> 
> 
> 
> 
> --
> Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/


Re: [DISCUSS] Integrate Flink SQL well with Hive ecosystem

2018-10-12 Thread Jörn Franke
Thank you very nice , I fully agree with that. 

> Am 11.10.2018 um 19:31 schrieb Zhang, Xuefu :
> 
> Hi Jörn,
> 
> Thanks for your feedback. Yes, I think Hive on Flink makes sense and in fact 
> it is one of the two approaches that I named in the beginning of the thread. 
> As also pointed out there, this isn't mutually exclusive from work we 
> proposed inside Flink and they target at different user groups and user 
> cases. Further, what we proposed to do in Flink should be a good showcase 
> that demonstrate Flink's capabilities in batch processing and convince Hive 
> community of the worth of a new engine. As you might know, the idea 
> encountered some doubt and resistance. Nevertheless, we do have a solid plan 
> for Hive on Flink, which we will execute once Flink SQL is in a good shape.
> 
> I also agree with you that Flink SQL shouldn't be closely coupled with Hive. 
> While we mentioned Hive in many of the proposed items, most of them are 
> coupled only in concepts and functionality rather than code or libraries. We 
> are taking the advantage of the connector framework in Flink. The only thing 
> that might be exceptional is to support Hive built-in UDFs, which we may not 
> make it work out of the box to avoid the coupling. We could, for example, 
> require users bring in Hive library and register themselves. This is subject 
> to further discussion.
> 
> #11 is about Flink runtime enhancement that is meant to make task failures 
> more tolerable (so that the job don't have to start from the beginning in 
> case of task failures) and to make task scheduling more resource-efficient. 
> Flink's current design in those two aspects leans more to stream processing, 
> which may not be good enough for batch processing. We will provide more 
> detailed design when we get to them.
> 
> Please let me know if you have further thoughts or feedback.
> 
> Thanks,
> Xuefu
> 
> 
> --
> Sender:Jörn Franke 
> Sent at:2018 Oct 11 (Thu) 13:54
> Recipient:Xuefu 
> Cc:vino yang ; Fabian Hueske ; dev 
> ; user 
> Subject:Re: [DISCUSS] Integrate Flink SQL well with Hive ecosystem
> 
> Would it maybe make sense to provide Flink as an engine on Hive 
> („flink-on-Hive“)? Eg to address 4,5,6,8,9,10. this could be more loosely 
> coupled than integrating hive in all possible flink core modules and thus 
> introducing a very tight dependency to Hive in the core.
> 1,2,3 could be achieved via a connector based on the Flink Table API.
> Just as a proposal to start this Endeavour as independent projects (hive 
> engine, connector) to avoid too tight coupling with Flink. Maybe in a more 
> distant future if the Hive integration is heavily demanded one could then 
> integrate it more tightly if needed. 
> 
> What is meant by 11?
> Am 11.10.2018 um 05:01 schrieb Zhang, Xuefu :
> 
> Hi Fabian/Vno,
> 
> Thank you very much for your encouragement inquiry. Sorry that I didn't see 
> Fabian's email until I read Vino's response just now. (Somehow Fabian's went 
> to the spam folder.)
> 
> My proposal contains long-term and short-terms goals. Nevertheless, the 
> effort will focus on the following areas, including Fabian's list:
> 
> 1. Hive metastore connectivity - This covers both read/write access, which 
> means Flink can make full use of Hive's metastore as its catalog (at least 
> for the batch but can extend for streaming as well).
> 2. Metadata compatibility - Objects (databases, tables, partitions, etc) 
> created by Hive can be understood by Flink and the reverse direction is true 
> also.
> 3. Data compatibility - Similar to #2, data produced by Hive can be consumed 
> by Flink and vise versa.
> 4. Support Hive UDFs - For all Hive's native udfs, Flink either provides its 
> own implementation or make Hive's implementation work in Flink. Further, for 
> user created UDFs in Hive, Flink SQL should provide a mechanism allowing user 
> to import them into Flink without any code change required.
> 5. Data types -  Flink SQL should support all data types that are available 
> in Hive.
> 6. SQL Language - Flink SQL should support SQL standard (such as SQL2003) 
> with extension to support Hive's syntax and language features, around DDL, 
> DML, and SELECT queries.
> 7.  SQL CLI - this is currently developing in Flink but more effort is needed.
> 8. Server - provide a server that's compatible with Hive's HiverServer2 in 
> thrift APIs, such that HiveServer2 users can reuse their existing client 
> (such as beeline) but connect to Flink's thrift server instead.
> 9. JDBC/ODBC drivers - Flink may provide its own JDBC/ODBC drivers for other 
> application to use to connect to its thrift server
> 10. Support other user's customizations in Hive, such as Hive Serdes, storage 
> handlers, etc.
> 11. Better task failure tolerance and task scheduling at Flink runtime.
> 
> As you can see, achieving all those requires significant effort and across 
> all layers in Flink. 

Re: [DISCUSS] Integrate Flink SQL well with Hive ecosystem

2018-10-10 Thread Jörn Franke
Would it maybe make sense to provide Flink as an engine on Hive 
(„flink-on-Hive“)? Eg to address 4,5,6,8,9,10. this could be more loosely 
coupled than integrating hive in all possible flink core modules and thus 
introducing a very tight dependency to Hive in the core.
1,2,3 could be achieved via a connector based on the Flink Table API.
Just as a proposal to start this Endeavour as independent projects (hive 
engine, connector) to avoid too tight coupling with Flink. Maybe in a more 
distant future if the Hive integration is heavily demanded one could then 
integrate it more tightly if needed. 

What is meant by 11?
> Am 11.10.2018 um 05:01 schrieb Zhang, Xuefu :
> 
> Hi Fabian/Vno,
> 
> Thank you very much for your encouragement inquiry. Sorry that I didn't see 
> Fabian's email until I read Vino's response just now. (Somehow Fabian's went 
> to the spam folder.)
> 
> My proposal contains long-term and short-terms goals. Nevertheless, the 
> effort will focus on the following areas, including Fabian's list:
> 
> 1. Hive metastore connectivity - This covers both read/write access, which 
> means Flink can make full use of Hive's metastore as its catalog (at least 
> for the batch but can extend for streaming as well).
> 2. Metadata compatibility - Objects (databases, tables, partitions, etc) 
> created by Hive can be understood by Flink and the reverse direction is true 
> also.
> 3. Data compatibility - Similar to #2, data produced by Hive can be consumed 
> by Flink and vise versa.
> 4. Support Hive UDFs - For all Hive's native udfs, Flink either provides its 
> own implementation or make Hive's implementation work in Flink. Further, for 
> user created UDFs in Hive, Flink SQL should provide a mechanism allowing user 
> to import them into Flink without any code change required.
> 5. Data types -  Flink SQL should support all data types that are available 
> in Hive.
> 6. SQL Language - Flink SQL should support SQL standard (such as SQL2003) 
> with extension to support Hive's syntax and language features, around DDL, 
> DML, and SELECT queries.
> 7.  SQL CLI - this is currently developing in Flink but more effort is needed.
> 8. Server - provide a server that's compatible with Hive's HiverServer2 in 
> thrift APIs, such that HiveServer2 users can reuse their existing client 
> (such as beeline) but connect to Flink's thrift server instead.
> 9. JDBC/ODBC drivers - Flink may provide its own JDBC/ODBC drivers for other 
> application to use to connect to its thrift server
> 10. Support other user's customizations in Hive, such as Hive Serdes, storage 
> handlers, etc.
> 11. Better task failure tolerance and task scheduling at Flink runtime.
> 
> As you can see, achieving all those requires significant effort and across 
> all layers in Flink. However, a short-term goal could  include only core 
> areas (such as 1, 2, 4, 5, 6, 7) or start  at a smaller scope (such as #3, 
> #6).
> 
> Please share your further thoughts. If we generally agree that this is the 
> right direction, I could come up with a formal proposal quickly and then we 
> can follow up with broader discussions.
> 
> Thanks,
> Xuefu
> 
> 
> 
> --
> Sender:vino yang 
> Sent at:2018 Oct 11 (Thu) 09:45
> Recipient:Fabian Hueske 
> Cc:dev ; Xuefu ; user 
> 
> Subject:Re: [DISCUSS] Integrate Flink SQL well with Hive ecosystem
> 
> Hi Xuefu,
> 
> Appreciate this proposal, and like Fabian, it would look better if you can 
> give more details of the plan.
> 
> Thanks, vino.
> 
> Fabian Hueske  于2018年10月10日周三 下午5:27写道:
> Hi Xuefu,
> 
> Welcome to the Flink community and thanks for starting this discussion! 
> Better Hive integration would be really great!
> Can you go into details of what you are proposing? I can think of a couple 
> ways to improve Flink in that regard:
> 
> * Support for Hive UDFs
> * Support for Hive metadata catalog
> * Support for HiveQL syntax
> * ???
> 
> Best, Fabian
> 
> Am Di., 9. Okt. 2018 um 19:22 Uhr schrieb Zhang, Xuefu 
> :
> Hi all,
> 
> Along with the community's effort, inside Alibaba we have explored Flink's 
> potential as an execution engine not just for stream processing but also for 
> batch processing. We are encouraged by our findings and have initiated our 
> effort to make Flink's SQL capabilities full-fledged. When comparing what's 
> available in Flink to the offerings from competitive data processing engines, 
> we identified a major gap in Flink: a well integration with Hive ecosystem. 
> This is crucial to the success of Flink SQL and batch due to the 
> well-established data ecosystem around Hive. Therefore, we have done some 
> initial work along this direction but there are still a lot of effort needed.
> 
> We have two strategies in mind. The first one is to make Flink SQL 
> full-fledged and well-integrated with Hive ecosystem. This is a similar 
> approach to what Spark SQL adopted. The second strategy is to make Hive 
> itself 

Re: Reading a single input file in parallel?

2018-02-18 Thread Jörn Franke
AFAIK Flink has a similar notion of splittable as Hadoop. Furthermore you can 
set for custom Fileibputformats the attribute unsplittable = true if your file 
format cannot be split 

> On 18. Feb 2018, at 13:28, Niels Basjes  wrote:
> 
> Hi,
> 
> In Hadoop MapReduce there is the notion of "splittable" in the
> FileInputFormat. This has the effect that a single input file can be fed
> into multiple separate instances of the mapper that read the data.
> A lot has been documented (i.e. text is splittable per line, gzipped text
> is not splittable) and designed into the various file formats (like Avro
> and Parquet) to allow splittability.
> 
> The goal is that reading and parsing files can be done by multiple
> cpus/systems in parallel.
> 
> How is this handled in Flink?
> Can Flink read a single file in parallel?
> How does Flink administrate/handle the possibilities regarding the various
> file formats?
> 
> 
> The reason I ask is because I want to see if I can port this (now Hadoop
> specific) hobby project of mine to work with Flink:
> https://github.com/nielsbasjes/splittablegzip
> 
> Thanks.
> 
> -- 
> Best regards / Met vriendelijke groeten,
> 
> Niels Basjes


Re: Submitting jobs via maven coordinates?

2017-11-14 Thread Jörn Franke
I have seen no official script but what you describe should be easily down with 
a build tool such as Gradle. The advantage would be that a build tool is 
already tested and you do not have to maintain scripts for downloading etc.

> On 14. Nov 2017, at 07:46, Ron Crocker  wrote:
> 
> Internally we use artifactory to store our jars. I have a team that would 
> like to submit their Flink jobs via their maven coordinates, resulting in 
> pull the specified jar from the artifactory repository. 
> 
> Has anyone else seen a need for this? Has it already been accomplished? 
> 
> Ron
> —
> Ron Crocker
> Principal Engineer & Architect
> ( ( •)) New Relic
> rcroc...@newrelic.com
> M: +1 630 363 8835
> 


Re: Switch to Scala 2.11 as a default build profile

2017-06-29 Thread Jörn Franke
EMR has a flink package. Just go to advanced options and but a checkbox on 
flink. No need to build yourself.

> On 29. Jun 2017, at 05:56, Bowen Li  wrote:
> 
> +1.
> 
> AWS EMR eco system is using Scala 2.11, and breaks with Scala 2.10. We had
> to build several Flink components (e.g. flink-kinesis-connector) ourselves
> in order to run on EMR. Defaulting to Scala 2.11 will greatly reduce
> adoption cost for Flink on EMR
> 
> 
>> On Wed, Jun 28, 2017 at 9:34 AM, Till Rohrmann  wrote:
>> 
>> I'm +1 for changing the profile and to start a discussion to drop Scala
>> 2.10.
>> 
>> Scala 2.10 is already quite old and the current stable version is 2.12. I
>> would be surprised to see many people still using Scala 2.10.
>> 
>> Cheers,
>> Till
>> 
>>> On Wed, Jun 28, 2017 at 4:53 PM, Ted Yu  wrote:
>>> 
>>> Here is the KIP that drops support for Scala 2.10 in Kafka 0.11 :
>>> 
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>> 119%3A+Drop+Support+for+Scala+2.10+in+Kafka+0.11
>>> 
>>> FYI
>>> 
>>> On Wed, Jun 28, 2017 at 7:23 AM, Piotr Nowojski >> 
>>> wrote:
>>> 
 Yes, I know and I’m proposing to change this in parent pom by default
>> to
 scala-2.11.
 
 Changing parent pom every time anyone wants to touch/build in Intellij
 Kafka 0.11 connector is not a great idea. This would require a
>> developer
>>> to
 constantly stash those changes or commit and revert them before
>> creating
>>> a
 pull request.
 
 Piotrek
 
> On Jun 28, 2017, at 3:49 PM, Greg Hogan  wrote:
> 
> You don't need to use the build profile in IntelliJ, just change
> scala.version and scala.binary.version in the parent pom (recent
> refactorings made this possible without changing every pom).
> 
> What is the benefit for changing the default without dropping older
> versions when contributions are still limited to the functionality of
>>> the
> old version?
> 
> On Wed, Jun 28, 2017 at 8:36 AM, Piotr Nowojski <
>>> pi...@data-artisans.com
> 
> wrote:
> 
>> Hi,
>> 
>> I propose to switch to Scala 2.11 as a default and to have a Scala
>>> 2.10
>> build profile. Now it is other way around. The reason for that is
>> poor
>> support for build profiles in Intellij, I was unable to make it work
 after
>> I added Kafka 0.11 dependency (Kafka 0.11 dropped support for Scala
 2.10).
>> 
>> As a side note, maybe we should also consider dropping Scala 2.10
 support?
>> 
>> Piotrek
 
 
>>> 
>>