Przejdź do głównej zawartości

Przykłady użycia

Ta sekcja zawiera praktyczne przykłady używania CGC SDK w Pythonie. Wszystkie pliki przykładów są dostępne w katalogu python/ i można je uruchamiać bezpośrednio.

Konfiguracja

Przed uruchomieniem przykładów upewnij się, że masz zainstalowane i skonfigurowane CGC SDK:

Instalacja
# Utwórz środowisko wirtualne
python -m venv venv
source venv/bin/activate # Na Windows: venv\Scripts\activate

# Zainstaluj CGC SDK
pip install -e .

# Zarejestruj się w CGC
cgc register

Dostępne przykłady

1. Podstawowe zarządzanie zasobami obliczeniowymi

Plik: python/basic_compute.py

Ten kompleksowy przykład demonstruje pełny cykl życia zarządzania zasobami obliczeniowymi, w tym tworzenie, monitorowanie, konfigurację portów i czyszczenie.

Zobacz kompletny plik basic_compute.py
basic_compute.py
#!/usr/bin/env python3
"""
Basic Compute Resource Example

This script demonstrates how to:
1. Create a compute resource
2. Check if it's ready
3. Add ports for external access
4. Clean up resources

Usage:
python basic_compute.py
"""

import time
import cgc.sdk.resource as resource
import cgc.sdk.exceptions as exceptions


def create_web_server():
"""Deploy a basic web server"""

app_name = "example-web-server"

print(f"Creating web server '{app_name}'...")

try:
# Create the resource
response = resource.resource_create(
name=app_name,
image_name="nginx:latest",
cpu=1, # 1 CPU core
memory=2, # 2GB RAM
environment_data=[
"NGINX_HOST=example.com",
"NGINX_PORT=80"
]
)

if response['code'] == 200:
print(f"✓ Web server '{app_name}' created successfully")
else:
print(f"✗ Failed to create web server: {response.get('message', 'Unknown error')}")
return None

except exceptions.SDKException as e:
print(f"✗ SDK Error (code {e.code}): {e}")
return None
except Exception as e:
print(f"✗ Unexpected error: {e}")
return None

return app_name


def wait_for_resource(app_name, max_wait=60):
"""Wait for resource to be ready"""

print(f"Waiting for '{app_name}' to be ready...")

waited = 0
while waited < max_wait:
if resource.resource_ready(app_name):
print(f"✓ '{app_name}' is ready!")
return True

print(f" Still starting... ({waited}s elapsed)")
time.sleep(5)
waited += 5

print(f"✗ '{app_name}' failed to become ready within {max_wait} seconds")
return False


def configure_ports(app_name):
"""Add ports for external access"""

print(f"Configuring ports for '{app_name}'...")

try:
# Add HTTP port
response = resource.resource_add_port(
name=app_name,
port_name="http",
new_port=80,
ingress=True # Enable external access
)

if response['code'] == 200:
print("✓ HTTP port (80) configured")
else:
print(f"✗ Failed to add port: {response.get('message', 'Unknown error')}")

# List all ports
ports = resource.resource_list_ports(app_name)
if ports['code'] == 200 and ports.get('details'):
print(f"Current ports for '{app_name}':")
for port in ports.get('details')['ports']['ports']:
print(f" - {port.get('name', 'unknown')}: {port.get('port', 'unknown')}")
if ports.get('details', {}).get('ingress'):
for port in ports.get('details')['ingress']:
print(f" - URL: {port.get('url')}")

except Exception as e:
print(f"✗ Error configuring ports: {e}")


def list_resources():
"""List all compute resources"""

print("\nListing all compute resources:")

try:
resources = resource.compute_list()

if resources['code'] == 200:
pods = resources['details']['pods_list']

if not pods:
print(" No resources found")
return

for pod in pods:
if 'app-name' in pod.get('labels', {}):
name = pod['labels']['app-name']
status = pod.get('status', 'Unknown')
restarts = pod.get('restart_count', 0)

status_symbol = "✓" if status == "Running" else "○"
print(f" {status_symbol} {name}: {status} (Restarts: {restarts})")
else:
print(f" Failed to list resources: {resources.get('message', 'Unknown error')}")

except Exception as e:
print(f" Error listing resources: {e}")


def cleanup_resource(app_name):
"""Delete a resource"""

print(f"\nCleaning up '{app_name}'...")

try:
response = resource.resource_delete(app_name)

if response['code'] == 200:
print(f"✓ '{app_name}' deleted successfully")
return True
else:
print(f"✗ Failed to delete: {response.get('message', 'Unknown error')}")
return False

except Exception as e:
print(f"✗ Error during cleanup: {e}")
return False


def main():
"""Main execution flow"""

print("=" * 50)
print("CGC SDK - Basic Compute Resource Example")
print("=" * 50)

# Step 1: Create web server
app_name = create_web_server()

if not app_name:
print("\nFailed to create web server. Exiting.")
return

# Step 2: Wait for it to be ready
if not wait_for_resource(app_name):
print("\nResource failed to start. Cleaning up...")
cleanup_resource(app_name)
return

