I am working on a simple POS. I am running into a really strange problem. I
wrote a java streaming app that collects tweets using the spark twitter
package and stores the to disk in JSON format. I noticed that when I run the
code on my mac. The file are written to the local files system as I expect
I.E. In valid JSON format. The key names are double quoted. Boolean values
are the works true or false in lower case.



When I run in my cluster the only difference is I call
data.saveAsTextFiles() using an hdfs: URI instead of using file:/// . When
the files are written to HDFS the JSON is not valid. E.G. Key names are
single quoted not double quoted. Boolean values are the string False or
True, notice they start with upper case. I suspect there will be other
problems. Any idea what I am doing wrong?



I am using spark-1.5.1-bin-hadoop2.6



import twitter4j.Status;

import com.fasterxml.jackson.databind.ObjectMapper;




   private static ObjectMapper mapper = new ObjectMapper();

    static {

        mapper.setVisibility(PropertyAccessor.FIELD,
JsonAutoDetect.Visibility.ANY);

        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
false);

    }


       JavaDStream<String> jsonTweets = tweets.map(mapStatusToJson);

        DStream<String> data = jsonTweets.dstream();

        data.saveAsTextFiles(outputUri, null);



class MapStatusToJson implements Function<Status, String> {

    private static final long serialVersionUID = -2882921625604816575L;



    @Override

    public String call(Status status) throws Exception {

        return mapper.writeValueAsString(status);



    }



I have been using pyspark to explore the data.



dataURL = "hdfs:///smallSample"

tweetStrings = sc.textFile(dataURL) # I looked at source it decoded as UTF-8

def str2JSON(statusStr):

    """If string is not valid JSON return 'none' and creates a log.warn()"""

    try:

        ret = json.loads(statusStr)

        return ret

    except Exception as exc :

        logging.warning("bad JSON")

        logging.warning(exc)

        logging.warning(statusStr)

        numJSONExceptions.add(1)

        return None #null value

    

    

#jTweets = tweetStrings.map(lambda x: json.loads(x)).take(10)

jTweets = tweetStrings.map(str2JSON).take(10)



If I call print tweetStrings.take(1)

I would get back the following string. (its really long only provided part
of)


{'favorited': False, 'inReplyToStatusId': -1, 'inReplyToScreenName': None,
'urlentities': [{'end': 126, 'expandedURL': 'http://panth.rs/pZ9Cvv',


If I copy one of the hdfs part files locally I would see something similar.
So I think the problem has something to do with DStream.saveAsTextFiles().



I do not know if this is the problem or not, How ever it looks like the
system might depend of several version of jackson and fasterxml.jackson



Has anyone else run into this problem?



Kind regards



Andy



provided

+--- org.apache.spark:spark-streaming_2.10:1.5.1

|    +--- org.apache.spark:spark-core_2.10:1.5.1

|    |    +--- org.apache.avro:avro-mapred:1.7.7

|    |    |    +--- org.apache.avro:avro-ipc:1.7.7

|    |    |    |    +--- org.apache.avro:avro:1.7.7

|    |    |    |    |    +--- org.codehaus.jackson:jackson-core-asl:1.9.13

|    |    |    |    |    +--- org.codehaus.jackson:jackson-mapper-asl:1.9.13

|    |    |    |    |    |    \---
org.codehaus.jackson:jackson-core-asp:1.9.13





|    |    +--- org.apache.hadoop:hadoop-client:2.2.0

|    |    |    +--- org.apache.hadoop:hadoop-common:2.2.0

|    |    |    |    +--- org.slf4j:slf4j-api:1.7.5 -> 1.7.10

|    |    |    |    +--- org.codehaus.jackson:jackson-core-asl:1.8.8 ->
1.9.13





|    |    |    \--- com.fasterxml.jackson.core:jackson-databind:2.3.1 ->
2.4.4

|    |    |         +---
com.fasterxml.jackson.core:jackson-annotations:2.4.0 -> 2.4.4

|    |    |         \--- com.fasterxml.jackson.core:jackson-core:2.4.4

|    |    +--- com.sun.jersey:jersey-server:1.9 (*)

|    |    +--- com.sun.jersey:jersey-core:1.9

|    |    +--- org.apache.mesos:mesos:0.21.1

|    |    +--- io.netty:netty-all:4.0.29.Final

|    |    +--- com.clearspring.analytics:stream:2.7.0

|    |    +--- io.dropwizard.metrics:metrics-core:3.1.2

|    |    |    \--- org.slf4j:slf4j-api:1.7.7 -> 1.7.10

|    |    +--- io.dropwizard.metrics:metrics-jvm:3.1.2

|    |    |    +--- io.dropwizard.metrics:metrics-core:3.1.2 (*)

|    |    |    \--- org.slf4j:slf4j-api:1.7.7 -> 1.7.10

|    |    +--- io.dropwizard.metrics:metrics-json:3.1.2

|    |    |    +--- io.dropwizard.metrics:metrics-core:3.1.2 (*)

|    |    |    +--- com.fasterxml.jackson.core:jackson-databind:2.4.2 ->
2.4.4 (*)

|    |    |    \--- org.slf4j:slf4j-api:1.7.7 -> 1.7.10

|    |    +--- io.dropwizard.metrics:metrics-graphite:3.1.2

|    |    |    +--- io.dropwizard.metrics:metrics-core:3.1.2 (*)

|    |    |    \--- org.slf4j:slf4j-api:1.7.7 -> 1.7.10

|    |    +--- com.fasterxml.jackson.core:jackson-databind:2.4.4 (*)

|    |    +--- com.fasterxml.jackson.module:jackson-module-scala_2.10:2.4.4

|    |    |    +--- org.scala-lang:scala-library:2.10.4

|    |    |    +--- org.scala-lang:scala-reflect:2.10.4 (*)

|    |    |    +--- com.fasterxml.jackson.core:jackson-core:2.4.4

|    |    |    +--- com.fasterxml.jackson.core:jackson-annotations:2.4.4

|    |    |    +--- com.fasterxml.jackson.core:jackson-databind:2.4.4 (*)


Reply via email to