Loading...
Loading...
BioBlend and Planemo expertise for Galaxy workflow automation. Galaxy API usage, workflow invocation, status checking, error handling, batch processing, and dataset management. Essential for any Galaxy automation project.
npx skill4agent add delphine-l/claude_global galaxy-automationfrom bioblend.galaxy import GalaxyInstance
# Connect to Galaxy server
gi = GalaxyInstance(url='https://usegalaxy.org', key='your_api_key')
# Verify connection
print(gi.whoami())f"{key[:4]}{'*' * (len(key) - 8)}{key[-4:]}"def get_or_find_history_id(gi, history_name):
"""Get history ID by name, or create if doesn't exist"""
histories = gi.histories.get_histories(name=history_name)
if histories:
return histories[0]['id']
else:
history = gi.histories.create_history(name=history_name)
return history['id']histories = gi.histories.get_histories()
for hist in histories:
print(f"{hist['name']}: {hist['id']}")history_id = '...'
contents = gi.histories.show_history(history_id, contents=True)
for item in contents:
print(f"{item['name']}: {item['state']}")workflow_id = 'a1b2c3d4e5f67890'
workflow = gi.workflows.show_workflow(workflow_id)
print(f"Workflow: {workflow['name']}")# Prepare inputs (dataset IDs or dataset collection IDs)
inputs = {
'0': {'id': dataset_id, 'src': 'hda'}, # hda = history dataset
'1': {'id': collection_id, 'src': 'hdca'} # hdca = history dataset collection
}
# Invoke workflow
invocation = gi.workflows.invoke_workflow(
workflow_id,
inputs=inputs,
history_id=history_id,
import_inputs_to_history=False # Inputs already in history
)
invocation_id = invocation['id']
print(f"Invocation ID: {invocation_id}")def check_invocation_complete(gi, invocation_id, include_steps=False):
"""
Check if workflow invocation is complete.
Returns:
str: 'ok', 'running', 'failed', 'cancelled', 'error'
"""
invocation = gi.invocations.show_invocation(
invocation_id,
include_workflow_steps=include_steps
)
state = invocation['state']
# Possible states: 'new', 'ready', 'scheduled', 'running',
# 'ok', 'failed', 'cancelled', 'error'
return stateimport time
def wait_for_invocation(gi, invocation_id, poll_interval=30, timeout=3600):
"""Wait for invocation to complete"""
start_time = time.time()
while True:
state = check_invocation_complete(gi, invocation_id)
if state in ['ok', 'failed', 'cancelled', 'error']:
return state
if time.time() - start_time > timeout:
raise TimeoutError(f"Invocation {invocation_id} timed out after {timeout}s")
time.sleep(poll_interval)invocation = gi.invocations.show_invocation(
invocation_id,
include_workflow_steps=True
)
# Check individual steps
for step_id, step_data in invocation.get('steps', {}).items():
step_state = step_data['state']
job_id = step_data.get('job_id')
print(f"Step {step_id}: {step_state} (job: {job_id})")def categorize_failure(gi, invocation_id):
"""Determine if failure is retriable"""
invocation = gi.invocations.show_invocation(
invocation_id,
include_workflow_steps=True
)
if invocation['state'] != 'failed':
return None
# Check failed steps
failed_steps = []
for step_id, step_data in invocation.get('steps', {}).items():
if step_data['state'] == 'error':
failed_steps.append({
'step_id': step_id,
'job_id': step_data.get('job_id')
})
# Analyze job failures
for step in failed_steps:
if step['job_id']:
job = gi.jobs.show_job(step['job_id'])
stderr = job.get('stderr', '')
# Check for specific error patterns
if 'out of memory' in stderr.lower():
return 'retriable_memory'
elif 'timeout' in stderr.lower():
return 'retriable_timeout'
elif 'network' in stderr.lower():
return 'retriable_network'
return 'permanent_failure'def rerun_failed_invocation(gi, invocation_id, use_cached_job=True,
replacement_params=None):
"""
Rerun a failed invocation using Galaxy's native rerun API.
Args:
gi: GalaxyInstance
invocation_id: Failed invocation ID
use_cached_job: Reuse successful job results
replacement_params: Dict of parameter changes
Returns:
New invocation ID
"""
rerun_payload = {
'use_cached_job': use_cached_job
}
if replacement_params:
rerun_payload['replacement_params'] = replacement_params
# Call Galaxy rerun API
response = gi.invocations.rerun_invocation(
invocation_id,
**rerun_payload
)
new_invocation_id = response['id']
return new_invocation_iddef build_replacement_params_from_yaml(gi, invocation_id, job_yaml_path):
"""
Compare YAML parameters with invocation parameters.
Returns dict of changed parameters for rerun.
"""
import yaml
# Read new parameters from YAML
with open(job_yaml_path, 'r') as f:
new_params = yaml.safe_load(f)
# Get original invocation parameters
invocation = gi.invocations.show_invocation(invocation_id)
orig_params = invocation.get('inputs', {})
# Find differences
replacement_params = {}
for key, new_value in new_params.items():
if key in orig_params:
if orig_params[key] != new_value:
replacement_params[key] = new_value
else:
replacement_params[key] = new_value
return replacement_paramsfile_path = '/path/to/file.fastq.gz'
dataset = gi.tools.upload_file(
file_path,
history_id,
file_type='fastqsanger.gz'
)
dataset_id = dataset['outputs'][0]['id']dataset = gi.datasets.show_dataset(dataset_id)
print(f"Name: {dataset['name']}")
print(f"State: {dataset['state']}") # 'ok', 'queued', 'running', 'error'
print(f"Size: {dataset.get('file_size', 0)} bytes")def wait_for_dataset(gi, dataset_id, poll_interval=5, timeout=600):
"""Wait for dataset to finish uploading"""
start_time = time.time()
while True:
dataset = gi.datasets.show_dataset(dataset_id)
state = dataset['state']
if state == 'ok':
return True
elif state == 'error':
raise RuntimeError(f"Dataset {dataset_id} failed to upload")
if time.time() - start_time > timeout:
raise TimeoutError(f"Dataset upload timeout after {timeout}s")
time.sleep(poll_interval)# List collection (multiple files)
collection_description = {
'collection_type': 'list',
'element_identifiers': [
{'id': dataset_id1, 'name': 'sample1', 'src': 'hda'},
{'id': dataset_id2, 'name': 'sample2', 'src': 'hda'},
]
}
collection = gi.histories.create_dataset_collection(
history_id,
collection_description
)
collection_id = collection['id']collection_description = {
'collection_type': 'paired',
'element_identifiers': [
{'id': forward_dataset_id, 'name': 'forward', 'src': 'hda'},
{'id': reverse_dataset_id, 'name': 'reverse', 'src': 'hda'},
]
}collection_description = {
'collection_type': 'list:paired',
'element_identifiers': [
{
'name': 'sample1',
'collection_type': 'paired',
'element_identifiers': [
{'id': sample1_fwd, 'name': 'forward', 'src': 'hda'},
{'id': sample1_rev, 'name': 'reverse', 'src': 'hda'},
]
},
{
'name': 'sample2',
'collection_type': 'paired',
'element_identifiers': [
{'id': sample2_fwd, 'name': 'forward', 'src': 'hda'},
{'id': sample2_rev, 'name': 'reverse', 'src': 'hda'},
]
}
]
}planemo run <workflow_file> <job_yaml> \
--engine external_galaxy \
--galaxy_url "https://usegalaxy.org" \
--galaxy_user_key "your_api_key" \
--history_name "My Analysis" \
--test_output_json "invocation.json"--engine external_galaxy--simultaneous_uploads--check_uploads_ok--test_output_json# Inputs
input_reads:
class: File
path: /path/to/reads.fastq.gz
# Collections
paired_reads:
class: Collection
collection_type: paired
elements:
- identifier: forward
class: File
path: /path/to/forward.fastq.gz
- identifier: reverse
class: File
path: /path/to/reverse.fastq.gz
# Parameters
kmer_size: 21
coverage_threshold: 30def build_planemo_command(workflow_path, job_yaml, galaxy_url, api_key,
history_name, output_json, log_file):
"""
Build planemo run command.
Security: Mask API key in display, but use full key in command.
"""
command = (
f'planemo run "{workflow_path}" "{job_yaml}" '
f'--engine external_galaxy '
f'--galaxy_url "{galaxy_url}" '
f'--simultaneous_uploads '
f'--check_uploads_ok '
f'--galaxy_user_key "{api_key}" '
f'--history_name "{history_name}" '
f'--test_output_json "{output_json}" '
f'> "{log_file}" 2>&1'
)
return commandimport os
return_code = os.system(planemo_command)
if return_code != 0:
# Planemo failed - workflow was NOT launched in Galaxy
# No invocation ID exists
print(f"ERROR: Planemo failed with return code {return_code}")
print(f"Check log: {log_file}")
# DO NOT mark invocation as failed - it was never created
else:
# Planemo succeeded - workflow launched
# Invocation ID is in output JSON
print(f"SUCCESS: Workflow launched")os.system()actual_exit = return_code >> 8import json
def extract_invocation_id(output_json_path):
"""Extract invocation ID from planemo test output"""
with open(output_json_path, 'r') as f:
data = json.load(f)
# Planemo output structure
tests = data.get('tests', [])
if tests and len(tests) > 0:
test = tests[0]
invocation_id = test['data'].get('invocation_id')
return invocation_id
return Noneimport threading
galaxy_lock = threading.Lock()
def thread_safe_invoke_workflow(gi, workflow_id, inputs, history_id):
"""Invoke workflow with thread safety"""
with galaxy_lock:
invocation = gi.workflows.invoke_workflow(
workflow_id,
inputs=inputs,
history_id=history_id
)
return invocation['id']def process_samples_batch(gi, workflow_id, samples, max_concurrent=3):
"""
Process multiple samples with concurrency limit.
Args:
gi: GalaxyInstance
workflow_id: Workflow to run
samples: List of sample dicts with 'name' and 'files'
max_concurrent: Max parallel invocations
"""
from concurrent.futures import ThreadPoolExecutor, as_completed
def process_one_sample(sample):
# Create history
history_id = get_or_find_history_id(gi, sample['name'])
# Upload files
dataset_ids = []
for file_path in sample['files']:
ds = gi.tools.upload_file(file_path, history_id)
dataset_ids.append(ds['outputs'][0]['id'])
# Invoke workflow
inputs = {'0': {'id': dataset_ids[0], 'src': 'hda'}}
invocation_id = thread_safe_invoke_workflow(
gi, workflow_id, inputs, history_id
)
# Wait for completion
state = wait_for_invocation(gi, invocation_id)
return {
'sample': sample['name'],
'invocation_id': invocation_id,
'state': state
}
# Process with limited concurrency
with ThreadPoolExecutor(max_workers=max_concurrent) as executor:
futures = {executor.submit(process_one_sample, s): s for s in samples}
results = []
for future in as_completed(futures):
result = future.result()
results.append(result)
print(f"Completed: {result['sample']} - {result['state']}")
return resultsimport json
import os
STATE_FILE = 'processing_state.json'
def load_state():
"""Load processing state"""
if os.path.exists(STATE_FILE):
with open(STATE_FILE, 'r') as f:
return json.load(f)
return {'completed': [], 'failed': []}
def save_state(state):
"""Save processing state"""
with open(STATE_FILE, 'w') as f:
json.dump(state, f, indent=2)
def process_with_resume(samples):
"""Process samples with resume capability"""
state = load_state()
for sample in samples:
sample_name = sample['name']
# Skip if already completed
if sample_name in state['completed']:
print(f"Skipping {sample_name} (already completed)")
continue
try:
# Process sample
result = process_one_sample(sample)
if result['state'] == 'ok':
state['completed'].append(sample_name)
else:
state['failed'].append(sample_name)
save_state(state)
except Exception as e:
print(f"Error processing {sample_name}: {e}")
state['failed'].append(sample_name)
save_state(state)import os
api_key = os.environ.get('GALAXY_API_KEY')
if not api_key:
raise ValueError("GALAXY_API_KEY environment variable not set")
gi = GalaxyInstance(url, api_key)def mask_api_key(key):
"""Mask API key for display"""
if len(key) <= 8:
return '*' * len(key)
return f"{key[:4]}{'*' * (len(key) - 8)}{key[-4:]}"
masked_key = mask_api_key(api_key)
print(f"Using API key: {masked_key}")# ✅ Good - handles spaces
command = f'planemo run "{workflow_path}" "{job_yaml}"'
# ❌ Bad - breaks with spaces
command = f'planemo run {workflow_path} {job_yaml}'def inspect_history(gi, history_id):
"""Print detailed history information"""
history = gi.histories.show_history(history_id)
print(f"History: {history['name']} ({history['id']})")
print(f"State: {history['state']}")
contents = gi.histories.show_history(history_id, contents=True)
for item in contents:
print(f" [{item['state']}] {item['name']} (type: {item['history_content_type']})")def analyze_failed_invocation(gi, invocation_id):
"""Analyze why invocation failed"""
invocation = gi.invocations.show_invocation(
invocation_id,
include_workflow_steps=True
)
print(f"Invocation: {invocation_id}")
print(f"State: {invocation['state']}")
for step_id, step_data in invocation.get('steps', {}).items():
step_state = step_data['state']
job_id = step_data.get('job_id')
if step_state == 'error':
print(f"\nFailed Step {step_id}:")
if job_id:
job = gi.jobs.show_job(job_id)
print(f" Tool: {job.get('tool_id')}")
print(f" Exit code: {job.get('exit_code')}")
print(f" Stderr:\n{job.get('stderr', 'N/A')}")--simultaneous_uploadsos.system()return_code >> 8--test_output_json