Feature request: split dataset based on condition

2019-02-01 Thread Moein Hosseini
I've seen many application need to split dataset to multiple datasets based
on some conditions. As there is no method to do it in one place, developers
use *filter *method multiple times. I think it can be useful to have method
to split dataset based on condition in one iteration, something like
*partition* method of scala (of-course scala partition just split list into
two list, but something more general can be more useful).
If you think it can be helpful, I can create Jira issue and work on it to
send PR.

Best Regards
Moein

-- 

Moein Hosseini
Data Engineer
mobile: +98 912 468 1859 <+98+912+468+1859>
site: www.moein.xyz
email: moein...@gmail.com
[image: linkedin] 
[image: twitter] 


Re: [DISCUSS] Upgrade built-in Hive to 2.3.4

2019-02-01 Thread Koert Kuipers
introducing hive serdes in sql core sounds a bit like a step back to me.
how can you build spark without hive support if there are imports for org.
apache.hadoop.hive.serde2 in sql core? are these imports very limited in
scope (and not suck all of hive into it)?

On Fri, Feb 1, 2019 at 3:03 PM Felix Cheung 
wrote:

> What’s the update and next step on this?
>
> We have real users getting blocked by this issue.
>
>
> --
> *From:* Xiao Li 
> *Sent:* Wednesday, January 16, 2019 9:37 AM
> *To:* Ryan Blue
> *Cc:* Marcelo Vanzin; Hyukjin Kwon; Sean Owen; Felix Cheung; Yuming Wang;
> dev
> *Subject:* Re: [DISCUSS] Upgrade built-in Hive to 2.3.4
>
> Thanks for your feedbacks!
>
> Working with Yuming to reduce the risk of stability and quality. Will keep
> you posted when the proposal is ready.
>
> Cheers,
>
> Xiao
>
> Ryan Blue  于2019年1月16日周三 上午9:27写道:
>
>> +1 for what Marcelo and Hyukjin said.
>>
>> In particular, I agree that we can't expect Hive to release a version
>> that is now more than 3 years old just to solve a problem for Spark. Maybe
>> that would have been a reasonable ask instead of publishing a fork years
>> ago, but I think this is now Spark's problem.
>>
>> On Tue, Jan 15, 2019 at 9:02 PM Marcelo Vanzin 
>> wrote:
>>
>>> +1 to that. HIVE-16391 by itself means we're giving up things like
>>> Hadoop 3, and we're also putting the burden on the Hive folks to fix a
>>> problem that we created.
>>>
>>> The current PR is basically a Spark-side fix for that bug. It does
>>> mean also upgrading Hive (which gives us Hadoop 3, yay!), but I think
>>> it's really the right path to take here.
>>>
>>> On Tue, Jan 15, 2019 at 6:32 PM Hyukjin Kwon 
>>> wrote:
>>> >
>>> > Resolving HIVE-16391 means Hive to release 1.2.x that contains the
>>> fixes of our Hive fork (correct me if I am mistaken).
>>> >
>>> > Just to be honest by myself and as a personal opinion, that basically
>>> says Hive to take care of Spark's dependency.
>>> > Hive looks going ahead for 3.1.x and no one would use the newer
>>> release of 1.2.x. In practice, Spark doesn't make a release 1.6.x anymore
>>> for instance,
>>> >
>>> > Frankly, my impression was that it's, honestly, our mistake to fix.
>>> Since Spark community is big enough, I was thinking we should try to fix it
>>> by ourselves first.
>>> > I am not saying upgrading is the only way to get through this but I
>>> think we should at least try first, and see what's next.
>>> >
>>> > It does, yes, sound more risky to upgrade it in our side but I think
>>> it's worth to check and try it and see if it's possible.
>>> > I think this is a standard approach to upgrade the dependency than
>>> using the fork or letting Hive side to release another 1.2.x.
>>> >
>>> > If we fail to upgrade it for critical or inevitable reasons somehow,
>>> yes, we could find an alternative but that basically means
>>> > we're going to stay in 1.2.x for, at least, a long time (say .. until
>>> Spark 4.0.0?).
>>> >
>>> > I know somehow it happened to be sensitive but to be just literally
>>> honest to myself, I think we should make a try.
>>> >
>>>
>>>
>>> --
>>> Marcelo
>>>
>>
>>
>> --
>> Ryan Blue
>> Software Engineer
>> Netflix
>>
>


Re: [DISCUSS] Upgrade built-in Hive to 2.3.4

2019-02-01 Thread Felix Cheung
What’s the update and next step on this?

We have real users getting blocked by this issue.



From: Xiao Li 
Sent: Wednesday, January 16, 2019 9:37 AM
To: Ryan Blue
Cc: Marcelo Vanzin; Hyukjin Kwon; Sean Owen; Felix Cheung; Yuming Wang; dev
Subject: Re: [DISCUSS] Upgrade built-in Hive to 2.3.4

Thanks for your feedbacks!

Working with Yuming to reduce the risk of stability and quality. Will keep you 
posted when the proposal is ready.

Cheers,

Xiao

Ryan Blue mailto:rb...@netflix.com>> 于2019年1月16日周三 上午9:27写道:
+1 for what Marcelo and Hyukjin said.

In particular, I agree that we can't expect Hive to release a version that is 
now more than 3 years old just to solve a problem for Spark. Maybe that would 
have been a reasonable ask instead of publishing a fork years ago, but I think 
this is now Spark's problem.

On Tue, Jan 15, 2019 at 9:02 PM Marcelo Vanzin 
mailto:van...@cloudera.com>> wrote:
+1 to that. HIVE-16391 by itself means we're giving up things like
Hadoop 3, and we're also putting the burden on the Hive folks to fix a
problem that we created.

