研究室セミナー | 汎用的なシミュレータ
汎用的なシミュレーターの設計 from Takaaki Sawa
汎用的なシミュレーターの設計 from Takaaki Sawa
最近, cloudnetという学会に論文を投稿したのだが, そこでデータを集める際に制作したシミュレータがいい感じにできたのでそれについて書いてみる。 https://github.com/adshidtadka/server-allocation Parameterクラス まず良かった点としてParameter クラスを作ったことである。 import numpy as np import pandas as pd import itertools import sys import Constant class Parameter: USER_NUM_CONST = 500 SERVER_NUM_CONST = 10 CAPACITY_CONST = 50 def __init__(self, seed): np.random.seed(seed) self.USER_NUM = 500 self.SERVER_NUM = 10 self.DELAY_MAX = 10 self.DELAY_SERVER = 1 self.CAPACITY = 50 def create_input(self): # inputs self.e_u = list(itertools.product(range(self.USER_NUM), range(self.SERVER_NUM))) self.e_s = list(itertools.combinations(list(range(0, self.SERVER_NUM)), 2)) self.d_us = np.random.randint(1, self.DELAY_MAX, (self.USER_NUM, self.SERVER_NUM)) self.m_s = np.full(self.SERVER_NUM, self.CAPACITY) def set_param(self, var_name, consts, var): if var_name == 'user': self.USER_NUM = var self.SERVER_NUM = consts['server'] self.CAPACITY = consts['capacity'] elif var_name == 'server': self.USER_NUM = consts['user'] self.SERVER_NUM = var self.CAPACITY = consts['capacity'] elif var_name == 'capacity': self.USER_NUM = consts['user'] self.SERVER_NUM = consts['server'] self.CAPACITY = var else: sys.exit('invalid var_name = ' + str(var_name)) def get_const(var_name): if var_name == 'user': return Parameter.USER_NUM_CONST elif var_name == 'server': return Parameter.SERVER_NUM_CONST elif var_name == 'capacity': return Parameter.CAPACITY_CONST else: sys.exit('invalid var_name = ' + str(var_name)) これを作ることで1つのインスタンスが1つのパラメータに対応することになるので, 全部グローバル変数で書いていた前回のシミュレータよりめちゃめちゃわかりやすくなった。また, インスタンスメソッドを呼び出してパラ調整がしやすく, メソッドとしてパッケージ化することでエラーを極力避けれるようになった。 IlpクラスとMmdクラス class Ilp: def __init__(self, param): self.create_dataframe(param) self.set_input() def set_input(self): # optimization problem problem = LpProblem() # decision variables self.df_e_u['variable'] = [LpVariable('x_us%d' % i, cat=LpBinary) for i in self.df_e_u.index] self.df_e_s['variable'] = [LpVariable('x_st%d' % i, cat=LpBinary) for i in self.df_e_s.index] self.df_v_s['variable'] = [LpVariable('y%d' % i, cat=LpBinary) for i in self.df_v_s.index] self.D_u = LpVariable('D_u', cat=LpInteger) self.D_s = LpVariable('D_s', cat=LpInteger) # objective function problem += 2 * self.D_u + self.D_s self.problem = problem def create_dataframe(self, param): # dataframe for E_U df_e_u = pd.DataFrame([(i, j) for i, j in param.e_u], columns=['user', 'server']) df_e_u['delay'] = param.d_us.flatten() self.df_e_u = df_e_u # dataframe for E_S df_e_s = pd.DataFrame([(i, j) for i, j in param.e_s], columns=['server_1', 'server_2']) df_e_s['delay'] = param.DELAY_SERVER self.df_e_s = df_e_s # dataframe for V_S df_v_s = pd.DataFrame(list(range(0, param.SERVER_NUM)), columns=['server']) df_v_s['capacity'] = param.m_s self.df_v_s = df_v_s def solve_by_ilp(self, solver=None): t_0 = time.perf_counter() # solve try: # constraints self.problem = self.create_constraints(self.problem) if solver == 'cplex': self.problem.solve(GLPK_CMD(msg=0)) else: self.problem.solve(CPLEX_CMD(msg=0)) except PulpSolverError: print(CPLEX_CMD().path, 'is not installed') t_1 = time.perf_counter() return t_1 - t_0 def print_result(self): if self.problem.status == 1: print('objective function is ', value(self.problem.objective)) print('cpu time is ' + str(self.cpu_time) + ' sec') else: print('status code is ', self.problem.status) def create_constraints(self, m): # constraints # (1b) for k, v in self.df_e_u.groupby('user'): m += lpSum(v.variable) == 1 # (1c) for k, v in self.df_e_u.groupby('server'): m += lpSum(v.variable) <= self.df_v_s['capacity'][k] # (1d) for k, v in self.df_e_u.iterrows(): m += v.variable * v.delay <= self.D_u # (1e) for k, v in self.df_e_s.iterrows(): m += v.variable * v.delay <= self.D_s # (1f) for k, v in self.df_e_u.groupby('user'): for l, w in self.df_v_s.iterrows(): m += w.variable >= v.variable # (1g) for k, v in self.df_e_s.iterrows(): m += self.df_v_s.iloc[v.server_1].variable + \ self.df_v_s.iloc[v.server_2].variable - 1 <= v.variable # (1h) for k, v in self.df_e_s.iterrows(): m += v.variable <= self.df_v_s.iloc[v.server_1].variable # (1i) for k, v in self.df_e_s.iterrows(): m += v.variable <= self.df_v_s.iloc[v.server_2].variable return m class Mmd: def __init__(self, param): self.set_input(param) def set_input(self, param): # edges list edges = np.empty(3, dtype=int) for k, v in enumerate(param.d_us): for i, j in enumerate(v): edges = np.vstack((edges, np.array([k, i, j]))) self.edges = edges def start_algorithm(self, param): t_0 = time.perf_counter() L_1 = self.one_server_case(param) L_2 = self.multiple_server_case(param) D_u = min([L_1, L_2]) if D_u > param.DELAY_MAX: self.status = False else: self.status = True self.objective_function = D_u * 2 + param.DELAY_SERVER t_1 = time.perf_counter() return t_1 - t_0 def one_server_case(self, param): # step 1: consider one server case # allocate all user and get L_1 dic_l = dict() for k, v in enumerate(param.m_s): if v >= param.USER_NUM: D_u = param.d_us[:, k].max() dic_l[k] = D_u # search minimum D_u if bool(dic_l): return min(dic_l.values()) else: return Constant.INF def multiple_server_case(self, param): # step 2: consider multiple server case # initialize the bipartite graph added_server = param.SERVER_NUM for k, v in enumerate(param.m_s): for j in range(param.USER_NUM): delay = param.d_us[j][k] for i in range(added_server, added_server + v - 1): self.edges = np.vstack((self.edges, np.array([j, i, delay]))) added_server += v - 1 param.COPY_SERVER_NUM = added_server # search matching for i in range(1, param.DELAY_MAX): hc = HopcroftKarp(param.USER_NUM, param.COPY_SERVER_NUM) for j in np.where(self.edges[:, -1] <= i)[0]: hc.add_edge(self.edges[j][0], self.edges[j][1]) if hc.flow() == param.USER_NUM: return i return Constant.INF def print_result(self): if self.status: print('objective function is ', str(self.objective_function)) print('cpu time is ' + str(self.cpu_time) + ' sec') else: print('Error') inputを入れればoutputが出るツールとして2つのクラスを作った。これにさっき定義したParameterをそのままいれればいいので, ブラックボックスとして扱える。set_inputなんかのメソッドを共通化してもよかった。また Ilp ではPulpを使ってソルバを簡単に切り替えられるようにした。 ...