Re: spark 1.5.2 memory leak? reading JSON

2015-12-20 Thread Yin Huai
Hi Eran,

Can you try 1.6? With the change in
https://github.com/apache/spark/pull/10288, JSON data source will not throw
a runtime exception if there is any record that it cannot parse. Instead,
it will put the entire record to the column of "_corrupt_record".

Thanks,

Yin

On Sun, Dec 20, 2015 at 9:37 AM, Eran Witkon  wrote:

> Thanks for this!
> This was the problem...
>
> On Sun, 20 Dec 2015 at 18:49 Chris Fregly  wrote:
>
>> hey Eran, I run into this all the time with Json.
>>
>> the problem is likely that your Json is "too pretty" and extending beyond
>> a single line which trips up the Json reader.
>>
>> my solution is usually to de-pretty the Json - either manually or through
>> an ETL step - by stripping all white space before pointing my
>> DataFrame/JSON reader at the file.
>>
>> this tool is handy for one-off scenerios:  http://jsonviewer.stack.hu
>>
>> for streaming use cases, you'll want to have a light de-pretty ETL step
>> either within the Spark Streaming job after ingestion - or upstream using
>> something like a Flume interceptor, NiFi Processor (I love NiFi), or Kafka
>> transformation assuming those exist by now.
>>
>> a similar problem exists for XML, btw.  there's lots of wonky workarounds
>> for this that use MapPartitions and all kinds of craziness.  the best
>> option, in my opinion, is to just ETL/flatten the data to make the
>> DataFrame reader happy.
>>
>> On Dec 19, 2015, at 4:55 PM, Eran Witkon  wrote:
>>
>> Hi,
>> I tried the following code in spark-shell on spark1.5.2:
>>
>> *val df =
>> sqlContext.read.json("/home/eranw/Workspace/JSON/sample/sample2.json")*
>> *df.count()*
>>
>> 15/12/19 23:49:40 ERROR Executor: Managed memory leak detected; size =
>> 67108864 bytes, TID = 3
>> 15/12/19 23:49:40 ERROR Executor: Exception in task 0.0 in stage 4.0 (TID
>> 3)
>> java.lang.RuntimeException: Failed to parse a value for data type
>> StructType() (current token: VALUE_STRING).
>> at scala.sys.package$.error(package.scala:27)
>> at
>> org.apache.spark.sql.execution.datasources.json.JacksonParser$.convertField(JacksonParser.scala:172)
>> at
>> org.apache.spark.sql.execution.datasources.json.JacksonParser$$anonfun$parseJson$1$$anonfun$apply$1.apply(JacksonParser.scala:251)
>> at
>> org.apache.spark.sql.execution.datasources.json.JacksonParser$$anonfun$parseJson$1$$anonfun$apply$1.apply(JacksonParser.scala:246)
>> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>> at
>> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:365)
>> at
>> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.start(TungstenAggregationIterator.scala:622)
>> at
>> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$
>> 1.org
>> $apache$spark$sql$execution$aggregate$TungstenAggregate$$anonfun$$executePartition$1(TungstenAggregate.scala:110)
>>
>> Am I am doing something wrong?
>> Eran
>>
>>


Re: spark 1.5.2 memory leak? reading JSON

2015-12-20 Thread Eran Witkon
Once I removed the CR LF from the file it worked ok.
eran
On Mon, 21 Dec 2015 at 06:29 Yin Huai  wrote:

