AI in data engineering  - Part 3

Preface🎬

In part 2, we went over a few AI solutions with an architecture we can use to start building custom AI tools that generate commercial value across the company.

We also highlighted two routes to creating them - the ones that involved coding and those that are no-code. This article explores the code-based one, which will be scripted in Python.

Types of AI chatbots🤖

  • Database chatbots 📊 - ideal for retrieving data from databases, whether they are relational, non-relational, or other types

  • Document chatbots 📄- for querying files of varying formats such as PDFs, CSVs, text, .doc, and more

  • Website chatbots 🌐- tailored to provide answers sourced from websites

The user experience with these chatbots is dependent on the quality and volumes of the data they consume. High-quality data results in a great user experience. On the other hand, poor-quality data risks creating poor, unreliable responses to the users which could result in creating more damage than value.

So it's always important to drive the attention to making sure the pipelines moving the source data into these chatbots is high in data quality through data engineering best practices, like data quality tests, leveraging data contracts, and peer reviews among others.

This article will be a deep dive into the database chatbot. Stay tuned for future posts on the others.

Why do we need a database chatbot?🤔💭

Some of the benefits of using an AI database chatbot include:

  • Instant responses 🚀- Accelerates decision-making by providing faster data insights to stakeholders

  • Accessible to all 🌉 - Enables non-technical users to retrieve any information they’re after from the database quickly without waiting on data gatekeepers

  • Optimized resource utilization🚦- The chatbots create efficient SQL queries on the fly which avoids users creating heavy ones that make the database sluggish

  • Less training required 🎓 - Simplifies data retrieval so no money or company resources are required on staff learning SQL from scratch

  • Streamlined audits 📂- Easier to trace and understand data usage through logged chat interactions

Creating an AI database chatbot with Python 🤓🐍

You can watch a demo of what this AI database chatbot looks like here:

Now I will walk you through how this was made with Python:

Folder Structure📂

Here’s what the folder directory tree looks like:

│   .env
│   .gitignore
│   app.py
│   README.md
│   requirements.txt
│
├───assets
│   │   dark_theme.py
│   │   light_theme.py
│   │   made_by_sdw.py
│   │
│   └───__pycache__
│           ...
│
├───conversation_history
│       ...
│
└───utils
    │   api_functions.py
    │   chat_functions.py
    │   config.py
    │   database_functions.py
    │   function_calling_spec.py
    │   helper_functions.py
    │   system_prompts.py
    │
    └───__pycache__
            ...

Here’s a breakdown of the folders mentioned:

  • root

    • .env - stores environment variables

    • .gitignore - lists the files and directories Git should ignore when committing and pushing changes to the main feature branch

    • app.py - initiates and runs the AI chatbot

    • README.md - provides the documentation for this project

    • requirements.txt - contains all the Python modules required to run the chatbot

  • assets

    • dark_theme.py - enables the dark theme for the AI chatbot UI

    • light_theme.py- enables the light theme for the AI chatbot UI

    • made_by_sdw.py- signatures a watermark of the AI chatbot's author (myself) at the bottom right of the chatbot UI

  • conversation_history

    • stores conversation transcripts in markdown format
  • utils

    • api_functions.py - stores functions for communication exchanges between the user, OpenAI’s API and Postgres

    • chat_functions.py - stores functions for facilitating communication within Streamlit

    • config.py - for instantiating the environment variables for other modules to pull from

    • database_functions.py - retrieves information on the Postgres database objects within the information schema

    • function_calling_spec.py - facilitates OpenAI’s function calling mechanism

    • helper_functions.py - stores functions to assist with other activities e.g. save conversations

    • system_prompts.py - sets the behaviour of the AI assistant by specifying its role and responsibilities

  • pycache

    • caches data for the program to perform quicker Python script executions

Technologies💻

  • Streamlit: For an interactive, user-friendly web-based interface.

  • OpenAI: The power behind the chatbot's intelligent responses.

  • Postgres: The database where all the magic data resides.

Code breakdown🖥🐛

This will be divided into

  • Frontend🐥

  • Backend⚙️🔙

Frontend🐥

This focuses on the part of the chatbot the end user sees and interacts with i.e. the chatbot UI.

app.py

