# SuperFastPython.com # example of logging from multiple workers in the multiprocessing pool from random import random from time import sleep from multiprocessing import current_process from multiprocessing import Pool from multiprocessing import Queue from multiprocessing import Manager from logging.handlers import QueueHandler import logging # executed in a process that performs logging def logger_process(queue): # create a logger logger = logging.getLogger('app') # configure a stream handler logger.addHandler(logging.StreamHandler()) # log all messages, debug and up logger.setLevel(logging.DEBUG) # report that the logger process is running logger.info(f'Logger process running.') # run forever while True: # consume a log message, block until one arrives message = queue.get() # check for shutdown if message is None: logger.info(f'Logger process shutting down.') break # log the message logger.handle(message) # task to be executed in child processes def task(queue): # create a logger logger = logging.getLogger('app') # add a handler that uses the shared queue logger.addHandler(QueueHandler(queue)) # log all messages, debug and up logger.setLevel(logging.DEBUG) # get the current process process = current_process() # report initial message logger.info(f'Child {process.name} starting.') # simulate doing work for i in range(5): # report a message logger.debug(f'Child {process.name} step {i}.') # block sleep(random()) # report final message logger.info(f'Child {process.name} done.') # protect the entry point if __name__ == '__main__': # create the manager with Manager() as manager: # create the shared queue and get the proxy object queue = manager.Queue() # create a logger logger = logging.getLogger('app') # add a handler that uses the shared queue logger.addHandler(QueueHandler(queue)) # log all messages, debug and up logger.setLevel(logging.DEBUG) # create the process pool with default configuration with Pool() as pool: # issue a long running task to receive logging messages _ = pool.apply_async(logger_process, args=(queue,)) # report initial message logger.info('Main process started.') # issue task to the process pool results = [pool.apply_async(task, args=(queue,)) for i in range(5)] # wait for all issued tasks to complete logger.info('Main process waiting...') for result in results: result.wait() # report final message logger.info('Main process done.') # shutdown the long running logger task queue.put(None) # close the process pool pool.close() # wait for all tasks to complete (e.g. the logger to close) pool.join()