# Step 3: Configure ports
configure_ports(app_name)

# Step 4: List all resources
list_resources()

# Step 5: Ask user if they want to keep or delete the resource
print("\n" + "=" * 50)
user_input = input("Do you want to delete the web server? (y/n): ").lower()

if user_input == 'y':
cleanup_resource(app_name)
else:
print(f"✓ Keeping '{app_name}' running")
print(" Remember to delete it when you're done to avoid charges")

print("\n" + "=" * 50)
print("Example completed!")
print("=" * 50)


if __name__ == "__main__":
main()

Kluczowe funkcje:

  • Pełne zarządzanie cyklem życia zasobów
  • Konfiguracja zmiennych środowiskowych
  • Konfiguracja portów z dostępem zewnętrznym
  • Monitorowanie zasobów i kontrole stanu
  • Interaktywne czyszczenie z potwierdzeniem użytkownika
  • Kompleksowa obsługa błędów i raportowanie stanu

2. Zarządzanie przepływem zadań

Plik: python/job_workflow.py

Pełne zarządzanie cyklem życia zadań z monitorowaniem, obsługą błędów i orkiestracją potoków.

Zobacz kompletny plik job_workflow.py
job_workflow.py
#!/usr/bin/env python3
"""
Job Workflow Example

This script demonstrates how to:
1. Create and run jobs
2. Monitor job execution
3. Run sequential job pipelines
4. Handle job failures
5. Clean up succeeded jobs

Usage:
python job_workflow.py
"""

import time
from datetime import datetime
import cgc.sdk.job as job
import cgc.sdk.exceptions as exceptions


def create_simple_job():
"""Create a simple job that prints a message"""

job_name = f"hello-job-{datetime.now().strftime('%Y%m%d-%H%M%S')}"

print(f"Creating simple job '{job_name}'...")

try:
response = job.job_create(
name=job_name,
image_name="busybox:latest",
cpu=1,
memory=1,
startup_command="echo 'Hello from CGC Job!'",
ttl_seconds_after_finished=300 # Clean up after 5 minutes
)

if response['code'] == 200:
print(f"✓ Job '{job_name}' created successfully")
return job_name
else:
print(f"✗ Failed to create job: {response.get('message', 'Unknown error')}")
return None

except exceptions.SDKException as e:
print(f"✗ SDK Error (code {e.code}): {e}")
return None


def create_data_processing_job(input_file, output_dir):
"""Create a job that simulates data processing"""

timestamp = datetime.now().strftime('%Y%m%d-%H%M%S')
job_name = f"data-process-{timestamp}"

print(f"Creating data processing job '{job_name}'...")

try:
response = job.job_create(
name=job_name,
image_name="python:3.9-slim",
cpu=2,
memory=4,
environment_data=[
f"INPUT_FILE={input_file}",
f"OUTPUT_DIR={output_dir}",
"PROCESSING_MODE=batch"
],
# Simulate data processing with Python
startup_command=(
"python -c \""
"import os, time; "
"print(f'Processing {os.environ.get(\\\"INPUT_FILE\\\")}...'); "
"time.sleep(5); "
"print(f'Output saved to {os.environ.get(\\\"OUTPUT_DIR\\\")}'); "
"print('Processing complete!')"
"\""
),
ttl_seconds_after_finished=600 # Clean up after 10 minutes
)

if response['code'] == 200:
print(f"✓ Data processing job '{job_name}' created")
return job_name
else:
print(f"✗ Failed to create job: {response.get('message', 'Unknown error')}")
return None

except Exception as e:
print(f"✗ Error creating job: {e}")
return None


def monitor_job(job_name, check_interval=5, max_wait=60):
"""Monitor a job until it completes or fails"""

print(f"Monitoring job '{job_name}'...")

start_time = time.time()
last_status = None

while time.time() - start_time < max_wait:
try:
response = job.job_list()

if response['code'] != 200:
print(f"✗ Failed to get job status: {response.get('message', 'Unknown error')}")
return 'Unknown'

# Find our job in the list
job_found = False
for j in response['details'].get('job_list', []):
if j['name'] == job_name:
job_found = True
current_status = j.get('status', {}).get('phase', 'Unknown')

# Only print if status changed
if current_status != last_status:
elapsed = int(time.time() - start_time)
print(f" [{elapsed}s] Status: {current_status}")
last_status = current_status

# Check if job finished
if current_status == 'Succeeded':
print(f"✓ Job '{job_name}' completed successfully!")
return 'Completed'
elif current_status == 'Failed':
print(f"✗ Job '{job_name}' failed")
return 'Failed'

break

if not job_found:
print(f" Job '{job_name}' not found (may have been cleaned up)")
return 'NotFound'

time.sleep(check_interval)

except Exception as e:
print(f"✗ Error monitoring job: {e}")
return 'Error'

print(f"✗ Job monitoring timeout after {max_wait} seconds")
return 'Timeout'


def run_job_pipeline(pipeline_name, steps):
"""Run a sequence of jobs as a pipeline"""

print(f"\nStarting pipeline: {pipeline_name}")
print("=" * 40)

