No, I literally meant filter on _corrupt_record, which has a magic meaning in dataframe api to identify lines that didn't match the schema.
On Wed, Jul 27, 2016 at 12:19 PM, vr spark <vrspark...@gmail.com> wrote: > HI , > I tried and getting exception still..any other suggestion? > > clickDF = cDF.filter(cDF['request.clientIP'].isNotNull()) > > It fails for some cases and errors our with below message > > AnalysisException: u'No such struct field clientIP in cookies, nscClientIP1, > nscClientIP2, uAgent;' > > > On Tue, Jul 26, 2016 at 12:05 PM, Cody Koeninger <c...@koeninger.org> wrote: >> >> Have you tried filtering out corrupt records with something along the >> lines of >> >> df.filter(df("_corrupt_record").isNull) >> >> On Tue, Jul 26, 2016 at 1:53 PM, vr spark <vrspark...@gmail.com> wrote: >> > i am reading data from kafka using spark streaming. >> > >> > I am reading json and creating dataframe. >> > I am using pyspark >> > >> > kvs = KafkaUtils.createDirectStream(ssc, kafkaTopic1, kafkaParams) >> > >> > lines = kvs.map(lambda x: x[1]) >> > >> > lines.foreachRDD(mReport) >> > >> > def mReport(clickRDD): >> > >> > clickDF = sqlContext.jsonRDD(clickRDD) >> > >> > clickDF.registerTempTable("clickstream") >> > >> > PagesDF = sqlContext.sql( >> > >> > "SELECT request.clientIP as ip " >> > >> > "FROM clickstream " >> > >> > "WHERE request.clientIP is not null " >> > >> > " limit 2000 " >> > >> > >> > The problem is that not all the jsons from the stream have the same >> > format. >> > >> > It works when it reads a json which has ip. >> > >> > Some of the json strings do not have client ip in their schema. >> > >> > So i am getting error and my job is failing when it encounters such a >> > json. >> > >> > How do read only those json which has ip in their schema? >> > >> > Please suggest. > > --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org