TL;DR - I have a pandas dataframe where I need to change the datatypes when 
going into Arrow. I tried using schema, but I'm getting exceptions and am at a 
loss how to proceed. All help appreciated!


Longer version: I'm exporting data from SQL Server into a dataframe, one hour 
at a time, using pypypodbc.  I want to convert it to Parquet, so that I can use 
Redshift Spectrum with it (and ideally at some point, Athena).

However, the schema that PyArrow guesses is inaccurate.
Some fields have NULL, so it converts ints to Float.  (I was super annoyed at 
this until I realized I can just tell it "float" in the CREATE EXTERNAL TABLE, 
in Spectrum).
Some fields are ALWAYS NULL in a particular hour, which means that Pyarrow 
guesses it's an INT64, even when it's normally varchar or whatever.
I tried using ASTYPE on the original dataframe, but that didn't work - the 
metadata showed as having two sets of metadata (numpy_type and pandas_type).  
Spectrum still didn't like it.
Then, I decided to use a schema to force it to what I need.  However, it's 
throwing weird errors (included below).  I have no idea how to proceed - any 
help greatly appreciated.

Far too much code below.

All thanks in advance - and if you're in the area (DFW, TX, US), I'll even buy 
you a beer or something!

============ first, how to get the data into the dataframe 
======================


import pandas as pd
import pypyodbc
import pyarrow.parquet as pq
import pyarrow as pa
from s3fs import S3FileSystem
from datetime import timedelta as td, datetime

s3 = S3FileSystem() # or s3fs.S3FileSystem(key=ACCESS_KEY_ID, 
secret=SECRET_ACCESS_KEY)

start_date = '2019-09-01'
end_date = '2019-09-02'
d1 = datetime.strptime(start_date, '%Y-%m-%d')
d2 = datetime.strptime(end_date, '%Y-%m-%d')

def get_delta(d1, d2):
    delta = d2 - d1
    return delta

delta = get_delta(d1,d2)

con_string = ('Driver={SQL Server};'
'Server=myserver;'
'Database=mydb;'
'App=PyPull;'  #It's not "application name"!
'Trusted_Connection=yes')
cnxn = pypyodbc.connect(con_string)


for i in range(delta.days * 24):
    datebasic =  d1 + td(hours=i)
    datestr = datebasic.strftime("%Y-%m-%d %H:00")
    print (datestr)
    s3name = ("s3://bucket/folder ").replace("_","-")
    query = f"""
    declare @start_date datetime = '{datestr}'
    declare @end_date   datetime  = dateadd(hour,1,'{datestr}')
    #assume those are actual names , if that would cause this to break.
    select fieldA,
              B,              C,              D,              E,              
F,              G,
              H,              I, j, k, l, M,N,O,P,Q,R,S
    , convert(date,E) as subdate, datepart(hour,E) as subhour
    FROM mytable
    where J >= @start_date and J < @end_date
    order by J
    """
    # these lines have to be indented to make sure it's part of the same block
    result_port_map = pd.read_sql(query, cnxn)

#and here is where I'm trying to convert and add the schema - see code below.

#    result_port_map['fieldK'] = result_port_map['fieldK'].astype(str)  
#something I was trying that didn't work, but K did exist at the time
    table = pa.Table.from_pandas(result_port_map)  #this works but guesses the 
field types wrong

    pq.write_to_dataset(table, root_path=s3name,
                        partition_cols=['subdate','subhour'],
                        filesystem=s3, coerce_timestamps='ms',
                        allow_truncated_timestamps=True)



=================== trying to get the schema to match 
==============================

The original SQL Server syntax:
CREATE TABLE [dbo].MYTABLE
(
fieldA [bigint] NOT NULL,
B [bigint] NOT NULL,
C [bigint] NOT NULL,
D [bigint] NULL,
E [datetime] NULL,
F [int] NULL,
G [int] NULL,
H [int] NULL,
I [int] NULL,
J [int] NULL,
K [varchar] (10) COLLATE SQL_Latin1_General_CP1_CI_AS NULL,
L [varchar] (10) COLLATE SQL_Latin1_General_CP1_CI_AS NULL,
M [varchar] (10) COLLATE SQL_Latin1_General_CP1_CI_AS NULL,
N [varchar] (10) COLLATE SQL_Latin1_General_CP1_CI_AS NULL,
O [varchar] (10) COLLATE SQL_Latin1_General_CP1_CI_AS NULL,
P [varchar] (20) COLLATE SQL_Latin1_General_CP1_CI_AS NULL,
Q [varchar] (10) COLLATE SQL_Latin1_General_CP1_CI_AS NULL,
R [varchar] (12) COLLATE SQL_Latin1_General_CP1_CI_AS NULL,
S [smallint] NULL)