succeeded_jobs = []
failed_job = None

for i, step in enumerate(steps, 1):
print(f"\nStep {i}/{len(steps)}: {step['description']}")

# Create the job
job_name = step['create_func'](*step.get('args', []))

if not job_name:
print(f"✗ Failed to create job for step {i}")
failed_job = f"Step {i}"
break

# Monitor the job
status = monitor_job(job_name)

if status == 'Succeeded':
succeeded_jobs.append(job_name)
else:
print(f"✗ Pipeline failed at step {i}")
failed_job = job_name
break

# Pipeline summary
print("\n" + "=" * 40)
print(f"Pipeline: {pipeline_name}")
print(f"Succeeded: {len(succeeded_jobs)}/{len(steps)} steps")

if failed_job:
print(f"Failed at: {failed_job}")
else:
print("✓ All steps succeeded successfully!")

return succeeded_jobs, failed_job


def list_all_jobs():
"""List all jobs with their status"""

print("\nCurrent jobs in the system:")
print("-" * 40)

try:
response = job.job_list()

if response['code'] != 200:
print(f"Failed to list jobs: {response.get('message', 'Unknown error')}")
return

jobs_list = response['details'].get('job_list', [])

if not jobs_list:
print(" No jobs found")
return

# Group jobs by status
by_status = {}
for j in jobs_list:
status = j.get('status', {}).get('phase', 'Unknown')
if status not in by_status:
by_status[status] = []
by_status[status].append(j['name'])

# Display grouped jobs
for status, names in by_status.items():
print(f"\n{status}: ({len(names)} jobs)")
for name in names[:5]: # Show first 5
print(f" - {name}")
if len(names) > 5:
print(f" ... and {len(names) - 5} more")

except Exception as e:
print(f"Error listing jobs: {e}")


def cleanup_succeeded_jobs():
"""Delete all succeeded jobs"""

print("\nCleaning up succeeded jobs...")

try:
response = job.job_list()

if response['code'] != 200:
print(f"Failed to list jobs: {response.get('message', 'Unknown error')}")
return

deleted_count = 0
for j in response['details'].get('job_list', []):
job_phase = j.get('status', {}).get('phase')
if job_phase == 'Succeeded':
try:
delete_response = job.job_delete(j['name'])
if delete_response['code'] == 200:
deleted_count += 1
print(f" Deleted: {j['name']}")
except Exception as e:
print(f" Failed to delete {j['name']}: {e}")

if deleted_count > 0:
print(f"✓ Cleaned up {deleted_count} succeeded jobs")
else:
print(" No succeeded jobs to clean up")

except Exception as e:
print(f"Error during cleanup: {e}")


def main():
"""Main execution flow"""

print("=" * 50)
print("CGC SDK - Job Workflow Example")
print("=" * 50)

# Example 1: Simple job
print("\n1. Running a simple job")
print("-" * 30)
simple_job = create_simple_job()
if simple_job:
monitor_job(simple_job)
else:
print("✗ Simple job failed. Stopping further tests.")
return

# Example 2: Data processing job
print("\n2. Running a data processing job")
print("-" * 30)
data_job = create_data_processing_job(
input_file="/data/input.csv",
output_dir="/data/output/"
)
if data_job:
monitor_job(data_job)

# Example 3: Pipeline of jobs
print("\n3. Running a job pipeline")
print("-" * 30)

# Define pipeline steps
pipeline_steps = [
{
'description': 'Extract data',
'create_func': lambda: create_simple_job()
},
{
'description': 'Process data',
'create_func': lambda: create_data_processing_job("/tmp/extracted.csv", "/tmp/processed/")
},
{
'description': 'Generate report',
'create_func': lambda: create_simple_job()
}
]

succeeded, failed = run_job_pipeline("ETL Pipeline", pipeline_steps)

# Show all jobs
list_all_jobs()

# Cleanup option
print("\n" + "=" * 50)
user_input = input("Do you want to clean up succeeded jobs? (y/n): ").lower()

if user_input == 'y':
cleanup_succeeded_jobs()

print("\n" + "=" * 50)
print("Example completed!")
print("=" * 50)


if __name__ == "__main__":
main()

Kluczowe funkcje:

  • Proste tworzenie i monitorowanie zadań
  • Potoki przetwarzania danych
  • Śledzenie statusu zadań i funkcje oczekiwania
  • Sekwencyjne wykonywanie zadań
  • Automatyczne czyszczenie z TTL
  • Kompleksowa obsługa błędów

3. Zarządzanie portami

Plik: python/port_management.py

Zaawansowana konfiguracja portów dla dostępu zewnętrznego i równoważenia obciążenia.

Zobacz kompletny plik port_management.py
port_management.py
#!/usr/bin/env python3
"""
Port Management Example

This script demonstrates how to:
1. Create resources with multiple services
2. Add, update, and delete ports
3. Configure ingress for external access
4. List and manage port configurations

Usage:
python port_management.py
"""

import time
import cgc.sdk.resource as resource
import cgc.sdk.exceptions as exceptions


