Usage examples
This section provides practical examples of using the CGC SDK in Python. All example files are available in the python/
directory and can be run directly.
Setup
Before running the examples, ensure you have the CGC SDK installed and configured:
# Create virtual environment
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
# Install CGC SDK
pip install -e .
# Register with CGC
cgc register
Available Examples
1. Basic Compute Resource Management
File: python/basic_compute.py
This comprehensive example demonstrates the complete lifecycle of compute resource management including creation, monitoring, port configuration, and cleanup.
View the complete basic_compute.py file
#!/usr/bin/env python3
"""
Basic Compute Resource Example
This script demonstrates how to:
1. Create a compute resource
2. Check if it's ready
3. Add ports for external access
4. Clean up resources
Usage:
python basic_compute.py
"""
import time
import cgc.sdk.resource as resource
import cgc.sdk.exceptions as exceptions
def create_web_server():
"""Deploy a basic web server"""
app_name = "example-web-server"
print(f"Creating web server '{app_name}'...")
try:
# Create the resource
response = resource.resource_create(
name=app_name,
image_name="nginx:latest",
cpu=1, # 1 CPU core
memory=2, # 2GB RAM
environment_data=[
"NGINX_HOST=example.com",
"NGINX_PORT=80"
]
)
if response['code'] == 200:
print(f"✓ Web server '{app_name}' created successfully")
else:
print(f"✗ Failed to create web server: {response.get('message', 'Unknown error')}")
return None
except exceptions.SDKException as e:
print(f"✗ SDK Error (code {e.code}): {e}")
return None
except Exception as e:
print(f"✗ Unexpected error: {e}")
return None
return app_name
def wait_for_resource(app_name, max_wait=60):
"""Wait for resource to be ready"""
print(f"Waiting for '{app_name}' to be ready...")
waited = 0
while waited < max_wait:
if resource.resource_ready(app_name):
print(f"✓ '{app_name}' is ready!")
return True
print(f" Still starting... ({waited}s elapsed)")
time.sleep(5)
waited += 5
print(f"✗ '{app_name}' failed to become ready within {max_wait} seconds")
return False
def configure_ports(app_name):
"""Add ports for external access"""
print(f"Configuring ports for '{app_name}'...")
try:
# Add HTTP port
response = resource.resource_add_port(
name=app_name,
port_name="http",
new_port=80,
ingress=True # Enable external access
)
if response['code'] == 200:
print("✓ HTTP port (80) configured")
else:
print(f"✗ Failed to add port: {response.get('message', 'Unknown error')}")
# List all ports
ports = resource.resource_list_ports(app_name)
if ports['code'] == 200 and ports.get('details'):
print(f"Current ports for '{app_name}':")
for port in ports.get('details')['ports']['ports']:
print(f" - {port.get('name', 'unknown')}: {port.get('port', 'unknown')}")
if ports.get('details', {}).get('ingress'):
for port in ports.get('details')['ingress']:
print(f" - URL: {port.get('url')}")
except Exception as e:
print(f"✗ Error configuring ports: {e}")
def list_resources():
"""List all compute resources"""
print("\nListing all compute resources:")
try:
resources = resource.compute_list()
if resources['code'] == 200:
pods = resources['details']['pods_list']
if not pods:
print(" No resources found")
return
for pod in pods:
if 'app-name' in pod.get('labels', {}):
name = pod['labels']['app-name']
status = pod.get('status', 'Unknown')
restarts = pod.get('restart_count', 0)
status_symbol = "✓" if status == "Running" else "○"
print(f" {status_symbol} {name}: {status} (Restarts: {restarts})")
else:
print(f" Failed to list resources: {resources.get('message', 'Unknown error')}")
except Exception as e:
print(f" Error listing resources: {e}")
def cleanup_resource(app_name):
"""Delete a resource"""
print(f"\nCleaning up '{app_name}'...")
try:
response = resource.resource_delete(app_name)
if response['code'] == 200:
print(f"✓ '{app_name}' deleted successfully")
return True
else:
print(f"✗ Failed to delete: {response.get('message', 'Unknown error')}")
return False
except Exception as e:
print(f"✗ Error during cleanup: {e}")
return False
def main():
"""Main execution flow"""
print("=" * 50)
print("CGC SDK - Basic Compute Resource Example")
print("=" * 50)
# Step 1: Create web server
app_name = create_web_server()
if not app_name:
print("\nFailed to create web server. Exiting.")
return
# Step 2: Wait for it to be ready
if not wait_for_resource(app_name):
print("\nResource failed to start. Cleaning up...")
cleanup_resource(app_name)
return
# Step 3: Configure ports
configure_ports(app_name)
# Step 4: List all resources
list_resources()
# Step 5: Ask user if they want to keep or delete the resource
print("\n" + "=" * 50)
user_input = input("Do you want to delete the web server? (y/n): ").lower()
if user_input == 'y':
cleanup_resource(app_name)
else:
print(f"✓ Keeping '{app_name}' running")
print(" Remember to delete it when you're done to avoid charges")
print("\n" + "=" * 50)
print("Example completed!")
print("=" * 50)
if __name__ == "__main__":
main()
Key Features:
- Complete resource lifecycle management
- Environment variable configuration
- Port configuration with external access
- Resource monitoring and health checks
- Interactive cleanup with user confirmation
- Comprehensive error handling and status reporting
2. Job Workflow Management
File: python/job_workflow.py
Complete job lifecycle management with monitoring, error handling, and pipeline orchestration.
View the complete job_workflow.py file
#!/usr/bin/env python3
"""
Job Workflow Example
This script demonstrates how to:
1. Create and run jobs
2. Monitor job execution
3. Run sequential job pipelines
4. Handle job failures
5. Clean up succeeded jobs
Usage:
python job_workflow.py
"""
import time
from datetime import datetime
import cgc.sdk.job as job
import cgc.sdk.exceptions as exceptions
def create_simple_job():
"""Create a simple job that prints a message"""
job_name = f"hello-job-{datetime.now().strftime('%Y%m%d-%H%M%S')}"
print(f"Creating simple job '{job_name}'...")
try:
response = job.job_create(
name=job_name,
image_name="busybox:latest",
cpu=1,
memory=1,
startup_command="echo 'Hello from CGC Job!'",
ttl_seconds_after_finished=300 # Clean up after 5 minutes
)
if response['code'] == 200:
print(f"✓ Job '{job_name}' created successfully")
return job_name
else:
print(f"✗ Failed to create job: {response.get('message', 'Unknown error')}")
return None
except exceptions.SDKException as e:
print(f"✗ SDK Error (code {e.code}): {e}")
return None
def create_data_processing_job(input_file, output_dir):
"""Create a job that simulates data processing"""
timestamp = datetime.now().strftime('%Y%m%d-%H%M%S')
job_name = f"data-process-{timestamp}"
print(f"Creating data processing job '{job_name}'...")
try:
response = job.job_create(
name=job_name,
image_name="python:3.9-slim",
cpu=2,
memory=4,
environment_data=[
f"INPUT_FILE={input_file}",
f"OUTPUT_DIR={output_dir}",
"PROCESSING_MODE=batch"
],
# Simulate data processing with Python
startup_command=(
"python -c \""
"import os, time; "
"print(f'Processing {os.environ.get(\\\"INPUT_FILE\\\")}...'); "
"time.sleep(5); "
"print(f'Output saved to {os.environ.get(\\\"OUTPUT_DIR\\\")}'); "
"print('Processing complete!')"
"\""
),
ttl_seconds_after_finished=600 # Clean up after 10 minutes
)
if response['code'] == 200:
print(f"✓ Data processing job '{job_name}' created")
return job_name
else:
print(f"✗ Failed to create job: {response.get('message', 'Unknown error')}")
return None
except Exception as e:
print(f"✗ Error creating job: {e}")
return None
def monitor_job(job_name, check_interval=5, max_wait=60):
"""Monitor a job until it completes or fails"""
print(f"Monitoring job '{job_name}'...")
start_time = time.time()
last_status = None
while time.time() - start_time < max_wait:
try:
response = job.job_list()
if response['code'] != 200:
print(f"✗ Failed to get job status: {response.get('message', 'Unknown error')}")
return 'Unknown'
# Find our job in the list
job_found = False
for j in response['details'].get('job_list', []):
if j['name'] == job_name:
job_found = True
current_status = j.get('status', {}).get('phase', 'Unknown')
# Only print if status changed
if current_status != last_status:
elapsed = int(time.time() - start_time)
print(f" [{elapsed}s] Status: {current_status}")
last_status = current_status
# Check if job finished
if current_status == 'Succeeded':
print(f"✓ Job '{job_name}' completed successfully!")
return 'Completed'
elif current_status == 'Failed':
print(f"✗ Job '{job_name}' failed")
return 'Failed'
break
if not job_found:
print(f" Job '{job_name}' not found (may have been cleaned up)")
return 'NotFound'
time.sleep(check_interval)
except Exception as e:
print(f"✗ Error monitoring job: {e}")
return 'Error'
print(f"✗ Job monitoring timeout after {max_wait} seconds")
return 'Timeout'
def run_job_pipeline(pipeline_name, steps):
"""Run a sequence of jobs as a pipeline"""
print(f"\nStarting pipeline: {pipeline_name}")
print("=" * 40)
succeeded_jobs = []
failed_job = None
for i, step in enumerate(steps, 1):
print(f"\nStep {i}/{len(steps)}: {step['description']}")
# Create the job
job_name = step['create_func'](*step.get('args', []))
if not job_name:
print(f"✗ Failed to create job for step {i}")
failed_job = f"Step {i}"
break
# Monitor the job
status = monitor_job(job_name)
if status == 'Succeeded':
succeeded_jobs.append(job_name)
else:
print(f"✗ Pipeline failed at step {i}")
failed_job = job_name
break
# Pipeline summary
print("\n" + "=" * 40)
print(f"Pipeline: {pipeline_name}")
print(f"Succeeded: {len(succeeded_jobs)}/{len(steps)} steps")
if failed_job:
print(f"Failed at: {failed_job}")
else:
print("✓ All steps succeeded successfully!")
return succeeded_jobs, failed_job
def list_all_jobs():
"""List all jobs with their status"""
print("\nCurrent jobs in the system:")
print("-" * 40)
try:
response = job.job_list()
if response['code'] != 200:
print(f"Failed to list jobs: {response.get('message', 'Unknown error')}")
return
jobs_list = response['details'].get('job_list', [])
if not jobs_list:
print(" No jobs found")
return
# Group jobs by status
by_status = {}
for j in jobs_list:
status = j.get('status', {}).get('phase', 'Unknown')
if status not in by_status:
by_status[status] = []
by_status[status].append(j['name'])
# Display grouped jobs
for status, names in by_status.items():
print(f"\n{status}: ({len(names)} jobs)")
for name in names[:5]: # Show first 5
print(f" - {name}")
if len(names) > 5:
print(f" ... and {len(names) - 5} more")
except Exception as e:
print(f"Error listing jobs: {e}")
def cleanup_succeeded_jobs():
"""Delete all succeeded jobs"""
print("\nCleaning up succeeded jobs...")
try:
response = job.job_list()
if response['code'] != 200:
print(f"Failed to list jobs: {response.get('message', 'Unknown error')}")
return
deleted_count = 0
for j in response['details'].get('job_list', []):
job_phase = j.get('status', {}).get('phase')
if job_phase == 'Succeeded':
try:
delete_response = job.job_delete(j['name'])
if delete_response['code'] == 200:
deleted_count += 1
print(f" Deleted: {j['name']}")
except Exception as e:
print(f" Failed to delete {j['name']}: {e}")
if deleted_count > 0:
print(f"✓ Cleaned up {deleted_count} succeeded jobs")
else:
print(" No succeeded jobs to clean up")
except Exception as e:
print(f"Error during cleanup: {e}")
def main():
"""Main execution flow"""
print("=" * 50)
print("CGC SDK - Job Workflow Example")
print("=" * 50)
# Example 1: Simple job
print("\n1. Running a simple job")
print("-" * 30)
simple_job = create_simple_job()
if simple_job:
monitor_job(simple_job)
else:
print("✗ Simple job failed. Stopping further tests.")
return
# Example 2: Data processing job
print("\n2. Running a data processing job")
print("-" * 30)
data_job = create_data_processing_job(
input_file="/data/input.csv",
output_dir="/data/output/"
)
if data_job:
monitor_job(data_job)
# Example 3: Pipeline of jobs
print("\n3. Running a job pipeline")
print("-" * 30)
# Define pipeline steps
pipeline_steps = [
{
'description': 'Extract data',
'create_func': lambda: create_simple_job()
},
{
'description': 'Process data',
'create_func': lambda: create_data_processing_job("/tmp/extracted.csv", "/tmp/processed/")
},
{
'description': 'Generate report',
'create_func': lambda: create_simple_job()
}
]
succeeded, failed = run_job_pipeline("ETL Pipeline", pipeline_steps)
# Show all jobs
list_all_jobs()
# Cleanup option
print("\n" + "=" * 50)
user_input = input("Do you want to clean up succeeded jobs? (y/n): ").lower()
if user_input == 'y':
cleanup_succeeded_jobs()
print("\n" + "=" * 50)
print("Example completed!")
print("=" * 50)
if __name__ == "__main__":
main()
Key Features:
- Simple job creation and monitoring
- Data processing job pipelines
- Job status tracking and wait functions
- Sequential job execution
- Automatic cleanup with TTL
- Comprehensive error handling
3. Port Management
File: python/port_management.py
Advanced port configuration for external access and load balancing.
View the complete port_management.py file
#!/usr/bin/env python3
"""
Port Management Example
This script demonstrates how to:
1. Create resources with multiple services
2. Add, update, and delete ports
3. Configure ingress for external access
4. List and manage port configurations
Usage:
python port_management.py
"""
import time
import cgc.sdk.resource as resource
import cgc.sdk.exceptions as exceptions
def create_multi_service_app(app_name="multi-port-app"):
"""Create an application that needs multiple ports"""
print(f"Creating multi-service application '{app_name}'...")
try:
# Create a resource that could serve multiple services
# Using a Python image that can run multiple services
response = resource.resource_create(
name=app_name,
image_name="python:3.9",
cpu=2,
memory=4,
startup_command=(
"python -c \""
"import http.server, socketserver, threading; "
"handlers = []; "
"ports = [8080, 8081, 8082]; "
"for port in ports: "
" handler = http.server.SimpleHTTPRequestHandler; "
" httpd = socketserver.TCPServer(('', port), handler); "
" thread = threading.Thread(target=httpd.serve_forever); "
" thread.daemon = True; "
" thread.start(); "
" print(f'Server started on port {port}'); "
"import time; "
"while True: time.sleep(1)"
"\""
),
environment_data=[
"SERVICE_NAME=multi-port-demo",
"ENV=development"
]
)
if response['code'] == 200:
print(f"✓ Application '{app_name}' created successfully")
return app_name
else:
print(f"✗ Failed to create application: {response.get('message', 'Unknown error')}")
return None
except exceptions.SDKException as e:
print(f"✗ SDK Error (code {e.code}): {e}")
return None
def wait_for_app(app_name):
"""Wait for application to be ready"""
print(f"Waiting for '{app_name}' to be ready...")
max_wait = 60
waited = 0
while waited < max_wait:
if resource.resource_ready(app_name):
print(f"✓ Application '{app_name}' is ready!")
return True
time.sleep(5)
waited += 5
print(f" Still starting... ({waited}s elapsed)")
print(f"✗ Application failed to become ready within {max_wait} seconds")
return False
def add_application_ports(app_name):
"""Add multiple ports to the application"""
print(f"\nAdding ports to '{app_name}'...")
# Define ports to add
ports_config = [
{
'name': 'web',
'port': 8080,
'ingress': True,
'description': 'Main web interface'
},
{
'name': 'api',
'port': 8081,
'ingress': True,
'description': 'REST API endpoint'
},
{
'name': 'admin',
'port': 8082,
'ingress': False, # Internal only
'description': 'Admin panel (internal)'
}
]
success_count = 0
for config in ports_config:
try:
response = resource.resource_add_port(
name=app_name,
port_name=config['name'],
new_port=config['port'],
ingress=config['ingress']
)
if response['code'] == 200:
ingress_status = "external" if config['ingress'] else "internal"
print(f" ✓ Added {config['name']} port ({config['port']}) - {ingress_status}")
print(f" Description: {config['description']}")
success_count += 1
else:
print(f" ✗ Failed to add {config['name']} port: {response.get('message', 'Unknown error')}")
except Exception as e:
print(f" ✗ Error adding {config['name']} port: {e}")
print(f"\nSuccessfully added {success_count}/{len(ports_config)} ports")
return success_count > 0
def list_ports(app_name):
"""List all ports for an application"""
print(f"\nCurrent ports for '{app_name}':")
print("-" * 50)
try:
response = resource.resource_list_ports(app_name)
if response['code'] == 200:
ports = response.get('details', {}).get('ports', {}).get('ports')
ingresses = response.get('details', {}).get('ingress')
i_names = [i.get("port_name") for i in ingresses]
if not ports:
print(" No ports configured")
return []
print(f" {'Port Name':<15} {'Port':<10} {'Ingress':<10}")
print(" " + "-" * 35)
for port in ports:
name = port.get('name', 'unknown')
number = port.get('port', 'unknown')
ingress = "External" if port.get('name') in i_names else "Internal"
print(f" {name:<15} {number:<10} {ingress:<10}")
return ports
else:
print(f" Failed to list ports: {response.get('message', 'Unknown error')}")
return []
except Exception as e:
print(f" Error listing ports: {e}")
return []
def update_port_configuration(app_name):
"""Update existing port configurations"""
print(f"\nUpdating port configurations for '{app_name}'...")
# Example: Change the API port from 8081 to 8090
try:
print(" Updating 'api' port from 8081 to 8090...")
response = resource.resource_update_port(
name=app_name,
port_name='api',
new_port=8090,
ingress=True
)
if response['code'] == 200:
print(" ✓ Successfully updated API port to 8090")
else:
print(f" ✗ Failed to update port: {response.get('message', 'Unknown error')}")
except Exception as e:
print(f" ✗ Error updating port: {e}")
# Example: Change admin port to allow external access
try:
print(" Enabling external access for 'admin' port...")
response = resource.resource_update_port(
name=app_name,
port_name='admin',
new_port=8082, # Keep same port
ingress=True # Change to external
)
if response['code'] == 200:
print(" ✓ Admin port now accessible externally")
else:
print(f" ✗ Failed to update port: {response.get('message', 'Unknown error')}")
except Exception as e:
print(f" ✗ Error updating port: {e}")
def delete_port(app_name, port_name):
"""Delete a specific port"""
print(f"\nDeleting port '{port_name}' from '{app_name}'...")
try:
response = resource.resource_delete_port(
name=app_name,
port_name=port_name
)
if response['code'] == 200:
print(f" ✓ Successfully deleted port '{port_name}'")
return True
else:
print(f" ✗ Failed to delete port: {response.get('message', 'Unknown error')}")
return False
except Exception as e:
print(f" ✗ Error deleting port: {e}")
return False
def demonstrate_port_scenarios(app_name):
"""Demonstrate various port management scenarios"""
print("\n" + "=" * 50)
print("Port Management Scenarios")
print("=" * 50)
# Scenario 1: Add a monitoring port
print("\nScenario 1: Adding monitoring port")
try:
response = resource.resource_add_port(
name=app_name,
port_name='metrics',
new_port=9090,
ingress=False # Internal monitoring
)
if response['code'] == 200:
print(" ✓ Added internal monitoring port (9090)")
else:
print(f" ✗ Failed: {response.get('message', 'Unknown error')}")
except Exception as e:
print(f" ✗ Error: {e}")
# Scenario 2: Add a WebSocket port
print("\nScenario 2: Adding WebSocket port")
try:
response = resource.resource_add_port(
name=app_name,
port_name='websocket',
new_port=8899,
ingress=True # External WebSocket connections
)
if response['code'] == 200:
print(" ✓ Added WebSocket port (8899) with external access")
else:
print(f" ✗ Failed: {response.get('message', 'Unknown error')}")
except Exception as e:
print(f" ✗ Error: {e}")
# Scenario 3: Try to add duplicate port (should fail)
print("\nScenario 3: Testing duplicate port prevention")
try:
response = resource.resource_add_port(
name=app_name,
port_name='duplicate-web',
new_port=8080, # Already used by 'web'
ingress=True
)
# This might succeed or fail depending on implementation
if response['code'] == 200:
print(" ⚠ Warning: System allowed duplicate port number")
else:
print(" ✓ System correctly prevented duplicate port")
except Exception as e:
print(f" ✓ System prevented duplicate: {e}")
def cleanup_application(app_name):
"""Delete the application"""
print(f"\nCleaning up application '{app_name}'...")
try:
response = resource.resource_delete(app_name)
if response['code'] == 200:
print(f"✓ Application '{app_name}' deleted successfully")
return True
else:
print(f"✗ Failed to delete application: {response.get('message', 'Unknown error')}")
return False
except Exception as e:
print(f"✗ Error during cleanup: {e}")
return False
def main():
"""Main execution flow"""
print("=" * 60)
print("CGC SDK - Port Management Example")
print("=" * 60)
# Step 1: Create application
app_name = create_multi_service_app()
if not app_name:
print("\nFailed to create application. Exiting.")
return
# Step 2: Wait for application to be ready
if not wait_for_app(app_name):
print("\nApplication failed to start. Cleaning up...")
cleanup_application(app_name)
return
# Step 3: Add initial ports
if not add_application_ports(app_name):
print("\nFailed to add ports. Cleaning up...")
cleanup_application(app_name)
return
# Step 4: List current ports
list_ports(app_name)
# Step 5: Update port configurations
update_port_configuration(app_name)
# Step 6: List ports after updates
print("\nPorts after updates:")
list_ports(app_name)
# Step 7: Demonstrate various scenarios
demonstrate_port_scenarios(app_name)
# Step 8: Final port listing
print("\nFinal port configuration:")
final_ports = list_ports(app_name)
print(f"\nTotal ports configured: {len(final_ports)}")
# Step 9: Demonstrate port deletion
print("\n" + "=" * 60)
user_input = input("Do you want to see port deletion? (y/n): ").lower()
if user_input == 'y':
delete_port(app_name, 'metrics')
print("\nPorts after deletion:")
list_ports(app_name)
# Step 10: Cleanup
print("\n" + "=" * 60)
user_input = input(f"Do you want to delete the application '{app_name}'? (y/n): ").lower()
if user_input == 'y':
cleanup_application(app_name)
else:
print(f"✓ Keeping application '{app_name}' running")
print(" You can access the configured ports externally")
print(" Remember to delete it when you're done!")
print("\n" + "=" * 60)
print("Example completed!")
print("=" * 60)
if __name__ == "__main__":
main()
Key Features:
- Internal and external port mapping
- Protocol configuration (TCP/UDP)
- Load balancer integration
- Port status monitoring
- Dynamic port assignment
4. Database Connection Management
File: python/database_connection.py
Database resource setup and connection handling with persistent storage.
View the complete database_connection.py file
#!/usr/bin/env python3
"""
PostgreSQL Database Connection Example
This script demonstrates how to:
1. Deploy a PostgreSQL database
2. Connect to the database using native psycopg2
3. Create tables and insert data
4. Query and update data
5. Handle database errors
Usage:
python database_connection.py
Requirements:
pip install psycopg2-binary
"""
import time
import psycopg2
from psycopg2 import errors as pg_errors
import cgc.sdk.resource as resource
import cgc.sdk.exceptions as exceptions
def deploy_postgres(db_name="example-postgres", password="example-pass-123"):
"""Deploy a PostgreSQL database resource"""
print(f"Deploying PostgreSQL database '{db_name}'...")
try:
response = resource.resource_create(
name=db_name,
image_name="postgres:17",
entity="postgresql",
cpu=2,
memory=4,
environment_data=[
f"POSTGRES_PASSWORD={password}",
"POSTGRES_USER=admin",
"POSTGRES_DB=db"
]
)
if response['code'] == 200:
print(f"✓ PostgreSQL '{db_name}' created successfully")
# Wait for database to be ready
print("Waiting for database to be ready...")
max_wait = 60
waited = 0
while waited < max_wait:
if resource.resource_ready(db_name, resource.ResourceTypes.db):
print("