'simpy resource availability schedule
I'm using SimPy in Python to create a Discrete Event Simulation that requires resources to be available based on a schedule input by the user in my case in a csv file. The aim is to represent different numbers of the same resource (e.g. staff) being available at different times of day. As far I as I can tell this isn't something that is available in base SimPy - like resource priorities.
I have managed to get this working and have included the code below to show how. However I wanted to ask the community if there is a better way to achieve this functionality in SimPy?
The code below works by requesting the resources at the start of each day for the times they are not supposed to be available - with a much higher priority to ensure they get the resource. The resources are then released at the appropriate times for use by other events/processes. As I say it works but seems wasteful with a lot of dummy processes working to ensure the correct true availability of resources. Any comments which would lead to improvements would be welcomed.
so the csv looks like:
Number time
0 23
50 22
100 17
50 10
20 8
5 6
where Number represents the number of staff that are the become available at the defined time. For example: There will be 5 staff from 6-8, 20 from 8-10, 50 from 10-17 and so on until the end of the day.
The code:
import csv
import simpy
# empty list ready to hold the input data in the csv
input_list = []
# a dummy process that "uses" staff until the end of the current day
def take_res():
req = staff.request(priority=-100)
yield req # Request a staff resource at set priority
yield test_env.timeout(24 - test_env.now)
# A dummy process that "uses" staff for the time those staff should not
# be available for the real processes
def request_res(delay, avail_time):
req = staff.request(priority=-100)
yield req # Request a staff resource at set priority
yield test_env.timeout(delay)
yield staff.release(req)
# pass time it is avail for
yield test_env.timeout(avail_time)
test_env.process(take_res())
# used to print current levels of resource usage
def print_usage():
print('At time %0.2f %d res are in use' % (test_env.now, staff.count))
yield test_env.timeout(0.5)
test_env.process(print_usage())
# used to open the csv and read the data into a list
with open('staff_schedule.csv', mode="r") as infile:
reader = csv.reader(infile)
next(reader, None) # ignore header
for row in reader:
input_list.append(row[:2])
# calculates the time the current number of resources will be
# available for and adds to the list
i = 0
for row in the_list:
if i == 0:
row.append(24 - int(input_list[i][1]))
else:
row.append(int(input_list[i-1][1]) - int(input_list[i][1]))
i += 1
# converts list to tuple of tuples to prevent any accidental
# edits from this point in
staff_tuple = tuple(tuple(row) for row in input_list)
print(staff_tuple)
# define environment and creates resources
test_env = simpy.Environment()
staff = simpy.PriorityResource(test_env, capacity=sum(int(l[0]) for l in staff_tuple))
# for each row in the tuple run dummy processes to hold resources
# according to schedule in the csv
for item in the_tuple:
print(item[0])
for i in range(int(item[0])):
test_env.process(request_res(int(item[1]), int(item[2])))
# run event to print usage over time
test_env.process(print_usage())
# run for 25 hours - so 1 day
test_env.run(until=25)
Solution 1:[1]
This is how I solved it for my application. It's not perfect but was the best I could do given my basic level of skill with Python and SimPy.
The result is the correct number of Advisers are available at the desired times.
First I define a store and set the capacity to be equal to the total number of adviser instances that will exist within the simulation.
self.adviser_store = simpy.FilterStore(self.env,
capacity=self.total_ad_instances)
The instances of the Adviser class required are created in an initialization step which for brevity I have not included. I actually use a JSON file to customize the individual adviser instances which are then placed in a list.
The run parameter in the class definition below is actually another class that contains all info related to the current run of the simulation - so for example it contains the start and end dates for the simulation. self.start_date therefore defines the date that adviser starts working. self.run.start_date is the start date for the simulation.
class Adviser(object):
def __init__(self, run, id_num, start_time, end_time, start_date, end_date):
self.env = run.env
self.run = run
self.id_num = id_num
self.start_time = start_time
self.end_time = end_time
self.start_date = datetime.datetime.strptime(start_date, '%Y, %m, %d')
self.end_date = datetime.datetime.strptime(end_date, '%Y, %m, %d')
self.ad_type = ad_type
self.avail = False
self.run.env.process(self.set_availability())
So as you can see creating the adviser class also starts the process to set the availability. In the example below I've simplified it to set the same availability each day for a given date range. You could of course set different availabilities depending on date/day etc.
def set_availability(self):
start_delay = self.start_time + (self.start_date - self.run.start_date).total_seconds()/3600 # this returns the time in hours until the resource becomes available and is applied below.
end_delay = self.end_time + (self.start_date - self.run.start_date).total_seconds()/3600
repeat = (self.end_date - self.start_date).days + 1 # defines how man days to repeat it for
for i in range(repeat):
start_delayed(self.run.env, self.add_to_store(), start_delay)
start_delayed(self.run.env, self.remove_from_store(), end_delay)
start_delay += 24
end_delay += 24
yield self.run.env.timeout(0)
def add_to_store(self):
self.run.ad_avail.remove(self) # take adviser from a list
self.run.adviser_store.put(self) # and put it in the store
yield self.run.env.timeout(0)
def remove_from_store(self):
current_ad = yield self.run.adviser_store.get(lambda item: item.id_num == self.id_num) # get itself from the store
self.run.ad_avail.append(current_ad) # and put it back in the list
yield self.run.env.timeout(0)
So essentially customers can only request advisers from the store and the advisers will only be in the store at certain times. the rest of the time they are in the list attached to the current run of the simulation.
I think there is still a pitfall here. The adviser object may be in use when it is due to become unavailable. I haven't noticed if this happens as yet or the impact if it does.
Solution 2:[2]
I tried something else, I overloaded the Resource class, only adding one method and while I don't fully understand the source code, it seems to work properly. You can tell the resource to change the capacity somewhere in your simulation.
from simpy.resources.resource import Resource, Request, Release
from simpy.core import BoundClass
from simpy.resources.base import BaseResource
class VariableResource(BaseResource):
def __init__(self, env, capacity):
super(VariableResource, self).__init__(env, capacity)
self.users = []
self.queue = self.put_queue
@property
def count(self):
return len(self.users)
request = BoundClass(Request)
release = BoundClass(Release)
def _do_put(self, event):
if len(self.users) < self.capacity:
self.users.append(event)
event.usage_since = self._env.now
event.succeed()
def _do_get(self, event):
try:
self.users.remove(event.request)
except ValueError:
pass
event.succeed()
def _change_capacity(self, capacity):
self._capacity = capacity
I think this should work, but I'm not a 100% confident about how the triggers work.
Solution 3:[3]
I solved creating a Resource for each time window. Each arrival is processed in the function service, and each customer will be assigned to a resource depending on the arrival time. In case a customer has to wait in queue and has to be re-asigned to the next time window, it is removed from current Resource and re-assigned to the next Resource. This is done by modifying the request as:
with self.Morning.request() as req1:
yield req1 | self.env.timeout(self.durationMorning)
The code:
import simpy
import numpy as np
import itertools
class Queue():
def __init__(self, env, N_m, N_e):
self.Arrival = {}
self.StartService = {}
self.FinishService = {}
self.Morning = simpy.Resource(env, N_m)
self.Evening = simpy.Resource(env, N_e)
self.env = env
self.durationMorning = 30
#arrivals/second
def t_arrival(self,t):
if t<self.durationMorning:
return 1
else:
return 2
def t_service(self):
return 5
def service(self,i):
arrival_time = self.env.now
if arrival_time==self.durationMorning:
yield self.env.timeout(0.0001)
# Add Arrival
system.Arrival[i] = arrival_time
# Morning shift
if self.env.now < self.durationMorning:
with self.Morning.request() as req1:
yield req1 | self.env.timeout(self.durationMorning)
if self.env.now < self.durationMorning:
system.StartService[i] = self.env.now
yield self.env.timeout(self.t_service())
print(f'{i} arrived at {self.Arrival[i]} done at {self.env.now} by 1')
self.FinishService[i] = self.env.now
# Evening shift
if (self.env.now >= self.durationMorning) & (i not in self.FinishService):
with self.Evening.request() as req2:
yield req2
system.StartService[i] = self.env.now
yield self.env.timeout(self.t_service())
print(f'{i} arrived at {self.Arrival[i]} done at {self.env.now} by 2')
self.FinishService[i] = self.env.now
def arrivals(self):
for i in itertools.count():
self.env.process(self.service(i))
t = self.t_arrival(self.env.now)
yield self.env.timeout(t)
env = simpy.Environment()
system = Queue(env, N_morning, N_evening)
system.env.process(system.arrivals())
system.env.run(until=60)
0 arrived at 0 done at 5 by 1
1 arrived at 1 done at 6 by 1
2 arrived at 2 done at 10 by 1
3 arrived at 3 done at 11 by 1
4 arrived at 4 done at 15 by 1
5 arrived at 5 done at 16 by 1
6 arrived at 6 done at 20 by 1
7 arrived at 7 done at 21 by 1
8 arrived at 8 done at 25 by 1
9 arrived at 9 done at 26 by 1
10 arrived at 10 done at 30 by 1
11 arrived at 11 done at 31 by 1
12 arrived at 12 done at 35 by 2
13 arrived at 13 done at 40 by 2
14 arrived at 14 done at 45 by 2
15 arrived at 15 done at 50 by 2
16 arrived at 16 done at 55 by 2
Solution 4:[4]
SimPy related
Maybe you can use PreemptiveResource
(see this example). With this, you would only need one blocker-process per resource as it can just "kick" less important processes.
Python related
- Document your code. What’s the purpose of
take_res()
andrequest_res()
? (Why do both functions usepriority=-100
, anyway?) - Use better names.
the_list
orthe_tuple
is not very helpful. - Instead of
the_list.append(row[0], row[1])
you can dothe_list.append(row[:2])
. - Why do you convert the list of lists into a tuple of tuples? As far as I can see the benefit. But it adds extra code and thus, extra confusion and expra possibilities for programming errors.
- You should leave the
with open(file)
block as soon as possible (after the first four lines, in your case). There’s no need to keep the file open longer than necessary and when you are done iterating over all lines, you no longer need it.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | Pete |
Solution 2 | Jan van der Vegt |
Solution 3 | Joan Ventura |
Solution 4 | Stefan Scherfke |