TL;DR - I have a pandas dataframe where I need to change the datatypes when
going into Arrow. I tried using schema, but I'm getting exceptions and am at a
loss how to proceed. All help appreciated!
Longer version: I'm exporting data from SQL Server into a dataframe, one hour
at a time, using pypypodbc. I want to convert it to Parquet, so that I can use
Redshift Spectrum with it (and ideally at some point, Athena).
However, the schema that PyArrow guesses is inaccurate.
Some fields have NULL, so it converts ints to Float. (I was super annoyed at
this until I realized I can just tell it "float" in the CREATE EXTERNAL TABLE,
in Spectrum).
Some fields are ALWAYS NULL in a particular hour, which means that Pyarrow
guesses it's an INT64, even when it's normally varchar or whatever.
I tried using ASTYPE on the original dataframe, but that didn't work - the
metadata showed as having two sets of metadata (numpy_type and pandas_type).
Spectrum still didn't like it.
Then, I decided to use a schema to force it to what I need. However, it's
throwing weird errors (included below). I have no idea how to proceed - any
help greatly appreciated.
Far too much code below.
All thanks in advance - and if you're in the area (DFW, TX, US), I'll even buy
you a beer or something!
============ first, how to get the data into the dataframe
======================
import pandas as pd
import pypyodbc
import pyarrow.parquet as pq
import pyarrow as pa
from s3fs import S3FileSystem
from datetime import timedelta as td, datetime
s3 = S3FileSystem() # or s3fs.S3FileSystem(key=ACCESS_KEY_ID,
secret=SECRET_ACCESS_KEY)
start_date = '2019-09-01'
end_date = '2019-09-02'
d1 = datetime.strptime(start_date, '%Y-%m-%d')
d2 = datetime.strptime(end_date, '%Y-%m-%d')
def get_delta(d1, d2):
delta = d2 - d1
return delta
delta = get_delta(d1,d2)
con_string = ('Driver={SQL Server};'
'Server=myserver;'
'Database=mydb;'
'App=PyPull;' #It's not "application name"!
'Trusted_Connection=yes')
cnxn = pypyodbc.connect(con_string)
for i in range(delta.days * 24):
datebasic = d1 + td(hours=i)
datestr = datebasic.strftime("%Y-%m-%d %H:00")
print (datestr)
s3name = ("s3://bucket/folder ").replace("_","-")
query = f"""
declare @start_date datetime = '{datestr}'
declare @end_date datetime = dateadd(hour,1,'{datestr}')
#assume those are actual names , if that would cause this to break.
select fieldA,
B, C, D, E,
F, G,
H, I, j, k, l, M,N,O,P,Q,R,S
, convert(date,E) as subdate, datepart(hour,E) as subhour
FROM mytable
where J >= @start_date and J < @end_date
order by J
"""
# these lines have to be indented to make sure it's part of the same block
result_port_map = pd.read_sql(query, cnxn)
#and here is where I'm trying to convert and add the schema - see code below.
# result_port_map['fieldK'] = result_port_map['fieldK'].astype(str)
#something I was trying that didn't work, but K did exist at the time
table = pa.Table.from_pandas(result_port_map) #this works but guesses the
field types wrong
pq.write_to_dataset(table, root_path=s3name,
partition_cols=['subdate','subhour'],
filesystem=s3, coerce_timestamps='ms',
allow_truncated_timestamps=True)
=================== trying to get the schema to match
==============================
The original SQL Server syntax:
CREATE TABLE [dbo].MYTABLE
(
fieldA [bigint] NOT NULL,
B [bigint] NOT NULL,
C [bigint] NOT NULL,
D [bigint] NULL,
E [datetime] NULL,
F [int] NULL,
G [int] NULL,
H [int] NULL,
I [int] NULL,
J [int] NULL,
K [varchar] (10) COLLATE SQL_Latin1_General_CP1_CI_AS NULL,
L [varchar] (10) COLLATE SQL_Latin1_General_CP1_CI_AS NULL,
M [varchar] (10) COLLATE SQL_Latin1_General_CP1_CI_AS NULL,
N [varchar] (10) COLLATE SQL_Latin1_General_CP1_CI_AS NULL,
O [varchar] (10) COLLATE SQL_Latin1_General_CP1_CI_AS NULL,
P [varchar] (20) COLLATE SQL_Latin1_General_CP1_CI_AS NULL,
Q [varchar] (10) COLLATE SQL_Latin1_General_CP1_CI_AS NULL,
R [varchar] (12) COLLATE SQL_Latin1_General_CP1_CI_AS NULL,
S [smallint] NULL)
fields = [
pa.field('fieldA', pa.int64())
,
pa.field('B', pa.int64())
,
pa.field('C', pa.int64())
,
pa.field('D', pa.int64())
,
pa.field('E', pa.timestamp('ms'))
,
pa.field('F', pa.float64())
,
pa.field('G', pa.float64())
,
pa.field('H', pa.float64())
,
pa.field('I', pa.float64())
,
pa.field('J', pa.float64())
,
pa.field('K', pa.string())
,
pa.field('L', pa.string())
,
pa.field('M', pa.string())
,
pa.field('N', pa.string())
,
pa.field('O', pa.string())
,
pa.field('P', pa.string())
,
pa.field('Q', pa.string()),
pa.field('R', pa.string()),
pa.field('S', pa.int16()),
pa.field('subdate',pa.string()),
pa.field('subhour',pa.int8())
]
my_schema = pa.schema(fields)
tableForcedSchema = pa.Table.from_pandas(result_port_map, schema=my_schema,
preserve_index=False)
======================= The errors I'm getting =================
>>> tableForcedSchema = pa.Table.from_pandas(result_port_map, schema=my_schema,
>>> preserve_index=False)
Traceback (most recent call last):
File
"C:\Users\myuser\AppData\Local\Programs\Python\Python37\lib\site-packages\pandas\core\indexes\base.py",
line 2890, in get_loc
return self._engine.get_loc(key)
File "pandas\_libs\index.pyx", line 107, in
pandas._libs.index.IndexEngine.get_loc
File "pandas\_libs\index.pyx", line 131, in
pandas._libs.index.IndexEngine.get_loc
File "pandas\_libs\hashtable_class_helper.pxi", line 1607, in
pandas._libs.hashtable.PyObjectHashTable.get_item
File "pandas\_libs\hashtable_class_helper.pxi", line 1614, in
pandas._libs.hashtable.PyObjectHashTable.get_item
KeyError: 'fieldA'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "pyarrow\table.pxi", line 1174, in pyarrow.lib.Table.from_pandas
File
"C:\Users\myuser\AppData\Local\Programs\Python\Python37\lib\site-packages\pyarrow\pandas_compat.py",
line 460, in dataframe_to_arrays
columns)
File "C:\Users\ myuser
\AppData\Local\Programs\Python\Python37\lib\site-packages\pyarrow\pandas_compat.py",
line 346, in _get_columns_to_convert
col = df[name]
File "C:\Users\ myuser
\AppData\Local\Programs\Python\Python37\lib\site-packages\pandas\core\frame.py",
line 2975, in __getitem__
indexer = self.columns.get_loc(key)
File "C:\Users\ myuser
\AppData\Local\Programs\Python\Python37\lib\site-packages\pandas\core\indexes\base.py",
line 2892, in get_loc
return self._engine.get_loc(self._maybe_cast_indexer(key))
File "pandas\_libs\index.pyx", line 107, in
pandas._libs.index.IndexEngine.get_loc
File "pandas\_libs\index.pyx", line 131, in
pandas._libs.index.IndexEngine.get_loc
File "pandas\_libs\hashtable_class_helper.pxi", line 1607, in
pandas._libs.hashtable.PyObjectHashTable.get_item
File "pandas\_libs\hashtable_class_helper.pxi", line 1614, in
pandas._libs.hashtable.PyObjectHashTable.get_item
KeyError: 'fieldA'