def create_multi_service_app(app_name="multi-port-app"):
"""Create an application that needs multiple ports"""

print(f"Creating multi-service application '{app_name}'...")

try:
# Create a resource that could serve multiple services
# Using a Python image that can run multiple services
response = resource.resource_create(
name=app_name,
image_name="python:3.9",
cpu=2,
memory=4,
startup_command=(
"python -c \""
"import http.server, socketserver, threading; "
"handlers = []; "
"ports = [8080, 8081, 8082]; "
"for port in ports: "
" handler = http.server.SimpleHTTPRequestHandler; "
" httpd = socketserver.TCPServer(('', port), handler); "
" thread = threading.Thread(target=httpd.serve_forever); "
" thread.daemon = True; "
" thread.start(); "
" print(f'Server started on port {port}'); "
"import time; "
"while True: time.sleep(1)"
"\""
),
environment_data=[
"SERVICE_NAME=multi-port-demo",
"ENV=development"
]
)

if response['code'] == 200:
print(f"✓ Application '{app_name}' created successfully")
return app_name
else:
print(f"✗ Failed to create application: {response.get('message', 'Unknown error')}")
return None

except exceptions.SDKException as e:
print(f"✗ SDK Error (code {e.code}): {e}")
return None


def wait_for_app(app_name):
"""Wait for application to be ready"""

print(f"Waiting for '{app_name}' to be ready...")

max_wait = 60
waited = 0

while waited < max_wait:
if resource.resource_ready(app_name):
print(f"✓ Application '{app_name}' is ready!")
return True

time.sleep(5)
waited += 5
print(f" Still starting... ({waited}s elapsed)")

print(f"✗ Application failed to become ready within {max_wait} seconds")
return False


def add_application_ports(app_name):
"""Add multiple ports to the application"""

print(f"\nAdding ports to '{app_name}'...")

# Define ports to add
ports_config = [
{
'name': 'web',
'port': 8080,
'ingress': True,
'description': 'Main web interface'
},
{
'name': 'api',
'port': 8081,
'ingress': True,
'description': 'REST API endpoint'
},
{
'name': 'admin',
'port': 8082,
'ingress': False, # Internal only
'description': 'Admin panel (internal)'
}
]

success_count = 0

for config in ports_config:
try:
response = resource.resource_add_port(
name=app_name,
port_name=config['name'],
new_port=config['port'],
ingress=config['ingress']
)

if response['code'] == 200:
ingress_status = "external" if config['ingress'] else "internal"
print(f" ✓ Added {config['name']} port ({config['port']}) - {ingress_status}")
print(f" Description: {config['description']}")
success_count += 1
else:
print(f" ✗ Failed to add {config['name']} port: {response.get('message', 'Unknown error')}")

except Exception as e:
print(f" ✗ Error adding {config['name']} port: {e}")

print(f"\nSuccessfully added {success_count}/{len(ports_config)} ports")
return success_count > 0


def list_ports(app_name):
"""List all ports for an application"""

print(f"\nCurrent ports for '{app_name}':")
print("-" * 50)

try:
response = resource.resource_list_ports(app_name)

if response['code'] == 200:
ports = response.get('details', {}).get('ports', {}).get('ports')
ingresses = response.get('details', {}).get('ingress')
i_names = [i.get("port_name") for i in ingresses]
if not ports:
print(" No ports configured")
return []
print(f" {'Port Name':<15} {'Port':<10} {'Ingress':<10}")
print(" " + "-" * 35)

for port in ports:
name = port.get('name', 'unknown')
number = port.get('port', 'unknown')
ingress = "External" if port.get('name') in i_names else "Internal"

print(f" {name:<15} {number:<10} {ingress:<10}")

return ports
else:
print(f" Failed to list ports: {response.get('message', 'Unknown error')}")
return []

except Exception as e:
print(f" Error listing ports: {e}")
return []


def update_port_configuration(app_name):
"""Update existing port configurations"""

print(f"\nUpdating port configurations for '{app_name}'...")

# Example: Change the API port from 8081 to 8090
try:
print(" Updating 'api' port from 8081 to 8090...")

response = resource.resource_update_port(
name=app_name,
port_name='api',
new_port=8090,
ingress=True
)

if response['code'] == 200:
print(" ✓ Successfully updated API port to 8090")
else:
print(f" ✗ Failed to update port: {response.get('message', 'Unknown error')}")

except Exception as e:
print(f" ✗ Error updating port: {e}")

# Example: Change admin port to allow external access
try:
print(" Enabling external access for 'admin' port...")

response = resource.resource_update_port(
name=app_name,
port_name='admin',
new_port=8082, # Keep same port
ingress=True # Change to external
)

if response['code'] == 200:
print(" ✓ Admin port now accessible externally")
else:
print(f" ✗ Failed to update port: {response.get('message', 'Unknown error')}")

except Exception as e:
print(f" ✗ Error updating port: {e}")


def delete_port(app_name, port_name):
"""Delete a specific port"""

print(f"\nDeleting port '{port_name}' from '{app_name}'...")

