galaxy-automation
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseGalaxy Workflow Automation with BioBlend and Planemo
基于BioBlend和Planemo的Galaxy工作流自动化
Purpose
用途
This skill provides expert knowledge for automating Galaxy workflows using BioBlend (Python Galaxy API library) and Planemo (Galaxy workflow testing and execution tool).
本技能提供使用BioBlend(Python Galaxy API库)和Planemo(Galaxy工作流测试与执行工具)实现Galaxy工作流自动化的专业知识。
When to Use This Skill
适用场景
Use this skill when:
- ✅ Automating Galaxy workflow execution via API
- ✅ Building batch processing systems for Galaxy
- ✅ Using BioBlend to interact with Galaxy
- ✅ Testing workflows with Planemo
- ✅ Managing Galaxy histories, datasets, and collections programmatically
- ✅ Polling workflow invocation status
- ✅ Implementing error handling and retry logic for Galaxy operations
- ✅ Creating Galaxy automation pipelines
- ✅ Integrating Galaxy into larger bioinformatics workflows
This skill is NOT project-specific - it's useful for ANY Galaxy automation project.
在以下场景使用本技能:
- ✅ 通过API自动化执行Galaxy工作流
- ✅ 构建Galaxy批量处理系统
- ✅ 使用BioBlend与Galaxy进行交互
- ✅ 使用Planemo测试工作流
- ✅ 以编程方式管理Galaxy历史记录、数据集和集合
- ✅ 轮询工作流调用状态
- ✅ 为Galaxy操作实现错误处理与重试逻辑
- ✅ 创建Galaxy自动化流水线
- ✅ 将Galaxy集成到更大的生物信息学工作流中
本技能不针对特定项目 - 适用于任何Galaxy自动化项目。
Core BioBlend Concepts
核心BioBlend概念
1. Galaxy Instance Connection
1. Galaxy实例连接
python
from bioblend.galaxy import GalaxyInstancepython
from bioblend.galaxy import GalaxyInstanceConnect to Galaxy server
Connect to Galaxy server
gi = GalaxyInstance(url='https://usegalaxy.org', key='your_api_key')
gi = GalaxyInstance(url='https://usegalaxy.org', key='your_api_key')
Verify connection
Verify connection
print(gi.whoami())
**Best practices:**
- Store API keys in environment variables, never in code
- Use HTTPS URLs for production
- Mask API keys in logs: `f"{key[:4]}{'*' * (len(key) - 8)}{key[-4:]}"`
---print(gi.whoami())
**最佳实践:**
- 将API密钥存储在环境变量中,切勿写入代码
- 生产环境使用HTTPS URL
- 在日志中掩码API密钥:`f"{key[:4]}{'*' * (len(key) - 8)}{key[-4:]}"`
---2. History Management
2. 历史记录管理
Create or find history:
python
def get_or_find_history_id(gi, history_name):
"""Get history ID by name, or create if doesn't exist"""
histories = gi.histories.get_histories(name=history_name)
if histories:
return histories[0]['id']
else:
history = gi.histories.create_history(name=history_name)
return history['id']List histories:
python
histories = gi.histories.get_histories()
for hist in histories:
print(f"{hist['name']}: {hist['id']}")Get history contents:
python
history_id = '...'
contents = gi.histories.show_history(history_id, contents=True)
for item in contents:
print(f"{item['name']}: {item['state']}")创建或查找历史记录:
python
def get_or_find_history_id(gi, history_name):
"""Get history ID by name, or create if doesn't exist"""
histories = gi.histories.get_histories(name=history_name)
if histories:
return histories[0]['id']
else:
history = gi.histories.create_history(name=history_name)
return history['id']列出历史记录:
python
histories = gi.histories.get_histories()
for hist in histories:
print(f"{hist['name']}: {hist['id']}")查看历史记录内容:
python
history_id = '...'
contents = gi.histories.show_history(history_id, contents=True)
for item in contents:
print(f"{item['name']}: {item['state']}")3. Workflow Invocation
3. 工作流调用
Get workflow by ID:
python
workflow_id = 'a1b2c3d4e5f67890'
workflow = gi.workflows.show_workflow(workflow_id)
print(f"Workflow: {workflow['name']}")Invoke workflow:
python
undefined通过ID获取工作流:
python
workflow_id = 'a1b2c3d4e5f67890'
workflow = gi.workflows.show_workflow(workflow_id)
print(f"Workflow: {workflow['name']}")调用工作流:
python
undefinedPrepare inputs (dataset IDs or dataset collection IDs)
Prepare inputs (dataset IDs or dataset collection IDs)
inputs = {
'0': {'id': dataset_id, 'src': 'hda'}, # hda = history dataset
'1': {'id': collection_id, 'src': 'hdca'} # hdca = history dataset collection
}
inputs = {
'0': {'id': dataset_id, 'src': 'hda'}, # hda = history dataset
'1': {'id': collection_id, 'src': 'hdca'} # hdca = history dataset collection
}
Invoke workflow
Invoke workflow
invocation = gi.workflows.invoke_workflow(
workflow_id,
inputs=inputs,
history_id=history_id,
import_inputs_to_history=False # Inputs already in history
)
invocation_id = invocation['id']
print(f"Invocation ID: {invocation_id}")
---invocation = gi.workflows.invoke_workflow(
workflow_id,
inputs=inputs,
history_id=history_id,
import_inputs_to_history=False # Inputs already in history
)
invocation_id = invocation['id']
print(f"Invocation ID: {invocation_id}")
---4. Invocation Status Checking
4. 调用状态检查
Poll invocation status:
python
def check_invocation_complete(gi, invocation_id, include_steps=False):
"""
Check if workflow invocation is complete.
Returns:
str: 'ok', 'running', 'failed', 'cancelled', 'error'
"""
invocation = gi.invocations.show_invocation(
invocation_id,
include_workflow_steps=include_steps
)
state = invocation['state']
# Possible states: 'new', 'ready', 'scheduled', 'running',
# 'ok', 'failed', 'cancelled', 'error'
return stateWait for completion:
python
import time
def wait_for_invocation(gi, invocation_id, poll_interval=30, timeout=3600):
"""Wait for invocation to complete"""
start_time = time.time()
while True:
state = check_invocation_complete(gi, invocation_id)
if state in ['ok', 'failed', 'cancelled', 'error']:
return state
if time.time() - start_time > timeout:
raise TimeoutError(f"Invocation {invocation_id} timed out after {timeout}s")
time.sleep(poll_interval)Get invocation details with steps:
python
invocation = gi.invocations.show_invocation(
invocation_id,
include_workflow_steps=True
)轮询调用状态:
python
def check_invocation_complete(gi, invocation_id, include_steps=False):
"""
Check if workflow invocation is complete.
Returns:
str: 'ok', 'running', 'failed', 'cancelled', 'error'
"""
invocation = gi.invocations.show_invocation(
invocation_id,
include_workflow_steps=include_steps
)
state = invocation['state']
# Possible states: 'new', 'ready', 'scheduled', 'running',
# 'ok', 'failed', 'cancelled', 'error'
return state等待执行完成:
python
import time
def wait_for_invocation(gi, invocation_id, poll_interval=30, timeout=3600):
"""Wait for invocation to complete"""
start_time = time.time()
while True:
state = check_invocation_complete(gi, invocation_id)
if state in ['ok', 'failed', 'cancelled', 'error']:
return state
if time.time() - start_time > timeout:
raise TimeoutError(f"Invocation {invocation_id} timed out after {timeout}s")
time.sleep(poll_interval)查看包含步骤的调用详情:
python
invocation = gi.invocations.show_invocation(
invocation_id,
include_workflow_steps=True
)Check individual steps
Check individual steps
for step_id, step_data in invocation.get('steps', {}).items():
step_state = step_data['state']
job_id = step_data.get('job_id')
print(f"Step {step_id}: {step_state} (job: {job_id})")
---for step_id, step_data in invocation.get('steps', {}).items():
step_state = step_data['state']
job_id = step_data.get('job_id')
print(f"Step {step_id}: {step_state} (job: {job_id})")
---5. Error Handling Patterns
5. 错误处理模式
Categorize failures:
python
def categorize_failure(gi, invocation_id):
"""Determine if failure is retriable"""
invocation = gi.invocations.show_invocation(
invocation_id,
include_workflow_steps=True
)
if invocation['state'] != 'failed':
return None
# Check failed steps
failed_steps = []
for step_id, step_data in invocation.get('steps', {}).items():
if step_data['state'] == 'error':
failed_steps.append({
'step_id': step_id,
'job_id': step_data.get('job_id')
})
# Analyze job failures
for step in failed_steps:
if step['job_id']:
job = gi.jobs.show_job(step['job_id'])
stderr = job.get('stderr', '')
# Check for specific error patterns
if 'out of memory' in stderr.lower():
return 'retriable_memory'
elif 'timeout' in stderr.lower():
return 'retriable_timeout'
elif 'network' in stderr.lower():
return 'retriable_network'
return 'permanent_failure'分类失败类型:
python
def categorize_failure(gi, invocation_id):
"""Determine if failure is retriable"""
invocation = gi.invocations.show_invocation(
invocation_id,
include_workflow_steps=True
)
if invocation['state'] != 'failed':
return None
# Check failed steps
failed_steps = []
for step_id, step_data in invocation.get('steps', {}).items():
if step_data['state'] == 'error':
failed_steps.append({
'step_id': step_id,
'job_id': step_data.get('job_id')
})
# Analyze job failures
for step in failed_steps:
if step['job_id']:
job = gi.jobs.show_job(step['job_id'])
stderr = job.get('stderr', '')
# Check for specific error patterns
if 'out of memory' in stderr.lower():
return 'retriable_memory'
elif 'timeout' in stderr.lower():
return 'retriable_timeout'
elif 'network' in stderr.lower():
return 'retriable_network'
return 'permanent_failure'6. Rerun Failed Invocations
6. 重新运行失败的调用
Galaxy rerun API:
python
def rerun_failed_invocation(gi, invocation_id, use_cached_job=True,
replacement_params=None):
"""
Rerun a failed invocation using Galaxy's native rerun API.
Args:
gi: GalaxyInstance
invocation_id: Failed invocation ID
use_cached_job: Reuse successful job results
replacement_params: Dict of parameter changes
Returns:
New invocation ID
"""
rerun_payload = {
'use_cached_job': use_cached_job
}
if replacement_params:
rerun_payload['replacement_params'] = replacement_params
# Call Galaxy rerun API
response = gi.invocations.rerun_invocation(
invocation_id,
**rerun_payload
)
new_invocation_id = response['id']
return new_invocation_idDetect parameter changes from YAML:
python
def build_replacement_params_from_yaml(gi, invocation_id, job_yaml_path):
"""
Compare YAML parameters with invocation parameters.
Returns dict of changed parameters for rerun.
"""
import yaml
# Read new parameters from YAML
with open(job_yaml_path, 'r') as f:
new_params = yaml.safe_load(f)
# Get original invocation parameters
invocation = gi.invocations.show_invocation(invocation_id)
orig_params = invocation.get('inputs', {})
# Find differences
replacement_params = {}
for key, new_value in new_params.items():
if key in orig_params:
if orig_params[key] != new_value:
replacement_params[key] = new_value
else:
replacement_params[key] = new_value
return replacement_paramsGalaxy重新运行API:
python
def rerun_failed_invocation(gi, invocation_id, use_cached_job=True,
replacement_params=None):
"""
Rerun a failed invocation using Galaxy's native rerun API.
Args:
gi: GalaxyInstance
invocation_id: Failed invocation ID
use_cached_job: Reuse successful job results
replacement_params: Dict of parameter changes
Returns:
New invocation ID
"""
rerun_payload = {
'use_cached_job': use_cached_job
}
if replacement_params:
rerun_payload['replacement_params'] = replacement_params
# Call Galaxy rerun API
response = gi.invocations.rerun_invocation(
invocation_id,
**rerun_payload
)
new_invocation_id = response['id']
return new_invocation_id从YAML检测参数变更:
python
def build_replacement_params_from_yaml(gi, invocation_id, job_yaml_path):
"""
Compare YAML parameters with invocation parameters.
Returns dict of changed parameters for rerun.
"""
import yaml
# Read new parameters from YAML
with open(job_yaml_path, 'r') as f:
new_params = yaml.safe_load(f)
# Get original invocation parameters
invocation = gi.invocations.show_invocation(invocation_id)
orig_params = invocation.get('inputs', {})
# Find differences
replacement_params = {}
for key, new_value in new_params.items():
if key in orig_params:
if orig_params[key] != new_value:
replacement_params[key] = new_value
else:
replacement_params[key] = new_value
return replacement_params7. Dataset Operations
7. 数据集操作
Upload dataset:
python
file_path = '/path/to/file.fastq.gz'
dataset = gi.tools.upload_file(
file_path,
history_id,
file_type='fastqsanger.gz'
)
dataset_id = dataset['outputs'][0]['id']Get dataset details:
python
dataset = gi.datasets.show_dataset(dataset_id)
print(f"Name: {dataset['name']}")
print(f"State: {dataset['state']}") # 'ok', 'queued', 'running', 'error'
print(f"Size: {dataset.get('file_size', 0)} bytes")Wait for dataset upload:
python
def wait_for_dataset(gi, dataset_id, poll_interval=5, timeout=600):
"""Wait for dataset to finish uploading"""
start_time = time.time()
while True:
dataset = gi.datasets.show_dataset(dataset_id)
state = dataset['state']
if state == 'ok':
return True
elif state == 'error':
raise RuntimeError(f"Dataset {dataset_id} failed to upload")
if time.time() - start_time > timeout:
raise TimeoutError(f"Dataset upload timeout after {timeout}s")
time.sleep(poll_interval)上传数据集:
python
file_path = '/path/to/file.fastq.gz'
dataset = gi.tools.upload_file(
file_path,
history_id,
file_type='fastqsanger.gz'
)
dataset_id = dataset['outputs'][0]['id']获取数据集详情:
python
dataset = gi.datasets.show_dataset(dataset_id)
print(f"Name: {dataset['name']}")
print(f"State: {dataset['state']}") # 'ok', 'queued', 'running', 'error'
print(f"Size: {dataset.get('file_size', 0)} bytes")等待数据集上传完成:
python
def wait_for_dataset(gi, dataset_id, poll_interval=5, timeout=600):
"""Wait for dataset to finish uploading"""
start_time = time.time()
while True:
dataset = gi.datasets.show_dataset(dataset_id)
state = dataset['state']
if state == 'ok':
return True
elif state == 'error':
raise RuntimeError(f"Dataset {dataset_id} failed to upload")
if time.time() - start_time > timeout:
raise TimeoutError(f"Dataset upload timeout after {timeout}s")
time.sleep(poll_interval)8. Collections (Paired, List, List:Paired)
8. 集合(配对、列表、列表-配对)
Create dataset collection:
python
undefined创建数据集集合:
python
undefinedList collection (multiple files)
List collection (multiple files)
collection_description = {
'collection_type': 'list',
'element_identifiers': [
{'id': dataset_id1, 'name': 'sample1', 'src': 'hda'},
{'id': dataset_id2, 'name': 'sample2', 'src': 'hda'},
]
}
collection = gi.histories.create_dataset_collection(
history_id,
collection_description
)
collection_id = collection['id']
**Paired collection (forward/reverse reads):**
```python
collection_description = {
'collection_type': 'paired',
'element_identifiers': [
{'id': forward_dataset_id, 'name': 'forward', 'src': 'hda'},
{'id': reverse_dataset_id, 'name': 'reverse', 'src': 'hda'},
]
}List:Paired (multiple paired-end samples):
python
collection_description = {
'collection_type': 'list:paired',
'element_identifiers': [
{
'name': 'sample1',
'collection_type': 'paired',
'element_identifiers': [
{'id': sample1_fwd, 'name': 'forward', 'src': 'hda'},
{'id': sample1_rev, 'name': 'reverse', 'src': 'hda'},
]
},
{
'name': 'sample2',
'collection_type': 'paired',
'element_identifiers': [
{'id': sample2_fwd, 'name': 'forward', 'src': 'hda'},
{'id': sample2_rev, 'name': 'reverse', 'src': 'hda'},
]
}
]
}collection_description = {
'collection_type': 'list',
'element_identifiers': [
{'id': dataset_id1, 'name': 'sample1', 'src': 'hda'},
{'id': dataset_id2, 'name': 'sample2', 'src': 'hda'},
]
}
collection = gi.histories.create_dataset_collection(
history_id,
collection_description
)
collection_id = collection['id']
**配对集合(正向/反向读取):**
```python
collection_description = {
'collection_type': 'paired',
'element_identifiers': [
{'id': forward_dataset_id, 'name': 'forward', 'src': 'hda'},
{'id': reverse_dataset_id, 'name': 'reverse', 'src': 'hda'},
]
}列表-配对集合(多个双端样本):
python
collection_description = {
'collection_type': 'list:paired',
'element_identifiers': [
{
'name': 'sample1',
'collection_type': 'paired',
'element_identifiers': [
{'id': sample1_fwd, 'name': 'forward', 'src': 'hda'},
{'id': sample1_rev, 'name': 'reverse', 'src': 'hda'},
]
},
{
'name': 'sample2',
'collection_type': 'paired',
'element_identifiers': [
{'id': sample2_fwd, 'name': 'forward', 'src': 'hda'},
{'id': sample2_rev, 'name': 'reverse', 'src': 'hda'},
]
}
]
}Core Planemo Concepts
核心Planemo概念
1. Planemo Command Structure
1. Planemo命令结构
Basic syntax:
bash
planemo run <workflow_file> <job_yaml> \
--engine external_galaxy \
--galaxy_url "https://usegalaxy.org" \
--galaxy_user_key "your_api_key" \
--history_name "My Analysis" \
--test_output_json "invocation.json"Common options:
- : Use external Galaxy server (not local)
--engine external_galaxy - : Upload all files simultaneously (faster but more resource-intensive)
--simultaneous_uploads - : Verify uploads completed successfully
--check_uploads_ok - : Save invocation details to JSON file
--test_output_json
基本语法:
bash
planemo run <workflow_file> <job_yaml> \
--engine external_galaxy \
--galaxy_url "https://usegalaxy.org" \
--galaxy_user_key "your_api_key" \
--history_name "My Analysis" \
--test_output_json "invocation.json"常用选项:
- : 使用外部Galaxy服务器(非本地)
--engine external_galaxy - : 同时上传所有文件(速度更快但资源消耗更高)
--simultaneous_uploads - : 验证上传是否成功完成
--check_uploads_ok - : 将调用详情保存到JSON文件
--test_output_json
2. Job YAML Format
2. Job YAML格式
Example job.yml:
yaml
undefined示例job.yml:
yaml
undefinedInputs
Inputs
input_reads:
class: File
path: /path/to/reads.fastq.gz
input_reads:
class: File
path: /path/to/reads.fastq.gz
Collections
Collections
paired_reads:
class: Collection
collection_type: paired
elements:
- identifier: forward
class: File
path: /path/to/forward.fastq.gz
- identifier: reverse
class: File
path: /path/to/reverse.fastq.gz
paired_reads:
class: Collection
collection_type: paired
elements:
- identifier: forward
class: File
path: /path/to/forward.fastq.gz
- identifier: reverse
class: File
path: /path/to/reverse.fastq.gz
Parameters
Parameters
kmer_size: 21
coverage_threshold: 30
---kmer_size: 21
coverage_threshold: 30
---3. Generating Planemo Commands Programmatically
3. 以编程方式生成Planemo命令
python
def build_planemo_command(workflow_path, job_yaml, galaxy_url, api_key,
history_name, output_json, log_file):
"""
Build planemo run command.
Security: Mask API key in display, but use full key in command.
"""
command = (
f'planemo run "{workflow_path}" "{job_yaml}" '
f'--engine external_galaxy '
f'--galaxy_url "{galaxy_url}" '
f'--simultaneous_uploads '
f'--check_uploads_ok '
f'--galaxy_user_key "{api_key}" '
f'--history_name "{history_name}" '
f'--test_output_json "{output_json}" '
f'> "{log_file}" 2>&1'
)
return commandExecute with error handling:
python
import os
return_code = os.system(planemo_command)
if return_code != 0:
# Planemo failed - workflow was NOT launched in Galaxy
# No invocation ID exists
print(f"ERROR: Planemo failed with return code {return_code}")
print(f"Check log: {log_file}")
# DO NOT mark invocation as failed - it was never created
else:
# Planemo succeeded - workflow launched
# Invocation ID is in output JSON
print(f"SUCCESS: Workflow launched")CRITICAL: return codes are shifted by 8 bits:
os.system()- Exit code 1 becomes return code 256
- Exit code 2 becomes return code 512
- To get actual exit code:
actual_exit = return_code >> 8
python
def build_planemo_command(workflow_path, job_yaml, galaxy_url, api_key,
history_name, output_json, log_file):
"""
Build planemo run command.
Security: Mask API key in display, but use full key in command.
"""
command = (
f'planemo run "{workflow_path}" "{job_yaml}" '
f'--engine external_galaxy '
f'--galaxy_url "{galaxy_url}" '
f'--simultaneous_uploads '
f'--check_uploads_ok '
f'--galaxy_user_key "{api_key}" '
f'--history_name "{history_name}" '
f'--test_output_json "{output_json}" '
f'> "{log_file}" 2>&1'
)
return command带错误处理的执行:
python
import os
return_code = os.system(planemo_command)
if return_code != 0:
# Planemo failed - workflow was NOT launched in Galaxy
# No invocation ID exists
print(f"ERROR: Planemo failed with return code {return_code}")
print(f"Check log: {log_file}")
# DO NOT mark invocation as failed - it was never created
else:
# Planemo succeeded - workflow launched
# Invocation ID is in output JSON
print(f"SUCCESS: Workflow launched")重要提示: 返回码会左移8位:
os.system()- 退出码1变为返回码256
- 退出码2变为返回码512
- 获取实际退出码:
actual_exit = return_code >> 8
4. Parsing Planemo Output
4. 解析Planemo输出
Extract invocation ID from JSON:
python
import json
def extract_invocation_id(output_json_path):
"""Extract invocation ID from planemo test output"""
with open(output_json_path, 'r') as f:
data = json.load(f)
# Planemo output structure
tests = data.get('tests', [])
if tests and len(tests) > 0:
test = tests[0]
invocation_id = test['data'].get('invocation_id')
return invocation_id
return None从JSON提取调用ID:
python
import json
def extract_invocation_id(output_json_path):
"""Extract invocation ID from planemo test output"""
with open(output_json_path, 'r') as f:
data = json.load(f)
# Planemo output structure
tests = data.get('tests', [])
if tests and len(tests) > 0:
test = tests[0]
invocation_id = test['data'].get('invocation_id')
return invocation_id
return NoneCommon Automation Patterns
常见自动化模式
1. Thread-Safe Galaxy Operations
1. 线程安全的Galaxy操作
Use locks for concurrent API calls:
python
import threading
galaxy_lock = threading.Lock()
def thread_safe_invoke_workflow(gi, workflow_id, inputs, history_id):
"""Invoke workflow with thread safety"""
with galaxy_lock:
invocation = gi.workflows.invoke_workflow(
workflow_id,
inputs=inputs,
history_id=history_id
)
return invocation['id']Why: Galaxy API can have issues with concurrent uploads/operations from same API key.
为并发API调用使用锁:
python
import threading
galaxy_lock = threading.Lock()
def thread_safe_invoke_workflow(gi, workflow_id, inputs, history_id):
"""Invoke workflow with thread safety"""
with galaxy_lock:
invocation = gi.workflows.invoke_workflow(
workflow_id,
inputs=inputs,
history_id=history_id
)
return invocation['id']原因: Galaxy API在同一API密钥发起并发上传/操作时可能出现问题。
2. Batch Processing Pattern
2. 批量处理模式
python
def process_samples_batch(gi, workflow_id, samples, max_concurrent=3):
"""
Process multiple samples with concurrency limit.
Args:
gi: GalaxyInstance
workflow_id: Workflow to run
samples: List of sample dicts with 'name' and 'files'
max_concurrent: Max parallel invocations
"""
from concurrent.futures import ThreadPoolExecutor, as_completed
def process_one_sample(sample):
# Create history
history_id = get_or_find_history_id(gi, sample['name'])
# Upload files
dataset_ids = []
for file_path in sample['files']:
ds = gi.tools.upload_file(file_path, history_id)
dataset_ids.append(ds['outputs'][0]['id'])
# Invoke workflow
inputs = {'0': {'id': dataset_ids[0], 'src': 'hda'}}
invocation_id = thread_safe_invoke_workflow(
gi, workflow_id, inputs, history_id
)
# Wait for completion
state = wait_for_invocation(gi, invocation_id)
return {
'sample': sample['name'],
'invocation_id': invocation_id,
'state': state
}
# Process with limited concurrency
with ThreadPoolExecutor(max_workers=max_concurrent) as executor:
futures = {executor.submit(process_one_sample, s): s for s in samples}
results = []
for future in as_completed(futures):
result = future.result()
results.append(result)
print(f"Completed: {result['sample']} - {result['state']}")
return resultspython
def process_samples_batch(gi, workflow_id, samples, max_concurrent=3):
"""
Process multiple samples with concurrency limit.
Args:
gi: GalaxyInstance
workflow_id: Workflow to run
samples: List of sample dicts with 'name' and 'files'
max_concurrent: Max parallel invocations
"""
from concurrent.futures import ThreadPoolExecutor, as_completed
def process_one_sample(sample):
# Create history
history_id = get_or_find_history_id(gi, sample['name'])
# Upload files
dataset_ids = []
for file_path in sample['files']:
ds = gi.tools.upload_file(file_path, history_id)
dataset_ids.append(ds['outputs'][0]['id'])
# Invoke workflow
inputs = {'0': {'id': dataset_ids[0], 'src': 'hda'}}
invocation_id = thread_safe_invoke_workflow(
gi, workflow_id, inputs, history_id
)
# Wait for completion
state = wait_for_invocation(gi, invocation_id)
return {
'sample': sample['name'],
'invocation_id': invocation_id,
'state': state
}
# Process with limited concurrency
with ThreadPoolExecutor(max_workers=max_concurrent) as executor:
futures = {executor.submit(process_one_sample, s): s for s in samples}
results = []
for future in as_completed(futures):
result = future.result()
results.append(result)
print(f"Completed: {result['sample']} - {result['state']}")
return results3. Resume Capability Pattern
3. 恢复能力模式
Track processed samples:
python
import json
import os
STATE_FILE = 'processing_state.json'
def load_state():
"""Load processing state"""
if os.path.exists(STATE_FILE):
with open(STATE_FILE, 'r') as f:
return json.load(f)
return {'completed': [], 'failed': []}
def save_state(state):
"""Save processing state"""
with open(STATE_FILE, 'w') as f:
json.dump(state, f, indent=2)
def process_with_resume(samples):
"""Process samples with resume capability"""
state = load_state()
for sample in samples:
sample_name = sample['name']
# Skip if already completed
if sample_name in state['completed']:
print(f"Skipping {sample_name} (already completed)")
continue
try:
# Process sample
result = process_one_sample(sample)
if result['state'] == 'ok':
state['completed'].append(sample_name)
else:
state['failed'].append(sample_name)
save_state(state)
except Exception as e:
print(f"Error processing {sample_name}: {e}")
state['failed'].append(sample_name)
save_state(state)跟踪已处理样本:
python
import json
import os
STATE_FILE = 'processing_state.json'
def load_state():
"""Load processing state"""
if os.path.exists(STATE_FILE):
with open(STATE_FILE, 'r') as f:
return json.load(f)
return {'completed': [], 'failed': []}
def save_state(state):
"""Save processing state"""
with open(STATE_FILE, 'w') as f:
json.dump(state, f, indent=2)
def process_with_resume(samples):
"""Process samples with resume capability"""
state = load_state()
for sample in samples:
sample_name = sample['name']
# Skip if already completed
if sample_name in state['completed']:
print(f"Skipping {sample_name} (already completed)")
continue
try:
# Process sample
result = process_one_sample(sample)
if result['state'] == 'ok':
state['completed'].append(sample_name)
else:
state['failed'].append(sample_name)
save_state(state)
except Exception as e:
print(f"Error processing {sample_name}: {e}")
state['failed'].append(sample_name)
save_state(state)Security Best Practices
安全最佳实践
1. API Key Management
1. API密钥管理
Store in environment variables:
python
import os
api_key = os.environ.get('GALAXY_API_KEY')
if not api_key:
raise ValueError("GALAXY_API_KEY environment variable not set")
gi = GalaxyInstance(url, api_key)Mask in logs:
python
def mask_api_key(key):
"""Mask API key for display"""
if len(key) <= 8:
return '*' * len(key)
return f"{key[:4]}{'*' * (len(key) - 8)}{key[-4:]}"
masked_key = mask_api_key(api_key)
print(f"Using API key: {masked_key}")存储在环境变量中:
python
import os
api_key = os.environ.get('GALAXY_API_KEY')
if not api_key:
raise ValueError("GALAXY_API_KEY environment variable not set")
gi = GalaxyInstance(url, api_key)在日志中掩码:
python
def mask_api_key(key):
"""Mask API key for display"""
if len(key) <= 8:
return '*' * len(key)
return f"{key[:4]}{'*' * (len(key) - 8)}{key[-4:]}"
masked_key = mask_api_key(api_key)
print(f"Using API key: {masked_key}")2. Path Handling
2. 路径处理
Always quote paths in shell commands:
python
undefined在shell命令中始终为路径添加引号:
python
undefined✅ Good - handles spaces
✅ Good - handles spaces
command = f'planemo run "{workflow_path}" "{job_yaml}"'
command = f'planemo run "{workflow_path}" "{job_yaml}"'
❌ Bad - breaks with spaces
❌ Bad - breaks with spaces
command = f'planemo run {workflow_path} {job_yaml}'
---command = f'planemo run {workflow_path} {job_yaml}'
---Debugging
调试
1. Galaxy History Inspection
1. Galaxy历史记录检查
python
def inspect_history(gi, history_id):
"""Print detailed history information"""
history = gi.histories.show_history(history_id)
print(f"History: {history['name']} ({history['id']})")
print(f"State: {history['state']}")
contents = gi.histories.show_history(history_id, contents=True)
for item in contents:
print(f" [{item['state']}] {item['name']} (type: {item['history_content_type']})")python
def inspect_history(gi, history_id):
"""Print detailed history information"""
history = gi.histories.show_history(history_id)
print(f"History: {history['name']} ({history['id']})")
print(f"State: {history['state']}")
contents = gi.histories.show_history(history_id, contents=True)
for item in contents:
print(f" [{item['state']}] {item['name']} (type: {item['history_content_type']})")2. Invocation Step Analysis
2. 调用步骤分析
python
def analyze_failed_invocation(gi, invocation_id):
"""Analyze why invocation failed"""
invocation = gi.invocations.show_invocation(
invocation_id,
include_workflow_steps=True
)
print(f"Invocation: {invocation_id}")
print(f"State: {invocation['state']}")
for step_id, step_data in invocation.get('steps', {}).items():
step_state = step_data['state']
job_id = step_data.get('job_id')
if step_state == 'error':
print(f"\nFailed Step {step_id}:")
if job_id:
job = gi.jobs.show_job(job_id)
print(f" Tool: {job.get('tool_id')}")
print(f" Exit code: {job.get('exit_code')}")
print(f" Stderr:\n{job.get('stderr', 'N/A')}")python
def analyze_failed_invocation(gi, invocation_id):
"""Analyze why invocation failed"""
invocation = gi.invocations.show_invocation(
invocation_id,
include_workflow_steps=True
)
print(f"Invocation: {invocation_id}")
print(f"State: {invocation['state']}")
for step_id, step_data in invocation.get('steps', {}).items():
step_state = step_data['state']
job_id = step_data.get('job_id')
if step_state == 'error':
print(f"\nFailed Step {step_id}:")
if job_id:
job = gi.jobs.show_job(job_id)
print(f" Tool: {job.get('tool_id')}")
print(f" Exit code: {job.get('exit_code')}")
print(f" Stderr:\n{job.get('stderr', 'N/A')}")Common Pitfalls
常见陷阱
-
Planemo failures vs Galaxy failures
- Planemo return code != 0: Workflow was NOT launched, no invocation exists
- Invocation state = 'failed': Workflow was launched but Galaxy job failed
- Don't confuse these two failure modes
-
Concurrent uploads
- Too many simultaneous uploads can overwhelm Galaxy
- Use max_concurrent limits (typically 3-5)
- Consider vs sequential
--simultaneous_uploads
-
Dataset state checking
- Don't invoke workflows before uploads complete
- Always wait for dataset state = 'ok'
-
History name conflicts
- Use unique history names (add timestamps or suffixes)
- Check for existing histories before creating
-
Return code interpretation
- shifts exit codes (exit 1 → return 256)
os.system() - Use to get actual exit code
return_code >> 8
-
Invocation ID recovery
- Terminal disconnection loses invocation ID
- Always save invocation IDs to file immediately
- Use with planemo
--test_output_json
-
Planemo失败与Galaxy失败的区别
- Planemo返回码≠0:工作流未启动,不存在调用记录
- 调用状态='failed':工作流已启动但Galaxy任务失败
- 不要混淆这两种失败模式
-
并发上传
- 过多并发上传可能导致Galaxy过载
- 使用max_concurrent限制(通常为3-5)
- 考虑与顺序上传的区别
--simultaneous_uploads
-
数据集状态检查
- 不要在上传完成前调用工作流
- 始终等待数据集状态变为'ok'
-
历史记录名称冲突
- 使用唯一的历史记录名称(添加时间戳或后缀)
- 创建前检查是否存在同名历史记录
-
返回码解读
- 会偏移退出码(退出1→返回256)
os.system() - 使用获取实际退出码
return_code >> 8
-
调用ID恢复
- 终端断开连接会丢失调用ID
- 始终立即将调用ID保存到文件
- 在Planemo中使用
--test_output_json
Best Practices Summary
最佳实践总结
- ✅ Use environment variables for API keys
- ✅ Mask API keys in logs and output
- ✅ Quote all file paths in shell commands
- ✅ Implement thread-safety for concurrent operations
- ✅ Save state frequently for resume capability
- ✅ Wait for dataset uploads before invoking workflows
- ✅ Poll invocation status with reasonable intervals (30-60s)
- ✅ Distinguish planemo failures from Galaxy failures
- ✅ Implement proper error handling and retry logic
- ✅ Use unique history names to avoid conflicts
- ✅ 使用环境变量存储API密钥
- ✅ 在日志和输出中掩码API密钥
- ✅ 在shell命令中为所有文件路径添加引号
- ✅ 为并发操作实现线程安全
- ✅ 频繁保存状态以支持恢复能力
- ✅ 等待数据集上传完成后再调用工作流
- ✅ 以合理的间隔(30-60秒)轮询调用状态
- ✅ 区分Planemo失败与Galaxy失败
- ✅ 实现适当的错误处理和重试逻辑
- ✅ 使用唯一的历史记录名称避免冲突
Related Skills
相关技能
- galaxy-tool-wrapping: For creating Galaxy tool wrappers
- galaxy-workflow-development: For creating Galaxy workflows
- vgp-pipeline: VGP-specific orchestration (uses this skill as dependency)
- galaxy-tool-wrapping: 用于创建Galaxy工具包装器
- galaxy-workflow-development: 用于创建Galaxy工作流
- vgp-pipeline: VGP特定编排(依赖本技能)
Resources
资源
- BioBlend Documentation: https://bioblend.readthedocs.io/
- Planemo Documentation: https://planemo.readthedocs.io/
- Galaxy API: https://docs.galaxyproject.org/en/master/api/
- Galaxy Training: https://training.galaxyproject.org/
- BioBlend文档: https://bioblend.readthedocs.io/
- Planemo文档: https://planemo.readthedocs.io/
- Galaxy API: https://docs.galaxyproject.org/en/master/api/
- Galaxy培训: https://training.galaxyproject.org/