This is the script that offers the interface for users to interact with the AI chatbot.

To begin, we import the relevant functions to make the chatbot behave as expected

import streamlit as st
from utils.config import db_credentials, MAX_TOKENS_ALLOWED, MAX_MESSAGES_TO_OPENAI, TOKEN_BUFFER
from utils.system_prompts import get_final_system_prompt
from utils.chat_functions import run_chat_sequence, clear_chat_history, count_tokens, prepare_sidebar_data
from utils.database_functions import database_schema_dict
from utils.function_calling_spec import functions
from utils.helper_functions import  save_conversation
from assets.dark_theme import dark
from assets.light_theme import light
from assets.made_by_sdw import made_by_sdw

The chatbot’s UI is divided into a:

  • Sidebar

  • Chat messaging interface

We’ll begin with the sidebar:

if __name__ == "__main__":

    ########### A. SIDEBAR ###########

    # Prepare data for the sidebar dropdowns
    sidebar_data = prepare_sidebar_data(database_schema_dict)
    st.sidebar.markdown("<div class='made_by'>Made by SDW🔋</div>", unsafe_allow_html=True)

The sidebar contains the following:

  • Postgres DB Objects Viewer

  • Save Conversation button

  • Clear Conversation button

  • Toggle Theme button

Postgres DB Objects Viewer - where users can view the Postgres database schemas, tables and columns via dropdown menus


    ### POSTGRES DB OBJECTS VIEWER ###

    st.markdown(made_by_sdw, unsafe_allow_html=True)
    st.sidebar.title("🔍 Postgres DB Objects Viewer")

    # Dropdown for schema selection
    selected_schema = st.sidebar.selectbox("📂 Select a schema", list(sidebar_data.keys()))

    # Dropdown for table selection based on chosen Schema
    selected_table = st.sidebar.selectbox("📜 Select a table", list(sidebar_data[selected_schema].keys()))

    # Display columns of the chosen table with interactivity using checkboxes
    st.sidebar.subheader(f"🔗 Columns in {selected_table}")
    for column in sidebar_data[selected_schema][selected_table]:
        is_checked = st.sidebar.checkbox(f"📌 {column}")

Save Conversation button - to save a transcript of the conversation between the user and the AI chat assistant

    ### SAVE CONVERSATION BUTTON ###

    # Add a button to SAVE the chat/conversation
    if st.sidebar.button("Save Conversation💾"):
        saved_file_path = save_conversation(st.session_state["full_chat_history"])
        st.sidebar.success(f"Conversation saved to: {saved_file_path}")
        st.sidebar.markdown(f"Conversation saved! [Open File]({saved_file_path})")

Clear Conversation button - to delete the conversation history between the user and the AI chat assistant (this chatbot version applies this to the chat interface only - for regulatory and compliance purposes all conversations would be persisted into the cloud, but developers must inform users if this occurs)

    ### CLEAR CONVERSATION BUTTON ###

    # Add a button to CLEAR the chat/conversation
    if st.sidebar.button("Clear Conversation🗑️"):
        save_conversation(st.session_state["full_chat_history"]) 
        clear_chat_history()

Toggle Theme button - for switching the UI’s colour theme from light to dark, and vice versa

    ### TOGGLE THEME BUTTON ###

    # Retrieve the current theme from session state
    current_theme = st.session_state.get("theme", "light")
    st.markdown(f"<body class='{current_theme}'></body>", unsafe_allow_html=True)

    # Initialize the theme in session state
    if "theme" not in st.session_state:
        st.session_state.theme = "light"

    # Add a button to toggle the UI colour theme
    if st.sidebar.button("Toggle Theme🚨"):
        st.session_state.theme = "dark" if st.session_state.theme == "light" else "light"
        st.experimental_rerun()

    # Apply the theme based on session state
    theme_style = dark if st.session_state.theme == "dark" else light
    st.markdown(theme_style, unsafe_allow_html=True)

Now let’s break down the chat interface itself:

########### B. CHAT INTERFACE ###########

...

We add the title of the app…

   ### TITLE ###

   # Add title to the Streamlit chatbot app
    st.title("🤖 AI Database Chatbot 🤓")