try:
response = resource.resource_delete_port(
name=app_name,
port_name=port_name
)

if response['code'] == 200:
print(f" ✓ Successfully deleted port '{port_name}'")
return True
else:
print(f" ✗ Failed to delete port: {response.get('message', 'Unknown error')}")
return False

except Exception as e:
print(f" ✗ Error deleting port: {e}")
return False


def demonstrate_port_scenarios(app_name):
"""Demonstrate various port management scenarios"""

print("\n" + "=" * 50)
print("Port Management Scenarios")
print("=" * 50)

# Scenario 1: Add a monitoring port
print("\nScenario 1: Adding monitoring port")
try:
response = resource.resource_add_port(
name=app_name,
port_name='metrics',
new_port=9090,
ingress=False # Internal monitoring
)

if response['code'] == 200:
print(" ✓ Added internal monitoring port (9090)")
else:
print(f" ✗ Failed: {response.get('message', 'Unknown error')}")

except Exception as e:
print(f" ✗ Error: {e}")

# Scenario 2: Add a WebSocket port
print("\nScenario 2: Adding WebSocket port")
try:
response = resource.resource_add_port(
name=app_name,
port_name='websocket',
new_port=8899,
ingress=True # External WebSocket connections
)

if response['code'] == 200:
print(" ✓ Added WebSocket port (8899) with external access")
else:
print(f" ✗ Failed: {response.get('message', 'Unknown error')}")

except Exception as e:
print(f" ✗ Error: {e}")

# Scenario 3: Try to add duplicate port (should fail)
print("\nScenario 3: Testing duplicate port prevention")
try:
response = resource.resource_add_port(
name=app_name,
port_name='duplicate-web',
new_port=8080, # Already used by 'web'
ingress=True
)

# This might succeed or fail depending on implementation
if response['code'] == 200:
print(" ⚠ Warning: System allowed duplicate port number")
else:
print(" ✓ System correctly prevented duplicate port")

except Exception as e:
print(f" ✓ System prevented duplicate: {e}")


def cleanup_application(app_name):
"""Delete the application"""

print(f"\nCleaning up application '{app_name}'...")

try:
response = resource.resource_delete(app_name)

if response['code'] == 200:
print(f"✓ Application '{app_name}' deleted successfully")
return True
else:
print(f"✗ Failed to delete application: {response.get('message', 'Unknown error')}")
return False

except Exception as e:
print(f"✗ Error during cleanup: {e}")
return False


def main():
"""Main execution flow"""

print("=" * 60)
print("CGC SDK - Port Management Example")
print("=" * 60)

# Step 1: Create application
app_name = create_multi_service_app()

if not app_name:
print("\nFailed to create application. Exiting.")
return

# Step 2: Wait for application to be ready
if not wait_for_app(app_name):
print("\nApplication failed to start. Cleaning up...")
cleanup_application(app_name)
return

# Step 3: Add initial ports
if not add_application_ports(app_name):
print("\nFailed to add ports. Cleaning up...")
cleanup_application(app_name)
return

# Step 4: List current ports
list_ports(app_name)

# Step 5: Update port configurations
update_port_configuration(app_name)

# Step 6: List ports after updates
print("\nPorts after updates:")
list_ports(app_name)

# Step 7: Demonstrate various scenarios
demonstrate_port_scenarios(app_name)

# Step 8: Final port listing
print("\nFinal port configuration:")
final_ports = list_ports(app_name)

print(f"\nTotal ports configured: {len(final_ports)}")

# Step 9: Demonstrate port deletion
print("\n" + "=" * 60)
user_input = input("Do you want to see port deletion? (y/n): ").lower()

if user_input == 'y':
delete_port(app_name, 'metrics')
print("\nPorts after deletion:")
list_ports(app_name)

# Step 10: Cleanup
print("\n" + "=" * 60)
user_input = input(f"Do you want to delete the application '{app_name}'? (y/n): ").lower()

if user_input == 'y':
cleanup_application(app_name)
else:
print(f"✓ Keeping application '{app_name}' running")
print(" You can access the configured ports externally")
print(" Remember to delete it when you're done!")

print("\n" + "=" * 60)
print("Example completed!")
print("=" * 60)


if __name__ == "__main__":
main()

Kluczowe funkcje:

  • Mapowanie portów wewnętrznych i zewnętrznych
  • Konfiguracja protokołów (TCP/UDP)
  • Integracja z load balancerem
  • Monitorowanie statusu portów
  • Dynamiczne przypisywanie portów

4. Zarządzanie połączeniami bazy danych

Plik: python/database_connection.py

Konfiguracja zasobów bazy danych i obsługa połączeń z trwałym przechowywaniem.

Zobacz kompletny plik database_connection.py
database_connection.py
#!/usr/bin/env python3
"""
PostgreSQL Database Connection Example

This script demonstrates how to:
1. Deploy a PostgreSQL database
2. Connect to the database using native psycopg2
3. Create tables and insert data
4. Query and update data
5. Handle database errors

Usage:
python database_connection.py

Requirements:
pip install psycopg2-binary
"""

