PostgreSQL
If you've come this far, we believe that you already know how to create a database in our CLI app. If not, please check Databases section and create one for further training.
The CGC SDK allows you to deploy PostgreSQL databases as Kubernetes resources and connect to them using native Python PostgreSQL clients. This guide covers everything you need to know about deploying and connecting to PostgreSQL databases in your CGC environment.
Preparation​
For PostgreSQL, our wrapper is enhanced with useful commands to interact with the DB. Firstly, we create the connection to the DB with:
- the name of the running psql instance
- the token generated for that instance
If you don't have any running instances of PostgreSQL, create one and then copy the name and app token.
The app token can be found by using:
cgc db list -d
It won't work from Jupyter, unless you provided api keys. Use your personal terminal where you performed the cgc registration.
Understanding PostgreSQL in CGC​
What is PostgreSQL?​
For Non-Technical Users:
PostgreSQL is like a digital filing cabinet:
- It stores your data in organized tables (like spreadsheets)
- You can search, update, and retrieve information quickly
- Multiple applications can share the same data
- It keeps your data safe and consistent
For Technical Users:
PostgreSQL in CGC:
- Runs as a containerized service in Kubernetes
- Supports standard PostgreSQL features (version 17+)
- Uses Kubernetes service discovery for connectivity
- Requires native Python PostgreSQL clients (psycopg2)
- Network access limited to within the Kubernetes namespace
Requirements​
Before working with PostgreSQL, ensure you have the required Python package:
pip install psycopg2-binary
Getting Started​
Import Required Modules​
import psycopg2
from psycopg2 import errors as pg_errors
import cgc.sdk.resource as resource
import cgc.sdk.exceptions as exceptions
Deploy PostgreSQL Database​
def deploy_postgres(db_name="example-postgres", password="example-pass-123"):
"""Deploy a PostgreSQL database resource"""
response = resource.resource_create(
name=db_name,
image_name="postgres:17",
entity="postgresql",
cpu=2,
memory=4,
environment_data=[
f"POSTGRES_PASSWORD={password}",
"POSTGRES_USER=admin",
"POSTGRES_DB=db"
]
)
if response['code'] == 200:
print(f"✓ PostgreSQL '{db_name}' created successfully")
# Wait for database to be ready
while not resource.resource_ready(db_name, resource.ResourceTypes.db):
time.sleep(5)
return db_name, password
else:
print(f"✗ Failed to create database: {response.get('message')}")
return None, None
Connect to PostgreSQL​
def connect_to_database(db_name, password, database="db"):
"""Connect to PostgreSQL database using native psycopg2"""
# Connection parameters for Kubernetes service
connection_params = {
'host': db_name, # Kubernetes service name
'port': 5432, # Standard PostgreSQL port
'user': 'admin', # Database user
'password': password, # Database password
'database': database, # Database name
'connect_timeout': 10
}
# Connect with retry logic
max_retries = 5
for attempt in range(max_retries):
try:
connection = psycopg2.connect(**connection_params)
# Test the connection
cursor = connection.cursor()
cursor.execute("SELECT version()")
version = cursor.fetchone()[0]
cursor.close()
print(f"✓ Connected to PostgreSQL: {version[:50]}...")
return connection
except Exception as e:
print(f"Connection attempt {attempt + 1} failed: {e}")
if attempt < max_retries - 1:
time.sleep(10)
else:
raise
return None
Working with Databases​
Creating Tables​
def create_tables(connection):
"""Create example tables"""
cursor = connection.cursor()
try:
# Create users table
cursor.execute("""
CREATE TABLE IF NOT EXISTS users (
id SERIAL PRIMARY KEY,
username VARCHAR(50) UNIQUE NOT NULL,
email VARCHAR(100) UNIQUE NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# Create posts table
cursor.execute("""
CREATE TABLE IF NOT EXISTS posts (
id SERIAL PRIMARY KEY,
user_id INTEGER REFERENCES users(id),
title VARCHAR(200) NOT NULL,
content TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
connection.commit()
print("✓ Tables created successfully")
return True
except Exception as e:
connection.rollback()
print(f"✗ Error creating tables: {e}")
return False
finally:
cursor.close()
Inserting Data​
def insert_sample_data(connection):
"""Insert sample data into tables"""
cursor = connection.cursor()
try:
# Insert users
users = [
("alice", "alice@example.com"),
("bob", "bob@example.com"),
("charlie", "charlie@example.com")
]
user_ids = []
for username, email in users:
cursor.execute(
"INSERT INTO users (username, email) VALUES (%s, %s) RETURNING id",
(username, email)
)
user_id = cursor.fetchone()[0]
user_ids.append(user_id)
# Insert posts
posts = [
(user_ids[0], "Hello World", "This is my first post!"),
(user_ids[1], "Database Example", "PostgreSQL is awesome!"),
]
for user_id, title, content in posts:
cursor.execute(
"INSERT INTO posts (user_id, title, content) VALUES (%s, %s, %s)",
(user_id, title, content)
)
connection.commit()
print(f"✓ Inserted {len(users)} users and {len(posts)} posts")
return True
except pg_errors.UniqueViolation:
connection.rollback()
print("Note: Some data already exists (duplicate entries)")
return True
except Exception as e:
connection.rollback()
print(f"✗ Error inserting data: {e}")
return False
finally:
cursor.close()
Querying Data​
def query_data(connection):
"""Query and display data"""
cursor = connection.cursor()
try:
# Count users
cursor.execute("SELECT COUNT(*) FROM users")
user_count = cursor.fetchone()[0]
print(f"Total users: {user_count}")
# List users with post count
cursor.execute("""
SELECT
u.username,
u.email,
COUNT(p.id) as post_count
FROM users u
LEFT JOIN posts p ON u.id = p.user_id
GROUP BY u.id, u.username, u.email
ORDER BY post_count DESC
""")
for row in cursor.fetchall():
username, email, post_count = row
print(f"User: {username} ({email}) - {post_count} posts")
return True
except Exception as e:
print(f"✗ Error querying data: {e}")
return False
finally:
cursor.close()
Advanced Usage​
Transaction Management​
def demonstrate_transactions(connection):
"""Demonstrate transaction handling"""
cursor = connection.cursor()
try:
# psycopg2 starts transactions automatically
print("Starting transaction...")
# Operation 1: Create a new user
cursor.execute(
"INSERT INTO users (username, email) VALUES (%s, %s) RETURNING id",
("transaction_test", "trans@example.com")
)
new_user_id = cursor.fetchone()[0]
print(f"✓ Created user (ID: {new_user_id})")
# Operation 2: Create posts
for i in range(3):
cursor.execute(
"INSERT INTO posts (user_id, title, content) VALUES (%s, %s, %s)",
(new_user_id, f"Transaction Post {i+1}", "Testing transactions")
)
print("✓ Created 3 posts")
# Commit transaction
connection.commit()
print("✓ Transaction committed successfully")
# Demonstrate rollback (intentional error)
try:
cursor.execute(
"INSERT INTO users (username, email) VALUES (%s, %s)",
("transaction_test", "another@example.com") # Duplicate username
)
connection.commit()
except pg_errors.UniqueViolation:
connection.rollback()
print("✓ Transaction rolled back (duplicate username)")
return True
except Exception as e:
connection.rollback()
print(f"✗ Transaction failed: {e}")
return False
finally:
cursor.close()
Connection Management with Context Manager​
import contextlib
@contextlib.contextmanager
def database_connection(db_name, password, database="db"):
"""Context manager for database connections"""
connection = None
try:
# Connect to database
connection = psycopg2.connect(
host=db_name,
port=5432,
user='admin',
password=password,
database=database,
connect_timeout=10
)
yield connection
except Exception as e:
if connection:
connection.rollback()
raise e
finally:
if connection:
connection.close()
# Usage
with database_connection("postgres-db", "password") as conn:
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM users")
count = cursor.fetchone()[0]
print(f"Total users: {count}")
cursor.close()
Complete Example​
Here's a complete example from the SDK examples:
#!/usr/bin/env python3
"""
PostgreSQL Database Connection Example
Requirements:
pip install psycopg2-binary
"""
import time
import psycopg2
from psycopg2 import errors as pg_errors
import cgc.sdk.resource as resource
import cgc.sdk.exceptions as exceptions
def main():
"""Main execution flow"""
print("CGC SDK - PostgreSQL Database Example (Native psycopg2)")
# Step 1: Deploy PostgreSQL
db_name, password = deploy_postgres()
if not db_name:
print("Failed to deploy database. Exiting.")
return
# Network access confirmation
print("\\nNETWORK ACCESS CONFIRMATION")
print("Database access is only available within the Kubernetes namespace network.")
confirmation = input("Are you currently within the namespace network? (y/N): ")
if confirmation.lower().strip() != 'y':
print("✗ Network access not confirmed. Processing cleanup...")
cleanup_database(db_name)
return
# Step 2: Connect to database
connection = connect_to_database(db_name, password)
if not connection:
print("Failed to connect to database. Cleaning up...")
cleanup_database(db_name)
return
try:
# Step 3: Create tables and work with data
create_tables(connection)
insert_sample_data(connection)
query_data(connection)
demonstrate_transactions(connection)
finally:
# Always close the connection
connection.close()
print("✓ Database connection closed")
# Cleanup option
user_input = input(f"Do you want to delete the database '{db_name}'? (y/n): ")
if user_input.lower() == 'y':
cleanup_database(db_name)
if __name__ == "__main__":
main()
Network Requirements​
Important: PostgreSQL databases deployed via CGC SDK are only accessible within the Kubernetes namespace network. They are NOT exposed via ingress for security reasons.
To connect to your database, you must:
- Be running your code within the same Kubernetes namespace
- Use the service name (db_name) as the hostname
- Use port 5432 (standard PostgreSQL port)
Best Practices​
1. Security​
- Never hardcode passwords in your code
- Use environment variables or secure password prompts
- Always use parameterized queries to prevent SQL injection
2. Error Handling​
from psycopg2 import errors as pg_errors
try:
# Database operations
cursor.execute("INSERT INTO users ...")
connection.commit()
except pg_errors.UniqueViolation:
connection.rollback()
print("User already exists")
except pg_errors.ConnectionException:
print("Database connection lost")
except Exception as e:
connection.rollback()
print(f"Unexpected error: {e}")
3. Connection Management​
- Always close cursors and connections
- Use context managers when possible
- Implement retry logic for connection failures
- Reuse connections instead of creating new ones repeatedly
4. Performance​
- Use connection pooling for high-traffic applications
- Add indexes to frequently queried columns
- Use bulk operations for inserting multiple records
- Monitor query performance with EXPLAIN ANALYZE
Troubleshooting​
Connection Issues​
Problem: "could not translate host name to address"
Solutions:
- Ensure you're running within the Kubernetes namespace
- Verify the database resource is deployed and ready
- Check the service name matches your database name
Problem: "Connection refused"
Solutions:
- Verify the database is running:
resource.resource_ready(db_name, resource.ResourceTypes.db)
- Check if the database has finished initializing (may take 30-60 seconds)
- Ensure you're using the correct port (5432)
Problem: "Authentication failed"
Solutions:
- Verify the password is correct
- Ensure the username is 'admin' (default for CGC PostgreSQL)
- Check that the database name exists
Migration from SDK PostgreSQL Client​
If you were previously using cgc.sdk.postgresql
, here are the key changes:
Old way:
import cgc.sdk.postgresql as pg
db_connector = pg.postgresql_client(app_name, password, database)
connection = db_connector.get_postgresql_client()
New way:
import psycopg2
connection = psycopg2.connect(
host=app_name,
port=5432,
user='admin',
password=password,
database=database
)