The current PR is basically a Spark-side fix for that bug. It does
mean also upgrading Hive (which gives us Hadoop 3, yay!), but I think
it's really the right path to take here.

On Tue, Jan 15, 2019 at 6:32 PM Hyukjin Kwon 
mailto:gurwls...@gmail.com>> wrote:
>
> Resolving HIVE-16391 means Hive to release 1.2.x that contains the fixes of 
> our Hive fork (correct me if I am mistaken).
>
> Just to be honest by myself and as a personal opinion, that basically says 
> Hive to take care of Spark's dependency.
> Hive looks going ahead for 3.1.x and no one would use the newer release of 
> 1.2.x. In practice, Spark doesn't make a release 1.6.x anymore for instance,
>
> Frankly, my impression was that it's, honestly, our mistake to fix. Since 
> Spark community is big enough, I was thinking we should try to fix it by 
> ourselves first.
> I am not saying upgrading is the only way to get through this but I think we 
> should at least try first, and see what's next.
>
> It does, yes, sound more risky to upgrade it in our side but I think it's 
> worth to check and try it and see if it's possible.
> I think this is a standard approach to upgrade the dependency than using the 
> fork or letting Hive side to release another 1.2.x.
>
> If we fail to upgrade it for critical or inevitable reasons somehow, yes, we 
> could find an alternative but that basically means
> we're going to stay in 1.2.x for, at least, a long time (say .. until Spark 
> 4.0.0?).
>
> I know somehow it happened to be sensitive but to be just literally honest to 
> myself, I think we should make a try.
>


--
Marcelo


--
Ryan Blue
Software Engineer
Netflix


Re: Structured streaming from Kafka by timestamp

2019-02-01 Thread Tomas Bartalos
Hello,

sorry for my late answer.
You're right, what I'm doing is a one time query, not a structured
streaming. Probably it will be best to describe my use case:
I'd like to expose live data (via jdbc/odbc) residing in Kafka with the
power of spark's distributed sql engine. As jdbc server I use spark thrift
server.
Since timestamp pushdown is not possible :-(, this is a very cumbersome
task.
Let's say I want to inspect last 5 minutes of kafka. First I have to find
out offsetFrom per each partition that corresponds to now() - 5 minutes.
Then I can register a kafka table:

CREATE TABLE ticket_kafka_x USING kafka OPTIONS (kafka.bootstrap.servers
'server1,server2,...',

subscribe 'my_topic',

startingOffsets '{"my_topic" : {"0" : 48532124, "1" : 49029703, "2" :
49456213, "3" : 48400521}}');

Then I can issue queries against this table (Data in Kafka is stored in
Avro format but I've created custom genericUDF to deserialize the data).

select event.id as id, explode(event.picks) as picks from (

select from_avro(value) as event from ticket_kafka_x where timestamp >
from_unixtime(unix_timestamp() - 5 * 60, "-MM-dd HH:mm:ss")

) limit 100;


Whats even more irritating after few minutes I have to re-create this table
to reflect the last 5 minutes interval, otherwise the query performance
would suffer from increasing data to filter.

Colleague of mine was able to make direct queries with timestamp pushdown
in latest Hive.
How difficult is it to implement this feature in spark, could you lead me
to code where I could have a look ?

Thank you,


pi 25. 1. 2019 o 0:32 Shixiong(Ryan) Zhu 
napísal(a):

> Hey Tomas,
>
> From your description, you just ran a batch query rather than a Structured
> Streaming query. The Kafka data source doesn't support filter push down
> right now. But that's definitely doable. One workaround here is setting
> proper  "startingOffsets" and "endingOffsets" options when loading from
> Kafka.
>
> Best Regards,
> Ryan
>
>
> On Thu, Jan 24, 2019 at 10:15 AM Gabor Somogyi 
> wrote:
>
>> Hi Tomas,
>>
>> As a general note don't fully understand your use-case. You've mentioned
>> structured streaming but your query is more like a one-time SQL statement.
>> Kafka doesn't support predicates how it's integrated with spark. What can
>> be done from spark perspective is to look for an offset for a specific
>> lowest timestamp and start the reading from there.
>>
>> BR,
>> G
>>
>>
>> On Thu, Jan 24, 2019 at 6:38 PM Tomas Bartalos 
>> wrote:
>>
>>> Hello,
>>>
>>> I'm trying to read Kafka via spark structured streaming. I'm trying to
>>> read data within specific time range:
>>>
>>> select count(*) from kafka_table where timestamp > cast('2019-01-23
>>> 1:00' as TIMESTAMP) and timestamp < cast('2019-01-23 1:01' as TIMESTAMP)
>>> ;
>>>
>>>
>>> The problem is that timestamp query is not pushed-down to Kafka, so
>>> Spark tries to read the whole topic from beginning.
>>>
>>>
>>> explain query:
>>>
>>> 
>>>
>>>  +- *(1) Filter ((isnotnull(timestamp#57) && (timestamp#57 >
>>> 15351480)) && (timestamp#57 < 15352344))
>>>
>>>
>>> Scan
>>> KafkaRelation(strategy=Subscribe[keeper.Ticket.avro.v1---production],
>>> start=EarliestOffsetRangeLimit, end=LatestOffsetRangeLimit)
>>> [key#52,value#53,topic#54,partition#55,offset#56L,timestamp#57,timestampType#58]
>>> *PushedFilters: []*, ReadSchema:
>>> struct>>
>>>
>>> Obviously the query takes forever to complete. Is there a solution to
>>> this ?
>>>
>>> I'm using kafka and kafka-client version 1.1.1
>>>
>>>
>>> BR,
>>>
>>> Tomas
>>>
>>