hi Michael, It doesn't look like 'fieldA' is a column name in the object you're passing in. Does result_port_map['fieldA'] work? That's the error that's happening
- Wes On Wed, Sep 18, 2019 at 5:52 PM Bourgon, Michael <[email protected]> wrote: > > TL;DR – I have a pandas dataframe where I need to change the datatypes when > going into Arrow. I tried using schema, but I’m getting exceptions and am at > a loss how to proceed. All help appreciated! > > > > > > Longer version: I’m exporting data from SQL Server into a dataframe, one hour > at a time, using pypypodbc. I want to convert it to Parquet, so that I can > use Redshift Spectrum with it (and ideally at some point, Athena). > > > > However, the schema that PyArrow guesses is inaccurate. > > Some fields have NULL, so it converts ints to Float. (I was super annoyed at > this until I realized I can just tell it “float” in the CREATE EXTERNAL > TABLE, in Spectrum). > > Some fields are ALWAYS NULL in a particular hour, which means that Pyarrow > guesses it’s an INT64, even when it’s normally varchar or whatever. > > I tried using ASTYPE on the original dataframe, but that didn’t work - the > metadata showed as having two sets of metadata (numpy_type and pandas_type). > Spectrum still didn’t like it. > > Then, I decided to use a schema to force it to what I need. However, it’s > throwing weird errors (included below). I have no idea how to proceed – any > help greatly appreciated. > > > > Far too much code below. > > > > All thanks in advance – and if you’re in the area (DFW, TX, US), I’ll even > buy you a beer or something! > > > > ============ first, how to get the data into the dataframe > ====================== > > > > > > import pandas as pd > > import pypyodbc > > import pyarrow.parquet as pq > > import pyarrow as pa > > from s3fs import S3FileSystem > > from datetime import timedelta as td, datetime > > > > s3 = S3FileSystem() # or s3fs.S3FileSystem(key=ACCESS_KEY_ID, > secret=SECRET_ACCESS_KEY) > > > > start_date = '2019-09-01' > > end_date = '2019-09-02' > > d1 = datetime.strptime(start_date, '%Y-%m-%d') > > d2 = datetime.strptime(end_date, '%Y-%m-%d') > > > > def get_delta(d1, d2): > > delta = d2 - d1 > > return delta > > > > delta = get_delta(d1,d2) > > > > con_string = ('Driver={SQL Server};' > > 'Server=myserver;' > > 'Database=mydb;' > > 'App=PyPull;' #It's not "application name"! > > 'Trusted_Connection=yes') > > cnxn = pypyodbc.connect(con_string) > > > > > > for i in range(delta.days * 24): > > datebasic = d1 + td(hours=i) > > datestr = datebasic.strftime("%Y-%m-%d %H:00") > > print (datestr) > > s3name = ("s3://bucket/folder ").replace("_","-") > > query = f""" > > declare @start_date datetime = '{datestr}' > > declare @end_date datetime = dateadd(hour,1,'{datestr}') > > #assume those are actual names , if that would cause this to break. > > select fieldA, > > B, C, D, E, > F, G, > > H, I, j, k, l, M,N,O,P,Q,R,S > > , convert(date,E) as subdate, datepart(hour,E) as subhour > > FROM mytable > > where J >= @start_date and J < @end_date > > order by J > > """ > > # these lines have to be indented to make sure it's part of the same block > > result_port_map = pd.read_sql(query, cnxn) > > > > #and here is where I’m trying to convert and add the schema – see code below. > > > > # result_port_map['fieldK'] = result_port_map['fieldK'].astype(str) > #something I was trying that didn’t work, but K did exist at the time > > table = pa.Table.from_pandas(result_port_map) #this works but guesses > the field types wrong > > > > pq.write_to_dataset(table, root_path=s3name, > > partition_cols=['subdate','subhour'], > > filesystem=s3, coerce_timestamps='ms', > > allow_truncated_timestamps=True) > > > > > > > > =================== trying to get the schema to match > ============================== > > > > The original SQL Server syntax: > > CREATE TABLE [dbo].MYTABLE > > ( > > fieldA [bigint] NOT NULL, > > B [bigint] NOT NULL, > > C [bigint] NOT NULL, > > D [bigint] NULL, > > E [datetime] NULL, > > F [int] NULL, > > G [int] NULL, > > H [int] NULL, > > I [int] NULL, > > J [int] NULL, > > K [varchar] (10) COLLATE SQL_Latin1_General_CP1_CI_AS NULL, > > L [varchar] (10) COLLATE SQL_Latin1_General_CP1_CI_AS NULL, > > M [varchar] (10) COLLATE SQL_Latin1_General_CP1_CI_AS NULL, > > N [varchar] (10) COLLATE SQL_Latin1_General_CP1_CI_AS NULL, > > O [varchar] (10) COLLATE SQL_Latin1_General_CP1_CI_AS NULL, > > P [varchar] (20) COLLATE SQL_Latin1_General_CP1_CI_AS NULL, > > Q [varchar] (10) COLLATE SQL_Latin1_General_CP1_CI_AS NULL, > > R [varchar] (12) COLLATE SQL_Latin1_General_CP1_CI_AS NULL, > > S [smallint] NULL) > > > > fields = [ > > pa.field('fieldA', pa.int64()) > , > > pa.field('B', pa.int64()) > , > > pa.field('C', pa.int64()) > , > > pa.field('D', pa.int64()) > , > > pa.field('E', pa.timestamp('ms')) > , > > pa.field('F', pa.float64()) > , > > pa.field('G', pa.float64()) > , > > pa.field('H', pa.float64()) > , > > pa.field('I', pa.float64()) > , > > pa.field('J', pa.float64()) > , > > pa.field('K', pa.string()) > , > > pa.field('L', pa.string()) > , > > pa.field('M', pa.string()) > , > > pa.field('N', pa.string()) > , > > pa.field('O', pa.string()) > , > > pa.field('P', pa.string()) > , > > pa.field('Q', pa.string()), > > pa.field('R', pa.string()), > > pa.field('S', pa.int16()), > > pa.field('subdate',pa.string()), > > pa.field('subhour',pa.int8()) > > ] > > my_schema = pa.schema(fields) > > tableForcedSchema = pa.Table.from_pandas(result_port_map, schema=my_schema, > preserve_index=False) > > > > > > > > > > ======================= The errors I’m getting ================= > > > > > > >>> tableForcedSchema = pa.Table.from_pandas(result_port_map, > >>> schema=my_schema, preserve_index=False) > > Traceback (most recent call last): > > File > "C:\Users\myuser\AppData\Local\Programs\Python\Python37\lib\site-packages\pandas\core\indexes\base.py", > line 2890, in get_loc > > return self._engine.get_loc(key) > > File "pandas\_libs\index.pyx", line 107, in > pandas._libs.index.IndexEngine.get_loc > > File "pandas\_libs\index.pyx", line 131, in > pandas._libs.index.IndexEngine.get_loc > > File "pandas\_libs\hashtable_class_helper.pxi", line 1607, in > pandas._libs.hashtable.PyObjectHashTable.get_item > > File "pandas\_libs\hashtable_class_helper.pxi", line 1614, in > pandas._libs.hashtable.PyObjectHashTable.get_item > > KeyError: 'fieldA' > > > > During handling of the above exception, another exception occurred: > > > > Traceback (most recent call last): > > File "<stdin>", line 1, in <module> > > File "pyarrow\table.pxi", line 1174, in pyarrow.lib.Table.from_pandas > > File > "C:\Users\myuser\AppData\Local\Programs\Python\Python37\lib\site-packages\pyarrow\pandas_compat.py", > line 460, in dataframe_to_arrays > > columns) > > File "C:\Users\ myuser > \AppData\Local\Programs\Python\Python37\lib\site-packages\pyarrow\pandas_compat.py", > line 346, in _get_columns_to_convert > > col = df[name] > > File "C:\Users\ myuser > \AppData\Local\Programs\Python\Python37\lib\site-packages\pandas\core\frame.py", > line 2975, in __getitem__ > > indexer = self.columns.get_loc(key) > > File "C:\Users\ myuser > \AppData\Local\Programs\Python\Python37\lib\site-packages\pandas\core\indexes\base.py", > line 2892, in get_loc > > return self._engine.get_loc(self._maybe_cast_indexer(key)) > > File "pandas\_libs\index.pyx", line 107, in > pandas._libs.index.IndexEngine.get_loc > > File "pandas\_libs\index.pyx", line 131, in > pandas._libs.index.IndexEngine.get_loc > > File "pandas\_libs\hashtable_class_helper.pxi", line 1607, in > pandas._libs.hashtable.PyObjectHashTable.get_item > > File "pandas\_libs\hashtable_class_helper.pxi", line 1614, in > pandas._libs.hashtable.PyObjectHashTable.get_item > > KeyError: 'fieldA'
