Re: How to handle Flink Job with 400MB+ Uberjar with 800+ containers ?
Increase replication factor and/or use HDFS cache https://hadoop.apache.org/docs/r2.4.1/hadoop-project-dist/hadoop-hdfs/CentralizedCacheManagement.html Try to reduce the size of the Jar, eg the Flink libraries do not need to be included. > Am 30.08.2019 um 01:09 schrieb Elkhan Dadashov : > > Dear Flink developers, > > Having difficulty of getting a Flink job started. > > The job's uberjar/fat jar is around 400MB, and I need to kick 800+ > containers. > > The default HDFS replication is 3. > > The Yarn queue is empty, and 800 containers are allocated almost > immediately by Yarn RM. > > It takes very long time until all 800 nodes (node managers) will download > Uberjar from HDFS to local machines. > > Q1: > > a) Do all those 800 nodes download of batch of 3 at a time ? ( batch size > = HDFS replication size) > > b) Or Do Flink TM's can replicate from each other ? or already started > TM's replicate to yet-started nodes? > > Most probably answer is (a), but want to confirm. > > Q2: > > What is the recommended way of handling 400MB+ Uberjar with 800+ containers > ? > > Any specific params to tune? > > Thanks. > > Because downloading the UberJar takes really long time, after around 15 > minutes since the job kicked, facing this exception: > > org.apache.hadoop.yarn.exceptions.YarnException: Unauthorized request to > start container. > This token is expired. current time is 1567116179193 found 1567116001610 > Note: System times on machines may be out of sync. Check system time and time > zones. > at sun.reflect.GeneratedConstructorAccessor35.newInstance(Unknown > Source) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:423) > at > org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.instantiateException(SerializedExceptionPBImpl.java:168) > at > org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.deSerialize(SerializedExceptionPBImpl.java:106) > at > org.apache.hadoop.yarn.client.api.impl.NMClientImpl.startContainer(NMClientImpl.java:205) > at > org.apache.flink.yarn.YarnResourceManager.lambda$onContainersAllocated$1(YarnResourceManager.java:400) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158) > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142) > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40) > at > akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) > at akka.actor.Actor$class.aroundReceive(Actor.scala:502) > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) > at akka.actor.ActorCell.invoke(ActorCell.scala:495) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) > at akka.dispatch.Mailbox.run(Mailbox.scala:224) > at akka.dispatch.Mailbox.exec(Mailbox.scala:234) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > >
Re: [DISCUSS] Contributing Chinese website and docs to Apache Flink
Keep also in mind the other direction eg a new/modified version of the Chinese documentation needs to be reflected in the English one. > Am 29.01.2019 um 06:35 schrieb SteNicholas : > > Hi Jark, > > Thank you for starting this discussion.I am very willing to participate in > flink document translation. > > Best, > Nicholas > > > > > -- > Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
Re: [DISCUSS] Integrate Flink SQL well with Hive ecosystem
Thank you very nice , I fully agree with that. > Am 11.10.2018 um 19:31 schrieb Zhang, Xuefu : > > Hi Jörn, > > Thanks for your feedback. Yes, I think Hive on Flink makes sense and in fact > it is one of the two approaches that I named in the beginning of the thread. > As also pointed out there, this isn't mutually exclusive from work we > proposed inside Flink and they target at different user groups and user > cases. Further, what we proposed to do in Flink should be a good showcase > that demonstrate Flink's capabilities in batch processing and convince Hive > community of the worth of a new engine. As you might know, the idea > encountered some doubt and resistance. Nevertheless, we do have a solid plan > for Hive on Flink, which we will execute once Flink SQL is in a good shape. > > I also agree with you that Flink SQL shouldn't be closely coupled with Hive. > While we mentioned Hive in many of the proposed items, most of them are > coupled only in concepts and functionality rather than code or libraries. We > are taking the advantage of the connector framework in Flink. The only thing > that might be exceptional is to support Hive built-in UDFs, which we may not > make it work out of the box to avoid the coupling. We could, for example, > require users bring in Hive library and register themselves. This is subject > to further discussion. > > #11 is about Flink runtime enhancement that is meant to make task failures > more tolerable (so that the job don't have to start from the beginning in > case of task failures) and to make task scheduling more resource-efficient. > Flink's current design in those two aspects leans more to stream processing, > which may not be good enough for batch processing. We will provide more > detailed design when we get to them. > > Please let me know if you have further thoughts or feedback. > > Thanks, > Xuefu > > > -- > Sender:Jörn Franke > Sent at:2018 Oct 11 (Thu) 13:54 > Recipient:Xuefu > Cc:vino yang ; Fabian Hueske ; dev > ; user > Subject:Re: [DISCUSS] Integrate Flink SQL well with Hive ecosystem > > Would it maybe make sense to provide Flink as an engine on Hive > („flink-on-Hive“)? Eg to address 4,5,6,8,9,10. this could be more loosely > coupled than integrating hive in all possible flink core modules and thus > introducing a very tight dependency to Hive in the core. > 1,2,3 could be achieved via a connector based on the Flink Table API. > Just as a proposal to start this Endeavour as independent projects (hive > engine, connector) to avoid too tight coupling with Flink. Maybe in a more > distant future if the Hive integration is heavily demanded one could then > integrate it more tightly if needed. > > What is meant by 11? > Am 11.10.2018 um 05:01 schrieb Zhang, Xuefu : > > Hi Fabian/Vno, > > Thank you very much for your encouragement inquiry. Sorry that I didn't see > Fabian's email until I read Vino's response just now. (Somehow Fabian's went > to the spam folder.) > > My proposal contains long-term and short-terms goals. Nevertheless, the > effort will focus on the following areas, including Fabian's list: > > 1. Hive metastore connectivity - This covers both read/write access, which > means Flink can make full use of Hive's metastore as its catalog (at least > for the batch but can extend for streaming as well). > 2. Metadata compatibility - Objects (databases, tables, partitions, etc) > created by Hive can be understood by Flink and the reverse direction is true > also. > 3. Data compatibility - Similar to #2, data produced by Hive can be consumed > by Flink and vise versa. > 4. Support Hive UDFs - For all Hive's native udfs, Flink either provides its > own implementation or make Hive's implementation work in Flink. Further, for > user created UDFs in Hive, Flink SQL should provide a mechanism allowing user > to import them into Flink without any code change required. > 5. Data types - Flink SQL should support all data types that are available > in Hive. > 6. SQL Language - Flink SQL should support SQL standard (such as SQL2003) > with extension to support Hive's syntax and language features, around DDL, > DML, and SELECT queries. > 7. SQL CLI - this is currently developing in Flink but more effort is needed. > 8. Server - provide a server that's compatible with Hive's HiverServer2 in > thrift APIs, such that HiveServer2 users can reuse their existing client > (such as beeline) but connect to Flink's thrift server instead. > 9. JDBC/ODBC drivers - Flink may provide its own JDBC/ODBC drivers for other > application to use to connect to its thrift server > 10. Support other user's customizations in Hive, such as Hive Serdes, storage > handlers, etc. > 11. Better task failure tolerance and task scheduling at Flink runtime. > > As you can see, achieving all those requires significant effort and across > all layers in Flink.
Re: [DISCUSS] Integrate Flink SQL well with Hive ecosystem
Would it maybe make sense to provide Flink as an engine on Hive („flink-on-Hive“)? Eg to address 4,5,6,8,9,10. this could be more loosely coupled than integrating hive in all possible flink core modules and thus introducing a very tight dependency to Hive in the core. 1,2,3 could be achieved via a connector based on the Flink Table API. Just as a proposal to start this Endeavour as independent projects (hive engine, connector) to avoid too tight coupling with Flink. Maybe in a more distant future if the Hive integration is heavily demanded one could then integrate it more tightly if needed. What is meant by 11? > Am 11.10.2018 um 05:01 schrieb Zhang, Xuefu : > > Hi Fabian/Vno, > > Thank you very much for your encouragement inquiry. Sorry that I didn't see > Fabian's email until I read Vino's response just now. (Somehow Fabian's went > to the spam folder.) > > My proposal contains long-term and short-terms goals. Nevertheless, the > effort will focus on the following areas, including Fabian's list: > > 1. Hive metastore connectivity - This covers both read/write access, which > means Flink can make full use of Hive's metastore as its catalog (at least > for the batch but can extend for streaming as well). > 2. Metadata compatibility - Objects (databases, tables, partitions, etc) > created by Hive can be understood by Flink and the reverse direction is true > also. > 3. Data compatibility - Similar to #2, data produced by Hive can be consumed > by Flink and vise versa. > 4. Support Hive UDFs - For all Hive's native udfs, Flink either provides its > own implementation or make Hive's implementation work in Flink. Further, for > user created UDFs in Hive, Flink SQL should provide a mechanism allowing user > to import them into Flink without any code change required. > 5. Data types - Flink SQL should support all data types that are available > in Hive. > 6. SQL Language - Flink SQL should support SQL standard (such as SQL2003) > with extension to support Hive's syntax and language features, around DDL, > DML, and SELECT queries. > 7. SQL CLI - this is currently developing in Flink but more effort is needed. > 8. Server - provide a server that's compatible with Hive's HiverServer2 in > thrift APIs, such that HiveServer2 users can reuse their existing client > (such as beeline) but connect to Flink's thrift server instead. > 9. JDBC/ODBC drivers - Flink may provide its own JDBC/ODBC drivers for other > application to use to connect to its thrift server > 10. Support other user's customizations in Hive, such as Hive Serdes, storage > handlers, etc. > 11. Better task failure tolerance and task scheduling at Flink runtime. > > As you can see, achieving all those requires significant effort and across > all layers in Flink. However, a short-term goal could include only core > areas (such as 1, 2, 4, 5, 6, 7) or start at a smaller scope (such as #3, > #6). > > Please share your further thoughts. If we generally agree that this is the > right direction, I could come up with a formal proposal quickly and then we > can follow up with broader discussions. > > Thanks, > Xuefu > > > > -- > Sender:vino yang > Sent at:2018 Oct 11 (Thu) 09:45 > Recipient:Fabian Hueske > Cc:dev ; Xuefu ; user > > Subject:Re: [DISCUSS] Integrate Flink SQL well with Hive ecosystem > > Hi Xuefu, > > Appreciate this proposal, and like Fabian, it would look better if you can > give more details of the plan. > > Thanks, vino. > > Fabian Hueske 于2018年10月10日周三 下午5:27写道: > Hi Xuefu, > > Welcome to the Flink community and thanks for starting this discussion! > Better Hive integration would be really great! > Can you go into details of what you are proposing? I can think of a couple > ways to improve Flink in that regard: > > * Support for Hive UDFs > * Support for Hive metadata catalog > * Support for HiveQL syntax > * ??? > > Best, Fabian > > Am Di., 9. Okt. 2018 um 19:22 Uhr schrieb Zhang, Xuefu > : > Hi all, > > Along with the community's effort, inside Alibaba we have explored Flink's > potential as an execution engine not just for stream processing but also for > batch processing. We are encouraged by our findings and have initiated our > effort to make Flink's SQL capabilities full-fledged. When comparing what's > available in Flink to the offerings from competitive data processing engines, > we identified a major gap in Flink: a well integration with Hive ecosystem. > This is crucial to the success of Flink SQL and batch due to the > well-established data ecosystem around Hive. Therefore, we have done some > initial work along this direction but there are still a lot of effort needed. > > We have two strategies in mind. The first one is to make Flink SQL > full-fledged and well-integrated with Hive ecosystem. This is a similar > approach to what Spark SQL adopted. The second strategy is to make Hive > itself
Re: Reading a single input file in parallel?
AFAIK Flink has a similar notion of splittable as Hadoop. Furthermore you can set for custom Fileibputformats the attribute unsplittable = true if your file format cannot be split > On 18. Feb 2018, at 13:28, Niels Basjeswrote: > > Hi, > > In Hadoop MapReduce there is the notion of "splittable" in the > FileInputFormat. This has the effect that a single input file can be fed > into multiple separate instances of the mapper that read the data. > A lot has been documented (i.e. text is splittable per line, gzipped text > is not splittable) and designed into the various file formats (like Avro > and Parquet) to allow splittability. > > The goal is that reading and parsing files can be done by multiple > cpus/systems in parallel. > > How is this handled in Flink? > Can Flink read a single file in parallel? > How does Flink administrate/handle the possibilities regarding the various > file formats? > > > The reason I ask is because I want to see if I can port this (now Hadoop > specific) hobby project of mine to work with Flink: > https://github.com/nielsbasjes/splittablegzip > > Thanks. > > -- > Best regards / Met vriendelijke groeten, > > Niels Basjes
Re: Submitting jobs via maven coordinates?
I have seen no official script but what you describe should be easily down with a build tool such as Gradle. The advantage would be that a build tool is already tested and you do not have to maintain scripts for downloading etc. > On 14. Nov 2017, at 07:46, Ron Crockerwrote: > > Internally we use artifactory to store our jars. I have a team that would > like to submit their Flink jobs via their maven coordinates, resulting in > pull the specified jar from the artifactory repository. > > Has anyone else seen a need for this? Has it already been accomplished? > > Ron > — > Ron Crocker > Principal Engineer & Architect > ( ( •)) New Relic > rcroc...@newrelic.com > M: +1 630 363 8835 >
Re: Switch to Scala 2.11 as a default build profile
EMR has a flink package. Just go to advanced options and but a checkbox on flink. No need to build yourself. > On 29. Jun 2017, at 05:56, Bowen Liwrote: > > +1. > > AWS EMR eco system is using Scala 2.11, and breaks with Scala 2.10. We had > to build several Flink components (e.g. flink-kinesis-connector) ourselves > in order to run on EMR. Defaulting to Scala 2.11 will greatly reduce > adoption cost for Flink on EMR > > >> On Wed, Jun 28, 2017 at 9:34 AM, Till Rohrmann wrote: >> >> I'm +1 for changing the profile and to start a discussion to drop Scala >> 2.10. >> >> Scala 2.10 is already quite old and the current stable version is 2.12. I >> would be surprised to see many people still using Scala 2.10. >> >> Cheers, >> Till >> >>> On Wed, Jun 28, 2017 at 4:53 PM, Ted Yu wrote: >>> >>> Here is the KIP that drops support for Scala 2.10 in Kafka 0.11 : >>> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- >>> 119%3A+Drop+Support+for+Scala+2.10+in+Kafka+0.11 >>> >>> FYI >>> >>> On Wed, Jun 28, 2017 at 7:23 AM, Piotr Nowojski >> >>> wrote: >>> Yes, I know and I’m proposing to change this in parent pom by default >> to scala-2.11. Changing parent pom every time anyone wants to touch/build in Intellij Kafka 0.11 connector is not a great idea. This would require a >> developer >>> to constantly stash those changes or commit and revert them before >> creating >>> a pull request. Piotrek > On Jun 28, 2017, at 3:49 PM, Greg Hogan wrote: > > You don't need to use the build profile in IntelliJ, just change > scala.version and scala.binary.version in the parent pom (recent > refactorings made this possible without changing every pom). > > What is the benefit for changing the default without dropping older > versions when contributions are still limited to the functionality of >>> the > old version? > > On Wed, Jun 28, 2017 at 8:36 AM, Piotr Nowojski < >>> pi...@data-artisans.com > > wrote: > >> Hi, >> >> I propose to switch to Scala 2.11 as a default and to have a Scala >>> 2.10 >> build profile. Now it is other way around. The reason for that is >> poor >> support for build profiles in Intellij, I was unable to make it work after >> I added Kafka 0.11 dependency (Kafka 0.11 dropped support for Scala 2.10). >> >> As a side note, maybe we should also consider dropping Scala 2.10 support? >> >> Piotrek >>> >>