No, I literally meant filter on _corrupt_record, which has a magic
meaning in dataframe api to identify lines that didn't match the
schema.

On Wed, Jul 27, 2016 at 12:19 PM, vr spark <vrspark...@gmail.com> wrote:
> HI ,
> I tried and getting exception still..any other suggestion?
>
> clickDF = cDF.filter(cDF['request.clientIP'].isNotNull())
>
> It fails for some cases and errors our with below message
>
> AnalysisException: u'No such struct field clientIP in cookies, nscClientIP1,
> nscClientIP2, uAgent;'
>
>
> On Tue, Jul 26, 2016 at 12:05 PM, Cody Koeninger <c...@koeninger.org> wrote:
>>
>> Have you tried filtering out corrupt records with something along the
>> lines of
>>
>>  df.filter(df("_corrupt_record").isNull)
>>
>> On Tue, Jul 26, 2016 at 1:53 PM, vr spark <vrspark...@gmail.com> wrote:
>> > i am reading data from kafka using spark streaming.
>> >
>> > I am reading json and creating dataframe.
>> > I am using pyspark
>> >
>> > kvs = KafkaUtils.createDirectStream(ssc, kafkaTopic1, kafkaParams)
>> >
>> > lines = kvs.map(lambda x: x[1])
>> >
>> > lines.foreachRDD(mReport)
>> >
>> > def mReport(clickRDD):
>> >
>> >    clickDF = sqlContext.jsonRDD(clickRDD)
>> >
>> >    clickDF.registerTempTable("clickstream")
>> >
>> >    PagesDF = sqlContext.sql(
>> >
>> >             "SELECT   request.clientIP as ip "
>> >
>> >             "FROM clickstream "
>> >
>> >             "WHERE request.clientIP is not null "
>> >
>> >             " limit 2000 "
>> >
>> >
>> > The problem is that not all the jsons from the stream have the same
>> > format.
>> >
>> > It works when it reads a json which has ip.
>> >
>> > Some of the json strings do not have client ip in their schema.
>> >
>> > So i am getting error and my job is failing when it encounters such a
>> > json.
>> >
>> > How do read only those json which has ip in their schema?
>> >
>> > Please suggest.
>
>

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to