redirectToLog: new-bee question

2019-10-10 Thread Jae Yoon Yoo
Hello,

My application is launched by SparkLauncher and I am trying to collect all logs 
(from launcher, application, and Spark core(?)) into one log file.
I can see all those from the console (using Eclipse IDE) but can’t make them to 
be written in the log file. With the (simplified) code under, only log (in the 
log file) I see is from the launcher.

For testing purpose, I used redirectOutput(java.io.File) and with it I can see 
the application’s log also in the log file but not Spark core’s.
Spark core’s (if I am not using the name properly, please correct) output - I 
mean, like under and the exception messages if happens.

INFO: 19/10/10 15:50:08 INFO Utils: Successfully started service 'sparkDriver' 
on port 62573.

Can you shed some light for me? Thanks in advance,
/ Jae

Reference: 
https://stackoverflow.com/questions/45781842/save-spark-launcher-output-to-file
Spark version: 2.4.3
Code example: the launcher and the application (TestLog),

public class LauncherTest {

  final static Logger log = Logger.getLogger(LauncherTest.class);

  public static void main(String[] args) throws Exception {

String log4jConfPath = "log4j.properties";
PropertyConfigurator.configure(log4jConfPath);

String appName = LauncherTest.class.getName();

log.info("appName=" + appName);  // this I see in the log file and 
the console

SparkLauncher launcher = new SparkLauncher()
.setAppResource(myJar)
.setMainClass(myClass)
.setSparkHome(sparkHome)
.setMaster("local[*]")
.setAppName(appName);

launcher.redirectToLog(LauncherTest.class.getName());

SparkAppHandle handle = launcher.startApplication();
State state = handle.getState();

while (state == null || !state.isFinal()) {
  Thread.sleep(1000);
  state = handle.getState();
  if (state != null) {
log.info("state: " + state.toString());
  }
}
  }
}

public class TestLog {

  public static void main(String[] args) throws Exception {

System.out.println("TestLog: start"); // this I see in the console 
but not in the log file

SparkSession spark = SparkSession.builder().appName("Test 
Log").getOrCreate();
Dataset logData = spark.read().textFile(filePath).cache();

System.out.println("TestLog: end,count=" + logData.count(););

spark.stop();
  }
}


***  Please note that this message and any attachments may contain confidential 
and proprietary material and information and are intended only for the use of 
the intended recipient(s). If you are not the intended recipient, you are 
hereby notified that any review, use, disclosure, dissemination, distribution 
or copying of this message and any attachments is strictly prohibited. If you 
have received this email in error, please immediately notify the sender and 
destroy this e-mail and any attachments and all copies, whether electronic or 
printed. Please also note that any views, opinions, conclusions or commitments 
expressed in this message are those of the individual sender and do not 
necessarily reflect the views of Fortinet, Inc., its affiliates, and emails are 
not binding on Fortinet and only a writing manually signed by Fortinet's 
General Counsel can be a binding commitment of Fortinet to Fortinet's customers 
or partners. Thank you. *** 



Re: org.apache.spark.util.SparkUncaughtExceptionHandler

2019-10-10 Thread Mich Talebzadeh
Hi Nimmi,

Can you send us the spark parameters with overhead. assuming you are
running with yarn

Example

[4] - 864GB

--num-executors 32

--executor-memory 21G

--executor-cores 4
--conf spark.yarn.executor.memoryOverhead=3000

 The parameter spark.yarn.executor.memoryOverhead is explained as below:

 spark.yarn.executor.memoryOverhead = executorMemory * 0.10, with minimum
of 384

