Advanced guide
Database backup job
Whenever you run your own application or standalone database, it is useful to have backup of existing database.
def create_backup_job(database_name):
"""Create a database backup job"""
from datetime import datetime
timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
return job.job_create(
name=f"backup-{database_name}-{timestamp}",
image_name="postgres:17",
cpu=2,
memory=4,
volumes=["backups"],
volume_full_path="/backups",
environment_data=[
f"PGHOST={database_name}",
"PGUSER=postgres",
"PGPASSWORD=secret"
],
startup_command=f"pg_dump -d mydb -f /backups/{database_name}-{timestamp}.sql",
ttl_seconds_after_finished=300 # Clean up after 5 minutes
)
Batch processing pipeline
import cgc.sdk.job as job
import time
from datetime import datetime
def create_simple_job():
"""Create a simple example job"""
job_name = f"hello-job-{datetime.now().strftime('%Y%m%d-%H%M%S')}"
response = job.job_create(
name=job_name,
image_name="busybox:latest",
cpu=1,
memory=1,
startup_command="echo 'Hello from CGC Job!'",
ttl_seconds_after_finished=300
)
if response['code'] == 200:
print(f"✓ Job '{job_name}' created successfully")
return job_name
else:
print(f"✗ Failed to create job: {response.get('message', 'Unknown error')}")
return None
def monitor_job(job_name, check_interval=5, max_wait=60):
"""Monitor a job until it completes or fails"""
print(f"Monitoring job '{job_name}'...")
start_time = time.time()
last_status = None
while time.time() - start_time < max_wait:
try:
response = job.job_list()
if response['code'] != 200:
print(f"✗ Failed to get job status: {response.get('message', 'Unknown error')}")
return 'Unknown'
# Find our job in the list
for j in response['details'].get('job_list', []):
if j['name'] == job_name:
current_status = j.get('status', {}).get('phase', 'Unknown')
# Only print if status changed
if current_status != last_status:
elapsed = int(time.time() - start_time)
print(f" [{elapsed}s] Status: {current_status}")
last_status = current_status
# Check if job finished
if current_status == 'Succeeded':
print(f"✓ Job '{job_name}' completed successfully!")
return 'Succeeded'
elif current_status == 'Failed':
print(f"✗ Job '{job_name}' failed")
return 'Failed'
break
else:
print(f" Job '{job_name}' not found (may have been cleaned up)")
return 'NotFound'
time.sleep(check_interval)
except Exception as e:
print(f"✗ Error monitoring job: {e}")
return 'Error'
print(f"✗ Job monitoring timeout after {max_wait} seconds")
return 'Timeout'
def run_job_pipeline(pipeline_name, steps):
"""Run a sequence of jobs as a pipeline with proper error handling"""
print(f"Starting pipeline: {pipeline_name}")
print("=" * 40)
succeeded_jobs = []
failed_job = None
for i, step in enumerate(steps, 1):
print(f"Step {i}/{len(steps)}: {step['description']}")
# Create the job
job_name = step['create_func'](*step.get('args', []))
if not job_name:
print(f"✗ Failed to create job for step {i}")
failed_job = f"Step {i}"
break
# Monitor the job
status = monitor_job(job_name)
if status == 'Succeeded':
succeeded_jobs.append(job_name)
print(f"✓ Step {i} completed successfully")
else:
print(f"✗ Pipeline failed at step {i}")
failed_job = job_name
break
# Pipeline summary
print("=" * 40)
print(f"Pipeline: {pipeline_name}")
print(f"Succeeded: {len(succeeded_jobs)}/{len(steps)} steps")
if failed_job:
print(f"Failed at: {failed_job}")
return succeeded_jobs, failed_job
else:
print("✓ All steps succeeded successfully!")
return succeeded_jobs, None
# Example usage
pipeline_steps = [
{
'description': 'Extract data',
'create_func': lambda: create_simple_job()
},
{
'description': 'Process data',
'create_func': lambda: create_simple_job()
},
{
'description': 'Generate report',
'create_func': lambda: create_simple_job()
}
]
succeeded, failed = run_job_pipeline("ETL Pipeline", pipeline_steps)
if not failed:
print("Pipeline completed successfully!")
else:
print(f"Pipeline failed at: {failed}")
Job monitoring
Check job status
import cgc.sdk.job as job
import time
def monitor_job(job_name, check_interval=10):
"""Monitor a job until it completes"""
while True:
response = job.job_list()
if response['code'] != 200:
print("Failed to get job status")
break
# Find our job
job_found = False
for j in response['details'].get('job_list', []):
if j['name'] == job_name:
job_found = True
status = j.get('status', {}).get('phase', 'Unknown')
print(f"Job {job_name}: {status}")
if status in ['Succeeded', 'Failed']:
return status
break
if not job_found:
print(f"Job {job_name} not found (may have been cleaned up)")
return 'Cleaned'
time.sleep(check_interval)
# Use it
final_status = monitor_job("my-processing-job")
print(f"Job finished with status: {final_status}")
Job success pattern with early failure detection
def monitor_job(job_name):
"""Monitor a job until completion with improved status checking"""
print(f"Monitoring job '{job_name}'...")
start_time = time.time()
last_status = None
max_wait = 300 # 5 minutes max wait
while True:
if time.time() - start_time > max_wait:
print(f"✗ Job '{job_name}' timed out after {max_wait} seconds")
return 'Timeout'
try:
response = job.job_list()
if response['code'] != 200:
print(f"Failed to get job status: {response.get('message', 'Unknown error')}")
time.sleep(10)
continue
# Find our job
for j in response['details'].get('job_list', []):
if j['name'] == job_name:
current_status = j.get('status', {}).get('phase', 'Unknown')
# Only print if status changed
if current_status != last_status:
elapsed = int(time.time() - start_time)
print(f" [{elapsed}s] Status: {current_status}")
last_status = current_status
# Check if job finished
if current_status == 'Succeeded':
print(f"✓ Job '{job_name}' completed successfully!")
return 'Succeeded'
elif current_status == 'Failed':
print(f"✗ Job '{job_name}' failed")
return 'Failed'
break
else:
print(f"Job '{job_name}' not found (may have been cleaned up)")
return 'NotFound'
except Exception as e:
print(f"Error monitoring job: {e}")
time.sleep(5)
def run_job_with_verification(job_config):
"""Run a job and verify it completes successfully"""
# Create the job
response = job.job_create(**job_config)
if response['code'] != 200:
print(f"Failed to create job: {response['message']}")
return False
job_name = job_config['name']
print(f"Job {job_name} created successfully")
# Monitor until completion
status = monitor_job(job_name)
if status == 'Succeeded':
print(f"Job {job_name} completed successfully")
return True
else:
print(f"Job {job_name} failed with status: {status}")
return False
# Example with early failure detection
def main_example():
"""Example main function with failure handling"""
# Example 1: Simple job
print("1. Running a simple job")
simple_job = create_simple_job()
if simple_job:
monitor_job(simple_job)
else:
print("✗ Simple job failed. Stopping further tests.")
return # Stop execution if first job fails
# Example 2: Only run if first job succeeded
print("2. Running a data processing job")
data_job = create_data_processing_job("/data/input.csv", "/data/output/")
if data_job:
monitor_job(data_job)
Advanced patterns
Sequential job pipeline with failure handling
def run_job_pipeline(pipeline_name, steps):
"""Run a sequence of jobs as a pipeline with proper error handling"""
print(f"Starting pipeline: {pipeline_name}")
print("=" * 40)
succeeded_jobs = []
failed_job = None
for i, step in enumerate(steps, 1):
print(f"Step {i}/{len(steps)}: {step['description']}")
# Create the job
job_name = step['create_func'](*step.get('args', []))
if not job_name:
print(f"✗ Failed to create job for step {i}")
failed_job = f"Step {i}"
break
# Monitor the job
status = monitor_job(job_name)
if status == 'Succeeded':
succeeded_jobs.append(job_name)
print(f"✓ Step {i} completed successfully")
else:
print(f"✗ Pipeline failed at step {i}")
failed_job = job_name
break
# Pipeline summary
print("\n" + "=" * 40)
print(f"Pipeline: {pipeline_name}")
print(f"Succeeded: {len(succeeded_jobs)}/{len(steps)} steps")
if failed_job:
print(f"Failed at: {failed_job}")
return succeeded_jobs, failed_job
else:
print("✓ All steps succeeded successfully!")
return succeeded_jobs, None
# Define pipeline steps
pipeline = [
{
'name': 'extract-data',
'image_name': 'extractor:latest',
'startup_command': 'python extract.py',
'cpu': 2, 'memory': 4,
'ttl_seconds_after_finished': 600
},
{
'name': 'transform-data',
'image_name': 'transformer:latest',
'startup_command': 'python transform.py',
'cpu': 4, 'memory': 8,
'ttl_seconds_after_finished': 600
},
{
'name': 'load-data',
'image_name': 'loader:latest',
'startup_command': 'python load.py',
'cpu': 2, 'memory': 4,
'ttl_seconds_after_finished': 600
}
]
# Run the pipeline
run_pipeline(pipeline)
Parallel job execution
import concurrent.futures
def run_parallel_jobs(job_configs):
"""Run multiple jobs in parallel"""
def create_and_monitor(config):
response = job.job_create(**config)
if response['code'] != 200:
return config['name'], 'Failed to create'
status = monitor_job(config['name'])
return config['name'], status
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
futures = [executor.submit(create_and_monitor, config)
for config in job_configs]
results = []
for future in concurrent.futures.as_completed(futures):
name, status = future.result()
results.append((name, status))
print(f"Job {name} finished with status: {status}")
return results
# Run multiple jobs in parallel
parallel_jobs = [
{'name': 'job1', 'image_name': 'worker:latest', 'cpu': 1, 'memory': 2},
{'name': 'job2', 'image_name': 'worker:latest', 'cpu': 1, 'memory': 2},
{'name': 'job3', 'image_name': 'worker:latest', 'cpu': 1, 'memory': 2}
]
results = run_parallel_jobs(parallel_jobs)