…and then we add the session state variables (a session state is simply a feature in Streamlit that allows you to store variables used across the app into a temporary memory store to make your app work for the active session):

    ### SESSION STATE ###

    # Initialize the full chat messages history for UI
    if "full_chat_history" not in st.session_state:
        st.session_state["full_chat_history"] = [{"role": "system", "content": get_final_system_prompt(db_credentials=db_credentials)}]

    # Initialize the API chat messages history for OpenAI requests
    if "api_chat_history" not in st.session_state:
        st.session_state["api_chat_history"] = [{"role": "system", "content": get_final_system_prompt(db_credentials=db_credentials)}]

Then the interface that displays the messaging exchange between the user and chat assistant is facilitated by this:

### CHAT FACILITATION ###

    # Start the chat
    if (prompt := st.chat_input("What do you want to know?")) is not None:
        st.session_state.full_chat_history.append({"role": "user", "content": prompt})

        # Limit the number of messages sent to OpenAI by token count
        total_tokens = sum(count_tokens(message["content"]) for message in st.session_state["api_chat_history"])
        while total_tokens + count_tokens(prompt) + TOKEN_BUFFER > MAX_TOKENS_ALLOWED:
            removed_message = st.session_state["api_chat_history"].pop(0)
            total_tokens -= count_tokens(removed_message["content"])

        st.session_state.api_chat_history.append({"role": "user", "content": prompt})

    # Display previous chat messages from full_chat_history (ingore system prompt message)
    for message in st.session_state["full_chat_history"][1:]:
        if message["role"] == "user":
            st.chat_message("user", avatar='🧑‍💻').write(message["content"])
        elif message["role"] == "assistant":
            st.chat_message("assistant", avatar='🤖').write(message["content"])

    if st.session_state["api_chat_history"][-1]["role"] != "assistant":
        with st.spinner("⌛Connecting to AI model..."):
            # Send only the most recent messages to OpenAI from api_chat_history
            recent_messages = st.session_state["api_chat_history"][-MAX_MESSAGES_TO_OPENAI:]
            new_message = run_chat_sequence(recent_messages, functions)  # Get the latest message

            # Add this latest message to both api_chat_history and full_chat_history
            st.session_state["api_chat_history"].append(new_message)
            st.session_state["full_chat_history"].append(new_message)

            # Display the latest message from the assistant
            st.chat_message("assistant", avatar='🤖').write(new_message["content"])

        max_tokens = MAX_TOKENS_ALLOWED
        current_tokens = sum(count_tokens(message["content"]) for message in st.session_state["full_chat_history"])
        progress = min(1.0, max(0.0, current_tokens / max_tokens))
        st.progress(progress)
        st.write(f"Tokens Used: {current_tokens}/{max_tokens}")
        if current_tokens > max_tokens:
            st.warning("Note: Due to character limits, some older messages might not be considered in ongoing conversations with the AI.")

/assets

Each Python script in the assets folder supports the styling of the AI chatbot. Each of them contains CSS code that styles the page according to their specifications.

  • dark_theme.py - sets the AI chatbot theme to dark mode

  • light_theme.py- sets the AI chatbot theme to light mode

  • made_by_sdw.py- the watermark of the author’s initials (myself) at the bottom right of the chatbot UI

Backend⚙️🔙

Any operation that makes the chatbot work that the user doesn't see or interact with is part of the backend.

/utils

This folder holds all the Python scripts that deal with the backend operations

api_functions.py

The api_functions.py file contains functions that convert the user’s inputs into API calls which are sent to OpenAI’s API.

Here are the modules used in this file:

import json
import requests
from utils.config import OPENAI_API_KEY, AI_MODEL
from utils.database_functions import ask_postgres_database, postgres_connection
from tenacity import retry, wait_random_exponential, stop_after_attempt
  • json - for manipulating JSON data

  • requests - making API calls to endpoints

  • utils.config - importing environment variables

  • utils.database_functions - importing custom Postgres database functions

  • tenacity - retrying API calls upon failures

This file contains two functions:

  • send_api_request_to_openai_api

  • execute_function_call ****

The send_api_request_to_openai_api sends the user’s prompt as a POST API request to the OpenAI endpoint and returns the response in a structured format using function calling:

@retry(wait=wait_random_exponential(min=1, max=40), stop=stop_after_attempt(3))
def send_api_request_to_openai_api(messages, functions=None, function_call=None, model=AI_MODEL, openai_api_key=OPENAI_API_KEY):
    """ Send the API request to the OpenAI API via Chat Completions endpoint  """
    try:
        headers = {"Content-Type": "application/json", "Authorization": f"Bearer {openai_api_key}"}
        json_data = {"model": model, "messages": messages}
        if functions: 
            json_data.update({"functions": functions})
        if function_call: 
            json_data.update({"function_call": function_call})
        response = requests.post("<https://api.openai.com/v1/chat/completions>", headers=headers, json=json_data)
        response.raise_for_status()

        return response

    except requests.RequestException as e:
        raise ConnectionError(f"Failed to connect to OpenAI API due to: {e}")

The execute_function_call executes the function call included in the response from the previous POST request sent to OpenAI API.

In this case, the ask_postgres_database (defined in the function_calling_spec.py file) is used to convert the user’s input to a SQL query in string format.

The program extracts the SQL query and runs it in the Postgres database, and returns the SQL results:

def execute_function_call(message):
    """ Run the function call provided by OpenAI's API response """
    if message["function_call"]["name"] == "ask_postgres_database":
        query = json.loads(message["function_call"]["arguments"])["query"]
        print(f"SQL query: {query} \\n")
        results = ask_postgres_database(postgres_connection, query)
        print(f"Results A: {results} \\n")
    else:
        results = f"Error: function {message['function_call']['name']} does not exist"
    return results

chat_functions.py

The chat_functions.py file hosts the functions that manage the chat sequence and user interactions.

Here are the modules for this file:

import tiktoken
import streamlit as st
from utils.config import AI_MODEL
from utils.api_functions import send_api_request_to_openai_api, execute_function_call
  • tiktoken - for counting total tokens used in conversations

  • streamlit - for using Streamlit’s session state feature

  • utils.config - importing GPT model from config.py file

  • utils.api_functions - importing functions for interacting with the OpenAI API

This file contains four functions:

  • run_chat_sequence

  • clear_chat_history

  • count_tokens

  • prepare_sidebar_data

The run_chat_sequence handles the primary chat flow - once the user’s message has been received, the message is appended to the chat history and sent to the OpenAI API. If the API’s response contains a function call, the program executes the function and returns the response in a human-readable format.

In this case, the function in the API response is expected to be the ask_postgres_database, which is specified in the function_calling_spec.py file:

def run_chat_sequence(messages, functions):
    if "live_chat_history" not in st.session_state:
        st.session_state["live_chat_history"] = [{"role": "assistant", "content": "Hello! I'm Andy, how can I assist you?"}]
        # st.session_state["live_chat_history"] = []

    internal_chat_history = st.session_state["live_chat_history"].copy()

    chat_response = send_api_request_to_openai_api(messages, functions)
    assistant_message = chat_response.json()["choices"][0]["message"]

    if assistant_message["role"] == "assistant":
        internal_chat_history.append(assistant_message)

    if assistant_message.get("function_call"):
        results = execute_function_call(assistant_message)
        internal_chat_history.append({"role": "function", "name": assistant_message["function_call"]["name"], "content": results})
        internal_chat_history.append({"role": "user", "content": "You are a data analyst - provide personalized/customized explanations on what the results provided means and link them to the the context of the user query using clear, concise words in a user-friendly way. Or answer the question provided by the user in a helpful manner - either way, make sure your responses are human-like and relate to the initial user input. Your answers must not exceed 200 characters"})
        chat_response = send_api_request_to_openai_api(internal_chat_history, functions)
        assistant_message = chat_response.json()["choices"][0]["message"]
        if assistant_message["role"] == "assistant":
            st.session_state["live_chat_history"].append(assistant_message)

    return st.session_state["live_chat_history"][-1]

The clear_chat_history is called to clear the conversation history between the user and chat assistant once the user clicks on the Clear Conversation button in the chatbot UI:

def clear_chat_history():
    """ Clear the chat history stored in the Streamlit session state """
    del st.session_state["live_chat_history"]
    del st.session_state["full_chat_history"]
    del st.session_state["api_chat_history"]

The count_tokens calculates the number of tokens used in each conversation:

def count_tokens(text):
    """ Count the total tokens used in a text string """
    if not isinstance(text, str):  
        return 0 
    encoding = tiktoken.encoding_for_model(AI_MODEL)
    total_tokens_in_text_string = len(encoding.encode(text))

    return total_tokens_in_text_string

The prepare_sidebar_data organises the Postgres database objects in the sidebar in a way it is easy to be visualized. Admittingly, this could be placed in a better location than chat_functions.py, but this can be part of a refactoring exercise coming in a future post:

def prepare_sidebar_data(database_schema_dict):
    """ Add a sidebar for visualizing the database schema objects  """
    sidebar_data = {}
    for table in database_schema_dict:
        schema_name = table["schema_name"]
        table_name = table["table_name"]
        columns = table["column_names"]

        if schema_name not in sidebar_data:
            sidebar_data[schema_name] = {}

        sidebar_data[schema_name][table_name] = columns
    return sidebar_data

config.py

The config.py file stores all the environment variables useful for the chatbot’s operations.

There are only two modules in this file:

import os 
from dotenv import load_dotenv
  • os - accessing environment variables

  • dotenv - loading environment variables

In this file, we have the following objects:

  • db_credentials - a dictionary of the Postgres database credentials

  • OPENAI_API_KEY - the API key from OpenAI to interact with the GPT model

  • AI_MODEL - the GPT model used

  • MAX_TOKENS_ALLOWED - the maximum number of tokens permitted within a conversation exchange with the API

  • MAX_MESSAGES_TO_OPENAI - the maximum number of messages to exchange with OpenAI API

  • TOKEN_BUFFER - the random number selected to provide a safety cushion to avoid exceeding token limits

# Set up OpenAI variables 

OPENAI_API_KEY           =   os.getenv("OPENAI_API_KEY")
AI_MODEL                 =   'gpt-3.5-turbo-16k'
MAX_TOKENS_ALLOWED       =   3000
MAX_MESSAGES_TO_OPENAI   =   5
TOKEN_BUFFER             =   100

database_functions.py

The database_functions.py manages the functions that extract information on the objects in the Postgres database connected.

Here are the imported dependencies for this file:

import psycopg2
from utils.config import db_credentials
  • psycopg2 - connects with Postgres database

  • utils.config - importing database credentials from config.py

The script begins with a validation check that confirms if the connection to the Postgres database is successful:

# Establish connection with PostgreSQL
try:
    postgres_connection = psycopg2.connect(**db_credentials)
    postgres_connection.set_session(autocommit=True)
except Exception as e:
    raise ConnectionError(f"Unable to connect to the database due to: {e}")

# Create a database cursor to execute PostgreSQL commands
cursor = postgres_connection.cursor()

# Validate the PostgreSQL connection status
if postgres_connection.closed == 0:
    print(f"Connected successfully to {db_credentials['dbname']} database\\nConnection Details: {postgres_connection.dsn}")
else:
    raise ConnectionError("Unable to connect to the database")

Once the validation check is successful, the user has access to using any of the following functions:

  • get_schema_names

  • get_table_names

  • get_column_names

  • get_database_info

  • ask_postgres_database

The get_schema_names fetches all the schema names from the database provided:

def get_schema_names(database_connection):
    """ Returns a list of schema names """
    cursor = database_connection.cursor()
    cursor.execute("SELECT schema_name FROM information_schema.schemata;")
    schema_names = [row[0] for row in cursor.fetchall()]
    cursor.close()
    return schema_names

The get_table_names fetches all the table names from the database and schema provided:

def get_table_names(connection, schema_name):
    """ Returns a list of table names """
    cursor = connection.cursor()
    cursor.execute(f"SELECT table_name FROM information_schema.tables WHERE table_schema = '{schema_name}';")
    table_names = [table[0] for table in cursor.fetchall()]
    cursor.close()
    return table_names

The get_column_names fetches all the column names from the database, schema and table provided:

def get_column_names(connection, table_name, schema_name):
    """ Returns a list of column names """
    cursor = connection.cursor()
    cursor.execute(f"SELECT column_name FROM information_schema.columns WHERE table_name = '{table_name}' AND table_schema = '{schema_name}';")
    column_names = [col[0] for col in cursor.fetchall()]
    cursor.close()
    return column_names

