Thanks Mich,
Looking closely at the thread dump it is evident that Amazon EMR has custom
jars to communicate with HMS,
see for example the call:
SessionCatalog.listPartitionLocationsOptimized(SessionCatalog.scala:1360)
This is not in Spark GH repository, nor get_partition_locations method is
in HMS open source :D

I already opened a support case with AWS, and meanwhile I am trying 2
things:

1) follow your recommendation to use different HMS client version
https://spark.apache.org/docs/latest/sql-data-sources-hive-tables.html
Related to this, I read the property spark.sql.hive.metastore.jars value
should be:

A classpath in the standard format for the JVM. This classpath must include
all of Hive and its dependencies, including the correct version of Hadoop.
The provided jars should be the same version as
spark.sql.hive.metastore.version. These jars only need to be present on the
driver, but if you are running in yarn cluster mode then you must ensure
they are packaged with your application.

Does anyone has the list of precise maven artifacts to download?
I have to download all the jars and put them into the cluster filesystem,
but for that I need the exact list.

e.g. I plan to use either:
spark.sql.hive.metastore.jars = "org/apache/hive"
and package the dependencies inside my app's jar
or:
spark.sql.hive.metastore.jars = "path"
spark.sql.hive.metastore.jars.path = "/usr/lib/spark/hive/*"

But I am not sure which is correct yet.
Any examples for this might be helpful, thanks!

2) if nothing works yet, downgrade to Spark 3.4 hoping it works
differently, e.g. AWS optimizations are not there.

Thanks!
Matteo


