Running the JWST pipeline: Multiprocessing

Python multiprocessing could be used in the following ways, which are mutually exclusive. You must not mix them together because spawning multiple processes where each process itself calls a step that also calls multiprocessing would result in errors related to “daemon” or “recursion” and possibly memory usage:

Multiprocessing within a pipeline step

This usage of multiprocessing is recommended when you want to speed up the processing of a particular dataset running computationally-intensive steps:

Unlike Multiprocessing on multiple observations, this usage is compatible with running the pipeline within Jupyter Notebook/Lab.

To enable multiprocessing, the optional parameter is maximum_cores for each of the step stated above. This parameter can be set to one of these options:

  • a numerical value given as a string, e.g., '8' (it is not recommended to set to '1' nor a number larger than the available number of cores)

  • 'quarter'

  • 'half'

  • 'all' (this is usually not recommended because it might tie up your CPU completely for the duration of the run)

  • 'none' (default)

The following example turns on a step’s multiprocessing option. Note that only one of the steps (ramp_fit) has multiprocessing turned on.

Note

For more details on how to adjust .call(...) inputs in the example below, please see Setting Step Parameters on a Pipeline or Individual Step.

from jwst.pipeline import Detector1Pipeline

uncal_file = 'jw0000_0000_uncal.fits'
output_dir = 'my_project'
parameter_dict = {
    "ramp_fit": {
        "maximum_cores": 'half'
    }
}

Detector1Pipeline.call(
    uncal_file,
    save_results=True,
    steps=parameter_dict,
    output_dir=output_dir
)

Alternately, you can also run the equivalent call as above via strun:

strun calwebb_detector1 jw0000_0000_uncal.fits --steps.ramp_fit.save_results=true --steps.ramp_fit.maximum_cores=half --output_dir=my_project

Multiprocessing on multiple observations

This usage of multiprocessing is to simultaneously run the entire pipeline on multiple observations. You must not use Multiprocessing within a pipeline step if you choose this option. It is recommended that you refer to the Python multiprocessing documentation in order to follow its best practices. When in doubt, stick to the pattern in the given example below.

The pipeline uses the spawn start method (see Contexts and start methods) internally and it is recommended that any multiprocessing scripts that run the pipeline use the same start. As detailed in The spawn and forkserver start methods, this will require that code be “protected” with a if __name__ == '__main__': check as follows:

if __name__ = '__main__':
    # code used in multiprocessing

Because the code has to be “protected” as explained above, unlike Multiprocessing within a pipeline step, you will not be able to run this from within a Jupyter Notebook/Lab.

The following example runs the pipeline with multiprocessing via a multiprocessing.pool.Pool.starmap() method and using zip() to pack the pipeline inputs. The example also uses an option to set up a text file with the full traceback for debugging, in case there is a crash. Note that the import statement of the pipeline is within the multiprocessing block that gets called by every worker (run_det1); this is to avoid a known memory leak.

Note

For more details on how to adjust .call(...) inputs in the example below, please see Setting Step Parameters on a Pipeline or Individual Step.

# Save the code in a file named SampleScript2.py and then run it with
#     python SampleScript2.py

import os
import sys
import traceback
import multiprocessing
from glob import glob


def run_det1(uncal_file, output_dir):
    """
    Run the Detector1 pipeline on the given file.

    Parameters
    ----------
    uncal_file : str
        Name of uncalibrated file to run.
    output_dir : str
        Path of the output directory.
    """
    # Local import of pipeline to avoid known memory leak
    from jwst.pipeline.calwebb_detector1 import Detector1Pipeline

    log_name = os.path.basename(uncal_file).replace('.fits', '')
    pipe_success = False

    try:
        # Run the pipeline, turning off terminal logging messages
        Detector1Pipeline.call(
            uncal_file,
            output_dir=output_dir,
            save_results=True,
            configure_log=False
        )
        pipe_success = True
        print(f'\n * Pipeline finished for file: {uncal_file}\n')
    except Exception:
        print('\n *** OH NO! The detector1 pipeline crashed! *** \n')
        pipe_crash_msg = traceback.print_exc()
    if not pipe_success:
        with open(f'{log_name}_pipecrash.txt', 'w') as crashfile:
            print('Printing file with full traceback')
            print(pipe_crash_msg, file=crashfile)

def main():
    input_data_dir = 'my_project_dir'
    output_dir = input_data_dir

    # get the files to run
    files_to_run = sorted(glob(os.path.join(input_data_dir, '*_uncal.fits')))
    n_files = len(files_to_run)
    print(f'Will run the pipeline on {n_files} files')

    # the output list should be the same length as the files to run
    outptd = [output_dir] * n_files

    # get the cores to use
    # (please adjust this according to your hardware, as there is no point
    # to take half of available cores if you only have 3 cores or less)
    n_cpu = os.cpu_count()
    cores2use = int(n_cpu / 2)  # half of all available cores
    print(f'* Using {cores2use}/{n_cpu} cores for multiprocessing.')

    # set the pool and run multiprocess
    with multiprocessing.Pool(cores2use) as pool:
        pool.starmap(run_det1, zip(files_to_run, outptd))

    print('\n * Finished multiprocessing! \n')

if __name__ == '__main__':
    sys.exit(main())