Re: Python UDF to convert timestamps (performance question)

2017-08-30 Thread Brian Wylie
Tathagata,

Thanks, your explanation was great.

The suggestion worked well with the only minutia is that I needed to have
the TS field brought in as a DoubleType() or the time got truncated.

Thanks again,
-Brian








On Wed, Aug 30, 2017 at 1:34 PM, Tathagata Das 
wrote:

> 1. Generally, adding columns, etc. will not affect performance because the
> Spark's optimizer will automatically figure out columns that are not needed
> and eliminate in the optimization step. So that should never be a concern.
> 2. Again, this is generally not a concern as the optimizer will take care
> of moving such expressions around
> 3. However, using Python UDF is bd for perf. In your case, if the
> problem is that the timestamp is in float, you can cast to the float to
> timestamp type, and it should automatically convert it correctly.
> Something like this *col("ts").cast("timestamp")*
>
> On Wed, Aug 30, 2017 at 11:45 AM, Brian Wylie 
> wrote:
>
>> Hi All,
>>
>> I'm using structured streaming in Spark 2.2.
>>
>> I'm using PySpark and I have data (from a Kafka publisher) where the
>> timestamp is a float that looks like this:  1379288667.631940
>>
>> So here's my code (which is working fine)
>>
>> # SUBSCRIBE: Setup connection to Kafka Stream
>> raw_data = spark.readStream.format('kafka') \
>>   .option('kafka.bootstrap.servers', 'localhost:9092') \
>>   .option('subscribe', 'dns') \
>>   .option('startingOffsets', 'latest') \
>>   .load()
>>
>> # ETL: Hardcoded Schema for DNS records (do this better later)
>> from pyspark.sql.types import StructType, StringType, BooleanType,
>> IntegerType, FloatType
>> from pyspark.sql.functions import from_json, to_json, col, struct
>>
>> dns_schema = StructType() \
>> .add('ts', FloatType()) \
>> .add('uid', StringType()) \
>> .add('id.orig_h', StringType()) \
>>   
>>
>> # ETL: Convert raw data into parsed and proper typed data
>> from pyspark.sql.functions import col, length, to_timestamp
>>
>> parsed_data = raw_data \
>>   .select(from_json(col("value").cast("string"),
>> dns_schema).alias('data')) \
>>   .select('data.*')
>>
>> # Convert Bro IDS time to an actual TimeStamp type
>> from pyspark.sql.functions import udf
>> import datetime
>> my_udf = udf(lambda x: datetime.datetime.fromtimestamp(float(x)),
>> TimestampType())
>> parsed_data_with_dt = parsed_data.withColumn('dt', my_udf('ts'))
>>
>> # Then a writestream later...
>>
>> Okay so all this code works fine (the 'dt' field has exactly what I
>> want)... but I'll be streaming in a lot of data so here's the questions:
>>
>> - Will the creation of a new dataframe withColumn basically kill
>> performance?
>> - Should I move my UDF into the parsed_data.select(...)  part?
>> - Can my UDF be done by spark.sql directly? (I tried to_timestamp but
>> without luck)
>>
>> Any suggestions/pointers are greatly appreciated.
>>
>> -Brian Wylie
>>
>>
>>
>


Re: Python UDF to convert timestamps (performance question)

2017-08-30 Thread Tathagata Das
1. Generally, adding columns, etc. will not affect performance because the
Spark's optimizer will automatically figure out columns that are not needed
and eliminate in the optimization step. So that should never be a concern.
2. Again, this is generally not a concern as the optimizer will take care
of moving such expressions around
3. However, using Python UDF is bd for perf. In your case, if the
problem is that the timestamp is in float, you can cast to the float to
timestamp type, and it should automatically convert it correctly.
Something like this *col("ts").cast("timestamp")*

On Wed, Aug 30, 2017 at 11:45 AM, Brian Wylie 
wrote:

> Hi All,
>
> I'm using structured streaming in Spark 2.2.
>
> I'm using PySpark and I have data (from a Kafka publisher) where the
> timestamp is a float that looks like this:  1379288667.631940
>
> So here's my code (which is working fine)
>
> # SUBSCRIBE: Setup connection to Kafka Stream
> raw_data = spark.readStream.format('kafka') \
>   .option('kafka.bootstrap.servers', 'localhost:9092') \
>   .option('subscribe', 'dns') \
>   .option('startingOffsets', 'latest') \
>   .load()
>
> # ETL: Hardcoded Schema for DNS records (do this better later)
> from pyspark.sql.types import StructType, StringType, BooleanType,
> IntegerType, FloatType
> from pyspark.sql.functions import from_json, to_json, col, struct
>
> dns_schema = StructType() \
> .add('ts', FloatType()) \
> .add('uid', StringType()) \
> .add('id.orig_h', StringType()) \
>   
>
> # ETL: Convert raw data into parsed and proper typed data
> from pyspark.sql.functions import col, length, to_timestamp
>
> parsed_data = raw_data \
>   .select(from_json(col("value").cast("string"),
> dns_schema).alias('data')) \
>   .select('data.*')
>
> # Convert Bro IDS time to an actual TimeStamp type
> from pyspark.sql.functions import udf
> import datetime
> my_udf = udf(lambda x: datetime.datetime.fromtimestamp(float(x)),
> TimestampType())
> parsed_data_with_dt = parsed_data.withColumn('dt', my_udf('ts'))
>
> # Then a writestream later...
>
> Okay so all this code works fine (the 'dt' field has exactly what I
> want)... but I'll be streaming in a lot of data so here's the questions:
>
> - Will the creation of a new dataframe withColumn basically kill
> performance?
> - Should I move my UDF into the parsed_data.select(...)  part?
> - Can my UDF be done by spark.sql directly? (I tried to_timestamp but
> without luck)
>
> Any suggestions/pointers are greatly appreciated.
>
> -Brian Wylie
>
>
>