 The amount of off-heap memory (in megabytes) to be allocated per executor.
This is memory that accounts for things like VM overheads, interned
strings, other native overheads, etc. This tends to grow with the executor
size (typically
6-10%).

HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Thu, 10 Oct 2019 at 21:39, Nimmi Cv  wrote:

> 0
>
> I get the following error on executors while running my spark job. I am
> reading data from Database. The data has string in UTF8
>
> Iterator t.next().getString(row.fieldIndex("short_name"));
>
> ERROR org.apache.spark.util.SparkUncaughtExceptionHandler - Uncaught
> exception in thread Thread[Executor task launch worker for task 359,5,main]
> java.lang.OutOfMemoryError: Java heap space at
> org.apache.spark.unsafe.types.UTF8String.fromAddress(UTF8String.java:135)
> at
> org.apache.spark.sql.catalyst.expressions.UnsafeRow.getUTF8String(UnsafeRow.java:419)
> at
> org.apache.spark.sql.execution.columnar.STRING$.getField(ColumnType.scala:452)
> at
> org.apache.spark.sql.execution.columnar.STRING$.getField(ColumnType.scala:424)
> at
> org.apache.spark.sql.execution.columnar.compression.RunLengthEncoding$Encoder.gatherCompressibilityStats(compressionSchemes.scala:194)
> at
> org.apache.spark.sql.execution.columnar.compression.CompressibleColumnBuilder$$anonfun$gatherCompressibilityStats$1.apply(CompressibleColumnBuilder.scala:74)
> at
> org.apache.spark.sql.execution.columnar.compression.CompressibleColumnBuilder$$anonfun$gatherCompressibilityStats$1.apply(CompressibleColumnBuilder.scala:74)
> at scala.collection.immutable.List.foreach(List.scala:392) at
> org.apache.spark.sql.execution.columnar.compression.CompressibleColumnBuilder$class.gatherCompressibilityStats(CompressibleColumnBuilder.scala:74)
>
> I am processing 100 GB of data with 10 executors of 14G. I startted with
> 12G executors and I get the same error even with 14G and 3G Overhead memory.
> Thanks,
> Nimmi
>


org.apache.spark.util.SparkUncaughtExceptionHandler

2019-10-10 Thread Nimmi Cv
0

I get the following error on executors while running my spark job. I am
reading data from Database. The data has string in UTF8

Iterator t.next().getString(row.fieldIndex("short_name"));

ERROR org.apache.spark.util.SparkUncaughtExceptionHandler - Uncaught
exception in thread Thread[Executor task launch worker for task 359,5,main]
java.lang.OutOfMemoryError: Java heap space at
org.apache.spark.unsafe.types.UTF8String.fromAddress(UTF8String.java:135)
at
org.apache.spark.sql.catalyst.expressions.UnsafeRow.getUTF8String(UnsafeRow.java:419)
at
org.apache.spark.sql.execution.columnar.STRING$.getField(ColumnType.scala:452)
at
org.apache.spark.sql.execution.columnar.STRING$.getField(ColumnType.scala:424)
at
org.apache.spark.sql.execution.columnar.compression.RunLengthEncoding$Encoder.gatherCompressibilityStats(compressionSchemes.scala:194)
at
org.apache.spark.sql.execution.columnar.compression.CompressibleColumnBuilder$$anonfun$gatherCompressibilityStats$1.apply(CompressibleColumnBuilder.scala:74)
at
org.apache.spark.sql.execution.columnar.compression.CompressibleColumnBuilder$$anonfun$gatherCompressibilityStats$1.apply(CompressibleColumnBuilder.scala:74)
at scala.collection.immutable.List.foreach(List.scala:392) at
org.apache.spark.sql.execution.columnar.compression.CompressibleColumnBuilder$class.gatherCompressibilityStats(CompressibleColumnBuilder.scala:74)

I am processing 100 GB of data with 10 executors of 14G. I startted with
12G executors and I get the same error even with 14G and 3G Overhead memory.
Thanks,
Nimmi


spark streaming exception

2019-10-10 Thread Amit Sharma
Hi , we have spark streaming job to which we send a request through our UI
using kafka. It process and returned the response. We are getting below
error and this stareming is not processing any request.

Listener StreamingJobProgressListener threw an exception
java.util.NoSuchElementException: key not found: 1570689515000 ms
at scala.collection.MapLike$class.default(MapLike.scala:228)
at scala.collection.AbstractMap.default(Map.scala:59)
at scala.collection.mutable.HashMap.apply(HashMap.scala:65)
at
org.apache.spark.streaming.ui.StreamingJobProgressListener.onOutputOperationCompleted(StreamingJobProgressListener.scala:134)
at
org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:67)
at
org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:29).

Please help me in find out the root cause of this issue.