The get_database_info returns a detailed list of schemas, their tables and respective column names:

def get_database_info(connection, schema_names):
    """ Fetches information about the schemas, tables and columns in the database """
    table_dicts = []
    for schema in schema_names:
        for table_name in get_table_names(connection, schema):
            column_names = get_column_names(connection, table_name, schema)
            table_dicts.append({"table_name": table_name, "column_names": column_names, "schema_name": schema})
    return table_dicts

(There is also a step that prints the database schema details in string format):

schemas = ['prod', 'dev']
database_schema_dict = get_database_info(postgres_connection, schemas)
database_schema_string = "\\n".join(
    [
        f"Schema: {table['schema_name']}\\nTable: {table['table_name']}\\nColumns: {', '.join(table['column_names'])}"
        for table in database_schema_dict
    ]
)

The ask_postgres_database executes the SQL query provided on the database and returns the results:

def ask_postgres_database(connection, query):
    """ Execute the SQL query provided by OpenAI and return the results """
    try:
        cursor = connection.cursor()
        cursor.execute(query)
        results = str(cursor.fetchall())
        cursor.close()
    except Exception as e:
        results = f"Query failed with error: {e}"
    return results

function_calling_spec.py

The function_calling_spec.py holds the function specification for leveraging OpenAI’s function calling mechanism. Unfortunately, this is beyond the scope of this article but I should be able to touch on this in a future post.

This file only imports the following:

from utils.database_functions import database_schema_string
  • utils.database_functions - imports the database schema in string format

The function description is specified in the functions object, which would be fed into the API calls sent to OpenAI.

functions = [
    {
        "name": "ask_postgres_database",
        "description": "Use this function to answer user questions about the database. Output should be a fully formed SQL query.",
        "parameters": {
            "type": "object",
            "properties": {
                "query": {
                    "type": "string",
                    "description": f""" The SQL query that extracts the information that answers the user's question from the Postgres database. Write the SQL in the following schema structure:
                            {database_schema_string}. Write the query in SQL format only, not in JSON. Do not include any line breaks or characters that cannot be executed in Postgres.  
                            """,
                }
            },
            "required": ["query"],
        },
    }
]

helper_functions.py

The helper_functions.py file stores other functions that support different areas within the chatbot.

The only imports made here are:

import os
import datetime
  • os- for accessing local directories

  • datetime - generates timestamps

Because this is a basic AI chatbot, there is only one function in this file:

  • save_conversation

The save_conversation function writes the conversation history between the user and chat assistant to a markdown format in the directory specified - each file is named the current timestamp it was saved at:

def save_conversation(conversation_history, directory="conversation_history"):
    """
    Save a given conversation history to a markdown file with timestamps.
    """
    # Create the directory if it doesn't exist
    if not os.path.exists(directory):
        os.makedirs(directory)

    # Get the current date and time for the filename
    current_datetime = datetime.datetime.now().strftime('%Y_%m_%d_%H%M%S')
    file_path = os.path.join(directory, f"{current_datetime}.md")

    with open(file_path, 'w', encoding='utf-8') as file:
        for message in conversation_history:
            if message["role"] in ["user", "assistant"]:
                message_timestamp = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
                role_icon = '🧑‍💻' if message["role"] == "user" else '🤖'
                file.write(f"{message_timestamp} **{role_icon} {message['role'].title()}:** {message['content']}\\n\\n")

    return file_path

system_prompts.py

The system_prompts.py file is used to visualize the system prompt fed into the AI chatbot assistant to understand the behaviour it is expected to adopt during the conversation with the user.

This can be executed using the following in the terminal:

streamlit run system_prompt.py

Here are the modules it requires:

import psycopg2
import streamlit as st
from config import db_credentials
  • psycopg2 - connects to Postgres database

  • streamlit - accessing Streamlit web app

  • utils.config - importing database credentials from config.py

In this script we use GENERATE_SQL_PROMPT object, which is the string for setting up the persona of the AI, which is Andy in this instance. Andy is an assistant that creates and executes SQL queries from the user’s input messages under a set of predefined instructions which it must not violate.

Here are the functions in this script:

  • get_table_context

  • get_all_tables_from_db

  • get_all_table_contexts

  • get_data_dictionary

  • get_final_system_prompt