fields = [
       pa.field('fieldA', pa.int64())                                           
                          ,
       pa.field('B', pa.int64())                                                
            ,
       pa.field('C', pa.int64())                                                
     ,
       pa.field('D', pa.int64())                                                
                          ,
       pa.field('E', pa.timestamp('ms'))                                        
                   ,
       pa.field('F', pa.float64())                                              
                   ,
       pa.field('G', pa.float64())                                              
                                 ,
       pa.field('H', pa.float64())                                              
                                 ,
       pa.field('I', pa.float64())                                              
                          ,
       pa.field('J', pa.float64())                                              
                                 ,
       pa.field('K', pa.string())                                               
                                        ,
       pa.field('L', pa.string())                                               
                                        ,
       pa.field('M', pa.string())                                               
                                 ,
       pa.field('N', pa.string())                                               
                          ,
       pa.field('O', pa.string())                                               
                   ,
       pa.field('P', pa.string())                                               
                          ,
       pa.field('Q', pa.string()),
       pa.field('R', pa.string()),
       pa.field('S', pa.int16()),
       pa.field('subdate',pa.string()),
       pa.field('subhour',pa.int8())
]
my_schema = pa.schema(fields)
tableForcedSchema = pa.Table.from_pandas(result_port_map, schema=my_schema, 
preserve_index=False)




======================= The errors I'm getting =================


>>> tableForcedSchema = pa.Table.from_pandas(result_port_map, schema=my_schema, 
>>> preserve_index=False)
Traceback (most recent call last):
  File 
"C:\Users\myuser\AppData\Local\Programs\Python\Python37\lib\site-packages\pandas\core\indexes\base.py",
 line 2890, in get_loc
    return self._engine.get_loc(key)
  File "pandas\_libs\index.pyx", line 107, in 
pandas._libs.index.IndexEngine.get_loc
  File "pandas\_libs\index.pyx", line 131, in 
pandas._libs.index.IndexEngine.get_loc
  File "pandas\_libs\hashtable_class_helper.pxi", line 1607, in 
pandas._libs.hashtable.PyObjectHashTable.get_item
  File "pandas\_libs\hashtable_class_helper.pxi", line 1614, in 
pandas._libs.hashtable.PyObjectHashTable.get_item
KeyError: 'fieldA'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "pyarrow\table.pxi", line 1174, in pyarrow.lib.Table.from_pandas
  File 
"C:\Users\myuser\AppData\Local\Programs\Python\Python37\lib\site-packages\pyarrow\pandas_compat.py",
 line 460, in dataframe_to_arrays
    columns)
  File "C:\Users\ myuser 
\AppData\Local\Programs\Python\Python37\lib\site-packages\pyarrow\pandas_compat.py",
 line 346, in _get_columns_to_convert
    col = df[name]
  File "C:\Users\ myuser 
\AppData\Local\Programs\Python\Python37\lib\site-packages\pandas\core\frame.py",
 line 2975, in __getitem__
    indexer = self.columns.get_loc(key)
  File "C:\Users\ myuser 
\AppData\Local\Programs\Python\Python37\lib\site-packages\pandas\core\indexes\base.py",
 line 2892, in get_loc
    return self._engine.get_loc(self._maybe_cast_indexer(key))
  File "pandas\_libs\index.pyx", line 107, in 
pandas._libs.index.IndexEngine.get_loc
  File "pandas\_libs\index.pyx", line 131, in 
pandas._libs.index.IndexEngine.get_loc
  File "pandas\_libs\hashtable_class_helper.pxi", line 1607, in 
pandas._libs.hashtable.PyObjectHashTable.get_item
  File "pandas\_libs\hashtable_class_helper.pxi", line 1614, in 
pandas._libs.hashtable.PyObjectHashTable.get_item
KeyError: 'fieldA'

Reply via email to