From 2529cd773d6a42db2350ac72b6b75a5fcedb4bcb Mon Sep 17 00:00:00 2001 From: YuriCat Date: Sun, 30 Jan 2022 10:00:13 +0900 Subject: [PATCH] feature: remove entry server and use same worker server to entry --- handyrl/worker.py | 26 ++++++++++---------------- 1 file changed, 10 insertions(+), 16 deletions(-) diff --git a/handyrl/worker.py b/handyrl/worker.py index 796196df..8967729e 100755 --- a/handyrl/worker.py +++ b/handyrl/worker.py @@ -191,39 +191,32 @@ def __init__(self, args): def run(self): # prepare listening connections - def entry_server(port): - print('started entry server %d' % port) + def worker_server(port): + print('started worker server %d' % port) conn_acceptor = accept_socket_connections(port=port, timeout=0.3) while not self.shutdown_flag: conn = next(conn_acceptor) - if conn is not None: - worker_args = conn.recv() + if conn is None: + continue + type, worker_args = conn.recv() + if type == 'entry': print('accepted connection from %s!' % worker_args['address']) args = copy.deepcopy(self.args) args['worker'] = worker_args conn.send(args) conn.close() - print('finished entry server') - - def worker_server(port): - print('started worker server %d' % port) - conn_acceptor = accept_socket_connections(port=port, timeout=0.3) - while not self.shutdown_flag: - conn = next(conn_acceptor) - if conn is not None: + else: self.add_connection(conn) print('finished worker server') # use thread list of super class - self.threads.append(threading.Thread(target=entry_server, args=(9999,))) self.threads.append(threading.Thread(target=worker_server, args=(9998,))) - self.threads[-2].start() self.threads[-1].start() def entry(worker_args): - conn = connect_socket_connection(worker_args['server_address'], 9999) - conn.send(worker_args) + conn = connect_socket_connection(worker_args['server_address'], 9998) + conn.send(('entry', worker_args)) args = conn.recv() conn.close() return args @@ -247,6 +240,7 @@ def run(self): try: for i in range(self.args['num_gathers']): conn = connect_socket_connection(self.args['server_address'], 9998) + conn.send(('worker', None)) p = mp.Process(target=gather_loop, args=(args, conn, i)) p.start() conn.close()