Turns out data is in python format. ETL pipeline was over writing original data
Andy From: Andrew Davidson <a...@santacruzintegration.com> Date: Thursday, November 19, 2015 at 6:58 PM To: "user @spark" <user@spark.apache.org> Subject: spark streaming problem saveAsTextFiles() does not write valid JSON to HDFS > I am working on a simple POS. I am running into a really strange problem. I > wrote a java streaming app that collects tweets using the spark twitter > package and stores the to disk in JSON format. I noticed that when I run the > code on my mac. The file are written to the local files system as I expect > I.E. In valid JSON format. The key names are double quoted. Boolean values are > the works true or false in lower case. > > > > When I run in my cluster the only difference is I call data.saveAsTextFiles() > using an hdfs: URI instead of using file:/// . When the files are written to > HDFS the JSON is not valid. E.G. Key names are single quoted not double > quoted. Boolean values are the string False or True, notice they start with > upper case. I suspect there will be other problems. Any idea what I am doing > wrong? > > > > I am using spark-1.5.1-bin-hadoop2.6 > > > > import twitter4j.Status; > > import com.fasterxml.jackson.databind.ObjectMapper; > > > > > private static ObjectMapper mapper = new ObjectMapper(); > > static { > > mapper.setVisibility(PropertyAccessor.FIELD, > JsonAutoDetect.Visibility.ANY); > > mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, > false); > > } > > > JavaDStream<String> jsonTweets = tweets.map(mapStatusToJson); > > DStream<String> data = jsonTweets.dstream(); > > data.saveAsTextFiles(outputUri, null); > > > > class MapStatusToJson implements Function<Status, String> { > > private static final long serialVersionUID = -2882921625604816575L; > > > > @Override > > public String call(Status status) throws Exception { > > return mapper.writeValueAsString(status); > > > > } > > > > I have been using pyspark to explore the data. > > > > dataURL = "hdfs:///smallSample" > > tweetStrings = sc.textFile(dataURL) # I looked at source it decoded as UTF-8 > > def str2JSON(statusStr): > > """If string is not valid JSON return 'none' and creates a log.warn()""" > > try: > > ret = json.loads(statusStr) > > return ret > > except Exception as exc : > > logging.warning("bad JSON") > > logging.warning(exc) > > logging.warning(statusStr) > > numJSONExceptions.add(1) > > return None #null value > > > > > > #jTweets = tweetStrings.map(lambda x: json.loads(x)).take(10) > > jTweets = tweetStrings.map(str2JSON).take(10) > > > > If I call print tweetStrings.take(1) > > I would get back the following string. (its really long only provided part of) > > > {'favorited': False, 'inReplyToStatusId': -1, 'inReplyToScreenName': None, > 'urlentities': [{'end': 126, 'expandedURL': 'http://panth.rs/pZ9Cvv', > > > If I copy one of the hdfs part files locally I would see something similar. So > I think the problem has something to do with DStream.saveAsTextFiles(). > > > > I do not know if this is the problem or not, How ever it looks like the system > might depend of several version of jackson and fasterxml.jackson > > > > Has anyone else run into this problem? > > > > Kind regards > > > > Andy > > > > provided > > +--- org.apache.spark:spark-streaming_2.10:1.5.1 > > | +--- org.apache.spark:spark-core_2.10:1.5.1 > > | | +--- org.apache.avro:avro-mapred:1.7.7 > > | | | +--- org.apache.avro:avro-ipc:1.7.7 > > | | | | +--- org.apache.avro:avro:1.7.7 > > | | | | | +--- org.codehaus.jackson:jackson-core-asl:1.9.13 > > | | | | | +--- org.codehaus.jackson:jackson-mapper-asl:1.9.13 > > | | | | | | \--- > org.codehaus.jackson:jackson-core-asp:1.9.13 > > > > > > | | +--- org.apache.hadoop:hadoop-client:2.2.0 > > | | | +--- org.apache.hadoop:hadoop-common:2.2.0 > > | | | | +--- org.slf4j:slf4j-api:1.7.5 -> 1.7.10 > > | | | | +--- org.codehaus.jackson:jackson-core-asl:1.8.8 -> 1.9.13 > > > > > > | | | \--- com.fasterxml.jackson.core:jackson-databind:2.3.1 -> 2.4.4 > > | | | +--- com.fasterxml.jackson.core:jackson-annotations:2.4.0 > -> 2.4.4 > > | | | \--- com.fasterxml.jackson.core:jackson-core:2.4.4 > > | | +--- com.sun.jersey:jersey-server:1.9 (*) > > | | +--- com.sun.jersey:jersey-core:1.9 > > | | +--- org.apache.mesos:mesos:0.21.1 > > | | +--- io.netty:netty-all:4.0.29.Final > > | | +--- com.clearspring.analytics:stream:2.7.0 > > | | +--- io.dropwizard.metrics:metrics-core:3.1.2 > > | | | \--- org.slf4j:slf4j-api:1.7.7 -> 1.7.10 > > | | +--- io.dropwizard.metrics:metrics-jvm:3.1.2 > > | | | +--- io.dropwizard.metrics:metrics-core:3.1.2 (*) > > | | | \--- org.slf4j:slf4j-api:1.7.7 -> 1.7.10 > > | | +--- io.dropwizard.metrics:metrics-json:3.1.2 > > | | | +--- io.dropwizard.metrics:metrics-core:3.1.2 (*) > > | | | +--- com.fasterxml.jackson.core:jackson-databind:2.4.2 -> 2.4.4 > (*) > > | | | \--- org.slf4j:slf4j-api:1.7.7 -> 1.7.10 > > | | +--- io.dropwizard.metrics:metrics-graphite:3.1.2 > > | | | +--- io.dropwizard.metrics:metrics-core:3.1.2 (*) > > | | | \--- org.slf4j:slf4j-api:1.7.7 -> 1.7.10 > > | | +--- com.fasterxml.jackson.core:jackson-databind:2.4.4 (*) > > | | +--- com.fasterxml.jackson.module:jackson-module-scala_2.10:2.4.4 > > | | | +--- org.scala-lang:scala-library:2.10.4 > > | | | +--- org.scala-lang:scala-reflect:2.10.4 (*) > > | | | +--- com.fasterxml.jackson.core:jackson-core:2.4.4 > > | | | +--- com.fasterxml.jackson.core:jackson-annotations:2.4.4 > > | | | +--- com.fasterxml.jackson.core:jackson-databind:2.4.4 (*)