import time
import psycopg2
from psycopg2 import errors as pg_errors
import cgc.sdk.resource as resource
import cgc.sdk.exceptions as exceptions


def deploy_postgres(db_name="example-postgres", password="example-pass-123"):
"""Deploy a PostgreSQL database resource"""

print(f"Deploying PostgreSQL database '{db_name}'...")

try:
response = resource.resource_create(
name=db_name,
image_name="postgres:17",
entity="postgresql",
cpu=2,
memory=4,
environment_data=[
f"POSTGRES_PASSWORD={password}",
"POSTGRES_USER=admin",
"POSTGRES_DB=db"
]
)

if response['code'] == 200:
print(f"✓ PostgreSQL '{db_name}' created successfully")

# Wait for database to be ready
print("Waiting for database to be ready...")
max_wait = 60
waited = 0

while waited < max_wait:
if resource.resource_ready(db_name, resource.ResourceTypes.db):
print("✓ Database is ready!")
return db_name, password

time.sleep(5)
waited += 5
print(f" Still starting... ({waited}s elapsed)")

print(f"✗ Database failed to become ready within {max_wait} seconds")
return None, None
else:
print(f"✗ Failed to create database: {response.get('message', 'Unknown error')}")
return None, None

except exceptions.SDKException as e:
print(f"✗ SDK Error (code {e.code}): {e}")
return None, None


def get_database_connection_info(db_name):
"""Get database connection information from Kubernetes service"""

# In Kubernetes, the service will be available at the service name
# The port is typically 5432 for PostgreSQL
return {
'host': db_name, # Service name in Kubernetes
'port': 5432,
'user': 'admin',
'database': 'db'
}


def connect_to_database(db_name, password, database="db"):
"""Connect to PostgreSQL database using native psycopg2"""

print(f"Connecting to database '{db_name}'...")

try:
# Wait a bit more for service to be available
print("Waiting for database service to be available...")
time.sleep(15)

# Get connection info
conn_info = get_database_connection_info(db_name)

# Connection parameters
connection_params = {
'host': conn_info['host'],
'port': conn_info['port'],
'user': conn_info['user'],
'password': password,
'database': database,
'connect_timeout': 10
}

# Test connection with retries
max_retries = 5
connection = None

for attempt in range(max_retries):
try:
print(f"Connection attempt {attempt + 1}/{max_retries}...")
print(f" Connecting to {conn_info['host']}:{conn_info['port']} as {conn_info['user']}")

connection = psycopg2.connect(**connection_params)
cursor = connection.cursor()

# Test the connection
cursor.execute("SELECT version()")
version = cursor.fetchone()[0]
print(f"✓ Connected to PostgreSQL")
print(f" Version: {version[:50]}...")

cursor.close()
return connection

except Exception as e:
print(f" Attempt {attempt + 1} failed: {e}")
if connection:
connection.close()
if attempt < max_retries - 1:
print(" Retrying in 10 seconds...")
time.sleep(10)
else:
raise

except Exception as e:
print(f"✗ Failed to connect: {e}")
return None


def create_tables(connection):
"""Create example tables"""

print("\nCreating tables...")

cursor = connection.cursor()

