Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: model pool in each worker #342

Open
wants to merge 2 commits into
base: develop
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 16 additions & 23 deletions handyrl/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def __init__(self, args, conn, wid):
self.worker_id = wid
self.args = args
self.conn = conn
self.latest_model = -1, None
self.model_pool = {}

self.env = make_env({**args['env'], 'id': wid})
self.generator = Generator(self.env, self.args)
Expand All @@ -41,27 +41,20 @@ def __del__(self):
print('closed worker %d' % self.worker_id)

def _gather_models(self, model_ids):
model_pool = {}
for model_id in model_ids:
if model_id not in model_pool:
if model_id < 0:
model_pool[model_id] = None
elif model_id == self.latest_model[0]:
# use latest model
model_pool[model_id] = self.latest_model[1]
else:
# get model from server
model = pickle.loads(send_recv(self.conn, ('model', model_id)))
if model_id == 0:
# use random model
self.env.reset()
obs = self.env.observation(self.env.players()[0])
model = RandomModel(model, obs)
model_pool[model_id] = ModelWrapper(model)
# update latest model
if model_id > self.latest_model[0]:
self.latest_model = model_id, model_pool[model_id]
return model_pool
if model_id is not None and model_id >= 0 and model_id not in self.model_pool:
# get model from server
model = pickle.loads(send_recv(self.conn, ('model', model_id)))
if model_id == 0:
# use random model
self.env.reset()
obs = self.env.observation(self.env.players()[0])
model = RandomModel(model, obs)
# update latest model
if len(self.model_pool) >= 1:
oldest_model_id = list(self.model_pool.keys())[0]
self.model_pool.pop(oldest_model_id)
self.model_pool[model_id] = ModelWrapper(model)

def run(self):
while True:
Expand All @@ -73,11 +66,11 @@ def run(self):
models = {}
if 'model_id' in args:
model_ids = list(args['model_id'].values())
model_pool = self._gather_models(model_ids)
self._gather_models(model_ids)

# make dict of models
for p, model_id in args['model_id'].items():
models[p] = model_pool[model_id]
models[p] = self.model_pool.get(model_id, None)

if role == 'g':
episode = self.generator.execute(models, args)
Expand Down