hi Michael,

It doesn't look like 'fieldA' is a column name in the object you're
passing in. Does result_port_map['fieldA'] work? That's the error
that's happening

- Wes

On Wed, Sep 18, 2019 at 5:52 PM Bourgon, Michael
<[email protected]> wrote:
>
> TL;DR – I have a pandas dataframe where I need to change the datatypes when 
> going into Arrow. I tried using schema, but I’m getting exceptions and am at 
> a loss how to proceed. All help appreciated!
>
>
>
>
>
> Longer version: I’m exporting data from SQL Server into a dataframe, one hour 
> at a time, using pypypodbc.  I want to convert it to Parquet, so that I can 
> use Redshift Spectrum with it (and ideally at some point, Athena).
>
>
>
> However, the schema that PyArrow guesses is inaccurate.
>
> Some fields have NULL, so it converts ints to Float.  (I was super annoyed at 
> this until I realized I can just tell it “float” in the CREATE EXTERNAL 
> TABLE, in Spectrum).
>
> Some fields are ALWAYS NULL in a particular hour, which means that Pyarrow 
> guesses it’s an INT64, even when it’s normally varchar or whatever.
>
> I tried using ASTYPE on the original dataframe, but that didn’t work - the 
> metadata showed as having two sets of metadata (numpy_type and pandas_type).  
> Spectrum still didn’t like it.
>
> Then, I decided to use a schema to force it to what I need.  However, it’s 
> throwing weird errors (included below).  I have no idea how to proceed – any 
> help greatly appreciated.
>
>
>
> Far too much code below.
>
>
>
> All thanks in advance – and if you’re in the area (DFW, TX, US), I’ll even 
> buy you a beer or something!
>
>
>
> ============ first, how to get the data into the dataframe 
> ======================
>
>
>
>
>
> import pandas as pd
>
> import pypyodbc
>
> import pyarrow.parquet as pq
>
> import pyarrow as pa
>
> from s3fs import S3FileSystem
>
> from datetime import timedelta as td, datetime
>
>
>
> s3 = S3FileSystem() # or s3fs.S3FileSystem(key=ACCESS_KEY_ID, 
> secret=SECRET_ACCESS_KEY)
>
>
>
> start_date = '2019-09-01'
>
> end_date = '2019-09-02'
>
> d1 = datetime.strptime(start_date, '%Y-%m-%d')
>
> d2 = datetime.strptime(end_date, '%Y-%m-%d')
>
>
>
> def get_delta(d1, d2):
>
>     delta = d2 - d1
>
>     return delta
>
>
>
> delta = get_delta(d1,d2)
>
>
>
> con_string = ('Driver={SQL Server};'
>
> 'Server=myserver;'
>
> 'Database=mydb;'
>
> 'App=PyPull;'  #It's not "application name"!
>
> 'Trusted_Connection=yes')
>
> cnxn = pypyodbc.connect(con_string)
>
>
>
>
>
> for i in range(delta.days * 24):
>
>     datebasic =  d1 + td(hours=i)
>
>     datestr = datebasic.strftime("%Y-%m-%d %H:00")
>
>     print (datestr)
>
>     s3name = ("s3://bucket/folder ").replace("_","-")
>
>     query = f"""
>
>     declare @start_date datetime = '{datestr}'
>
>     declare @end_date   datetime  = dateadd(hour,1,'{datestr}')
>
>     #assume those are actual names , if that would cause this to break.
>
>     select fieldA,
>
>               B,              C,              D,              E,              
> F,              G,
>
>               H,              I, j, k, l, M,N,O,P,Q,R,S
>
>     , convert(date,E) as subdate, datepart(hour,E) as subhour
>
>     FROM mytable
>
>     where J >= @start_date and J < @end_date
>
>     order by J
>
>     """
>
>     # these lines have to be indented to make sure it's part of the same block
>
>     result_port_map = pd.read_sql(query, cnxn)
>
>
>
> #and here is where I’m trying to convert and add the schema – see code below.
>
>
>
> #    result_port_map['fieldK'] = result_port_map['fieldK'].astype(str)  
> #something I was trying that didn’t work, but K did exist at the time
>
>     table = pa.Table.from_pandas(result_port_map)  #this works but guesses 
> the field types wrong
>
>
>
>     pq.write_to_dataset(table, root_path=s3name,
>
>                         partition_cols=['subdate','subhour'],
>
>                         filesystem=s3, coerce_timestamps='ms',
>
>                         allow_truncated_timestamps=True)
>
>
>
>
>
>
>
> =================== trying to get the schema to match 
> ==============================
>
>
>
> The original SQL Server syntax:
>
> CREATE TABLE [dbo].MYTABLE
>
> (
>
> fieldA [bigint] NOT NULL,
>
> B [bigint] NOT NULL,
>
> C [bigint] NOT NULL,
>
> D [bigint] NULL,
>
> E [datetime] NULL,
>
> F [int] NULL,
>
> G [int] NULL,
>
> H [int] NULL,
>
> I [int] NULL,
>
> J [int] NULL,
>
> K [varchar] (10) COLLATE SQL_Latin1_General_CP1_CI_AS NULL,
>
> L [varchar] (10) COLLATE SQL_Latin1_General_CP1_CI_AS NULL,
>
> M [varchar] (10) COLLATE SQL_Latin1_General_CP1_CI_AS NULL,
>
> N [varchar] (10) COLLATE SQL_Latin1_General_CP1_CI_AS NULL,
>
> O [varchar] (10) COLLATE SQL_Latin1_General_CP1_CI_AS NULL,
>
> P [varchar] (20) COLLATE SQL_Latin1_General_CP1_CI_AS NULL,
>
> Q [varchar] (10) COLLATE SQL_Latin1_General_CP1_CI_AS NULL,
>
> R [varchar] (12) COLLATE SQL_Latin1_General_CP1_CI_AS NULL,
>
> S [smallint] NULL)
>
>
>
> fields = [
>
>        pa.field('fieldA', pa.int64())                                         
>                             ,
>
>        pa.field('B', pa.int64())                                              
>               ,
>
>        pa.field('C', pa.int64())                                              
>        ,
>
>        pa.field('D', pa.int64())                                              
>                             ,
>
>        pa.field('E', pa.timestamp('ms'))                                      
>                      ,
>
>        pa.field('F', pa.float64())                                            
>                      ,
>
>        pa.field('G', pa.float64())                                            
>                                    ,
>
>        pa.field('H', pa.float64())                                            
>                                    ,
>
>        pa.field('I', pa.float64())                                            
>                             ,
>
>        pa.field('J', pa.float64())                                            
>                                    ,
>
>        pa.field('K', pa.string())                                             
>                                           ,
>
>        pa.field('L', pa.string())                                             
>                                           ,
>
>        pa.field('M', pa.string())                                             
>                                    ,
>
>        pa.field('N', pa.string())                                             
>                             ,
>
>        pa.field('O', pa.string())                                             
>                      ,
>
>        pa.field('P', pa.string())                                             
>                             ,
>
>        pa.field('Q', pa.string()),
>
>        pa.field('R', pa.string()),
>
>        pa.field('S', pa.int16()),
>
>        pa.field('subdate',pa.string()),
>
>        pa.field('subhour',pa.int8())
>
> ]
>
> my_schema = pa.schema(fields)
>
> tableForcedSchema = pa.Table.from_pandas(result_port_map, schema=my_schema, 
> preserve_index=False)
>
>
>
>
>
>
>
>
>
> ======================= The errors I’m getting =================
>
>
>
>
>
> >>> tableForcedSchema = pa.Table.from_pandas(result_port_map, 
> >>> schema=my_schema, preserve_index=False)
>
> Traceback (most recent call last):
>
>   File 
> "C:\Users\myuser\AppData\Local\Programs\Python\Python37\lib\site-packages\pandas\core\indexes\base.py",
>  line 2890, in get_loc
>
>     return self._engine.get_loc(key)
>
>   File "pandas\_libs\index.pyx", line 107, in 
> pandas._libs.index.IndexEngine.get_loc
>
>   File "pandas\_libs\index.pyx", line 131, in 
> pandas._libs.index.IndexEngine.get_loc
>
>   File "pandas\_libs\hashtable_class_helper.pxi", line 1607, in 
> pandas._libs.hashtable.PyObjectHashTable.get_item
>
>   File "pandas\_libs\hashtable_class_helper.pxi", line 1614, in 
> pandas._libs.hashtable.PyObjectHashTable.get_item
>
> KeyError: 'fieldA'
>
>
>
> During handling of the above exception, another exception occurred:
>
>
>
> Traceback (most recent call last):
>
>   File "<stdin>", line 1, in <module>
>
>   File "pyarrow\table.pxi", line 1174, in pyarrow.lib.Table.from_pandas
>
>   File 
> "C:\Users\myuser\AppData\Local\Programs\Python\Python37\lib\site-packages\pyarrow\pandas_compat.py",
>  line 460, in dataframe_to_arrays
>
>     columns)
>
>   File "C:\Users\ myuser 
> \AppData\Local\Programs\Python\Python37\lib\site-packages\pyarrow\pandas_compat.py",
>  line 346, in _get_columns_to_convert
>
>     col = df[name]
>
>   File "C:\Users\ myuser 
> \AppData\Local\Programs\Python\Python37\lib\site-packages\pandas\core\frame.py",
>  line 2975, in __getitem__
>
>     indexer = self.columns.get_loc(key)
>
>   File "C:\Users\ myuser 
> \AppData\Local\Programs\Python\Python37\lib\site-packages\pandas\core\indexes\base.py",
>  line 2892, in get_loc
>
>     return self._engine.get_loc(self._maybe_cast_indexer(key))
>
>   File "pandas\_libs\index.pyx", line 107, in 
> pandas._libs.index.IndexEngine.get_loc
>
>   File "pandas\_libs\index.pyx", line 131, in 
> pandas._libs.index.IndexEngine.get_loc
>
>   File "pandas\_libs\hashtable_class_helper.pxi", line 1607, in 
> pandas._libs.hashtable.PyObjectHashTable.get_item
>
>   File "pandas\_libs\hashtable_class_helper.pxi", line 1614, in 
> pandas._libs.hashtable.PyObjectHashTable.get_item
>
> KeyError: 'fieldA'

Reply via email to