Przejdź do głównej zawartości

Advanced guide

Database backup job

Whenever you run your own application or standalone database, it is useful to have backup of existing database.

def create_backup_job(database_name):
"""Create a database backup job"""
from datetime import datetime

timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")

return job.job_create(
name=f"backup-{database_name}-{timestamp}",
image_name="postgres:17",
cpu=2,
memory=4,
volumes=["backups"],
volume_full_path="/backups",
environment_data=[
f"PGHOST={database_name}",
"PGUSER=postgres",
"PGPASSWORD=secret"
],
startup_command=f"pg_dump -d mydb -f /backups/{database_name}-{timestamp}.sql",
ttl_seconds_after_finished=300 # Clean up after 5 minutes
)

Batch processing pipeline

import cgc.sdk.job as job
import time
from datetime import datetime

def create_simple_job():
"""Create a simple example job"""
job_name = f"hello-job-{datetime.now().strftime('%Y%m%d-%H%M%S')}"

response = job.job_create(
name=job_name,
image_name="busybox:latest",
cpu=1,
memory=1,
startup_command="echo 'Hello from CGC Job!'",
ttl_seconds_after_finished=300
)

if response['code'] == 200:
print(f"✓ Job '{job_name}' created successfully")
return job_name
else:
print(f"✗ Failed to create job: {response.get('message', 'Unknown error')}")
return None

def monitor_job(job_name, check_interval=5, max_wait=60):
"""Monitor a job until it completes or fails"""

print(f"Monitoring job '{job_name}'...")
start_time = time.time()
last_status = None

while time.time() - start_time < max_wait:
try:
response = job.job_list()

if response['code'] != 200:
print(f"✗ Failed to get job status: {response.get('message', 'Unknown error')}")
return 'Unknown'

# Find our job in the list
for j in response['details'].get('job_list', []):
if j['name'] == job_name:
current_status = j.get('status', {}).get('phase', 'Unknown')

# Only print if status changed
if current_status != last_status:
elapsed = int(time.time() - start_time)
print(f" [{elapsed}s] Status: {current_status}")
last_status = current_status

# Check if job finished
if current_status == 'Succeeded':
print(f"✓ Job '{job_name}' completed successfully!")
return 'Succeeded'
elif current_status == 'Failed':
print(f"✗ Job '{job_name}' failed")
return 'Failed'
break
else:
print(f" Job '{job_name}' not found (may have been cleaned up)")
return 'NotFound'

time.sleep(check_interval)

except Exception as e:
print(f"✗ Error monitoring job: {e}")
return 'Error'

print(f"✗ Job monitoring timeout after {max_wait} seconds")
return 'Timeout'

def run_job_pipeline(pipeline_name, steps):
"""Run a sequence of jobs as a pipeline with proper error handling"""

print(f"Starting pipeline: {pipeline_name}")
print("=" * 40)

succeeded_jobs = []
failed_job = None

for i, step in enumerate(steps, 1):
print(f"Step {i}/{len(steps)}: {step['description']}")

# Create the job
job_name = step['create_func'](*step.get('args', []))

if not job_name:
print(f"✗ Failed to create job for step {i}")
failed_job = f"Step {i}"
break

# Monitor the job
status = monitor_job(job_name)

if status == 'Succeeded':
succeeded_jobs.append(job_name)
print(f"✓ Step {i} completed successfully")
else:
print(f"✗ Pipeline failed at step {i}")
failed_job = job_name
break

# Pipeline summary
print("=" * 40)
print(f"Pipeline: {pipeline_name}")
print(f"Succeeded: {len(succeeded_jobs)}/{len(steps)} steps")

if failed_job:
print(f"Failed at: {failed_job}")
return succeeded_jobs, failed_job
else:
print("✓ All steps succeeded successfully!")
return succeeded_jobs, None

# Example usage
pipeline_steps = [
{
'description': 'Extract data',
'create_func': lambda: create_simple_job()
},
{
'description': 'Process data',
'create_func': lambda: create_simple_job()
},
{
'description': 'Generate report',
'create_func': lambda: create_simple_job()
}
]

succeeded, failed = run_job_pipeline("ETL Pipeline", pipeline_steps)

if not failed:
print("Pipeline completed successfully!")
else:
print(f"Pipeline failed at: {failed}")

Job monitoring

Check job status

import cgc.sdk.job as job
import time

def monitor_job(job_name, check_interval=10):
"""Monitor a job until it completes"""
while True:
response = job.job_list()

if response['code'] != 200:
print("Failed to get job status")
break

# Find our job
job_found = False
for j in response['details'].get('job_list', []):
if j['name'] == job_name:
job_found = True
status = j.get('status', {}).get('phase', 'Unknown')
print(f"Job {job_name}: {status}")

if status in ['Succeeded', 'Failed']:
return status
break

if not job_found:
print(f"Job {job_name} not found (may have been cleaned up)")
return 'Cleaned'

time.sleep(check_interval)

# Use it
final_status = monitor_job("my-processing-job")
print(f"Job finished with status: {final_status}")

Job success pattern with early failure detection

def monitor_job(job_name):
"""Monitor a job until completion with improved status checking"""

