Re: spark streaming problem saveAsTextFiles() does not write valid JSON to HDFS
Turns out data is in python format. ETL pipeline was over writing original data Andy From: Andrew Davidson Date: Thursday, November 19, 2015 at 6:58 PM To: "user @spark" Subject: spark streaming problem saveAsTextFiles() does not write valid JSON to HDFS > I am working on a simple POS. I am running into a really strange problem. I > wrote a java streaming app that collects tweets using the spark twitter > package and stores the to disk in JSON format. I noticed that when I run the > code on my mac. The file are written to the local files system as I expect > I.E. In valid JSON format. The key names are double quoted. Boolean values are > the works true or false in lower case. > > > > When I run in my cluster the only difference is I call data.saveAsTextFiles() > using an hdfs: URI instead of using file:/// . When the files are written to > HDFS the JSON is not valid. E.G. Key names are single quoted not double > quoted. Boolean values are the string False or True, notice they start with > upper case. I suspect there will be other problems. Any idea what I am doing > wrong? > > > > I am using spark-1.5.1-bin-hadoop2.6 > > > > import twitter4j.Status; > > import com.fasterxml.jackson.databind.ObjectMapper; > > > > >private static ObjectMapper mapper = new ObjectMapper(); > > static { > > mapper.setVisibility(PropertyAccessor.FIELD, > JsonAutoDetect.Visibility.ANY); > > mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, > false); > > } > > >JavaDStream jsonTweets = tweets.map(mapStatusToJson); > > DStream data = jsonTweets.dstream(); > > data.saveAsTextFiles(outputUri, null); > > > > class MapStatusToJson implements Function { > > private static final long serialVersionUID = -2882921625604816575L; > > > > @Override > > public String call(Status status) throws Exception { > > return mapper.writeValueAsString(status); > > > > } > > > > I have been using pyspark to explore the data. > > > > dataURL = "hdfs:///smallSample" > > tweetStrings = sc.textFile(dataURL) # I looked at source it decoded as UTF-8 > > def str2JSON(statusStr): > > """If string is not valid JSON return 'none' and creates a log.warn()""" > > try: > > ret = json.loads(statusStr) > > return ret > > except Exception as exc : > > logging.warning("bad JSON") > > logging.warning(exc) > > logging.warning(statusStr) > > numJSONExceptions.add(1) > > return None #null value > > > > > > #jTweets = tweetStrings.map(lambda x: json.loads(x)).take(10) > > jTweets = tweetStrings.map(str2JSON).take(10) > > > > If I call print tweetStrings.take(1) > > I would get back the following string. (its really long only provided part of) > > > {'favorited': False, 'inReplyToStatusId': -1, 'inReplyToScreenName': None, > 'urlentities': [{'end': 126, 'expandedURL': 'http://panth.rs/pZ9Cvv', > > > If I copy one of the hdfs part files locally I would see something similar. So > I think the problem has something to do with DStream.saveAsTextFiles(). > > > > I do not know if this is the problem or not, How ever it looks like the system > might depend of several version of jackson and fasterxml.jackson > > > > Has anyone else run into this problem? > > > > Kind regards > > > > Andy > > > > provided > > +--- org.apache.spark:spark-streaming_2.10:1.5.1 > > |+--- org.apache.spark:spark-core_2.10:1.5.1 > > ||+--- org.apache.avro:avro-mapred:1.7.7 > > |||+--- org.apache.avro:avro-ipc:1.7.7 > > ||||+--- org.apache.avro:avro:1.7.7 > > |||||+--- org.codehaus.jackson:jackson-core-asl:1.9.13 > > |||||+--- org.codehaus.jackson:jackson-mapper-asl:1.9.13 > > ||||||\--- > org.codehaus.jackson:jackson-core-asp:1.9.13 > > > > > > ||+--- org.apache.hadoop:hadoop-client:2.2.0 > > |||+--- org.apache.hadoop:hadoop-common:2.2.0 > > ||||+--- org.slf4j:slf4j-api:1.7.5 -> 1.7.10 > > ||||+--- org.codehaus.jackson:jackson-core-asl:1.8.8 -> 1.9.13 > > > > > > |||\--- com.fast
spark streaming problem saveAsTextFiles() does not write valid JSON to HDFS
I am working on a simple POS. I am running into a really strange problem. I wrote a java streaming app that collects tweets using the spark twitter package and stores the to disk in JSON format. I noticed that when I run the code on my mac. The file are written to the local files system as I expect I.E. In valid JSON format. The key names are double quoted. Boolean values are the works true or false in lower case. When I run in my cluster the only difference is I call data.saveAsTextFiles() using an hdfs: URI instead of using file:/// . When the files are written to HDFS the JSON is not valid. E.G. Key names are single quoted not double quoted. Boolean values are the string False or True, notice they start with upper case. I suspect there will be other problems. Any idea what I am doing wrong? I am using spark-1.5.1-bin-hadoop2.6 import twitter4j.Status; import com.fasterxml.jackson.databind.ObjectMapper; private static ObjectMapper mapper = new ObjectMapper(); static { mapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY); mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); } JavaDStream jsonTweets = tweets.map(mapStatusToJson); DStream data = jsonTweets.dstream(); data.saveAsTextFiles(outputUri, null); class MapStatusToJson implements Function { private static final long serialVersionUID = -2882921625604816575L; @Override public String call(Status status) throws Exception { return mapper.writeValueAsString(status); } I have been using pyspark to explore the data. dataURL = "hdfs:///smallSample" tweetStrings = sc.textFile(dataURL) # I looked at source it decoded as UTF-8 def str2JSON(statusStr): """If string is not valid JSON return 'none' and creates a log.warn()""" try: ret = json.loads(statusStr) return ret except Exception as exc : logging.warning("bad JSON") logging.warning(exc) logging.warning(statusStr) numJSONExceptions.add(1) return None #null value #jTweets = tweetStrings.map(lambda x: json.loads(x)).take(10) jTweets = tweetStrings.map(str2JSON).take(10) If I call print tweetStrings.take(1) I would get back the following string. (its really long only provided part of) {'favorited': False, 'inReplyToStatusId': -1, 'inReplyToScreenName': None, 'urlentities': [{'end': 126, 'expandedURL': 'http://panth.rs/pZ9Cvv', If I copy one of the hdfs part files locally I would see something similar. So I think the problem has something to do with DStream.saveAsTextFiles(). I do not know if this is the problem or not, How ever it looks like the system might depend of several version of jackson and fasterxml.jackson Has anyone else run into this problem? Kind regards Andy provided +--- org.apache.spark:spark-streaming_2.10:1.5.1 |+--- org.apache.spark:spark-core_2.10:1.5.1 ||+--- org.apache.avro:avro-mapred:1.7.7 |||+--- org.apache.avro:avro-ipc:1.7.7 ||||+--- org.apache.avro:avro:1.7.7 |||||+--- org.codehaus.jackson:jackson-core-asl:1.9.13 |||||+--- org.codehaus.jackson:jackson-mapper-asl:1.9.13 ||||||\--- org.codehaus.jackson:jackson-core-asp:1.9.13 ||+--- org.apache.hadoop:hadoop-client:2.2.0 |||+--- org.apache.hadoop:hadoop-common:2.2.0 ||||+--- org.slf4j:slf4j-api:1.7.5 -> 1.7.10 ||||+--- org.codehaus.jackson:jackson-core-asl:1.8.8 -> 1.9.13 |||\--- com.fasterxml.jackson.core:jackson-databind:2.3.1 -> 2.4.4 ||| +--- com.fasterxml.jackson.core:jackson-annotations:2.4.0 -> 2.4.4 ||| \--- com.fasterxml.jackson.core:jackson-core:2.4.4 ||+--- com.sun.jersey:jersey-server:1.9 (*) ||+--- com.sun.jersey:jersey-core:1.9 ||+--- org.apache.mesos:mesos:0.21.1 ||+--- io.netty:netty-all:4.0.29.Final ||+--- com.clearspring.analytics:stream:2.7.0 ||+--- io.dropwizard.metrics:metrics-core:3.1.2 |||\--- org.slf4j:slf4j-api:1.7.7 -> 1.7.10 ||+--- io.dropwizard.metrics:metrics-jvm:3.1.2 |||+--- io.dropwizard.metrics:metrics-core:3.1.2 (*) |||\--- org.slf4j:slf4j-api:1.7.7 -> 1.7.10 ||+--- io.dropwizard.metrics:metrics-json:3.1.2 |||+--- io.dropwizard.metrics:metrics-core:3.1.2 (*) |||+--- com.fasterxml.jackson.core:jackson-databind:2.4.2 -> 2.4.4 (*) |||\--- org.slf4j:slf4j-api:1.7.7 -> 1.7.10 ||+--- io.dropwizard.metrics:metrics-graphite:3.1.2 |||+--- io.dropwizard.metrics:metrics-core:3.1.2 (*) |||\--- org.slf4j:slf4j-api:1.7.7 -> 1.7.10 ||+--- com.fasterxml.jackson.core:jackson-databind:2.4.4 (*) ||+--- com.fasterxml.jackson.module:ja