> Hi Eran,
>
> Can you try 1.6? With the change in
> https://github.com/apache/spark/pull/10288, JSON data source will not
> throw a runtime exception if there is any record that it cannot parse.
> Instead, it will put the entire record to the column of "_corrupt_record".
>
> Thanks,
>
> Yin
>
> On Sun, Dec 20, 2015 at 9:37 AM, Eran Witkon  wrote:
>
>> Thanks for this!
>> This was the problem...
>>
>> On Sun, 20 Dec 2015 at 18:49 Chris Fregly  wrote:
>>
>>> hey Eran, I run into this all the time with Json.
>>>
>>> the problem is likely that your Json is "too pretty" and extending
>>> beyond a single line which trips up the Json reader.
>>>
>>> my solution is usually to de-pretty the Json - either manually or
>>> through an ETL step - by stripping all white space before pointing my
>>> DataFrame/JSON reader at the file.
>>>
>>> this tool is handy for one-off scenerios:  http://jsonviewer.stack.hu
>>>
>>> for streaming use cases, you'll want to have a light de-pretty ETL step
>>> either within the Spark Streaming job after ingestion - or upstream using
>>> something like a Flume interceptor, NiFi Processor (I love NiFi), or Kafka
>>> transformation assuming those exist by now.
>>>
>>> a similar problem exists for XML, btw.  there's lots of wonky
>>> workarounds for this that use MapPartitions and all kinds of craziness.
>>>  the best option, in my opinion, is to just ETL/flatten the data to make
>>> the DataFrame reader happy.
>>>
>>> On Dec 19, 2015, at 4:55 PM, Eran Witkon  wrote:
>>>
>>> Hi,
>>> I tried the following code in spark-shell on spark1.5.2:
>>>
>>> *val df =
>>> sqlContext.read.json("/home/eranw/Workspace/JSON/sample/sample2.json")*
>>> *df.count()*
>>>
>>> 15/12/19 23:49:40 ERROR Executor: Managed memory leak detected; size =
>>> 67108864 bytes, TID = 3
>>> 15/12/19 23:49:40 ERROR Executor: Exception in task 0.0 in stage 4.0
>>> (TID 3)
>>> java.lang.RuntimeException: Failed to parse a value for data type
>>> StructType() (current token: VALUE_STRING).
>>> at scala.sys.package$.error(package.scala:27)
>>> at
>>> org.apache.spark.sql.execution.datasources.json.JacksonParser$.convertField(JacksonParser.scala:172)
>>> at
>>> org.apache.spark.sql.execution.datasources.json.JacksonParser$$anonfun$parseJson$1$$anonfun$apply$1.apply(JacksonParser.scala:251)
>>> at
>>> org.apache.spark.sql.execution.datasources.json.JacksonParser$$anonfun$parseJson$1$$anonfun$apply$1.apply(JacksonParser.scala:246)
>>> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>> at
>>> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:365)
>>> at
>>> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.start(TungstenAggregationIterator.scala:622)
>>> at
>>> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$
>>> 1.org
>>> $apache$spark$sql$execution$aggregate$TungstenAggregate$$anonfun$$executePartition$1(TungstenAggregate.scala:110)
>>>
>>> Am I am doing something wrong?
>>> Eran
>>>
>>>
>


Re: spark 1.5.2 memory leak? reading JSON

2015-12-20 Thread Chris Fregly
hey Eran, I run into this all the time with Json.

the problem is likely that your Json is "too pretty" and extending beyond a 
single line which trips up the Json reader.

my solution is usually to de-pretty the Json - either manually or through an 
ETL step - by stripping all white space before pointing my DataFrame/JSON 
reader at the file.

this tool is handy for one-off scenerios:  http://jsonviewer.stack.hu

for streaming use cases, you'll want to have a light de-pretty ETL step either 
within the Spark Streaming job after ingestion - or upstream using something 
like a Flume interceptor, NiFi Processor (I love NiFi), or Kafka transformation 
assuming those exist by now.

a similar problem exists for XML, btw.  there's lots of wonky workarounds for 
this that use MapPartitions and all kinds of craziness.  the best option, in my 
opinion, is to just ETL/flatten the data to make the DataFrame reader happy.

> On Dec 19, 2015, at 4:55 PM, Eran Witkon  wrote:
> 
> Hi,
> I tried the following code in spark-shell on spark1.5.2:
> 
> val df = 
> sqlContext.read.json("/home/eranw/Workspace/JSON/sample/sample2.json")
> df.count()
> 
> 15/12/19 23:49:40 ERROR Executor: Managed memory leak detected; size = 
> 67108864 bytes, TID = 3
> 15/12/19 23:49:40 ERROR Executor: Exception in task 0.0 in stage 4.0 (TID 3)
> java.lang.RuntimeException: Failed to parse a value for data type 
> StructType() (current token: VALUE_STRING).
>   at scala.sys.package$.error(package.scala:27)
>   at 
> org.apache.spark.sql.execution.datasources.json.JacksonParser$.convertField(JacksonParser.scala:172)
>   at 
> org.apache.spark.sql.execution.datasources.json.JacksonParser$$anonfun$parseJson$1$$anonfun$apply$1.apply(JacksonParser.scala:251)
>   at 
> org.apache.spark.sql.execution.datasources.json.JacksonParser$$anonfun$parseJson$1$$anonfun$apply$1.apply(JacksonParser.scala:246)
>   at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>   at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:365)
>   at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.start(TungstenAggregationIterator.scala:622)
>   at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.org$apache$spark$sql$execution$aggregate$TungstenAggregate$$anonfun$$executePartition$1(TungstenAggregate.scala:110)
> 
> Am I am doing something wrong?
> Eran


Re: spark 1.5.2 memory leak? reading JSON

2015-12-20 Thread Eran Witkon
Thanks for this!
This was the problem...
On Sun, 20 Dec 2015 at 18:49 Chris Fregly  wrote:

