Hi

I am trying to load data into a PostGIS table. For this purpose, I have the following function I tried to cough up analogously to https://www.psycopg.org/docs/usage.html#passing-parameters-to-sql-queries.


def process_files(data_directory, file_name_regexp, conn, source_id, logger):
loaded_files = get_loaded_files(conn=conn,source_id=source_id,logger=logger,)
    existing_hashes = {file_hash for file_hash, _ in loaded_files}

    # "next" inhibits recursion, so only the top level is retrieved
    logger.info(f"Looking into '{data_directory}'")
    try:
        cur = conn.cursor()
        cur.execute("set postgis.gdal_enabled_drivers = 'ENABLE_ALL';")
        root, dirs, files = next(os.walk(data_directory))
        for file_name in files:
            if re.match(file_name_regexp, file_name):
                file_path = os.path.join(root, file_name)
                logger.info(f"Processing '{file_path}'")
                file_hash = calculate_file_sha3_512_hash(file_path)
file_creation_time = datetime.fromtimestamp(os.path.getctime(file_path))

# If the hash is alread present, skip this file FIXME check on file names
                if file_hash in existing_hashes:
                    continue

                # Get the raster data
                with open(file_path, 'rb') as f:
                    raster_data = f.read()

                statement = """merge
into
    TOPO_FILES as TARGET
using
    ( values
        (
            -- ST_FromGDALRaster(pg_read_binary_file(%s)),
            ST_FromGDALRaster(%(TILE)s::bytea),
            %(FILE_NAME)s,
            %(FILE_CREATION_PIT)s,
            %(FILE_HASH)s,
            %(SOURCE_ID)s::uuid
        )
    ) as source ( TILE, FILE_NAME, FILE_CREATION_PIT, FILE_HASH, SOURCE_ID )
on
    TARGET.FILE_NAME = SOURCE.FILE_NAME
and TARGET.SOURCE_ID = SOURCE.SOURCE_ID
and TARGET.FILE_HASH != SOURCE.FILE_HASH
and TARGET.FILE_CREATION_PIT < SOURCE.FILE_CREATION_PIT
when matched
    then
update
set
    TILE = SOURCE.TILE,
    FILE_NAME = SOURCE.FILE_NAME,
    FILE_CREATION_PIT = SOURCE.FILE_CREATION_PIT,
    FILE_HASH = SOURCE.FILE_HASH,
    SOURCE_ID = SOURCE.SOURCE_ID
when not matched
    then
insert
    (
        TILE,
        FILE_NAME,
        FILE_CREATION_PIT,
        FILE_HASH,
        SOURCE_ID
    )
    values
    (
        SOURCE.TILE,
        SOURCE.FILE_NAME,
        SOURCE.FILE_CREATION_PIT,
        SOURCE.FILE_HASH,
        SOURCE.SOURCE_ID
    );"""
                logger.debug("statement")
                logger.debug(statement)
                logger.debug("First 100 bytes of raster_data")
                logger.debug(f"{raster_data[:100]}")
                logger.debug(f"file_name:{file_name}")
                logger.debug(f"file_creation_time:{file_creation_time}")
                logger.debug(f"file_hash:{file_hash}")
                logger.debug(f"source_id:{source_id}")
# params = (psycopg2.Binary(raster_data), file_name, file_creation_time, file_hash, source_id)
                params = {'TILE': raster_data
                         ,'FILE_NAME': file_name
                         ,'FILE_CREATION_PIT': file_creation_time
                         ,'FILE_HASH': file_hash
                         ,'SOURCE_ID': source_id}
                # logger.debug(f"params:{params}")
                cur.execute(statement, params)
                conn.commit()
                cur.close()
    except StopIteration:
logger.error(f"Error: '{data_directory}' could not be walked. Directory might be empty or inaccessible.")


However, I get the mentioned error.

2024-11-01 11:06:58 - root - DEBUG - source_id:4f68d890-a08c-4c06-8aa5-741ad36b6abe
Traceback (most recent call last):
File "/home/thiemo/external_projects/svn/33/trunk/code_files/data_storage/load_OpenTopography_data.py", line 737, in <module>
    main()
File "/home/thiemo/external_projects/svn/33/trunk/code_files/data_storage/load_OpenTopography_data.py", line 714, in main
    process_files(
File "/home/thiemo/external_projects/svn/33/trunk/code_files/data_storage/load_OpenTopography_data.py", line 442, in process_files
    cur.execute(statement, params)
TypeError: dict is not a sequence


I would very much appreciate, if someone shed some light on the matter.

Kind regards

Thiemo



Reply via email to