From de4ccffcfaf45981527d51994968395f568bdf07 Mon Sep 17 00:00:00 2001 From: Keisuke Fukuda Date: Fri, 11 May 2018 17:17:26 +0900 Subject: [PATCH 1/6] added SIGCHLD handler to MPU to exit when a worker process exits becuase of an uncaught exception --- chainer/training/updaters/multiprocess_parallel_updater.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/chainer/training/updaters/multiprocess_parallel_updater.py b/chainer/training/updaters/multiprocess_parallel_updater.py index fb402bc4da2..c4b790f2b45 100644 --- a/chainer/training/updaters/multiprocess_parallel_updater.py +++ b/chainer/training/updaters/multiprocess_parallel_updater.py @@ -1,4 +1,5 @@ import multiprocessing +import signal import warnings import six @@ -18,6 +19,17 @@ import numpy +def _sigchld_handler(signo, stk): + import sys + sys.stderr.write("******************************************\n") + sys.stderr.write("Chainer multiprocess parallel updater: \n") + sys.stderr.write(" It seems that an uncaught exception in a worker process\n") + sys.stderr.write("******************************************\n") + sys.stderr.write("\n\n") + sys.stderr.flush() + sys.exit(1) + + class _Worker(multiprocessing.Process): def __init__(self, proc_id, pipe, master): @@ -178,6 +190,8 @@ def _send_message(self, message): pipe.send(message) def setup_workers(self): + signal.signal(signal.SIGCHLD, _sigchld_handler) + if self._initialized: return self._initialized = True From 9cbdfaa1e7705bd561dc3b858bf74f74c3a26b7b Mon Sep 17 00:00:00 2001 From: Keisuke Fukuda Date: Fri, 11 May 2018 19:02:48 +0900 Subject: [PATCH 2/6] improved the handler of SIGCHLD --- .../updaters/multiprocess_parallel_updater.py | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/chainer/training/updaters/multiprocess_parallel_updater.py b/chainer/training/updaters/multiprocess_parallel_updater.py index c4b790f2b45..a2a6783abd0 100644 --- a/chainer/training/updaters/multiprocess_parallel_updater.py +++ b/chainer/training/updaters/multiprocess_parallel_updater.py @@ -21,14 +21,18 @@ def _sigchld_handler(signo, stk): import sys - sys.stderr.write("******************************************\n") - sys.stderr.write("Chainer multiprocess parallel updater: \n") - sys.stderr.write(" It seems that an uncaught exception in a worker process\n") - sys.stderr.write("******************************************\n") - sys.stderr.write("\n\n") - sys.stderr.flush() - sys.exit(1) - + import os + + pid, stat = os.waitpid(-1, os.WNOHANG) + + if stat != 0: + sys.stderr.write("******************************************\n") + sys.stderr.write("Chainer multiprocess parallel updater: \n") + sys.stderr.write(" It seems that an uncaught exception in a worker process\n") + sys.stderr.write("******************************************\n") + sys.stderr.write("\n\n") + sys.stderr.flush() + exit(-1) class _Worker(multiprocessing.Process): From fbaf37602d7ce460cde61be38570cdd5c8d92140 Mon Sep 17 00:00:00 2001 From: Keisuke Fukuda Date: Fri, 11 May 2018 20:05:59 +0900 Subject: [PATCH 3/6] minor fix --- chainer/training/updaters/multiprocess_parallel_updater.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/chainer/training/updaters/multiprocess_parallel_updater.py b/chainer/training/updaters/multiprocess_parallel_updater.py index a2a6783abd0..1f5c60fbe06 100644 --- a/chainer/training/updaters/multiprocess_parallel_updater.py +++ b/chainer/training/updaters/multiprocess_parallel_updater.py @@ -32,7 +32,7 @@ def _sigchld_handler(signo, stk): sys.stderr.write("******************************************\n") sys.stderr.write("\n\n") sys.stderr.flush() - exit(-1) + sys.exit(-1) class _Worker(multiprocessing.Process): From 8a090af768d1641b8adb9b1cfc72ad41652e290b Mon Sep 17 00:00:00 2001 From: Keisuke Fukuda Date: Fri, 11 May 2018 20:14:15 +0900 Subject: [PATCH 4/6] Records child PIDs and check in SIGCHLD handler --- .../updaters/multiprocess_parallel_updater.py | 37 ++++++++++++---------- 1 file changed, 21 insertions(+), 16 deletions(-) diff --git a/chainer/training/updaters/multiprocess_parallel_updater.py b/chainer/training/updaters/multiprocess_parallel_updater.py index 1f5c60fbe06..b5c7cb2f673 100644 --- a/chainer/training/updaters/multiprocess_parallel_updater.py +++ b/chainer/training/updaters/multiprocess_parallel_updater.py @@ -19,21 +19,6 @@ import numpy -def _sigchld_handler(signo, stk): - import sys - import os - - pid, stat = os.waitpid(-1, os.WNOHANG) - - if stat != 0: - sys.stderr.write("******************************************\n") - sys.stderr.write("Chainer multiprocess parallel updater: \n") - sys.stderr.write(" It seems that an uncaught exception in a worker process\n") - sys.stderr.write("******************************************\n") - sys.stderr.write("\n\n") - sys.stderr.flush() - sys.exit(-1) - class _Worker(multiprocessing.Process): def __init__(self, proc_id, pipe, master): @@ -183,6 +168,7 @@ def __init__(self, iterators, optimizer, converter=convert.concat_examples, self._pipes = [] self._workers = [] + self._worker_pids = [] self.comm = None @staticmethod @@ -193,8 +179,25 @@ def _send_message(self, message): for pipe in self._pipes: pipe.send(message) + def _sigchld_handler(self, signo, stk): + import sys + import os + + pid, stat = os.waitpid(-1, os.WNOHANG) + + if pid in self._worker_pids and stat != 0: + sys.stderr.write("\n") + sys.stderr.write("*" * 70 + "\n") + sys.stderr.write("MultiprocessParallelUpdater: \n") + sys.stderr.write(" An uncaught exception occured in " + "a worker process (PID {})\n".format(pid)) + sys.stderr.write("*" * 70 + "\n") + sys.stderr.write("\n") + sys.stderr.flush() + sys.exit(-1) + def setup_workers(self): - signal.signal(signal.SIGCHLD, _sigchld_handler) + signal.signal(signal.SIGCHLD, self._sigchld_handler) if self._initialized: return @@ -208,6 +211,8 @@ def setup_workers(self): self._workers.append(worker) self._pipes.append(pipe) + self._worker_pids = [w.pid for w in self._workers] + with cuda.Device(self._devices[0]): self._master.to_gpu(self._devices[0]) if len(self._devices) > 1: From e742fd1b282a33a33e8b850e39d3c9c332d3d76a Mon Sep 17 00:00:00 2001 From: Keisuke Fukuda Date: Sat, 12 May 2018 14:38:49 +0900 Subject: [PATCH 5/6] minor fix --- .../updaters/multiprocess_parallel_updater.py | 45 ++++++++++++++-------- 1 file changed, 29 insertions(+), 16 deletions(-) diff --git a/chainer/training/updaters/multiprocess_parallel_updater.py b/chainer/training/updaters/multiprocess_parallel_updater.py index b5c7cb2f673..edbc3d869c1 100644 --- a/chainer/training/updaters/multiprocess_parallel_updater.py +++ b/chainer/training/updaters/multiprocess_parallel_updater.py @@ -1,5 +1,7 @@ import multiprocessing +import os import signal +import sys import warnings import six @@ -180,29 +182,39 @@ def _send_message(self, message): pipe.send(message) def _sigchld_handler(self, signo, stk): - import sys - import os - - pid, stat = os.waitpid(-1, os.WNOHANG) - - if pid in self._worker_pids and stat != 0: - sys.stderr.write("\n") - sys.stderr.write("*" * 70 + "\n") - sys.stderr.write("MultiprocessParallelUpdater: \n") - sys.stderr.write(" An uncaught exception occured in " - "a worker process (PID {})\n".format(pid)) - sys.stderr.write("*" * 70 + "\n") - sys.stderr.write("\n") - sys.stderr.flush() + # Catch SIGCHLD signal and abort the master process + # if necessary. If a worker process exits before the + # training process finishes, the whole training process + # will hang in the NCCL communicaton routine. + try: + # NOTE: os.waitpid() can take '-1' as pid argument + # only on Unix environment. + pid, stat = os.waitpid(-1, os.WNOHANG) + except OSError: + # os.waitpid() failed. Something is completely broken. sys.exit(-1) - def setup_workers(self): - signal.signal(signal.SIGCHLD, self._sigchld_handler) + if pid in self._worker_pids: + sig = stat & 0xff # Signal that the child process recieved. + ecode = stat >> 8 # Exit code (if sig == 0) + if sig != 0 or ecode != 0: + sys.stderr.write("\n") + sys.stderr.write("-" * 70 + "\n") + sys.stderr.write("MultiprocessParallelUpdater: \n") + sys.stderr.write(" worker (PID {}) ".format(pid) + + "exited abnormally.\n") + sys.stderr.write("-" * 70 + "\n") + sys.stderr.write("\n") + sys.stderr.flush() + sys.exit(-1) + def setup_workers(self): if self._initialized: return self._initialized = True + signal.signal(signal.SIGCHLD, self._sigchld_handler) + self._master.cleargrads() for i in six.moves.range(1, len(self._devices)): pipe, worker_end = multiprocessing.Pipe() @@ -211,6 +223,7 @@ def setup_workers(self): self._workers.append(worker) self._pipes.append(pipe) + # Record worker PIDs for SIGCHLD handler self._worker_pids = [w.pid for w in self._workers] with cuda.Device(self._devices[0]): From a6317e4686017b166189b9639975de7e2c8a847a Mon Sep 17 00:00:00 2001 From: Keisuke Fukuda Date: Sat, 12 May 2018 14:56:33 +0900 Subject: [PATCH 6/6] wait multiple child processes --- .../updaters/multiprocess_parallel_updater.py | 55 ++++++++++++++-------- 1 file changed, 35 insertions(+), 20 deletions(-) diff --git a/chainer/training/updaters/multiprocess_parallel_updater.py b/chainer/training/updaters/multiprocess_parallel_updater.py index edbc3d869c1..97f7033273f 100644 --- a/chainer/training/updaters/multiprocess_parallel_updater.py +++ b/chainer/training/updaters/multiprocess_parallel_updater.py @@ -186,27 +186,42 @@ def _sigchld_handler(self, signo, stk): # if necessary. If a worker process exits before the # training process finishes, the whole training process # will hang in the NCCL communicaton routine. - try: - # NOTE: os.waitpid() can take '-1' as pid argument - # only on Unix environment. - pid, stat = os.waitpid(-1, os.WNOHANG) - except OSError: - # os.waitpid() failed. Something is completely broken. - sys.exit(-1) - if pid in self._worker_pids: - sig = stat & 0xff # Signal that the child process recieved. - ecode = stat >> 8 # Exit code (if sig == 0) - if sig != 0 or ecode != 0: - sys.stderr.write("\n") - sys.stderr.write("-" * 70 + "\n") - sys.stderr.write("MultiprocessParallelUpdater: \n") - sys.stderr.write(" worker (PID {}) ".format(pid) + - "exited abnormally.\n") - sys.stderr.write("-" * 70 + "\n") - sys.stderr.write("\n") - sys.stderr.flush() - sys.exit(-1) + err = False # Any error happened in child processes + + while True: + # Call os.waitpid(). Note that there are possibly + # multiple child processes to be wait()-ed. + try: + # NOTE: os.waitpid() can take '-1' as pid argument + # only on Unix environment. + pid, stat = os.waitpid(-1, os.WNOHANG) + if pid == 0: + break + except OSError as e: + import errno + if e.errno == errno.ECHILD: # no more chld proc. + break + else: + # Something is broken. + sys.exit(-1) + + if pid in self._worker_pids: + sig = stat & 0xff # Signal that the child process recieved. + ecode = stat >> 8 # Exit code (if sig == 0) + if sig != 0 or ecode != 0: + err = True + + if err: + sys.stderr.write("\n") + sys.stderr.write("-" * 70 + "\n") + sys.stderr.write("MultiprocessParallelUpdater: \n") + sys.stderr.write(" Worker(s) exited abnormally." + " Exiting to avoid deadlock.\n") + sys.stderr.write("-" * 70 + "\n") + sys.stderr.write("\n") + sys.stderr.flush() + sys.exit(-1) def setup_workers(self): if self._initialized: