Hey Liang and Yan,

Been super busy, and just now getting back to this problem.

I've been thinking a bit more about it, and it still feels like using the
group by functionality even in a SQL transform is incorrect, and doesn't
follow the transform pattern.  It doesn't seem to fit the extractor pattern
either.

Reasoning:

Since my transforms/extractions would reduce the number of rows and remove
some of the columns, some information in the data frame is lost.  The
problem is that output data frame from a transform/extraction on the raw
data can not be used as the input of the next transform/extractor.
Basically, it doesn't simply add a column to the data frame, it reduces it
and adds a column.

Ex:
data = [
    ("id1", "purchase", "ios", 1),
    ("id1", "rental", "ios", 1),
    ("id1", "purchase", "android", 2),
    ("id2", "purchase", "android", 1),
    ("id2", "rental", "android", 2),
    ("id2", "rental", "android", 3),
    ("id3", "rental", "android", 1)
]
columns = ["id", "type", "env", "total"]
raw_df = spark.createDataFrame(data, columns)
raw_df.cache()
raw_df.show()
+---+--------+-------+-----+
| id|    type|    env|total|
+---+--------+-------+-----+
|id1|purchase|    ios|    1|
|id1|  rental|    ios|    1|
|id1|purchase|android|    2|
|id2|purchase|android|    1|
|id2|  rental|android|    2|
|id2|  rental|android|    3|
|id3|  rental|android|    1|
+---+--------+-------+-----+

purchase_df = raw_df.filter(raw_df.type == "purchase")
max_transform = purchase_df.groupby("id",
"env").agg(pyspark.sql.functions.max("total").alias("max_total"))
max_transform = max_transform.groupby("id").sum("max_total")
max_transform.show()
+---+--------------+
| id|sum(max_total)|
+---+--------------+
|id1|             3|
|id2|             1|
+---+--------------+

type_transform_ = raw_df.groupby("id").pivot("type").count()
type_transform_.show()
+---+--------+------+
| id|purchase|rental|
+---+--------+------+
|id3|    null|     1|
|id1|       2|     1|
|id2|       1|     2|
+---+--------+------+

env_transform = raw_df.groupby("id").pivot("env").count()
env_transform.show()
+---+-------+----+
| id|android| ios|
+---+-------+----+
|id3|      1|null|
|id1|      1|   2|
|id2|      3|null|
+---+-------+----+

Here, I can't use the max_transform data frame as input for the
type_transform.
Since the output from one transform can't be used by the next, it doesn't
work in a pipeline.

It also feels unnatural that the size of the data frame from each transform
could be different depending on the data. This happens because we filter to
specific events for some of the features. e.g. max_transform ends up with 2
rows, while the others end up with 3.

Am I missing a clean way to do this in a pipeline?
Maybe there is a way to reduce the data, in the beginning, to make it so I
can use transforms.  Maybe by grouping all the events together in a list
per user.  Keep in mind in my actual use case I have more than the 4
columns, it's more like 10-50 columns in the raw df.

Currently, I'm thinking that I'm just going to create my own custom
pipeline for these data preprocessing steps.

Thanks again
Adrian


On Mon, Mar 20, 2017 at 10:33 PM, 颜发才(Yan Facai) <facai....@gmail.com>
wrote:

> SQLTransformer is a good solution if all operators are combined with SQL.
>
> By the way,
> if you like to get hands dirty,
> writing a Transformer in scala is not hard,
> and multiple output columns is valid in such case.
>
>
>
>
> On Fri, Mar 17, 2017 at 9:10 PM, Yanbo Liang <yblia...@gmail.com> wrote:
>
>> Hi Adrian,
>>
>> Did you try SQLTransformer? Your preprocessing steps are SQL operations
>> and can be handled by SQLTransformer in MLlib pipeline scope.
>>
>> Thanks
>> Yanbo
>>
>> On Thu, Mar 9, 2017 at 11:02 AM, aATv <adr...@vidora.com> wrote:
>>
>>> I want to start using PySpark Mllib pipelines, but I don't understand
>>> how/where preprocessing fits into the pipeline.
>>>
>>> My preprocessing steps are generally in the following form:
>>>    1) Load log files(from s3) and parse into a spark Dataframe with
>>> columns
>>> user_id, event_type, timestamp, etc
>>>    2) Group by a column, then pivot and count another column
>>>       - e.g. df.groupby("user_id").pivot("event_type").count()
>>>       - We can think of the columns that this creates besides user_id as
>>> features, where the number of each event type is a different feature
>>>    3) Join the data from step 1 with other metadata, usually stored in
>>> Cassandra. Then perform a transformation similar to one from step 2),
>>> where
>>> the column that is pivoted and counted is a column that came from the
>>> data
>>> stored in Cassandra.
>>>
>>> After this preprocessing, I would use transformers to create other
>>> features
>>> and feed it into a model, lets say Logistic Regression for example.
>>>
>>> I would like to make at lease step 2 a custom transformer and add that
>>> to a
>>> pipeline, but it doesn't fit the transformer abstraction. This is
>>> because it
>>> takes a single input column and outputs multiple columns.  It also has a
>>> different number of input rows than output rows due to the group by
>>> operation.
>>>
>>> Given that, how do I fit this into a Mllib pipeline, and it if doesn't
>>> fit
>>> as part of a pipeline, what is the best way to include it in my code so
>>> that
>>> it can easily be reused both for training and testing, as well as in
>>> production.
>>>
>>> I'm using pyspark 2.1 and here is an example of 2)
>>>
>>>
>>>
>>>
>>> Note: My question is in some way related to this question, but I don't
>>> think
>>> it is answered here:
>>> http://apache-spark-developers-list.1001551.n3.nabble.com/Wh
>>> y-can-t-a-Transformer-have-multiple-output-columns-td18689.html
>>> <http://apache-spark-developers-list.1001551.n3.nabble.com/W
>>> hy-can-t-a-Transformer-have-multiple-output-columns-td18689.html>
>>>
>>> Thanks
>>> Adrian
>>>
>>>
>>>
>>>
>>> --
>>> View this message in context: http://apache-spark-user-list.
>>> 1001560.n3.nabble.com/How-does-preprocessing-fit-into-Spark-
>>> MLlib-pipeline-tp28473.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>
>>
>


-- 
Adrian Stern, Senior Software Engineer at Vidora
www.vidora.com
Follow us on LinkedIn <https://www.linkedin.com/company/vidora>, Twitter
<https://twitter.com/vidoracorp> or Facebook
<https://www.facebook.com/vidoracorp>

Reply via email to