Applied Programming/Databases/Python3 MS SQL

databases.pyEdit

"""This program demonstrates MS SQL database processing.

Input:
    None

Output:
    Sample data.

References:
    https://en.wikiversity.org/wiki/Python_Programming/Databases
***IMPORTANT***
    Download Link for MS SQL database:
    https://www.microsoft.com/en-us/sql-server/sql-server-downloads

    To install pyodbc use following command:
    python pip3 install pyodbc
"""

import sys
import pyodbc

DATABASE = "users.db"


def execute_sql(sql):
    """Executes the given sql statement.

    Args:
        sql (string): A valid SQL statement to execute.

    Returns:
        None.

    """
    try:
        conn = pyodbc.connect(
            'DRIVER={SQL Server};'
            'SERVER=DESKTOP\\MSSQLSERVER01;'
            'PORT=1433;'
            'DATABASE=users.db;'
            'UID=PyUser;'
            'PWD=pass123'
        )
    except:
        print(f"Unable to connect to {DATABASE}")
        raise

    try:
        cursor = conn.cursor()
        cursor.execute(sql)
        conn.commit()
    except Exception as exception:
        print(f"Unable to execute {sql}")
        print(exception)
    finally:
        conn.close()


def create_table():
    """Creates the Users table.

    Args:
        None.

    Returns:
        None.

    """
    sql = "DROP TABLE IF EXISTS Users;"
    execute_sql(sql)

    sql = """
        CREATE TABLE
        Users(
        UserID INT PRIMARY KEY NOT NULL,
        [User] TEXT NOT NULL, Age INT NOT NULL
        );
        """
    execute_sql(sql)


def insert_users():
    """Inserts data into the Users table.

    Args:
        None.

    Returns:
        None.

    """
    sql = "INSERT INTO Users(UserID, [User], Age) VALUES(1, 'Moe', 28);"
    execute_sql(sql)

    sql = "INSERT INTO Users(UserID, [User], Age) VALUES(2, 'Larry', 55);"
    execute_sql(sql)

    sql = "INSERT INTO Users(UserID, [User], Age) VALUES(3, 'Curly', 19);"
    execute_sql(sql)


def display_table():
    """Displays the Users table.

    Args:
        None.

    Returns:
        None.

    """
    try:
        conn = pyodbc.connect(
            'DRIVER={SQL Server};'
            'SERVER=DESKTOP\\MSSQLSERVER01;'
            'PORT=1433;'
            'DATABASE=users.db;'
            'UID=PyUser;'
            'PWD=pass123'
        )
    except:
        print(f"Unable to connect to {DATABASE}")
        raise

    try:
        cursor = conn.cursor()

        sql = """
            SELECT UserID, [User], Age FROM Users;
            """
        cursor.execute(sql)
        rows = cursor.fetchall()

        for row in rows:
            print(row)
        print()
    except Exception as exception:
        print(f"Error processing {sql}")
        print(exception)
    finally:
        conn.close()


def update_user():
    """Updates the Users table.

    Args:
        None.

    Returns:
        None.

    """
    sql = """
        UPDATE Users
        SET [User] = 'Shemp'
        WHERE UserID = 3;
        """
    execute_sql(sql)


def delete_user():
    """Deletes a record from the Users table.

    Args:
        None.

    Returns:
        None.

    """
    sql = """
        DELETE FROM Users
        WHERE UserID = 3;
        """
    execute_sql(sql)


def display_list_of_tables():
    """Deletes a record from the Users table.

    Args:
        None.

    Returns:
        None.

    """
    print("List of the tables in database: ", end='')
    try:
        conn = pyodbc.connect(
            'DRIVER={SQL Server};'
            'SERVER=DESKTOP\\MSSQLSERVER01;'
            'PORT=1433;'
            'DATABASE=users.db;'
            'UID=PyUser;'
            'PWD=pass123'
        )
    except:
        print(f"Unable to connect to {DATABASE}")
        raise
    try:
        cursor = conn.cursor()
        sql = """
            SELECT * FROM INFORMATION_SCHEMA.TABLES
            WHERE TABLE_TYPE='BASE TABLE';
            """
        cursor.execute(sql)
        rows = cursor.fetchall()

        if rows == []:
            print("Database is Empty, all Tables deleted.")
        else:
            for row in rows:
                print(row)
    except Exception as exception:
        print(f"Error processing {sql}")
        print(exception)
    finally:
        conn.close()


def delete_table():
    """Deletes a record from the Users table.

    Args:
        None.

    Returns:
        None.

    """
    sql = """
        DROP TABLE Users;
        """
    execute_sql(sql)
    print('Users Table was deleted.\n')


def main():
    """Runs the main program logic."""

    try:
        print("Users")
        create_table()
        insert_users()
        display_table()

        print("Users After Update")
        update_user()
        display_table()

        print("Users After Delete")
        delete_user()
        display_table()

        print("Delete Table")
        display_list_of_tables()
        delete_table()
        display_list_of_tables()
    except:
        print("Unexpected error.")
        print("Error:", sys.exc_info()[1])
        print("File: ", sys.exc_info()[2].tb_frame.f_code.co_filename)
        print("Line: ", sys.exc_info()[2].tb_lineno)


main()