Re: spark streaming problem saveAsTextFiles() does not write valid JSON to HDFS

2015-11-19 Thread Andy Davidson
Turns out data is in python format. ETL pipeline was over writing original
data

Andy
From:  Andrew Davidson 
Date:  Thursday, November 19, 2015 at 6:58 PM
To:  "user @spark" 
Subject:  spark streaming problem  saveAsTextFiles() does not write valid
JSON to HDFS

> I am working on a simple POS. I am running into a really strange problem. I
> wrote a java streaming app that collects tweets using the spark twitter
> package and stores the to disk in JSON format. I noticed that when I run the
> code on my mac. The file are written to the local files system as I expect
> I.E. In valid JSON format. The key names are double quoted. Boolean values are
> the works true or false in lower case.
> 
> 
> 
> When I run in my cluster the only difference is I call data.saveAsTextFiles()
> using an hdfs: URI instead of using file:/// . When the files are written to
> HDFS the JSON is not valid. E.G. Key names are single quoted not double
> quoted. Boolean values are the string False or True, notice they start with
> upper case. I suspect there will be other problems. Any idea what I am doing
> wrong?
> 
> 
> 
> I am using spark-1.5.1-bin-hadoop2.6
> 
> 
> 
> import twitter4j.Status;
> 
> import com.fasterxml.jackson.databind.ObjectMapper;
> 
> 
> 
> 
>private static ObjectMapper mapper = new ObjectMapper();
> 
> static {
> 
> mapper.setVisibility(PropertyAccessor.FIELD,
> JsonAutoDetect.Visibility.ANY);
> 
> mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
> false);
> 
> }
> 
> 
>JavaDStream jsonTweets = tweets.map(mapStatusToJson);
> 
> DStream data = jsonTweets.dstream();
> 
> data.saveAsTextFiles(outputUri, null);
> 
> 
> 
> class MapStatusToJson implements Function {
> 
> private static final long serialVersionUID = -2882921625604816575L;
> 
> 
> 
> @Override
> 
> public String call(Status status) throws Exception {
> 
> return mapper.writeValueAsString(status);
> 
> 
> 
> }
> 
> 
> 
> I have been using pyspark to explore the data.
> 
> 
> 
> dataURL = "hdfs:///smallSample"
> 
> tweetStrings = sc.textFile(dataURL) # I looked at source it decoded as UTF-8
> 
> def str2JSON(statusStr):
> 
> """If string is not valid JSON return 'none' and creates a log.warn()"""
> 
> try:
> 
> ret = json.loads(statusStr)
> 
> return ret
> 
> except Exception as exc :
> 
> logging.warning("bad JSON")
> 
> logging.warning(exc)
> 
> logging.warning(statusStr)
> 
> numJSONExceptions.add(1)
> 
> return None #null value
> 
> 
> 
> 
> 
> #jTweets = tweetStrings.map(lambda x: json.loads(x)).take(10)
> 
> jTweets = tweetStrings.map(str2JSON).take(10)
> 
> 
> 
> If I call print tweetStrings.take(1)
> 
> I would get back the following string. (its really long only provided part of)
> 
> 
> {'favorited': False, 'inReplyToStatusId': -1, 'inReplyToScreenName': None,
> 'urlentities': [{'end': 126, 'expandedURL': 'http://panth.rs/pZ9Cvv',
> 
> 
> If I copy one of the hdfs part files locally I would see something similar. So
> I think the problem has something to do with DStream.saveAsTextFiles().
> 
> 
> 
> I do not know if this is the problem or not, How ever it looks like the system
> might depend of several version of jackson and fasterxml.jackson
> 
> 
> 
> Has anyone else run into this problem?
> 
> 
> 
> Kind regards
> 
> 
> 
> Andy
> 
> 
> 
> provided
> 
> +--- org.apache.spark:spark-streaming_2.10:1.5.1
> 
> |+--- org.apache.spark:spark-core_2.10:1.5.1
> 
> ||+--- org.apache.avro:avro-mapred:1.7.7
> 
> |||+--- org.apache.avro:avro-ipc:1.7.7
> 
> ||||+--- org.apache.avro:avro:1.7.7
> 
> |||||+--- org.codehaus.jackson:jackson-core-asl:1.9.13
> 
> |||||+--- org.codehaus.jackson:jackson-mapper-asl:1.9.13
> 
> ||||||\---
> org.codehaus.jackson:jackson-core-asp:1.9.13
> 
> 
> 
> 
> 
> ||+--- org.apache.hadoop:hadoop-client:2.2.0
> 
> |||+--- org.apache.hadoop:hadoop-common:2.2.0
> 
> ||||+--- org.slf4j:slf4j-api:1.7.5 -> 1.7.10
> 
> ||||+--- org.codehaus.jackson:jackson-core-asl:1.8.8 -> 1.9.13
> 
> 
> 
> 
> 
> |||\--- com.fast

spark streaming problem saveAsTextFiles() does not write valid JSON to HDFS

2015-11-19 Thread Andy Davidson
I am working on a simple POS. I am running into a really strange problem. I
wrote a java streaming app that collects tweets using the spark twitter
package and stores the to disk in JSON format. I noticed that when I run the
code on my mac. The file are written to the local files system as I expect
I.E. In valid JSON format. The key names are double quoted. Boolean values
are the works true or false in lower case.



When I run in my cluster the only difference is I call
data.saveAsTextFiles() using an hdfs: URI instead of using file:/// . When
the files are written to HDFS the JSON is not valid. E.G. Key names are
single quoted not double quoted. Boolean values are the string False or
True, notice they start with upper case. I suspect there will be other
problems. Any idea what I am doing wrong?



I am using spark-1.5.1-bin-hadoop2.6



import twitter4j.Status;

import com.fasterxml.jackson.databind.ObjectMapper;




   private static ObjectMapper mapper = new ObjectMapper();

static {

mapper.setVisibility(PropertyAccessor.FIELD,
JsonAutoDetect.Visibility.ANY);

mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
false);

}


   JavaDStream jsonTweets = tweets.map(mapStatusToJson);

DStream data = jsonTweets.dstream();

data.saveAsTextFiles(outputUri, null);



class MapStatusToJson implements Function {

private static final long serialVersionUID = -2882921625604816575L;



@Override

public String call(Status status) throws Exception {

return mapper.writeValueAsString(status);



}



I have been using pyspark to explore the data.



dataURL = "hdfs:///smallSample"

tweetStrings = sc.textFile(dataURL) # I looked at source it decoded as UTF-8

def str2JSON(statusStr):

"""If string is not valid JSON return 'none' and creates a log.warn()"""

try:

ret = json.loads(statusStr)

return ret

except Exception as exc :

logging.warning("bad JSON")

logging.warning(exc)

logging.warning(statusStr)

numJSONExceptions.add(1)

return None #null value





#jTweets = tweetStrings.map(lambda x: json.loads(x)).take(10)

jTweets = tweetStrings.map(str2JSON).take(10)



If I call print tweetStrings.take(1)

I would get back the following string. (its really long only provided part
of)


{'favorited': False, 'inReplyToStatusId': -1, 'inReplyToScreenName': None,
'urlentities': [{'end': 126, 'expandedURL': 'http://panth.rs/pZ9Cvv',


If I copy one of the hdfs part files locally I would see something similar.
So I think the problem has something to do with DStream.saveAsTextFiles().



I do not know if this is the problem or not, How ever it looks like the
system might depend of several version of jackson and fasterxml.jackson



Has anyone else run into this problem?



Kind regards



Andy



provided

+--- org.apache.spark:spark-streaming_2.10:1.5.1

|+--- org.apache.spark:spark-core_2.10:1.5.1

||+--- org.apache.avro:avro-mapred:1.7.7

|||+--- org.apache.avro:avro-ipc:1.7.7

||||+--- org.apache.avro:avro:1.7.7

|||||+--- org.codehaus.jackson:jackson-core-asl:1.9.13

|||||+--- org.codehaus.jackson:jackson-mapper-asl:1.9.13

||||||\---
org.codehaus.jackson:jackson-core-asp:1.9.13





||+--- org.apache.hadoop:hadoop-client:2.2.0

|||+--- org.apache.hadoop:hadoop-common:2.2.0

||||+--- org.slf4j:slf4j-api:1.7.5 -> 1.7.10

||||+--- org.codehaus.jackson:jackson-core-asl:1.8.8 ->
1.9.13





|||\--- com.fasterxml.jackson.core:jackson-databind:2.3.1 ->
2.4.4

||| +---
com.fasterxml.jackson.core:jackson-annotations:2.4.0 -> 2.4.4

||| \--- com.fasterxml.jackson.core:jackson-core:2.4.4

||+--- com.sun.jersey:jersey-server:1.9 (*)

||+--- com.sun.jersey:jersey-core:1.9

||+--- org.apache.mesos:mesos:0.21.1

||+--- io.netty:netty-all:4.0.29.Final

||+--- com.clearspring.analytics:stream:2.7.0

||+--- io.dropwizard.metrics:metrics-core:3.1.2

|||\--- org.slf4j:slf4j-api:1.7.7 -> 1.7.10

||+--- io.dropwizard.metrics:metrics-jvm:3.1.2

|||+--- io.dropwizard.metrics:metrics-core:3.1.2 (*)

|||\--- org.slf4j:slf4j-api:1.7.7 -> 1.7.10

||+--- io.dropwizard.metrics:metrics-json:3.1.2

|||+--- io.dropwizard.metrics:metrics-core:3.1.2 (*)

|||+--- com.fasterxml.jackson.core:jackson-databind:2.4.2 ->
2.4.4 (*)

|||\--- org.slf4j:slf4j-api:1.7.7 -> 1.7.10

||+--- io.dropwizard.metrics:metrics-graphite:3.1.2

|||+--- io.dropwizard.metrics:metrics-core:3.1.2 (*)

|||\--- org.slf4j:slf4j-api:1.7.7 -> 1.7.10

||+--- com.fasterxml.jackson.core:jackson-databind:2.4.4 (*)

||+--- com.fasterxml.jackson.module:ja