Loading...
Loading...
Process-based discrete-event simulation framework in Python. Use this skill when building simulations of systems with processes, queues, resources, and time-based events such as manufacturing systems, service operations, network traffic, logistics, or any system where entities interact with shared resources over time.
npx skill4agent add davila7/claude-code-templates simpyimport simpy
def process(env, name):
"""A simple process that waits and prints."""
print(f'{name} starting at {env.now}')
yield env.timeout(5)
print(f'{name} finishing at {env.now}')
# Create environment
env = simpy.Environment()
# Start processes
env.process(process(env, 'Process 1'))
env.process(process(env, 'Process 2'))
# Run simulation
env.run(until=10)import simpy
def customer(env, name, resource):
"""Customer requests resource, uses it, then releases."""
with resource.request() as req:
yield req # Wait for resource
print(f'{name} got resource at {env.now}')
yield env.timeout(3) # Use resource
print(f'{name} released resource at {env.now}')
env = simpy.Environment()
server = simpy.Resource(env, capacity=1)
env.process(customer(env, 'Customer 1', server))
env.process(customer(env, 'Customer 2', server))
env.run()import simpy
# Standard environment (runs as fast as possible)
env = simpy.Environment(initial_time=0)
# Real-time environment (synchronized with wall-clock)
import simpy.rt
env_rt = simpy.rt.RealtimeEnvironment(factor=1.0)
# Run simulation
env.run(until=100) # Run until time 100
env.run() # Run until no events remainyielddef my_process(env, param1, param2):
"""Process that yields events to pause execution."""
print(f'Starting at {env.now}')
# Wait for time to pass
yield env.timeout(5)
print(f'Resumed at {env.now}')
# Wait for another event
yield env.timeout(3)
print(f'Done at {env.now}')
return 'result'
# Start the process
env.process(my_process(env, 'value1', 'value2'))env.timeout(delay)resource.request()env.event()env.process(func())event1 & event2event1 | event2references/resources.md| Resource Type | Use Case |
|---|---|
| Resource | Limited capacity (servers, machines) |
| PriorityResource | Priority-based queuing |
| PreemptiveResource | High-priority can interrupt low-priority |
| Container | Bulk materials (fuel, water) |
| Store | Python object storage (FIFO) |
| FilterStore | Selective item retrieval |
| PriorityStore | Priority-ordered items |
import simpy
env = simpy.Environment()
# Basic resource (e.g., servers)
resource = simpy.Resource(env, capacity=2)
# Priority resource
priority_resource = simpy.PriorityResource(env, capacity=1)
# Container (e.g., fuel tank)
fuel_tank = simpy.Container(env, capacity=100, init=50)
# Store (e.g., warehouse)
warehouse = simpy.Store(env, capacity=10)import simpy
import random
def customer(env, name, server):
arrival = env.now
with server.request() as req:
yield req
wait = env.now - arrival
print(f'{name} waited {wait:.2f}, served at {env.now}')
yield env.timeout(random.uniform(2, 4))
def customer_generator(env, server):
i = 0
while True:
yield env.timeout(random.uniform(1, 3))
i += 1
env.process(customer(env, f'Customer {i}', server))
env = simpy.Environment()
server = simpy.Resource(env, capacity=2)
env.process(customer_generator(env, server))
env.run(until=20)import simpy
def producer(env, store):
item_id = 0
while True:
yield env.timeout(2)
item = f'Item {item_id}'
yield store.put(item)
print(f'Produced {item} at {env.now}')
item_id += 1
def consumer(env, store):
while True:
item = yield store.get()
print(f'Consumed {item} at {env.now}')
yield env.timeout(3)
env = simpy.Environment()
store = simpy.Store(env, capacity=10)
env.process(producer(env, store))
env.process(consumer(env, store))
env.run(until=20)import simpy
def task(env, name, duration):
print(f'{name} starting at {env.now}')
yield env.timeout(duration)
print(f'{name} done at {env.now}')
return f'{name} result'
def coordinator(env):
# Start tasks in parallel
task1 = env.process(task(env, 'Task 1', 5))
task2 = env.process(task(env, 'Task 2', 3))
task3 = env.process(task(env, 'Task 3', 4))
# Wait for all to complete
results = yield task1 & task2 & task3
print(f'All done at {env.now}')
env = simpy.Environment()
env.process(coordinator(env))
env.run()def entity_process(env, name, resources, parameters):
# Arrival logic
arrival_time = env.now
# Request resources
with resource.request() as req:
yield req
# Service logic
service_time = calculate_service_time(parameters)
yield env.timeout(service_time)
# Departure logic
collect_statistics(env.now - arrival_time)references/monitoring.mdfrom scripts.resource_monitor import ResourceMonitor
# Create and monitor resource
resource = simpy.Resource(env, capacity=2)
monitor = ResourceMonitor(env, resource, "Server")
# After simulation
monitor.report()# Run simulation
env.run(until=simulation_time)
# Generate reports
monitor.report()
stats.report()
# Export data for further analysis
monitor.export_csv('results.csv')references/process-interaction.mdreferences/real-time.mdimport simpy.rt
env = simpy.rt.RealtimeEnvironment(factor=1.0) # 1:1 time mapping
# factor=0.5 means 1 sim unit = 0.5 seconds (2x faster)references/monitoring.mdfrom scripts.basic_simulation_template import SimulationConfig, run_simulation
config = SimulationConfig()
config.num_resources = 2
config.sim_time = 100
stats = run_simulation(config)
stats.report()ResourceMonitorMultiResourceMonitorContainerMonitorfrom scripts.resource_monitor import ResourceMonitor
monitor = ResourceMonitor(env, resource, "My Resource")
# ... run simulation ...
monitor.report()
monitor.export_csv('data.csv')references/resources.mdreferences/events.mdreferences/process-interaction.mdreferences/monitoring.mdreferences/real-time.mdyieldwith resource.request() as req:random.seed()