The get_table_context fetches all the columns and their data types for a given table and schema and returns them as a formatted string.

The @st.cache_data persists the output of get_table_context into memory so that if the function is called again with the same parameters it doesn’t need to recompute from scratch.

The show_spinner is set to False to avoid showing a spinner to the users when the function is running:

@st.cache_data(show_spinner=False)
def get_table_context(schema: str, table: str, db_credentials: dict):
    conn = psycopg2.connect(**db_credentials)
    cursor = conn.cursor()
    cursor.execute(f"""
    SELECT column_name, data_type FROM information_schema.columns
    WHERE table_schema = '{schema}' AND table_name = '{table}'
    """)
    columns = cursor.fetchall()

    columns_str = "\\n".join([f"- **{col[0]}**: {col[1]}" for col in columns])
    context = f"""
    Table: <tableName> {schema}.{table} </tableName>
    Columns for {schema}.{table}:
    <columns>\\n\\n{columns_str}\\n\\n</columns>
    """
    cursor.close()
    conn.close()
    return context

The get_all_tables_from_db retrieves all the tables from the database that are not included in the Postgres system schemas named pg_catalog and information_schema:

def get_all_tables_from_db(db_credentials: dict):
    conn = psycopg2.connect(**db_credentials)
    cursor = conn.cursor()
    cursor.execute("""
    SELECT table_schema, table_name FROM information_schema.tables
    WHERE table_schema NOT IN ('pg_catalog', 'information_schema')
    """)
    tables = cursor.fetchall()
    cursor.close()
    conn.close()
    return tables

The get_all_table_contexts retrieves all tables from the database, combines the results with the get_all_tables_from_db’s results and returns the output a a single formatted string:

def get_all_table_contexts(db_credentials: dict):
    tables = get_all_tables_from_db(db_credentials)
    table_contexts = [get_table_context(schema, table, db_credentials) for schema, table in tables]
    return '\\n'.join(table_contexts)

The get_data_dictionary creates a dictionary where the keys are the combinations of schemas and tables and the values are the columns and their data types.

The function calls the get_all_tables_from_db to retrieve all the tables and their corresponding schemas. A new connection is established for each table to fetch the column name and their data type.

def get_data_dictionary(db_credentials: dict):
    tables = get_all_tables_from_db(db_credentials)
    data_dict = {}
    for schema, table in tables:
        conn = psycopg2.connect(**db_credentials)
        cursor = conn.cursor()
        cursor.execute(f"""
        SELECT column_name, data_type FROM information_schema.columns
        WHERE table_schema = '{schema}' AND table_name = '{table}'
        """)
        columns = cursor.fetchall()
        data_dict[f"{schema}.{table}"] = {col[0]: col[1] for col in columns}
        cursor.close()
        conn.close()
    return data_dict

The get_final_system_prompt simply returns the GENERATE_SQL_PROMPT string:

def get_final_system_prompt(db_credentials: dict):
    return GENERATE_SQL_PROMPT

All of this is instantiated and executed in the following code block:

if __name__ == "__main__":

    st.header("System prompt for AI Database Chatbot")

    # Display the data dictionary
    data_dict = get_data_dictionary(db_credentials=db_credentials)
    data_dict_str = "\\n".join(
        [f"{table}:\\n" + "\\n".join(
            [f"    {column}: {dtype}" for column, dtype in columns.items()]) for table, columns in data_dict.items()])

    SYSTEM_PROMPT = get_final_system_prompt(db_credentials=db_credentials)
    st.markdown(SYSTEM_PROMPT)

Resources🌐

You can find the GitHub repo for the code used here:

Conclusion🏁

With Python we can create an AI database chatbot that democratizes access to databases - now non-technical users can perform their data quality checks, and retrieve any information they want at any time without companies needing to spend time and money on SQL training workshops. It's becoming easier to create self-service analytics tools because all users have to do is ask the chatbot - no technical skills are required!

In part 4 of this series, we’ll explore how to create an AI document chatbot that answers any question based on the information within the documents provided using no code this time.

Feel free to reach out via my handles and I’ll be happy to point you in the right direction where I can: LinkedIn| Email | Twitter | Tiktok