print(f"Monitoring job '{job_name}'...")

start_time = time.time()
last_status = None
max_wait = 300 # 5 minutes max wait

while True:
if time.time() - start_time > max_wait:
print(f"✗ Job '{job_name}' timed out after {max_wait} seconds")
return 'Timeout'

try:
response = job.job_list()

if response['code'] != 200:
print(f"Failed to get job status: {response.get('message', 'Unknown error')}")
time.sleep(10)
continue

# Find our job
for j in response['details'].get('job_list', []):
if j['name'] == job_name:
current_status = j.get('status', {}).get('phase', 'Unknown')

# Only print if status changed
if current_status != last_status:
elapsed = int(time.time() - start_time)
print(f" [{elapsed}s] Status: {current_status}")
last_status = current_status

# Check if job finished
if current_status == 'Succeeded':
print(f"✓ Job '{job_name}' completed successfully!")
return 'Succeeded'
elif current_status == 'Failed':
print(f"✗ Job '{job_name}' failed")
return 'Failed'

break
else:
print(f"Job '{job_name}' not found (may have been cleaned up)")
return 'NotFound'

except Exception as e:
print(f"Error monitoring job: {e}")

time.sleep(5)

def run_job_with_verification(job_config):
"""Run a job and verify it completes successfully"""

# Create the job
response = job.job_create(**job_config)

if response['code'] != 200:
print(f"Failed to create job: {response['message']}")
return False

job_name = job_config['name']
print(f"Job {job_name} created successfully")

# Monitor until completion
status = monitor_job(job_name)

if status == 'Succeeded':
print(f"Job {job_name} completed successfully")
return True
else:
print(f"Job {job_name} failed with status: {status}")
return False

# Example with early failure detection
def main_example():
"""Example main function with failure handling"""

# Example 1: Simple job
print("1. Running a simple job")
simple_job = create_simple_job()
if simple_job:
monitor_job(simple_job)
else:
print("✗ Simple job failed. Stopping further tests.")
return # Stop execution if first job fails

# Example 2: Only run if first job succeeded
print("2. Running a data processing job")
data_job = create_data_processing_job("/data/input.csv", "/data/output/")
if data_job:
monitor_job(data_job)

Advanced patterns

Sequential job pipeline with failure handling

def run_job_pipeline(pipeline_name, steps):
"""Run a sequence of jobs as a pipeline with proper error handling"""

print(f"Starting pipeline: {pipeline_name}")
print("=" * 40)

succeeded_jobs = []
failed_job = None

for i, step in enumerate(steps, 1):
print(f"Step {i}/{len(steps)}: {step['description']}")

# Create the job
job_name = step['create_func'](*step.get('args', []))

if not job_name:
print(f"✗ Failed to create job for step {i}")
failed_job = f"Step {i}"
break

# Monitor the job
status = monitor_job(job_name)

if status == 'Succeeded':
succeeded_jobs.append(job_name)
print(f"✓ Step {i} completed successfully")
else:
print(f"✗ Pipeline failed at step {i}")
failed_job = job_name
break

# Pipeline summary
print("\n" + "=" * 40)
print(f"Pipeline: {pipeline_name}")
print(f"Succeeded: {len(succeeded_jobs)}/{len(steps)} steps")

if failed_job:
print(f"Failed at: {failed_job}")
return succeeded_jobs, failed_job
else:
print("✓ All steps succeeded successfully!")
return succeeded_jobs, None

# Define pipeline steps
pipeline = [
{
'name': 'extract-data',
'image_name': 'extractor:latest',
'startup_command': 'python extract.py',
'cpu': 2, 'memory': 4,
'ttl_seconds_after_finished': 600
},
{
'name': 'transform-data',
'image_name': 'transformer:latest',
'startup_command': 'python transform.py',
'cpu': 4, 'memory': 8,
'ttl_seconds_after_finished': 600
},
{
'name': 'load-data',
'image_name': 'loader:latest',
'startup_command': 'python load.py',
'cpu': 2, 'memory': 4,
'ttl_seconds_after_finished': 600
}
]

# Run the pipeline
run_pipeline(pipeline)

Parallel job execution

import concurrent.futures

def run_parallel_jobs(job_configs):
"""Run multiple jobs in parallel"""

def create_and_monitor(config):
response = job.job_create(**config)
if response['code'] != 200:
return config['name'], 'Failed to create'

status = monitor_job(config['name'])
return config['name'], status

with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
futures = [executor.submit(create_and_monitor, config)
for config in job_configs]

results = []
for future in concurrent.futures.as_completed(futures):
name, status = future.result()
results.append((name, status))
print(f"Job {name} finished with status: {status}")

return results

# Run multiple jobs in parallel
parallel_jobs = [
{'name': 'job1', 'image_name': 'worker:latest', 'cpu': 1, 'memory': 2},
{'name': 'job2', 'image_name': 'worker:latest', 'cpu': 1, 'memory': 2},
{'name': 'job3', 'image_name': 'worker:latest', 'cpu': 1, 'memory': 2}
]

results = run_parallel_jobs(parallel_jobs)