try:
# Create users table
cursor.execute("""
CREATE TABLE IF NOT EXISTS users (
id SERIAL PRIMARY KEY,
username VARCHAR(50) UNIQUE NOT NULL,
email VARCHAR(100) UNIQUE NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")

# Create posts table
cursor.execute("""
CREATE TABLE IF NOT EXISTS posts (
id SERIAL PRIMARY KEY,
user_id INTEGER REFERENCES users(id),
title VARCHAR(200) NOT NULL,
content TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")

connection.commit()
print("✓ Tables created successfully")

# List tables
cursor.execute("""
SELECT table_name
FROM information_schema.tables
WHERE table_schema = 'public'
""")

tables = cursor.fetchall()
print(" Available tables:")
for table in tables:
print(f" - {table[0]}")

return True

except Exception as e:
connection.rollback()
print(f"✗ Error creating tables: {e}")
return False

finally:
cursor.close()


def insert_sample_data(connection):
"""Insert sample data into tables"""

print("\nInserting sample data...")

cursor = connection.cursor()

try:
# Insert users
users = [
("alice", "alice@example.com"),
("bob", "bob@example.com"),
("charlie", "charlie@example.com")
]

user_ids = []
for username, email in users:
cursor.execute(
"INSERT INTO users (username, email) VALUES (%s, %s) RETURNING id",
(username, email)
)
user_id = cursor.fetchone()[0]
user_ids.append(user_id)
print(f" ✓ Created user: {username} (ID: {user_id})")

# Insert posts
posts = [
(user_ids[0], "Hello World", "This is my first post!"),
(user_ids[0], "CGC SDK Tutorial", "Learning how to use the SDK..."),
(user_ids[1], "Database Example", "PostgreSQL is awesome!"),
(user_ids[2], "Cloud Computing", "Deploying apps in the cloud")
]

for user_id, title, content in posts:
cursor.execute(
"INSERT INTO posts (user_id, title, content) VALUES (%s, %s, %s)",
(user_id, title, content)
)

connection.commit()
print(f"✓ Inserted {len(users)} users and {len(posts)} posts")
return True

except pg_errors.UniqueViolation:
connection.rollback()
print(" Note: Some data already exists (duplicate entries)")
return True

except Exception as e:
connection.rollback()
print(f"✗ Error inserting data: {e}")
return False

finally:
cursor.close()


def query_data(connection):
"""Query and display data"""

print("\nQuerying data...")

cursor = connection.cursor()

try:
# Query 1: Count users
cursor.execute("SELECT COUNT(*) FROM users")
user_count = cursor.fetchone()[0]
print(f" Total users: {user_count}")

# Query 2: List users with post count
cursor.execute("""
SELECT
u.username,
u.email,
COUNT(p.id) as post_count
FROM users u
LEFT JOIN posts p ON u.id = p.user_id
GROUP BY u.id, u.username, u.email
ORDER BY post_count DESC
""")

print("\n User Statistics:")
print(" " + "-" * 50)
print(f" {'Username':<15} {'Email':<25} {'Posts':<5}")
print(" " + "-" * 50)

for row in cursor.fetchall():
username, email, post_count = row
print(f" {username:<15} {email:<25} {post_count:<5}")

# Query 3: Recent posts
cursor.execute("""
SELECT
p.title,
u.username,
p.created_at
FROM posts p
JOIN users u ON p.user_id = u.id
ORDER BY p.created_at DESC
LIMIT 5
""")

print("\n Recent Posts:")
print(" " + "-" * 50)

for row in cursor.fetchall():
title, author, created = row
print(f" 📝 {title}")
print(f" by {author} at {created}")

return True

except Exception as e:
print(f"✗ Error querying data: {e}")
return False

finally:
cursor.close()


def demonstrate_transactions(connection):
"""Demonstrate transaction handling"""

print("\nDemonstrating transactions...")

cursor = connection.cursor()

try:
# Start transaction (psycopg2 starts transactions automatically)
print(" Starting transaction...")

# Operation 1: Create a new user
cursor.execute(
"INSERT INTO users (username, email) VALUES (%s, %s) RETURNING id",
("transaction_test", "trans@example.com")
)
new_user_id = cursor.fetchone()[0]
print(f" ✓ Created user (ID: {new_user_id})")

# Operation 2: Create multiple posts
for i in range(3):
cursor.execute(
"INSERT INTO posts (user_id, title, content) VALUES (%s, %s, %s)",
(new_user_id, f"Transaction Post {i+1}", "Testing transactions")
)
print(" ✓ Created 3 posts")

# Commit transaction
connection.commit()
print(" ✓ Transaction committed successfully")

# Demonstrate rollback (intentional error)
print("\n Testing rollback scenario...")

try:
# This will fail due to duplicate username
cursor.execute(
"INSERT INTO users (username, email) VALUES (%s, %s)",
("transaction_test", "another@example.com")
)
connection.commit()
except pg_errors.UniqueViolation:
connection.rollback()
print(" ✓ Transaction rolled back (duplicate username)")

return True

except Exception as e:
connection.rollback()
print(f" ✗ Transaction failed: {e}")
return False

finally:
cursor.close()


def cleanup_database(db_name):
"""Delete the database resource"""

print(f"\nCleaning up database '{db_name}'...")

try:
response = resource.resource_delete(db_name)

if response['code'] == 200:
print(f"✓ Database '{db_name}' deleted successfully")
return True
else:
print(f"✗ Failed to delete database: {response.get('message', 'Unknown error')}")
return False

except Exception as e:
print(f"✗ Error during cleanup: {e}")
return False


def main():
"""Main execution flow"""

print("=" * 60)
print("CGC SDK - PostgreSQL Database Example (Native psycopg2)")
print("=" * 60)

# Step 1: Deploy PostgreSQL
db_name, password = deploy_postgres()

if not db_name:
print("\nFailed to deploy database. Exiting.")
return

# Wait a bit more for database to fully initialize
print("\nWaiting for database to fully initialize...")
time.sleep(10)

# Network access confirmation
print("\n" + "=" * 60)
print("NETWORK ACCESS CONFIRMATION")
print("=" * 60)
print("Database access is only available within the Kubernetes namespace network.")
print("Access is NOT exposed via ingress - you must be within the cluster network.")
print()
confirmation = input("Are you currently within the namespace network? (y/N): ").lower().strip()

if confirmation != 'y':
print("✗ Network access not confirmed. Processing cleanup...")
cleanup_database(db_name)
return

print("✓ Network access confirmed. Proceeding with database operations...")

# Step 2: Connect to database
connection = connect_to_database(db_name, password)

if not connection:
print("\nFailed to connect to database. Cleaning up...")
cleanup_database(db_name)
return

try:
# Step 3: Create tables
if not create_tables(connection):
print("\nFailed to create tables. Cleaning up...")
cleanup_database(db_name)
return

# Step 4: Insert sample data
insert_sample_data(connection)

# Step 5: Query data
query_data(connection)

# Step 6: Demonstrate transactions
demonstrate_transactions(connection)

# Step 7: Final statistics
cursor = connection.cursor()

cursor.execute("SELECT COUNT(*) FROM users")
final_users = cursor.fetchone()[0]

cursor.execute("SELECT COUNT(*) FROM posts")
final_posts = cursor.fetchone()[0]

cursor.close()

print("\n" + "=" * 60)
print("Final Database Statistics:")
print(f" Total Users: {final_users}")
print(f" Total Posts: {final_posts}")
print("=" * 60)

finally:
# Always close the connection
connection.close()
print("✓ Database connection closed")

# Cleanup option
user_input = input(f"\nDo you want to delete the database '{db_name}'? (y/n): ").lower()

if user_input == 'y':
cleanup_database(db_name)
else:
print(f"✓ Keeping database '{db_name}' running")
print(f" Connection details:")
print(f" Host: {db_name}")
print(f" User: admin")
print(f" Password: {password}")
print(f" Database: db")
print(" Remember to delete it when you're done!")

print("\n" + "=" * 60)
print("Example completed!")
print("=" * 60)


if __name__ == "__main__":
main()

Kluczowe funkcje:

  • Zarządzanie trwałymi wolumenami
  • Konfiguracja i konfiguracja bazy danych
  • Testowanie i walidacja połączeń
  • Zarządzanie zmiennymi środowiskowymi
  • Operacje tworzenia kopii zapasowych i przywracania

Uruchamianie przykładów

Każdy przykład można uruchomić niezależnie:

python python/basic_compute.py

Tworzy kompletny serwer WWW z konfiguracją portów i interaktywnym czyszczeniem.

Podstawowe wzorce SDK

Na podstawie przykładów, oto kluczowe wzorce używane w całym projekcie:

Wzorzec tworzenia zasobów

Tworzenie zasobów
response = resource.resource_create(
name="my-app",
image_name="nginx:latest",
cpu=1, # Rdzenie CPU
memory=2, # Pamięć w GB
environment_data=[ # Zmienne środowiskowe
"KEY=value"
]
)

if response['code'] == 200:
print("✓ Zasób utworzony pomyślnie")
else:
print(f"✗ Błąd: {response.get('message')}")

Wzorzec monitorowania zasobów

Monitorowanie statusu zasobów
def wait_for_resource(app_name, max_wait=60):
waited = 0
while waited < max_wait:
if resource.resource_ready(app_name):
return True
time.sleep(5)
waited += 5
return False

Wzorzec obsługi błędów

Kompleksowa obsługa błędów
try:
response = resource.resource_create(...)

if response['code'] == 200:
print("✓ Sukces")
else:
print(f"✗ Błąd API: {response.get('message')}")

except exceptions.SDKException as e:
print(f"✗ Błąd SDK (kod {e.code}): {e}")
except Exception as e:
print(f"✗ Nieoczekiwany błąd: {e}")

Wzorzec konfiguracji portów

Zarządzanie portami
# Dodaj dostęp do portu zewnętrznego
response = resource.resource_add_port(
name=app_name,
port_name="http",
new_port=80,
ingress=True # Włącz dostęp zewnętrzny
)

# Wylistuj wszystkie porty i URL-e
ports = resource.resource_list_ports(app_name)

Najlepsze praktyki

  1. Zawsze sprawdzaj kody odpowiedzi - SDK zwraca strukturalne odpowiedzi z kodami statusu
  2. Używaj znaczących nazw zasobów - Uwzględniaj znaczniki czasu lub unikalne identyfikatory
  3. Implementuj właściwe czyszczenie - Usuń zasoby po zakończeniu, aby uniknąć opłat
  4. Monitoruj status zasobów - Poczekaj, aż zasoby będą gotowe przed kontynuowaniem
  5. Obsługuj wyjątki z gracją - Używaj zarówno specyficznych dla SDK, jak i ogólnych obsług wyjątków
  6. Używaj zmiennych środowiskowych - Konfiguruj aplikacje przez dane środowiskowe
  7. Konfiguruj TTL dla zadań - Ustaw automatyczne czyszczenie dla tymczasowych zadań

Uwierzytelnianie i konfiguracja

Upewnij się, że jesteś uwierzytelniony i masz wybrany właściwy kontekst:

# Wylistuj dostępne konteksty
cgc context list

# Ustaw kontekst w razie potrzeby
cgc context set <nazwa-kontekstu>

Rozwiązywanie problemów

Typowe problemy

  1. Błędy importu: Upewnij się, że CGC SDK jest zainstalowane z pip install -e .
  2. Błędy uwierzytelniania: Uruchom cgc context list i sprawdź kontekst
  3. Niepowodzenia tworzenia zasobów: Sprawdź limity i przydziały zasobów
  4. Problemy z siecią: Sprawdź łączność z punktami końcowymi CGC

Uzyskiwanie pomocy

  • Sprawdź pomoc CGC CLI: cgc --help
  • Zobacz status zasobów: cgc resource list
  • Sprawdź logi: cgc logs <nazwa-zasobu>