Turns out data is in python format. ETL pipeline was over writing original
data

Andy
From:  Andrew Davidson <a...@santacruzintegration.com>
Date:  Thursday, November 19, 2015 at 6:58 PM
To:  "user @spark" <user@spark.apache.org>
Subject:  spark streaming problem  saveAsTextFiles() does not write valid
JSON to HDFS

> I am working on a simple POS. I am running into a really strange problem. I
> wrote a java streaming app that collects tweets using the spark twitter
> package and stores the to disk in JSON format. I noticed that when I run the
> code on my mac. The file are written to the local files system as I expect
> I.E. In valid JSON format. The key names are double quoted. Boolean values are
> the works true or false in lower case.
> 
> 
> 
> When I run in my cluster the only difference is I call data.saveAsTextFiles()
> using an hdfs: URI instead of using file:/// . When the files are written to
> HDFS the JSON is not valid. E.G. Key names are single quoted not double
> quoted. Boolean values are the string False or True, notice they start with
> upper case. I suspect there will be other problems. Any idea what I am doing
> wrong?
> 
> 
> 
> I am using spark-1.5.1-bin-hadoop2.6
> 
> 
> 
> import twitter4j.Status;
> 
> import com.fasterxml.jackson.databind.ObjectMapper;
> 
> 
> 
> 
>    private static ObjectMapper mapper = new ObjectMapper();
> 
>     static {
> 
>         mapper.setVisibility(PropertyAccessor.FIELD,
> JsonAutoDetect.Visibility.ANY);
> 
>         mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
> false);
> 
>     }
> 
> 
>        JavaDStream<String> jsonTweets = tweets.map(mapStatusToJson);
> 
>         DStream<String> data = jsonTweets.dstream();
> 
>         data.saveAsTextFiles(outputUri, null);
> 
> 
> 
> class MapStatusToJson implements Function<Status, String> {
> 
>     private static final long serialVersionUID = -2882921625604816575L;
> 
> 
> 
>     @Override
> 
>     public String call(Status status) throws Exception {
> 
>         return mapper.writeValueAsString(status);
> 
> 
> 
>     }
> 
> 
> 
> I have been using pyspark to explore the data.
> 
> 
> 
> dataURL = "hdfs:///smallSample"
> 
> tweetStrings = sc.textFile(dataURL) # I looked at source it decoded as UTF-8
> 
> def str2JSON(statusStr):
> 
>     """If string is not valid JSON return 'none' and creates a log.warn()"""
> 
>     try:
> 
>         ret = json.loads(statusStr)
> 
>         return ret
> 
>     except Exception as exc :
> 
>         logging.warning("bad JSON")
> 
>         logging.warning(exc)
> 
>         logging.warning(statusStr)
> 
>         numJSONExceptions.add(1)
> 
>         return None #null value
> 
>     
> 
>     
> 
> #jTweets = tweetStrings.map(lambda x: json.loads(x)).take(10)
> 
> jTweets = tweetStrings.map(str2JSON).take(10)
> 
> 
> 
> If I call print tweetStrings.take(1)
> 
> I would get back the following string. (its really long only provided part of)
> 
> 
> {'favorited': False, 'inReplyToStatusId': -1, 'inReplyToScreenName': None,
> 'urlentities': [{'end': 126, 'expandedURL': 'http://panth.rs/pZ9Cvv',
> 
> 
> If I copy one of the hdfs part files locally I would see something similar. So
> I think the problem has something to do with DStream.saveAsTextFiles().
> 
> 
> 
> I do not know if this is the problem or not, How ever it looks like the system
> might depend of several version of jackson and fasterxml.jackson
> 
> 
> 
> Has anyone else run into this problem?
> 
> 
> 
> Kind regards
> 
> 
> 
> Andy
> 
> 
> 
> provided
> 
> +--- org.apache.spark:spark-streaming_2.10:1.5.1
> 
> |    +--- org.apache.spark:spark-core_2.10:1.5.1
> 
> |    |    +--- org.apache.avro:avro-mapred:1.7.7
> 
> |    |    |    +--- org.apache.avro:avro-ipc:1.7.7
> 
> |    |    |    |    +--- org.apache.avro:avro:1.7.7
> 
> |    |    |    |    |    +--- org.codehaus.jackson:jackson-core-asl:1.9.13
> 
> |    |    |    |    |    +--- org.codehaus.jackson:jackson-mapper-asl:1.9.13
> 
> |    |    |    |    |    |    \---
> org.codehaus.jackson:jackson-core-asp:1.9.13
> 
> 
> 
> 
> 
> |    |    +--- org.apache.hadoop:hadoop-client:2.2.0
> 
> |    |    |    +--- org.apache.hadoop:hadoop-common:2.2.0
> 
> |    |    |    |    +--- org.slf4j:slf4j-api:1.7.5 -> 1.7.10
> 
> |    |    |    |    +--- org.codehaus.jackson:jackson-core-asl:1.8.8 -> 1.9.13
> 
> 
> 
> 
> 
> |    |    |    \--- com.fasterxml.jackson.core:jackson-databind:2.3.1 -> 2.4.4
> 
> |    |    |         +--- com.fasterxml.jackson.core:jackson-annotations:2.4.0
> -> 2.4.4
> 
> |    |    |         \--- com.fasterxml.jackson.core:jackson-core:2.4.4
> 
> |    |    +--- com.sun.jersey:jersey-server:1.9 (*)
> 
> |    |    +--- com.sun.jersey:jersey-core:1.9
> 
> |    |    +--- org.apache.mesos:mesos:0.21.1
> 
> |    |    +--- io.netty:netty-all:4.0.29.Final
> 
> |    |    +--- com.clearspring.analytics:stream:2.7.0
> 
> |    |    +--- io.dropwizard.metrics:metrics-core:3.1.2
> 
> |    |    |    \--- org.slf4j:slf4j-api:1.7.7 -> 1.7.10
> 
> |    |    +--- io.dropwizard.metrics:metrics-jvm:3.1.2
> 
> |    |    |    +--- io.dropwizard.metrics:metrics-core:3.1.2 (*)
> 
> |    |    |    \--- org.slf4j:slf4j-api:1.7.7 -> 1.7.10
> 
> |    |    +--- io.dropwizard.metrics:metrics-json:3.1.2
> 
> |    |    |    +--- io.dropwizard.metrics:metrics-core:3.1.2 (*)
> 
> |    |    |    +--- com.fasterxml.jackson.core:jackson-databind:2.4.2 -> 2.4.4
> (*)
> 
> |    |    |    \--- org.slf4j:slf4j-api:1.7.7 -> 1.7.10
> 
> |    |    +--- io.dropwizard.metrics:metrics-graphite:3.1.2
> 
> |    |    |    +--- io.dropwizard.metrics:metrics-core:3.1.2 (*)
> 
> |    |    |    \--- org.slf4j:slf4j-api:1.7.7 -> 1.7.10
> 
> |    |    +--- com.fasterxml.jackson.core:jackson-databind:2.4.4 (*)
> 
> |    |    +--- com.fasterxml.jackson.module:jackson-module-scala_2.10:2.4.4
> 
> |    |    |    +--- org.scala-lang:scala-library:2.10.4
> 
> |    |    |    +--- org.scala-lang:scala-reflect:2.10.4 (*)
> 
> |    |    |    +--- com.fasterxml.jackson.core:jackson-core:2.4.4
> 
> |    |    |    +--- com.fasterxml.jackson.core:jackson-annotations:2.4.4
> 
> |    |    |    +--- com.fasterxml.jackson.core:jackson-databind:2.4.4 (*)


Reply via email to