Добавил:
egrpleh
Опубликованный материал нарушает ваши авторские права? Сообщите нам.
Вуз:
Предмет:
Файл:
from matplotlib import pyplot as plt
from scipy import stats as st
import numpy as np
def ploting_erlang_distribution(k, lmbda, N=10000):
'''Построение графика плотности вероятности Эрланговского распределения'''
# Определяем границы построения графика
data = st.erlang.rvs(a=k, scale=1/lmbda, size=N)
x_min, x_max = min(data), max(data)
# Генерируем список значений x для построения графика
x = np.linspace(0 if x_min<=1 else x_min-1, x_max+1, 100)
# Функция плотности вероятности эрланговского распределения для заданных k и lambda
f = lambda x : ((lmbda**k) * (x**(k-1)) * np.exp(-lmbda*x)) / np.math.factorial(k-1)
# Построение графика
plt.figure(figsize=(9,7))
plt.plot(x, f(x), color='red')
plt.title('График плотности вероятности\nЭрланговского распределения\n' + \
'для k={}, lambda={}'.format(round(k, 4), round(lmbda, 4)), fontsize=16)
plt.xlabel("x")
plt.ylabel("y")
plt.show()
def ploting_uniform_distribution(a, b):
'''Построение графика плотности вероятности равномерного распределения'''
# Генерируем список значений x для построения графика
x = np.linspace(a - (b-a)/5, b + (b-a)/5, 100)
# Функция плотности вероятности равномерного распределения для заданных a и b
f = lambda x : [1/(b-a) if (a<=i) and (i<=b) else 0 for i in x]
# Построение графика
plt.figure(figsize=(9,7))
plt.plot(x, f(x), color='red')
plt.title('График плотности вероятности\nравномерного распределения\n' + \
'для a={}, b={}'.format(round(a, 4), round(b, 4)), fontsize=16)
plt.xlabel("x")
plt.ylabel("y")
plt.show()
def model(shape, lambd, a, b, mu_0, buffer_size=1, is_test=False):
'''Функция моделироваиня СМО'''
# Лямбда ф-ция получения случайного значения распределения
# Эрланговского для закона распределения входного потока заявок
get_t_request = lambda : np.random.exponential(1/lambd) if is_test \
else st.erlang.rvs(a=shape, scale=1/(lambd*shape))
# Равномерного для закона распределения времени обслуживания заявок
get_t_work = lambda : np.random.exponential(1/mu_0) if is_test \
else np.random.uniform(a, b)
# Количество заявок, поступивших / обслуженных / в буфере к данному моменту в СМО
n, k, m = 0, 0, 0
# Статус занятости ОУ
is_busy = False
# Системное время
t_system = 0
# Следующий момент поступления заявки
t_request = get_t_request()
# Следующий момент освобождения ОУ
t_work = t_request
# Список времени моментов поступления заявок
t_request_Data = np.array([])
# Список времени моментов освобождения ОУ
t_work_Data = np.array([])
q_Old = 2**32
q_Data = np.array([])
# Старая оценка среднего времени пребывания запроса
t_me_Old = 2 ** 32
# Счетчик итераций цикла
loop_cnt = 0
# Моделируем СМО
while True:
loop_cnt += 1 # Увеличиваем счетчик цикла
# Проверяем наступило ли время освобождения ОУ
if t_request <= t_work:
# Записываем в системное время момент поступления заявки
t_system = t_request
# Сохраняем в список момент поступления заявки
# t_request_Data = np.append(t_request_Data, t_request)
if not is_busy or m < buffer_size:
t_request_Data = np.append(t_request_Data, t_request)
# Создаем новое значение
t_request_New = get_t_request()
# Сохранение для посдчета интенсивности входного потока
q_Data = np.append(q_Data, t_request_New)
n += 1 # Увеличиваем счетчик поступивших заявок
# Проверяем занято ли ОУ
if not is_busy:
is_busy = True # Устанавливаем статус ОУ в положение занято
# Вычисляем следующий момент освобождения ОУ
t_work = t_system + get_t_work()
# Добавлием заявку в буфер, только если в нем есть свободное место
elif m < buffer_size:
m += 1
# else: print('ОТКАЗ', not is_busy or m < buffer_size)
# Вычисляем следующий момент поступления заявки, только если заявка попала в буфер
t_request = t_system + t_request_New
else:
# Записываем в системное время момент освобождения ОУ
t_system = t_work
k += 1 # Увеличиваем счетчик обслуженных заявок
# Сохраняем в список момент освобождения ОУ
t_work_Data = np.append(t_work_Data, t_work)
# Проверяем есть ли заявки в буфере
if m > 0:
m -= 1 # Берем заявку из буфера
# Вычисляем следующий момент освобождения ОУ
t_work = t_system + get_t_work()
else:
# Устанавливаем статус ОУ в положение свободно
is_busy = False
# Устанавливаем следующий момент освобождения ОУ
t_work = t_request
# Проводим отценку каждые 1000 итераций цикла
if loop_cnt % 1000 == 0:
# Высчитываем среднее время обслуживания заявки
t_me_New = np.mean(t_work_Data - t_request_Data[:len(t_work_Data)])
# Расчет оценки производительности системы
q_New = (k/n) / np.mean(q_Data)
# Проверка достаточности условия выхода
if np.abs((t_me_New - t_me_Old)/t_me_Old) < 0.01 \
or np.abs((q_New - q_Old)/q_Old) < 0.001: return t_me_New, q_New
q_Old = q_New
t_me_Old = t_me_New # Перезаписываем среднее время обслуживания заявки
def modeling_experimental_dependence(shape, mu_0, buffer_size = 10000, is_test = False):
'''Функция построения графиков для модели СМО'''
# Генерация списка значений интенсивности входного потока
lambd = np.linspace(0.1,1,10) * mu_0
# Расчет границ для равномерного распределения
a = 1 / mu_0 - 0.05 * mu_0
b = a + 0.1 * mu_0
# Список среднего времени пребывания запроса
t_empirical = np.array([])
# Список оценки производительности системы
q_empirical = np.array([])
# Заполнение списка значений среднего времени пребывания запроса
for l in lambd:
t_empirical_val, q_empirical_val = model(shape, l, a, b, mu_0, buffer_size, is_test)
t_empirical = np.append(t_empirical, t_empirical_val)
q_empirical = np.append(q_empirical, q_empirical_val)
# print('---------------------------------------------------------------') # TEST PRINT TEST
print('t_empirical', t_empirical, '\nq_empirical', q_empirical, end='\n\n') # TEST PRINT TEST
if not is_test:
# Построение графика эрланговского распределения
# для закона распределения входного потока заявок
ploting_erlang_distribution(k=shape, lmbda=1/(np.mean(lambd)*shape), N=10000)
# Построение графика равномерного распределения
# для закона распределения времени обслуживания заявок
ploting_uniform_distribution(a=a, b=b)
# Создаем объекты графиков
fig, axis = plt.subplots(1, 2, figsize=(15, 7))
fig.suptitle('Статистика для ЭСМО c буффером размера {}'.format(buffer_size), fontsize=16)
# Построение графика
if is_test:
lambd = lambd[lambd != mu_0]
#Теор.знач. коэф загрузки системы
Rho = lambd / mu_0
# Расчет теор. значения среднего времени пребывания запроса в системе
t_theor = [ 1/lambd[j] * sum( (i * (1-Rho[j]) * Rho[j]**i) / (1 - Rho[j]**(buffer_size+1)) for i in range(buffer_size+2) ) for j in range(len(lambd))]
#Теор.знач. вероятности потери запроса
p_Fail = ((1-Rho) * Rho**buffer_size) / (1-Rho**(buffer_size+1))
#Теор.знач. производительности ситсемы
q_theor = lambd * (1-p_Fail)
# Построение графика георитически расчитанного значения
axis[0].plot(lambd[:len(q_theor)], q_theor, label='Теоретическое значение', color='red')
# Построение графика теоритически расчитанного значения
axis[1].plot(lambd[:len(t_theor)], t_theor, label='Теоретическое значение', color='red')
# Построение графика экспериментально полученного значения
axis[0].plot(lambd[:9], q_empirical[:9], label='Экспериментальное значение', color='black')
axis[0].set_title("График зависимости производительность системы\nот интенсивности входного потока")
axis[0].set_ylabel('Производительность системы')
# Построение графика экспериментально полученного значения
axis[1].plot(lambd[:9], t_empirical[:9], label='Экспериментальное значение', color='black')
axis[1].set_title("График зависимости среднего времени пребывания запроса\nот интенсивности входного потока")
axis[1].set_ylabel('Ср. t в системе')
for ax in axis.flat:
ax.set_xlabel('Инт. входного потока')
ax.grid()
ax.legend()
plt.show()
return t_empirical
def main():
# Вариант 10
shape, mu_0, N = 6, 5, 3
# Моделирование работы СМО
modeling_experimental_dependence(shape, mu_0, buffer_size=N, is_test=True)
modeling_experimental_dependence(shape, mu_0, buffer_size=N, is_test=False)
return 0
if __name__ == "__main__":
main()
Соседние файлы в папке ЛР5