Przykłady użycia
Ta sekcja zawiera praktyczne przykłady używania CGC SDK w Pythonie. Wszystkie pliki przykładów są dostępne w katalogu python/
i można je uruchamiać bezpośrednio.
Konfiguracja
Przed uruchomieniem przykładów upewnij się, że masz zainstalowane i skonfigurowane CGC SDK:
# Utwórz środowisko wirtualne
python -m venv venv
source venv/bin/activate # Na Windows: venv\Scripts\activate
# Zainstaluj CGC SDK
pip install -e .
# Zarejestruj się w CGC
cgc register
Dostępne przykłady
1. Podstawowe zarządzanie zasobami obliczeniowymi
Plik: python/basic_compute.py
Ten kompleksowy przykład demonstruje pełny cykl życia zarządzania zasobami obliczeniowymi, w tym tworzenie, monitorowanie, konfigurację portów i czyszczenie.
Zobacz kompletny plik basic_compute.py
#!/usr/bin/env python3
"""
Basic Compute Resource Example
This script demonstrates how to:
1. Create a compute resource
2. Check if it's ready
3. Add ports for external access
4. Clean up resources
Usage:
python basic_compute.py
"""
import time
import cgc.sdk.resource as resource
import cgc.sdk.exceptions as exceptions
def create_web_server():
"""Deploy a basic web server"""
app_name = "example-web-server"
print(f"Creating web server '{app_name}'...")
try:
# Create the resource
response = resource.resource_create(
name=app_name,
image_name="nginx:latest",
cpu=1, # 1 CPU core
memory=2, # 2GB RAM
environment_data=[
"NGINX_HOST=example.com",
"NGINX_PORT=80"
]
)
if response['code'] == 200:
print(f"✓ Web server '{app_name}' created successfully")
else:
print(f"✗ Failed to create web server: {response.get('message', 'Unknown error')}")
return None
except exceptions.SDKException as e:
print(f"✗ SDK Error (code {e.code}): {e}")
return None
except Exception as e:
print(f"✗ Unexpected error: {e}")
return None
return app_name
def wait_for_resource(app_name, max_wait=60):
"""Wait for resource to be ready"""
print(f"Waiting for '{app_name}' to be ready...")
waited = 0
while waited < max_wait:
if resource.resource_ready(app_name):
print(f"✓ '{app_name}' is ready!")
return True
print(f" Still starting... ({waited}s elapsed)")
time.sleep(5)
waited += 5
print(f"✗ '{app_name}' failed to become ready within {max_wait} seconds")
return False
def configure_ports(app_name):
"""Add ports for external access"""
print(f"Configuring ports for '{app_name}'...")
try:
# Add HTTP port
response = resource.resource_add_port(
name=app_name,
port_name="http",
new_port=80,
ingress=True # Enable external access
)
if response['code'] == 200:
print("✓ HTTP port (80) configured")
else:
print(f"✗ Failed to add port: {response.get('message', 'Unknown error')}")
# List all ports
ports = resource.resource_list_ports(app_name)
if ports['code'] == 200 and ports.get('details'):
print(f"Current ports for '{app_name}':")
for port in ports.get('details')['ports']['ports']:
print(f" - {port.get('name', 'unknown')}: {port.get('port', 'unknown')}")
if ports.get('details', {}).get('ingress'):
for port in ports.get('details')['ingress']:
print(f" - URL: {port.get('url')}")
except Exception as e:
print(f"✗ Error configuring ports: {e}")
def list_resources():
"""List all compute resources"""
print("\nListing all compute resources:")
try:
resources = resource.compute_list()
if resources['code'] == 200:
pods = resources['details']['pods_list']
if not pods:
print(" No resources found")
return
for pod in pods:
if 'app-name' in pod.get('labels', {}):
name = pod['labels']['app-name']
status = pod.get('status', 'Unknown')
restarts = pod.get('restart_count', 0)
status_symbol = "✓" if status == "Running" else "○"
print(f" {status_symbol} {name}: {status} (Restarts: {restarts})")
else:
print(f" Failed to list resources: {resources.get('message', 'Unknown error')}")
except Exception as e:
print(f" Error listing resources: {e}")
def cleanup_resource(app_name):
"""Delete a resource"""
print(f"\nCleaning up '{app_name}'...")
try:
response = resource.resource_delete(app_name)
if response['code'] == 200:
print(f"✓ '{app_name}' deleted successfully")
return True
else:
print(f"✗ Failed to delete: {response.get('message', 'Unknown error')}")
return False
except Exception as e:
print(f"✗ Error during cleanup: {e}")
return False
def main():
"""Main execution flow"""
print("=" * 50)
print("CGC SDK - Basic Compute Resource Example")
print("=" * 50)
# Step 1: Create web server
app_name = create_web_server()
if not app_name:
print("\nFailed to create web server. Exiting.")
return
# Step 2: Wait for it to be ready
if not wait_for_resource(app_name):
print("\nResource failed to start. Cleaning up...")
cleanup_resource(app_name)
return
# Step 3: Configure ports
configure_ports(app_name)
# Step 4: List all resources
list_resources()
# Step 5: Ask user if they want to keep or delete the resource
print("\n" + "=" * 50)
user_input = input("Do you want to delete the web server? (y/n): ").lower()
if user_input == 'y':
cleanup_resource(app_name)
else:
print(f"✓ Keeping '{app_name}' running")
print(" Remember to delete it when you're done to avoid charges")
print("\n" + "=" * 50)
print("Example completed!")
print("=" * 50)
if __name__ == "__main__":
main()
Kluczowe funkcje:
- Pełne zarządzanie cyklem życia zasobów
- Konfiguracja zmiennych środowiskowych
- Konfiguracja portów z dostępem zewnętrznym
- Monitorowanie zasobów i kontrole stanu
- Interaktywne czyszczenie z potwierdzeniem użytkownika
- Kompleksowa obsługa błędów i raportowanie stanu
2. Zarządzanie przepływem zadań
Plik: python/job_workflow.py
Pełne zarządzanie cyklem życia zadań z monitorowaniem, obsługą błędów i orkiestracją potoków.
Zobacz kompletny plik job_workflow.py
#!/usr/bin/env python3
"""
Job Workflow Example
This script demonstrates how to:
1. Create and run jobs
2. Monitor job execution
3. Run sequential job pipelines
4. Handle job failures
5. Clean up succeeded jobs
Usage:
python job_workflow.py
"""
import time
from datetime import datetime
import cgc.sdk.job as job
import cgc.sdk.exceptions as exceptions
def create_simple_job():
"""Create a simple job that prints a message"""
job_name = f"hello-job-{datetime.now().strftime('%Y%m%d-%H%M%S')}"
print(f"Creating simple job '{job_name}'...")
try:
response = job.job_create(
name=job_name,
image_name="busybox:latest",
cpu=1,
memory=1,
startup_command="echo 'Hello from CGC Job!'",
ttl_seconds_after_finished=300 # Clean up after 5 minutes
)
if response['code'] == 200:
print(f"✓ Job '{job_name}' created successfully")
return job_name
else:
print(f"✗ Failed to create job: {response.get('message', 'Unknown error')}")
return None
except exceptions.SDKException as e:
print(f"✗ SDK Error (code {e.code}): {e}")
return None
def create_data_processing_job(input_file, output_dir):
"""Create a job that simulates data processing"""
timestamp = datetime.now().strftime('%Y%m%d-%H%M%S')
job_name = f"data-process-{timestamp}"
print(f"Creating data processing job '{job_name}'...")
try:
response = job.job_create(
name=job_name,
image_name="python:3.9-slim",
cpu=2,
memory=4,
environment_data=[
f"INPUT_FILE={input_file}",
f"OUTPUT_DIR={output_dir}",
"PROCESSING_MODE=batch"
],
# Simulate data processing with Python
startup_command=(
"python -c \""
"import os, time; "
"print(f'Processing {os.environ.get(\\\"INPUT_FILE\\\")}...'); "
"time.sleep(5); "
"print(f'Output saved to {os.environ.get(\\\"OUTPUT_DIR\\\")}'); "
"print('Processing complete!')"
"\""
),
ttl_seconds_after_finished=600 # Clean up after 10 minutes
)
if response['code'] == 200:
print(f"✓ Data processing job '{job_name}' created")
return job_name
else:
print(f"✗ Failed to create job: {response.get('message', 'Unknown error')}")
return None
except Exception as e:
print(f"✗ Error creating job: {e}")
return None
def monitor_job(job_name, check_interval=5, max_wait=60):
"""Monitor a job until it completes or fails"""
print(f"Monitoring job '{job_name}'...")
start_time = time.time()
last_status = None
while time.time() - start_time < max_wait:
try:
response = job.job_list()
if response['code'] != 200:
print(f"✗ Failed to get job status: {response.get('message', 'Unknown error')}")
return 'Unknown'
# Find our job in the list
job_found = False
for j in response['details'].get('job_list', []):
if j['name'] == job_name:
job_found = True
current_status = j.get('status', {}).get('phase', 'Unknown')
# Only print if status changed
if current_status != last_status:
elapsed = int(time.time() - start_time)
print(f" [{elapsed}s] Status: {current_status}")
last_status = current_status
# Check if job finished
if current_status == 'Succeeded':
print(f"✓ Job '{job_name}' completed successfully!")
return 'Completed'
elif current_status == 'Failed':
print(f"✗ Job '{job_name}' failed")
return 'Failed'
break
if not job_found:
print(f" Job '{job_name}' not found (may have been cleaned up)")
return 'NotFound'
time.sleep(check_interval)
except Exception as e:
print(f"✗ Error monitoring job: {e}")
return 'Error'
print(f"✗ Job monitoring timeout after {max_wait} seconds")
return 'Timeout'
def run_job_pipeline(pipeline_name, steps):
"""Run a sequence of jobs as a pipeline"""
print(f"\nStarting pipeline: {pipeline_name}")
print("=" * 40)
succeeded_jobs = []
failed_job = None
for i, step in enumerate(steps, 1):
print(f"\nStep {i}/{len(steps)}: {step['description']}")
# Create the job
job_name = step['create_func'](*step.get('args', []))
if not job_name:
print(f"✗ Failed to create job for step {i}")
failed_job = f"Step {i}"
break
# Monitor the job
status = monitor_job(job_name)
if status == 'Succeeded':
succeeded_jobs.append(job_name)
else:
print(f"✗ Pipeline failed at step {i}")
failed_job = job_name
break
# Pipeline summary
print("\n" + "=" * 40)
print(f"Pipeline: {pipeline_name}")
print(f"Succeeded: {len(succeeded_jobs)}/{len(steps)} steps")
if failed_job:
print(f"Failed at: {failed_job}")
else:
print("✓ All steps succeeded successfully!")
return succeeded_jobs, failed_job
def list_all_jobs():
"""List all jobs with their status"""
print("\nCurrent jobs in the system:")
print("-" * 40)
try:
response = job.job_list()
if response['code'] != 200:
print(f"Failed to list jobs: {response.get('message', 'Unknown error')}")
return
jobs_list = response['details'].get('job_list', [])
if not jobs_list:
print(" No jobs found")
return
# Group jobs by status
by_status = {}
for j in jobs_list:
status = j.get('status', {}).get('phase', 'Unknown')
if status not in by_status:
by_status[status] = []
by_status[status].append(j['name'])
# Display grouped jobs
for status, names in by_status.items():
print(f"\n{status}: ({len(names)} jobs)")
for name in names[:5]: # Show first 5
print(f" - {name}")
if len(names) > 5:
print(f" ... and {len(names) - 5} more")
except Exception as e:
print(f"Error listing jobs: {e}")
def cleanup_succeeded_jobs():
"""Delete all succeeded jobs"""
print("\nCleaning up succeeded jobs...")
try:
response = job.job_list()
if response['code'] != 200:
print(f"Failed to list jobs: {response.get('message', 'Unknown error')}")
return
deleted_count = 0
for j in response['details'].get('job_list', []):
job_phase = j.get('status', {}).get('phase')
if job_phase == 'Succeeded':
try:
delete_response = job.job_delete(j['name'])
if delete_response['code'] == 200:
deleted_count += 1
print(f" Deleted: {j['name']}")
except Exception as e:
print(f" Failed to delete {j['name']}: {e}")
if deleted_count > 0:
print(f"✓ Cleaned up {deleted_count} succeeded jobs")
else:
print(" No succeeded jobs to clean up")
except Exception as e:
print(f"Error during cleanup: {e}")
def main():
"""Main execution flow"""
print("=" * 50)
print("CGC SDK - Job Workflow Example")
print("=" * 50)
# Example 1: Simple job
print("\n1. Running a simple job")
print("-" * 30)
simple_job = create_simple_job()
if simple_job:
monitor_job(simple_job)
else:
print("✗ Simple job failed. Stopping further tests.")
return
# Example 2: Data processing job
print("\n2. Running a data processing job")
print("-" * 30)
data_job = create_data_processing_job(
input_file="/data/input.csv",
output_dir="/data/output/"
)
if data_job:
monitor_job(data_job)
# Example 3: Pipeline of jobs
print("\n3. Running a job pipeline")
print("-" * 30)
# Define pipeline steps
pipeline_steps = [
{
'description': 'Extract data',
'create_func': lambda: create_simple_job()
},
{
'description': 'Process data',
'create_func': lambda: create_data_processing_job("/tmp/extracted.csv", "/tmp/processed/")
},
{
'description': 'Generate report',
'create_func': lambda: create_simple_job()
}
]
succeeded, failed = run_job_pipeline("ETL Pipeline", pipeline_steps)
# Show all jobs
list_all_jobs()
# Cleanup option
print("\n" + "=" * 50)
user_input = input("Do you want to clean up succeeded jobs? (y/n): ").lower()
if user_input == 'y':
cleanup_succeeded_jobs()
print("\n" + "=" * 50)
print("Example completed!")
print("=" * 50)
if __name__ == "__main__":
main()
Kluczowe funkcje:
- Proste tworzenie i monitorowanie zadań
- Potoki przetwarzania danych
- Śledzenie statusu zadań i funkcje oczekiwania
- Sekwencyjne wykonywanie zadań
- Automatyczne czyszczenie z TTL
- Kompleksowa obsługa błędów
3. Zarządzanie portami
Plik: python/port_management.py
Zaawansowana konfiguracja portów dla dostępu zewnętrznego i równoważenia obciążenia.
Zobacz kompletny plik port_management.py
#!/usr/bin/env python3
"""
Port Management Example
This script demonstrates how to:
1. Create resources with multiple services
2. Add, update, and delete ports
3. Configure ingress for external access
4. List and manage port configurations
Usage:
python port_management.py
"""
import time
import cgc.sdk.resource as resource
import cgc.sdk.exceptions as exceptions
def create_multi_service_app(app_name="multi-port-app"):
"""Create an application that needs multiple ports"""
print(f"Creating multi-service application '{app_name}'...")
try:
# Create a resource that could serve multiple services
# Using a Python image that can run multiple services
response = resource.resource_create(
name=app_name,
image_name="python:3.9",
cpu=2,
memory=4,
startup_command=(
"python -c \""
"import http.server, socketserver, threading; "
"handlers = []; "
"ports = [8080, 8081, 8082]; "
"for port in ports: "
" handler = http.server.SimpleHTTPRequestHandler; "
" httpd = socketserver.TCPServer(('', port), handler); "
" thread = threading.Thread(target=httpd.serve_forever); "
" thread.daemon = True; "
" thread.start(); "
" print(f'Server started on port {port}'); "
"import time; "
"while True: time.sleep(1)"
"\""
),
environment_data=[
"SERVICE_NAME=multi-port-demo",
"ENV=development"
]
)
if response['code'] == 200:
print(f"✓ Application '{app_name}' created successfully")
return app_name
else:
print(f"✗ Failed to create application: {response.get('message', 'Unknown error')}")
return None
except exceptions.SDKException as e:
print(f"✗ SDK Error (code {e.code}): {e}")
return None
def wait_for_app(app_name):
"""Wait for application to be ready"""
print(f"Waiting for '{app_name}' to be ready...")
max_wait = 60
waited = 0
while waited < max_wait:
if resource.resource_ready(app_name):
print(f"✓ Application '{app_name}' is ready!")
return True
time.sleep(5)
waited += 5
print(f" Still starting... ({waited}s elapsed)")
print(f"✗ Application failed to become ready within {max_wait} seconds")
return False
def add_application_ports(app_name):
"""Add multiple ports to the application"""
print(f"\nAdding ports to '{app_name}'...")
# Define ports to add
ports_config = [
{
'name': 'web',
'port': 8080,
'ingress': True,
'description': 'Main web interface'
},
{
'name': 'api',
'port': 8081,
'ingress': True,
'description': 'REST API endpoint'
},
{
'name': 'admin',
'port': 8082,
'ingress': False, # Internal only
'description': 'Admin panel (internal)'
}
]
success_count = 0
for config in ports_config:
try:
response = resource.resource_add_port(
name=app_name,
port_name=config['name'],
new_port=config['port'],
ingress=config['ingress']
)
if response['code'] == 200:
ingress_status = "external" if config['ingress'] else "internal"
print(f" ✓ Added {config['name']} port ({config['port']}) - {ingress_status}")
print(f" Description: {config['description']}")
success_count += 1
else:
print(f" ✗ Failed to add {config['name']} port: {response.get('message', 'Unknown error')}")
except Exception as e:
print(f" ✗ Error adding {config['name']} port: {e}")
print(f"\nSuccessfully added {success_count}/{len(ports_config)} ports")
return success_count > 0
def list_ports(app_name):
"""List all ports for an application"""
print(f"\nCurrent ports for '{app_name}':")
print("-" * 50)
try:
response = resource.resource_list_ports(app_name)
if response['code'] == 200:
ports = response.get('details', {}).get('ports', {}).get('ports')
ingresses = response.get('details', {}).get('ingress')
i_names = [i.get("port_name") for i in ingresses]
if not ports:
print(" No ports configured")
return []
print(f" {'Port Name':<15} {'Port':<10} {'Ingress':<10}")
print(" " + "-" * 35)
for port in ports:
name = port.get('name', 'unknown')
number = port.get('port', 'unknown')
ingress = "External" if port.get('name') in i_names else "Internal"
print(f" {name:<15} {number:<10} {ingress:<10}")
return ports
else:
print(f" Failed to list ports: {response.get('message', 'Unknown error')}")
return []
except Exception as e:
print(f" Error listing ports: {e}")
return []
def update_port_configuration(app_name):
"""Update existing port configurations"""
print(f"\nUpdating port configurations for '{app_name}'...")
# Example: Change the API port from 8081 to 8090
try:
print(" Updating 'api' port from 8081 to 8090...")
response = resource.resource_update_port(
name=app_name,
port_name='api',
new_port=8090,
ingress=True
)
if response['code'] == 200:
print(" ✓ Successfully updated API port to 8090")
else:
print(f" ✗ Failed to update port: {response.get('message', 'Unknown error')}")
except Exception as e:
print(f" ✗ Error updating port: {e}")
# Example: Change admin port to allow external access
try:
print(" Enabling external access for 'admin' port...")
response = resource.resource_update_port(
name=app_name,
port_name='admin',
new_port=8082, # Keep same port
ingress=True # Change to external
)
if response['code'] == 200:
print(" ✓ Admin port now accessible externally")
else:
print(f" ✗ Failed to update port: {response.get('message', 'Unknown error')}")
except Exception as e:
print(f" ✗ Error updating port: {e}")
def delete_port(app_name, port_name):
"""Delete a specific port"""
print(f"\nDeleting port '{port_name}' from '{app_name}'...")
try:
response = resource.resource_delete_port(
name=app_name,
port_name=port_name
)
if response['code'] == 200:
print(f" ✓ Successfully deleted port '{port_name}'")
return True
else:
print(f" ✗ Failed to delete port: {response.get('message', 'Unknown error')}")
return False
except Exception as e:
print(f" ✗ Error deleting port: {e}")
return False
def demonstrate_port_scenarios(app_name):
"""Demonstrate various port management scenarios"""
print("\n" + "=" * 50)
print("Port Management Scenarios")
print("=" * 50)
# Scenario 1: Add a monitoring port
print("\nScenario 1: Adding monitoring port")
try:
response = resource.resource_add_port(
name=app_name,
port_name='metrics',
new_port=9090,
ingress=False # Internal monitoring
)
if response['code'] == 200:
print(" ✓ Added internal monitoring port (9090)")
else:
print(f" ✗ Failed: {response.get('message', 'Unknown error')}")
except Exception as e:
print(f" ✗ Error: {e}")
# Scenario 2: Add a WebSocket port
print("\nScenario 2: Adding WebSocket port")
try:
response = resource.resource_add_port(
name=app_name,
port_name='websocket',
new_port=8899,
ingress=True # External WebSocket connections
)
if response['code'] == 200:
print(" ✓ Added WebSocket port (8899) with external access")
else:
print(f" ✗ Failed: {response.get('message', 'Unknown error')}")
except Exception as e:
print(f" ✗ Error: {e}")
# Scenario 3: Try to add duplicate port (should fail)
print("\nScenario 3: Testing duplicate port prevention")
try:
response = resource.resource_add_port(
name=app_name,
port_name='duplicate-web',
new_port=8080, # Already used by 'web'
ingress=True
)
# This might succeed or fail depending on implementation
if response['code'] == 200:
print(" ⚠ Warning: System allowed duplicate port number")
else:
print(" ✓ System correctly prevented duplicate port")
except Exception as e:
print(f" ✓ System prevented duplicate: {e}")
def cleanup_application(app_name):
"""Delete the application"""
print(f"\nCleaning up application '{app_name}'...")
try:
response = resource.resource_delete(app_name)
if response['code'] == 200:
print(f"✓ Application '{app_name}' deleted successfully")
return True
else:
print(f"✗ Failed to delete application: {response.get('message', 'Unknown error')}")
return False
except Exception as e:
print(f"✗ Error during cleanup: {e}")
return False
def main():
"""Main execution flow"""
print("=" * 60)
print("CGC SDK - Port Management Example")
print("=" * 60)
# Step 1: Create application
app_name = create_multi_service_app()
if not app_name:
print("\nFailed to create application. Exiting.")
return
# Step 2: Wait for application to be ready
if not wait_for_app(app_name):
print("\nApplication failed to start. Cleaning up...")
cleanup_application(app_name)
return
# Step 3: Add initial ports
if not add_application_ports(app_name):
print("\nFailed to add ports. Cleaning up...")
cleanup_application(app_name)
return
# Step 4: List current ports
list_ports(app_name)
# Step 5: Update port configurations
update_port_configuration(app_name)
# Step 6: List ports after updates
print("\nPorts after updates:")
list_ports(app_name)
# Step 7: Demonstrate various scenarios
demonstrate_port_scenarios(app_name)
# Step 8: Final port listing
print("\nFinal port configuration:")
final_ports = list_ports(app_name)
print(f"\nTotal ports configured: {len(final_ports)}")
# Step 9: Demonstrate port deletion
print("\n" + "=" * 60)
user_input = input("Do you want to see port deletion? (y/n): ").lower()
if user_input == 'y':
delete_port(app_name, 'metrics')
print("\nPorts after deletion:")
list_ports(app_name)
# Step 10: Cleanup
print("\n" + "=" * 60)
user_input = input(f"Do you want to delete the application '{app_name}'? (y/n): ").lower()
if user_input == 'y':
cleanup_application(app_name)
else:
print(f"✓ Keeping application '{app_name}' running")
print(" You can access the configured ports externally")
print(" Remember to delete it when you're done!")
print("\n" + "=" * 60)
print("Example completed!")
print("=" * 60)
if __name__ == "__main__":
main()
Kluczowe funkcje:
- Mapowanie portów wewnętrznych i zewnętrznych
- Konfiguracja protokołów (TCP/UDP)
- Integracja z load balancerem
- Monitorowanie statusu portów
- Dynamiczne przypisywanie portów
4. Zarządzanie połączeniami bazy danych
Plik: python/database_connection.py
Konfiguracja zasobów bazy danych i obsługa połączeń z trwałym przechowywaniem.
Zobacz kompletny plik database_connection.py
#!/usr/bin/env python3
"""
PostgreSQL Database Connection Example
This script demonstrates how to:
1. Deploy a PostgreSQL database
2. Connect to the database using native psycopg2
3. Create tables and insert data
4. Query and update data
5. Handle database errors
Usage:
python database_connection.py
Requirements:
pip install psycopg2-binary
"""
import time
import psycopg2
from psycopg2 import errors as pg_errors
import cgc.sdk.resource as resource
import cgc.sdk.exceptions as exceptions
def deploy_postgres(db_name="example-postgres", password="example-pass-123"):
"""Deploy a PostgreSQL database resource"""
print(f"Deploying PostgreSQL database '{db_name}'...")
try:
response = resource.resource_create(
name=db_name,
image_name="postgres:17",
entity="postgresql",
cpu=2,
memory=4,
environment_data=[
f"POSTGRES_PASSWORD={password}",
"POSTGRES_USER=admin",
"POSTGRES_DB=db"
]
)
if response['code'] == 200:
print(f"✓ PostgreSQL '{db_name}' created successfully")
# Wait for database to be ready
print("Waiting for database to be ready...")
max_wait = 60
waited = 0
while waited < max_wait:
if resource.resource_ready(db_name, resource.ResourceTypes.db):
print("✓ Database is ready!")
return db_name, password
time.sleep(5)
waited += 5
print(f" Still starting... ({waited}s elapsed)")
print(f"✗ Database failed to become ready within {max_wait} seconds")
return None, None
else:
print(f"✗ Failed to create database: {response.get('message', 'Unknown error')}")
return None, None
except exceptions.SDKException as e:
print(f"✗ SDK Error (code {e.code}): {e}")
return None, None
def get_database_connection_info(db_name):
"""Get database connection information from Kubernetes service"""
# In Kubernetes, the service will be available at the service name
# The port is typically 5432 for PostgreSQL
return {
'host': db_name, # Service name in Kubernetes
'port': 5432,
'user': 'admin',
'database': 'db'
}
def connect_to_database(db_name, password, database="db"):
"""Connect to PostgreSQL database using native psycopg2"""
print(f"Connecting to database '{db_name}'...")
try:
# Wait a bit more for service to be available
print("Waiting for database service to be available...")
time.sleep(15)
# Get connection info
conn_info = get_database_connection_info(db_name)
# Connection parameters
connection_params = {
'host': conn_info['host'],
'port': conn_info['port'],
'user': conn_info['user'],
'password': password,
'database': database,
'connect_timeout': 10
}
# Test connection with retries
max_retries = 5
connection = None
for attempt in range(max_retries):
try:
print(f"Connection attempt {attempt + 1}/{max_retries}...")
print(f" Connecting to {conn_info['host']}:{conn_info['port']} as {conn_info['user']}")
connection = psycopg2.connect(**connection_params)
cursor = connection.cursor()
# Test the connection
cursor.execute("SELECT version()")
version = cursor.fetchone()[0]
print(f"✓ Connected to PostgreSQL")
print(f" Version: {version[:50]}...")
cursor.close()
return connection
except Exception as e:
print(f" Attempt {attempt + 1} failed: {e}")
if connection:
connection.close()
if attempt < max_retries - 1:
print(" Retrying in 10 seconds...")
time.sleep(10)
else:
raise
except Exception as e:
print(f"✗ Failed to connect: {e}")
return None
def create_tables(connection):
"""Create example tables"""
print("\nCreating tables...")
cursor = connection.cursor()
try:
# Create users table
cursor.execute("""
CREATE TABLE IF NOT EXISTS users (
id SERIAL PRIMARY KEY,
username VARCHAR(50) UNIQUE NOT NULL,
email VARCHAR(100) UNIQUE NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# Create posts table
cursor.execute("""
CREATE TABLE IF NOT EXISTS posts (
id SERIAL PRIMARY KEY,
user_id INTEGER REFERENCES users(id),
title VARCHAR(200) NOT NULL,
content TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
connection.commit()
print("✓ Tables created successfully")
# List tables
cursor.execute("""
SELECT table_name
FROM information_schema.tables
WHERE table_schema = 'public'
""")
tables = cursor.fetchall()
print(" Available tables:")
for table in tables:
print(f" - {table[0]}")
return True
except Exception as e:
connection.rollback()
print(f"✗ Error creating tables: {e}")
return False
finally:
cursor.close()
def insert_sample_data(connection):
"""Insert sample data into tables"""
print("\nInserting sample data...")
cursor = connection.cursor()
try:
# Insert users
users = [
("alice", "alice@example.com"),
("bob", "bob@example.com"),
("charlie", "charlie@example.com")
]
user_ids = []
for username, email in users:
cursor.execute(
"INSERT INTO users (username, email) VALUES (%s, %s) RETURNING id",
(username, email)
)
user_id = cursor.fetchone()[0]
user_ids.append(user_id)
print(f" ✓ Created user: {username} (ID: {user_id})")
# Insert posts
posts = [
(user_ids[0], "Hello World", "This is my first post!"),
(user_ids[0], "CGC SDK Tutorial", "Learning how to use the SDK..."),
(user_ids[1], "Database Example", "PostgreSQL is awesome!"),
(user_ids[2], "Cloud Computing", "Deploying apps in the cloud")
]
for user_id, title, content in posts:
cursor.execute(
"INSERT INTO posts (user_id, title, content) VALUES (%s, %s, %s)",
(user_id, title, content)
)
connection.commit()
print(f"✓ Inserted {len(users)} users and {len(posts)} posts")
return True
except pg_errors.UniqueViolation:
connection.rollback()
print(" Note: Some data already exists (duplicate entries)")
return True
except Exception as e:
connection.rollback()
print(f"✗ Error inserting data: {e}")
return False
finally:
cursor.close()
def query_data(connection):
"""Query and display data"""
print("\nQuerying data...")
cursor = connection.cursor()
try:
# Query 1: Count users
cursor.execute("SELECT COUNT(*) FROM users")
user_count = cursor.fetchone()[0]
print(f" Total users: {user_count}")
# Query 2: List users with post count
cursor.execute("""
SELECT
u.username,
u.email,
COUNT(p.id) as post_count
FROM users u
LEFT JOIN posts p ON u.id = p.user_id
GROUP BY u.id, u.username, u.email
ORDER BY post_count DESC
""")
print("\n User Statistics:")
print(" " + "-" * 50)
print(f" {'Username':<15} {'Email':<25} {'Posts':<5}")
print(" " + "-" * 50)
for row in cursor.fetchall():
username, email, post_count = row
print(f" {username:<15} {email:<25} {post_count:<5}")
# Query 3: Recent posts
cursor.execute("""
SELECT
p.title,
u.username,
p.created_at
FROM posts p
JOIN users u ON p.user_id = u.id
ORDER BY p.created_at DESC
LIMIT 5
""")
print("\n Recent Posts:")
print(" " + "-" * 50)
for row in cursor.fetchall():
title, author, created = row
print(f" 📝 {title}")
print(f" by {author} at {created}")
return True
except Exception as e:
print(f"✗ Error querying data: {e}")
return False
finally:
cursor.close()
def demonstrate_transactions(connection):
"""Demonstrate transaction handling"""
print("\nDemonstrating transactions...")
cursor = connection.cursor()
try:
# Start transaction (psycopg2 starts transactions automatically)
print(" Starting transaction...")
# Operation 1: Create a new user
cursor.execute(
"INSERT INTO users (username, email) VALUES (%s, %s) RETURNING id",
("transaction_test", "trans@example.com")
)
new_user_id = cursor.fetchone()[0]
print(f" ✓ Created user (ID: {new_user_id})")
# Operation 2: Create multiple posts
for i in range(3):
cursor.execute(
"INSERT INTO posts (user_id, title, content) VALUES (%s, %s, %s)",
(new_user_id, f"Transaction Post {i+1}", "Testing transactions")
)
print(" ✓ Created 3 posts")
# Commit transaction
connection.commit()
print(" ✓ Transaction committed successfully")
# Demonstrate rollback (intentional error)
print("\n Testing rollback scenario...")
try:
# This will fail due to duplicate username
cursor.execute(
"INSERT INTO users (username, email) VALUES (%s, %s)",
("transaction_test", "another@example.com")
)
connection.commit()
except pg_errors.UniqueViolation:
connection.rollback()
print(" ✓ Transaction rolled back (duplicate username)")
return True
except Exception as e:
connection.rollback()
print(f" ✗ Transaction failed: {e}")
return False
finally:
cursor.close()
def cleanup_database(db_name):
"""Delete the database resource"""
print(f"\nCleaning up database '{db_name}'...")
try:
response = resource.resource_delete(db_name)
if response['code'] == 200:
print(f"✓ Database '{db_name}' deleted successfully")
return True
else:
print(f"✗ Failed to delete database: {response.get('message', 'Unknown error')}")
return False
except Exception as e:
print(f"✗ Error during cleanup: {e}")
return False
def main():
"""Main execution flow"""
print("=" * 60)
print("CGC SDK - PostgreSQL Database Example (Native psycopg2)")
print("=" * 60)
# Step 1: Deploy PostgreSQL
db_name, password = deploy_postgres()
if not db_name:
print("\nFailed to deploy database. Exiting.")
return
# Wait a bit more for database to fully initialize
print("\nWaiting for database to fully initialize...")
time.sleep(10)
# Network access confirmation
print("\n" + "=" * 60)
print("NETWORK ACCESS CONFIRMATION")
print("=" * 60)
print("Database access is only available within the Kubernetes namespace network.")
print("Access is NOT exposed via ingress - you must be within the cluster network.")
print()
confirmation = input("Are you currently within the namespace network? (y/N): ").lower().strip()
if confirmation != 'y':
print("✗ Network access not confirmed. Processing cleanup...")
cleanup_database(db_name)
return
print("✓ Network access confirmed. Proceeding with database operations...")
# Step 2: Connect to database
connection = connect_to_database(db_name, password)
if not connection:
print("\nFailed to connect to database. Cleaning up...")
cleanup_database(db_name)
return
try:
# Step 3: Create tables
if not create_tables(connection):
print("\nFailed to create tables. Cleaning up...")
cleanup_database(db_name)
return
# Step 4: Insert sample data
insert_sample_data(connection)
# Step 5: Query data
query_data(connection)
# Step 6: Demonstrate transactions
demonstrate_transactions(connection)
# Step 7: Final statistics
cursor = connection.cursor()
cursor.execute("SELECT COUNT(*) FROM users")
final_users = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM posts")
final_posts = cursor.fetchone()[0]
cursor.close()
print("\n" + "=" * 60)
print("Final Database Statistics:")
print(f" Total Users: {final_users}")
print(f" Total Posts: {final_posts}")
print("=" * 60)
finally:
# Always close the connection
connection.close()
print("✓ Database connection closed")
# Cleanup option
user_input = input(f"\nDo you want to delete the database '{db_name}'? (y/n): ").lower()
if user_input == 'y':
cleanup_database(db_name)
else:
print(f"✓ Keeping database '{db_name}' running")
print(f" Connection details:")
print(f" Host: {db_name}")
print(f" User: admin")
print(f" Password: {password}")
print(f" Database: db")
print(" Remember to delete it when you're done!")
print("\n" + "=" * 60)
print("Example completed!")
print("=" * 60)
if __name__ == "__main__":
main()
Kluczowe funkcje:
- Zarządzanie trwałymi wolumenami
- Konfiguracja i konfiguracja bazy danych
- Testowanie i walidacja połączeń
- Zarządzanie zmiennymi środowiskowymi
- Operacje tworzenia kopii zapasowych i przywracania
Uruchamianie przykładów
Każdy przykład można uruchomić niezależnie:
- Podstawowe obliczenia
- Przepływ zadań
- Zarządzanie portami
- Konfiguracja bazy danych
python python/basic_compute.py
Tworzy kompletny serwer WWW z konfiguracją portów i interaktywnym czyszczeniem.
python python/job_workflow.py
Demonstruje tworzenie zadań, monitorowanie i orkiestrację potoków.
python python/port_management.py
Pokazuje zaawansowaną konfigurację portów i konfigurację dostępu zewnętrznego.
python python/database_connection.py
Tworzy i konfiguruje zasoby bazy danych z trwałym przechowywaniem.
Podstawowe wzorce SDK
Na podstawie przykładów, oto kluczowe wzorce używane w całym projekcie:
Wzorzec tworzenia zasobów
response = resource.resource_create(
name="my-app",
image_name="nginx:latest",
cpu=1, # Rdzenie CPU
memory=2, # Pamięć w GB
environment_data=[ # Zmienne środowiskowe
"KEY=value"
]
)
if response['code'] == 200:
print("✓ Zasób utworzony pomyślnie")
else:
print(f"✗ Błąd: {response.get('message')}")
Wzorzec monitorowania zasobów
def wait_for_resource(app_name, max_wait=60):
waited = 0
while waited < max_wait:
if resource.resource_ready(app_name):
return True
time.sleep(5)
waited += 5
return False
Wzorzec obsługi błędów
try:
response = resource.resource_create(...)
if response['code'] == 200:
print("✓ Sukces")
else:
print(f"✗ Błąd API: {response.get('message')}")
except exceptions.SDKException as e:
print(f"✗ Błąd SDK (kod {e.code}): {e}")
except Exception as e:
print(f"✗ Nieoczekiwany błąd: {e}")
Wzorzec konfiguracji portów
# Dodaj dostęp do portu zewnętrznego
response = resource.resource_add_port(
name=app_name,
port_name="http",
new_port=80,
ingress=True # Włącz dostęp zewnętrzny
)
# Wylistuj wszystkie porty i URL-e
ports = resource.resource_list_ports(app_name)
Najlepsze praktyki
- Zawsze sprawdzaj kody odpowiedzi - SDK zwraca strukturalne odpowiedzi z kodami statusu
- Używaj znaczących nazw zasobów - Uwzględniaj znaczniki czasu lub unikalne identyfikatory
- Implementuj właściwe czyszczenie - Usuń zasoby po zakończeniu, aby uniknąć opłat
- Monitoruj status zasobów - Poczekaj, aż zasoby będą gotowe przed kontynuowaniem
- Obsługuj wyjątki z gracją - Używaj zarówno specyficznych dla SDK, jak i ogólnych obsług wyjątków
- Używaj zmiennych środowiskowych - Konfiguruj aplikacje przez dane środowiskowe
- Konfiguruj TTL dla zadań - Ustaw automatyczne czyszczenie dla tymczasowych zadań
Uwierzytelnianie i konfiguracja
Upewnij się, że jesteś uwierzytelniony i masz wybrany właściwy kontekst:
# Wylistuj dostępne konteksty
cgc context list
# Ustaw kontekst w razie potrzeby
cgc context set <nazwa-kontekstu>
Rozwiązywanie problemów
Typowe problemy
- Błędy importu: Upewnij się, że CGC SDK jest zainstalowane z
pip install -e .
- Błędy uwierzytelniania: Uruchom
cgc context list
i sprawdź kontekst - Niepowodzenia tworzenia zasobów: Sprawdź limity i przydziały zasobów
- Problemy z siecią: Sprawdź łączność z punktami końcowymi CGC
Uzyskiwanie pomocy
- Sprawdź pomoc CGC CLI:
cgc --help
- Zobacz status zasobów:
cgc resource list
- Sprawdź logi:
cgc logs <nazwa-zasobu>