Getting row counts from Redshift during unload process and counting rows loaded in S3

My python code looks like below where I am unloading data from Redshift to Amazon S3 bucket. I am trying to get row count from Redshift and S3 bucket to ensure that all the data is loaded. Additionally, I would also like to get last uploaded date from S3 bucket so that I know when last unload was performed. Kindly suggest the code with explanation.

Thanks in advance for your time and efforts!

import csv
import redshift_connector
import sys

CSV_FILE="Tables.csv"
CSV_DELIMITER=';'
S3_DEST_PATH="s3://..../"
DB_HOST="MY HOST"
DB_PORT=1234
DB_DB="MYDB"
DB_USER="MY_READ"
DB_PASSWORD="MY_PSWD"
IM_ROLE="arn:aws:iam::/redshift-role/unload data","arn:aws::iam::/write in bucket"

def get_tables(path):
    tables=[]
    with open (path, 'r') as file:
        csv_reader = csv.reader (file,delimiter=CSV_DELIMITER)
        header = next(csv_reader)
        if header != None:
            for row in csv_reader:
                tables.append(row)
    return tables

def unload(conn, tables, s3_path):
    cur = conn.cursor()
    
    for table in tables:
        print(f">{table[0]}.{table[1]}")
        try:
            query= f'''unload('select * from {table[0]}.{table[1]}' to '{s3_path}/{table[1]}/'
            iam_role '{IAM_ROLE}'
            CSV
            PARALLEL FALSE
            CLEANPATH;'''
            print(f"loading in progress")
            cur.execute(query)
            print(f"Done.")
        except Esception as e:
            print("Failed to load")
            print(str(e))
            sys.exit(1)
            
        cur.close()
        

def main():
    try:
        conn = redshift_connector.connect(
        host=DB_HOST,
        port=DB_PORT,
        database= DB_DB,
        user= DB_USER,
        password=DB_PASSWORD
        )
        
        tables = get_tables(CSV_FILE)
        unload(conn,tables,S3_DEST_PATH)
        conn.close()
    except Exception as e:
        print(e)
        sys.exit(1)


Comments

Popular posts from this blog

Today Walkin 14th-Sept

Spring Elasticsearch Operations

Hibernate Search - Elasticsearch with JSON manipulation