> hey Eran, I run into this all the time with Json.
>
> the problem is likely that your Json is "too pretty" and extending beyond
> a single line which trips up the Json reader.
>
> my solution is usually to de-pretty the Json - either manually or through
> an ETL step - by stripping all white space before pointing my
> DataFrame/JSON reader at the file.
>
> this tool is handy for one-off scenerios:  http://jsonviewer.stack.hu
>
> for streaming use cases, you'll want to have a light de-pretty ETL step
> either within the Spark Streaming job after ingestion - or upstream using
> something like a Flume interceptor, NiFi Processor (I love NiFi), or Kafka
> transformation assuming those exist by now.
>
> a similar problem exists for XML, btw.  there's lots of wonky workarounds
> for this that use MapPartitions and all kinds of craziness.  the best
> option, in my opinion, is to just ETL/flatten the data to make the
> DataFrame reader happy.
>
> On Dec 19, 2015, at 4:55 PM, Eran Witkon  wrote:
>
> Hi,
> I tried the following code in spark-shell on spark1.5.2:
>
> *val df =
> sqlContext.read.json("/home/eranw/Workspace/JSON/sample/sample2.json")*
> *df.count()*
>
> 15/12/19 23:49:40 ERROR Executor: Managed memory leak detected; size =
> 67108864 bytes, TID = 3
> 15/12/19 23:49:40 ERROR Executor: Exception in task 0.0 in stage 4.0 (TID
> 3)
> java.lang.RuntimeException: Failed to parse a value for data type
> StructType() (current token: VALUE_STRING).
> at scala.sys.package$.error(package.scala:27)
> at
> org.apache.spark.sql.execution.datasources.json.JacksonParser$.convertField(JacksonParser.scala:172)
> at
> org.apache.spark.sql.execution.datasources.json.JacksonParser$$anonfun$parseJson$1$$anonfun$apply$1.apply(JacksonParser.scala:251)
> at
> org.apache.spark.sql.execution.datasources.json.JacksonParser$$anonfun$parseJson$1$$anonfun$apply$1.apply(JacksonParser.scala:246)
> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> at
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:365)
> at
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.start(TungstenAggregationIterator.scala:622)
> at
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$
> 1.org
> $apache$spark$sql$execution$aggregate$TungstenAggregate$$anonfun$$executePartition$1(TungstenAggregate.scala:110)
>
> Am I am doing something wrong?
> Eran
>
>


spark 1.5.2 memory leak? reading JSON

2015-12-19 Thread Eran Witkon
Hi,
I tried the following code in spark-shell on spark1.5.2:

*val df =
sqlContext.read.json("/home/eranw/Workspace/JSON/sample/sample2.json")*
*df.count()*

15/12/19 23:49:40 ERROR Executor: Managed memory leak detected; size =
67108864 bytes, TID = 3
15/12/19 23:49:40 ERROR Executor: Exception in task 0.0 in stage 4.0 (TID 3)
java.lang.RuntimeException: Failed to parse a value for data type
StructType() (current token: VALUE_STRING).
at scala.sys.package$.error(package.scala:27)
at
org.apache.spark.sql.execution.datasources.json.JacksonParser$.convertField(JacksonParser.scala:172)
at
org.apache.spark.sql.execution.datasources.json.JacksonParser$$anonfun$parseJson$1$$anonfun$apply$1.apply(JacksonParser.scala:251)
at
org.apache.spark.sql.execution.datasources.json.JacksonParser$$anonfun$parseJson$1$$anonfun$apply$1.apply(JacksonParser.scala:246)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at
org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:365)
at
org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.start(TungstenAggregationIterator.scala:622)
at
org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$
1.org
$apache$spark$sql$execution$aggregate$TungstenAggregate$$anonfun$$executePartition$1(TungstenAggregate.scala:110)

Am I am doing something wrong?
Eran


Re: spark 1.5.2 memory leak? reading JSON

2015-12-19 Thread Ted Yu
The 'Failed to parse a value' was the cause for execution failure.

Can you disclose the structure of your json file ?

Maybe try latest 1.6.0 RC to see if the problem goes away.

Thanks

On Sat, Dec 19, 2015 at 1:55 PM, Eran Witkon  wrote:

> Hi,
> I tried the following code in spark-shell on spark1.5.2:
>
> *val df =
> sqlContext.read.json("/home/eranw/Workspace/JSON/sample/sample2.json")*
> *df.count()*
>
> 15/12/19 23:49:40 ERROR Executor: Managed memory leak detected; size =
> 67108864 bytes, TID = 3
> 15/12/19 23:49:40 ERROR Executor: Exception in task 0.0 in stage 4.0 (TID
> 3)
> java.lang.RuntimeException: Failed to parse a value for data type
> StructType() (current token: VALUE_STRING).
> at scala.sys.package$.error(package.scala:27)
> at
> org.apache.spark.sql.execution.datasources.json.JacksonParser$.convertField(JacksonParser.scala:172)
> at
> org.apache.spark.sql.execution.datasources.json.JacksonParser$$anonfun$parseJson$1$$anonfun$apply$1.apply(JacksonParser.scala:251)
> at
> org.apache.spark.sql.execution.datasources.json.JacksonParser$$anonfun$parseJson$1$$anonfun$apply$1.apply(JacksonParser.scala:246)
> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> at
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:365)
> at
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.start(TungstenAggregationIterator.scala:622)
> at
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$
> 1.org
> $apache$spark$sql$execution$aggregate$TungstenAggregate$$anonfun$$executePartition$1(TungstenAggregate.scala:110)
>
> Am I am doing something wrong?
> Eran
>