On Thu, Dec 5, 2024 at 1:08 PM Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> You are correct about the potential overhead of coalescing partitions when
> all of them have data.
>
> The primary issue seems to be the excessive number of HMS calls to
> retrieve partition information, even when only a subset of partitions is
> being written.
>
> Spark appears to be fetching metadata for all partitions, even when only a
> subset is relevant. This could be due to limitations in the current
> implementation of 3.5.n or optimizations that haven't been fully
> implemented.
>
> Consider exploring options to optimize partition listing within Spark
> itself. This might involve caching partition information, using heuristics
> to avoid unnecessary calls to HMS, or leveraging more efficient data
> structures to store and retrieve partition metadata.
>
> What is your Hive metastore built on?  If possible, optimize the Hive
> Metastore queries to reduce the time taken to retrieve partition
> information. This might involve indexing partitions, using caching
> mechanisms, or improving query execution plans.
>
> Ensure that there are no network bottlenecks between the Spark application
> and the Hive Metastore. Network latency and bandwidth limitations can
> significantly impact performance.Mich Talebzadeh,
>
> Finally, is there an overriding reason to use 3.5.n?  Is the past I had
> issues with 3.5 so with Hive so I went back using 3.4
>
> HTH
>
> Architect | Data Science | Financial Crime | GDPR & Compliance Specialist
> PhD <https://en.wikipedia.org/wiki/Doctor_of_Philosophy> Imperial College
> London <https://en.wikipedia.org/wiki/Imperial_College_London>
> London, United Kingdom
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner  <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
> Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
>
>
> On Thu, 5 Dec 2024 at 08:29, Matteo Moci <mox...@gmail.com> wrote:
>
>>
>> On Thu, Dec 5, 2024 at 8:21 AM Matteo Moci <mox...@gmail.com> wrote:
>>
>>> Thanks a lot Mich,
>>> 1) you mean coalescing partitions that are about to write? I don't think
>>> this will take less time, because all partitions have data. It seems the
>>> problem is that it asks HMS all partitions, even if it's only writing 650.
>>> Is it an improvement something that would benefit Spark?
>>>
>> I mean: I find it strange that Spark gets metadata of all partitions even
>> if it is only writing some.
>> But I don't know the underlying logic, so I will have to look at the code
>> first.
>>
>>
>>>
>>> 2) so you are saying that the fallback method is more expensive than the
>>> first one, right?
>>> I will try to use the old HMS passing the configuration at runtime like
>>> they say it's possible in this link
>>> https://spark.apache.org/docs/latest/sql-data-sources-hive-tables.html
>>>
>>>
>>> On Thu, Dec 5, 2024 at 8:17 AM Matteo Moci <mox...@gmail.com> wrote:
>>>
>>>> Thanks Mich,
>>>> 1)
>>>>
>>>> On Thu, Dec 5, 2024 at 1:16 AM Mich Talebzadeh <
>>>> mich.talebza...@gmail.com> wrote:
>>>>
>>>>> Hi Matteo,
>>>>>
>>>>> 1) You have an incompatible Metastore: The Hive Metastore version used
>>>>> by the EMR cluster (2.3.9) doesn't support the get_partition_locations
>>>>> method directly. Spark 3.5 tries to use this method, leading to fallback
>>>>> and increased (Hive Metastore Service) HMS calls.
>>>>>
>>>>> 2) Large Number of Partitions: Your table has a very high number of
>>>>> partitions (250,000). Listing all partitions with
>>>>> listPartitionsWithAuthInfo can be slow, especially with potential network
>>>>> issues causing retries problem: Your  Spark 3.5 application writing to a
>>>>> Hive table with 250000 partitions experiences slowness compared to Spark
>>>>> 2.4. The driver gets stuck waiting for HMS (Hive Metastore Service) calls.
>>>>>
>>>>> Suggestions
>>>>>
>>>>> 1) You can optimize Partition Handling by coalescing Partitions,
>>>>> before writing. Consider using coalesce to reduce the number of partitions
>>>>> written to. This can significantly reduce HMS calls and improve 
>>>>> performance.
>>>>>
>>>>> 2) check this link
>>>>> <https://spark.apache.org/docs/latest/sql-data-sources-hive-tables.html>
>>>>> for more info .
>>>>>
>>>>> HTH
>>>>>
>>>>> Mich Talebzadeh,
>>>>>
>>>>> Architect | Data Science | Financial Crime | GDPR & Compliance
>>>>> Specialist
>>>>> PhD <https://en.wikipedia.org/wiki/Doctor_of_Philosophy> Imperial
>>>>> College London <https://en.wikipedia.org/wiki/Imperial_College_London>
>>>>>
>>>>> London, United Kingdom
>>>>>
>>>>>
>>>>>    view my Linkedin profile
>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>
>>>>>
>>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>>
>>>>>
>>>>>
>>>>> *Disclaimer:* The information provided is correct to the best of my
>>>>> knowledge but of course cannot be guaranteed . It is essential to note
>>>>> that, as with any advice, quote "one test result is worth one-thousand
>>>>> expert opinions (Werner
>>>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun
>>>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
>>>>>
>>>>>
>>>>> On Wed, 4 Dec 2024 at 21:47, Matteo Moci <mox...@gmail.com> wrote:
>>>>>
>>>>>> Hello Community,
>>>>>> The Spark 3.5 application I am working on shows slowness, right at
>>>>>> the time of writing to a Hive table.
>>>>>>
>>>>>> I'd like to ask you some hints on how to mitigate this behaviour, if
>>>>>> possible.
>>>>>>
>>>>>> The same application using Spark 2.4 ran "fine" within reasonable
>>>>>> times, with minimal cluster idle cpu times, while on Spark 3, the driver 
>>>>>> is
>>>>>> stuck for a long time waiting for HMS.
>>>>>> Some more context:
>>>>>>
>>>>>>    - the app runs on an Amazon EMR cluster 7.2.0 [1], that uses its
>>>>>>    own "distribution" of Spark 3.5.1, Hive 3.1.3, and with some 
>>>>>> optimizations
>>>>>>    I don't know the details of. From what I understood, this is why line
>>>>>>    numbers of stack traces in cluster logs don't always match with the 
>>>>>> open
>>>>>>    source code of Spark.
>>>>>>    - Hive MetaStore is using Hive 2 - not sure exactly which version
>>>>>>    - the app reads from a hive table and writes the output data to
>>>>>>    other 3 hive tables. Let's focus on 1 of them, because that's where 
>>>>>> the app
>>>>>>    is stuck
>>>>>>    - I think we can leave the table it reads from outside of the
>>>>>>    equation, because the driver thread dumps show it is stuck at this 
>>>>>> point,
>>>>>>    when inserting in the output table:
>>>>>>
>>>>>> java.base@17.0.13/sun.nio.ch.Net.poll(Native Method)
>>>>>> java.base@17.0.13
>>>>>> /sun.nio.ch.NioSocketImpl.park(NioSocketImpl.java:186)
>>>>>> java.base@17.0.13
>>>>>> /sun.nio.ch.NioSocketImpl.timedRead(NioSocketImpl.java:290)
>>>>>> java.base@17.0.13
>>>>>> /sun.nio.ch.NioSocketImpl.implRead(NioSocketImpl.java:314)
>>>>>> java.base@17.0.13
>>>>>> /sun.nio.ch.NioSocketImpl.read(NioSocketImpl.java:355)
>>>>>> java.base@17.0.13
>>>>>> /sun.nio.ch.NioSocketImpl$1.read(NioSocketImpl.java:808)
>>>>>> java.base@17.0.13
>>>>>> /java.net.Socket$SocketInputStream.read(Socket.java:966)
>>>>>> java.base@17.0.13
>>>>>> /java.io.BufferedInputStream.fill(BufferedInputStream.java:244)
>>>>>> java.base@17.0.13
>>>>>> /java.io.BufferedInputStream.read1(BufferedInputStream.java:284)
>>>>>> java.base@17.0.13/java.io.BufferedInputStream.read(BufferedInputStream.java:343)
>>>>>> => holding Monitor(java.io.BufferedInputStream@1632069162)
>>>>>>
>>>>>> app//org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:127)
>>>>>>
>>>>>> app//org.apache.thrift.transport.TTransport.readAll(TTransport.java:86)
>>>>>>
>>>>>> app//org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:425)
>>>>>>
>>>>>> app//org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:321)
>>>>>>
>>>>>> app//org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:225)
>>>>>>
>>>>>> app//org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:77)
>>>>>>
>>>>>> app//org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_get_partitions_ps_with_auth(ThriftHiveMetastore.java:2601)
>>>>>>
>>>>>> app//org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.get_partitions_ps_with_auth(ThriftHiveMetastore.java:2583)
>>>>>>
>>>>>> app//org.apache.hadoop.hive.metastore.HiveMetaStoreClient.listPartitionsWithAuthInfo(HiveMetaStoreClient.java:1212)
>>>>>> java.base@17.0.13/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>>>>>> Method)
>>>>>> java.base@17.0.13
>>>>>> /jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
>>>>>> java.base@17.0.13
>>>>>> /jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>> java.base@17.0.13/java.lang.reflect.Method.invoke(Method.java:569)
>>>>>>
>>>>>> app//org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:177)
>>>>>> app/jdk.proxy2/jdk.proxy2.$Proxy106.listPartitionsWithAuthInfo(Unknown
>>>>>> Source)
>>>>>> java.base@17.0.13/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>>>>>> Method)
>>>>>> java.base@17.0.13
>>>>>> /jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
>>>>>> java.base@17.0.13
>>>>>> /jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>> java.base@17.0.13/java.lang.reflect.Method.invoke(Method.java:569)
>>>>>> app//org.apache.hadoop.hive.metastore.HiveMetaStoreClient$SynchronizedHandler.invoke(HiveMetaStoreClient.java:2350)
>>>>>> => holding
>>>>>> Monitor(org.apache.hadoop.hive.metastore.HiveMetaStoreClient$SynchronizedHandler@244249800
>>>>>> )
>>>>>> app/jdk.proxy2/jdk.proxy2.$Proxy106.listPartitionsWithAuthInfo(Unknown
>>>>>> Source)
>>>>>>
>>>>>> app//org.apache.hadoop.hive.ql.metadata.Hive.listPartitionLocationsWithoutExtension(Hive.java:2797)
>>>>>>
>>>>>> app//org.apache.hadoop.hive.ql.metadata.Hive.doGetPartitionLocations(Hive.java:2775)
>>>>>>
>>>>>> app//org.apache.hadoop.hive.ql.metadata.Hive.getPartitionLocations(Hive.java:2754)
>>>>>>
>>>>>> app//org.apache.hadoop.hive.ql.metadata.Hive.getPartitionLocations(Hive.java:2727)
>>>>>>
>>>>>> app//org.apache.spark.sql.hive.client.Shim_v0_12.getPartitionLocations(HiveShim.scala:673)
>>>>>>
>>>>>> app//org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$getPartitionLocations$3(HiveClientImpl.scala:798)
>>>>>> app//org.apache.spark.sql.hive.client.HiveClientImpl$$Lambda$3952/0x00007f8c193a3068.apply(Unknown
>>>>>> Source)
>>>>>>
>>>>>> app//org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$doGetPartitions$1(HiveClientImpl.scala:845)
>>>>>> app//org.apache.spark.sql.hive.client.HiveClientImpl$$Lambda$3953/0x00007f8c193a3628.apply(Unknown
>>>>>> Source)
>>>>>>
>>>>>> app//org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$withHiveState$1(HiveClientImpl.scala:303)
>>>>>> app//org.apache.spark.sql.hive.client.HiveClientImpl$$Lambda$3366/0x00007f8c19188000.apply(Unknown
>>>>>> Source)
>>>>>>
>>>>>> app//org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:234)
>>>>>> app//org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:233)
>>>>>> => holding
>>>>>> Monitor(org.apache.spark.sql.hive.client.IsolatedClientLoader@1088141517
>>>>>> )
>>>>>>
>>>>>> app//org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:283)
>>>>>>
>>>>>> app//org.apache.spark.sql.hive.client.HiveClientImpl.doGetPartitions(HiveClientImpl.scala:838)
>>>>>>
>>>>>> app//org.apache.spark.sql.hive.client.HiveClientImpl.getPartitionLocations(HiveClientImpl.scala:796)
>>>>>>
>>>>>> app//org.apache.spark.sql.hive.client.HiveClientImpl.getPartitionLocations(HiveClientImpl.scala:790)
>>>>>>
>>>>>> app//org.apache.spark.sql.hive.HiveExternalCatalog.$anonfun$listPartitionLocations$1(HiveExternalCatalog.scala:1307)
>>>>>> app//org.apache.spark.sql.hive.HiveExternalCatalog$$Lambda$3941/0x00007f8c1939eb90.apply(Unknown
>>>>>> Source)
>>>>>> app//org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:102)
>>>>>> => holding 
>>>>>> Monitor(org.apache.spark.sql.hive.HiveExternalCatalog@845450683
>>>>>> )
>>>>>>
>>>>>> app//org.apache.spark.sql.hive.HiveExternalCatalog.listPartitionLocations(HiveExternalCatalog.scala:1303)
>>>>>>
>>>>>> app//org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener.listPartitionLocations(ExternalCatalogWithListener.scala:254)
>>>>>>
>>>>>> app//org.apache.spark.sql.catalyst.catalog.SessionCatalog.listPartitionLocationsOptimized(SessionCatalog.scala:1360)
>>>>>>
>>>>>> app//org.apache.spark.sql.catalyst.catalog.SessionCatalog.$anonfun$listPartitionLocations$1(SessionCatalog.scala:1341)
>>>>>> app//org.apache.spark.sql.catalyst.catalog.SessionCatalog$$Lambda$3935/0x00007f8c193932c0.apply(Unknown
>>>>>> Source)
>>>>>> app//org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:554)
>>>>>>
>>>>>> app//org.apache.spark.sql.catalyst.catalog.SessionCatalog.withCommonListMetrics(SessionCatalog.scala:1375)
>>>>>>
>>>>>> app//org.apache.spark.sql.catalyst.catalog.SessionCatalog.listPartitionLocations(SessionCatalog.scala:1340)
>>>>>>
>>>>>> app//org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:107)
>>>>>> app//org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
>>>>>> => holding
>>>>>> Monitor(org.apache.spark.sql.execution.command.DataWritingCommandExec@1800917126
>>>>>> )
>>>>>>
>>>>>> app//org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
>>>>>>
>>>>>> app//org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
>>>>>>
>>>>>> app//org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$executeCollect$1(AdaptiveSparkPlanExec.scala:535)
>>>>>> app//org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec$$Lambda$5823/0x00007f8c19973f98.apply(Unknown
>>>>>> Source)
>>>>>>
>>>>>> app//org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:574)
>>>>>>
>>>>>> app//org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:535)
>>>>>>
>>>>>> app//org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:126)
>>>>>> app//org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1$$Lambda$3552/0x00007f8c19243128.apply(Unknown
>>>>>> Source)
>>>>>>
>>>>>> app//org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:108)
>>>>>>
>>>>>> app//org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:264)
>>>>>>
>>>>>> app//org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:138)
>>>>>>
>>>>>> app//org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$9(SQLExecution.scala:174)
>>>>>> app//org.apache.spark.sql.execution.SQLExecution$$$Lambda$3562/0x00007f8c19245730.apply(Unknown
>>>>>> Source)
>>>>>>
>>>>>> app//org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:108)
>>>>>>
>>>>>> app//org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:264)
>>>>>>
>>>>>> app//org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$8(SQLExecution.scala:174)
>>>>>> app//org.apache.spark.sql.execution.SQLExecution$$$Lambda$3561/0x00007f8c19245000.apply(Unknown
>>>>>> Source)
>>>>>>
>>>>>> app//org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:285)
>>>>>>
>>>>>> app//org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:173)
>>>>>> app//org.apache.spark.sql.execution.SQLExecution$$$Lambda$3553/0x00007f8c192433f8.apply(Unknown
>>>>>> Source)
>>>>>>
>>>>>> app//org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:901)
>>>>>>
>>>>>> app//org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:70)
>>>>>>
>>>>>> app//org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:123)
>>>>>>
>>>>>> app//org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:114)
>>>>>>
>>>>>> app//org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:519)
>>>>>> app//org.apache.spark.sql.catalyst.trees.TreeNode$$Lambda$3197/0x00007f8c190ddbb0.apply(Unknown
>>>>>> Source)
>>>>>>
>>>>>> app//org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:77)
>>>>>>
>>>>>> app//org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:519)
>>>>>> app//org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org
>>>>>> $apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:34)
>>>>>>
>>>>>> app//org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:297)
>>>>>>
>>>>>> app//org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:293)
>>>>>>
>>>>>> app//org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:34)
>>>>>>
>>>>>> app//org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:34)
>>>>>>
>>>>>> app//org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:495)
>>>>>>
>>>>>> app//org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:114)
>>>>>> app//org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:101)
>>>>>> => holding 
>>>>>> Monitor(org.apache.spark.sql.execution.QueryExecution@310792592
>>>>>> )
>>>>>>
>>>>>> app//org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:99)
>>>>>>
>>>>>> app//org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:164)
>>>>>>
>>>>>> app//org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:874)
>>>>>>
>>>>>> app//org.apache.spark.sql.DataFrameWriter.insertInto(DataFrameWriter.scala:495)
>>>>>>
>>>>>> app//org.apache.spark.sql.DataFrameWriter.insertInto(DataFrameWriter.scala:456)
>>>>>> x.Application.run(Application.java:N) <---------------------- this is
>>>>>> the application code
>>>>>> java.base@17.0.13/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>>>>>> Method)
>>>>>> java.base@17.0.13
>>>>>> /jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
>>>>>> java.base@17.0.13
>>>>>> /jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>> java.base@17.0.13/java.lang.reflect.Method.invoke(Method.java:569)
>>>>>>
>>>>>> app//org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:741)
>>>>>>
>>>>>>    - at line x.Application.main(Application.java:N) there is:
>>>>>>
>>>>>> dataset.write().mode(SaveMode.Overwrite).insertInto("table_name"); //
>>>>>> dataset is a Dataset<Row>
>>>>>>
>>>>>>    - on cluster logs we see:
>>>>>>
>>>>>> 24/12/04 10:16:22 INFO Hive: Metastore does not support
>>>>>> listPartitionLocations operation; falling back to using
>>>>>> listPartitionsWithAuthInfo operation from now on
>>>>>> org.apache.hadoop.hive.metastore.IMetaStoreClient$IncompatibleMetastoreException:
>>>>>> Metastore doesn't support listPartitionLocation: Invalid method name:
>>>>>> 'get_partition_locations'
>>>>>> at
>>>>>> org.apache.hadoop.hive.metastore.HiveMetaStoreClient.newIncompatibleMetastoreException(HiveMetaStoreClient.java:2579)
>>>>>> ~[hive-metastore-2.3.9-amzn-3.jar:2.3.9-amzn-3]
>>>>>>
>>>>>>    - on cluster logs we see around 5 times this log:
>>>>>>
>>>>>> 24/12/04 11:33:42 WARN RetryingMetaStoreClient: MetaStoreClient lost
>>>>>> connection. Attempting to reconnect (3 of 5) after 15s.
>>>>>> listPartitionsWithAuthInfo
>>>>>> org.apache.thrift.transport.TTransportException:
>>>>>> java.net.SocketTimeoutException: Read timed out
>>>>>>
>>>>>>    - and on hivemetastore server logs we see (I am told that the -1
>>>>>>    there means "get all the partitions"):
>>>>>>
>>>>>> 2024-12-04T10:33:11,357 [pool-4-thread-173] DEBUG
>>>>>> x.server.invocation-log:168 - #get_partitions_ps_with_auth('x',
>>>>>> 'table_name', '[, , , ]', -1, 'client-x', []): entered
>>>>>>
>>>>>>    - the table the app tries to write to has 1 year worth of data
>>>>>>    - Spark 2 is capable to write around 650 partitions daily, around
>>>>>>    1Tb worth of data, so there's a total of around 250000 partitions
>>>>>>    - the application uses spark.sql.sources.partitionOverwriteMode:
>>>>>>    dynamic so it's expected the 650 partitions are completely
>>>>>>    overwritten, leaving the others untouched
>>>>>>
>>>>>> I have some options but wanted to check if you have any pointers
>>>>>> forward, specifically how to speed this up :D
>>>>>>
>>>>>> Thanks a lot and sorry for the wall of text!
>>>>>>
>>>>>> Matteo
>>>>>>
>>>>>> [1]
>>>>>> https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-release-app-versions-7.x.html
>>>>>>
>>>>>>
>>>>
>>>> --
>>>> Matteo Moci
>>>> https://twitter.com/matteomoci <http://mox.fm>
>>>>
>>>>
>>>
>>> --
>>> Matteo Moci
>>> https://twitter.com/matteomoci <http://mox.fm>
>>>
>>>
>>
>> --
>> Matteo Moci
>> https://twitter.com/matteomoci <http://mox.fm>
>>
>>

-- 
Matteo Moci
https://twitter.com/matteomoci <http://mox.fm>

Reply via email to