Hey Tomas, >From your description, you just ran a batch query rather than a Structured Streaming query. The Kafka data source doesn't support filter push down right now. But that's definitely doable. One workaround here is setting proper "startingOffsets" and "endingOffsets" options when loading from Kafka.
Best Regards, Ryan On Thu, Jan 24, 2019 at 10:15 AM Gabor Somogyi <gabor.g.somo...@gmail.com> wrote: > Hi Tomas, > > As a general note don't fully understand your use-case. You've mentioned > structured streaming but your query is more like a one-time SQL statement. > Kafka doesn't support predicates how it's integrated with spark. What can > be done from spark perspective is to look for an offset for a specific > lowest timestamp and start the reading from there. > > BR, > G > > > On Thu, Jan 24, 2019 at 6:38 PM Tomas Bartalos <tomas.barta...@gmail.com> > wrote: > >> Hello, >> >> I'm trying to read Kafka via spark structured streaming. I'm trying to >> read data within specific time range: >> >> select count(*) from kafka_table where timestamp > cast('2019-01-23 1:00' >> as TIMESTAMP) and timestamp < cast('2019-01-23 1:01' as TIMESTAMP); >> >> >> The problem is that timestamp query is not pushed-down to Kafka, so Spark >> tries to read the whole topic from beginning. >> >> >> explain query: >> >> .... >> >> +- *(1) Filter ((isnotnull(timestamp#57) && (timestamp#57 > >> 1535148000000000)) && (timestamp#57 < 1535234400000000)) >> >> >> Scan >> KafkaRelation(strategy=Subscribe[keeper.Ticket.avro.v1---production], >> start=EarliestOffsetRangeLimit, end=LatestOffsetRangeLimit) >> [key#52,value#53,topic#54,partition#55,offset#56L,timestamp#57,timestampType#58] >> *PushedFilters: []*, ReadSchema: >> struct<key:binary,value:binary,topic:string,partition:int,offset:bigint,timestamp:timestamp,times... >> >> >> Obviously the query takes forever to complete. Is there a solution to >> this ? >> >> I'm using kafka and kafka-client version 1.1.1 >> >> >> BR, >> >> Tomas >> >