Services

The BSB provides some “services”, which can be provided by a fallback system of providers. Usually they import a package, and if it isn’t found, provide a sensible mock, or an object that errors on first use, so that the framework and any downstream packages can always import and use the service (if a mock is provided).

MPI

The MPI service provided by bsb.services.MPI is the COMM_WORLD mpi4py.MPI.Comm if mpi4py is available, otherwise it is an emulator that emulates a single node parallel context.

Error

If any environment variables are present that contain MPI in their name an error is raised, as execution in an actual MPI environment won’t work without mpi4py.

MPILock

The MPILock service provides mpilock’s WindowController if it is available, or a mock that immediately and unconditionally acquires its lock and continues.

Note

Depends on the MPI service.

JobPool

The JobPool service allows you to submit Jobs and then execute them.

Note

Depends on the MPI service.

Most component types have a queue method that takes a job pool as an argument and lets them schedule their jobs.

The recommended way to open a job pool is to use the create_job_pool() context manager:

network = from_storage("example.hdf5")
with network.create_job_pool() as pool:
  if pool.is_main():
    # Only the main node needs to schedule the jobs
    for component in network.placement.values():
      component.queue(pool)
  # But everyone needs to partake in the execute call
  pool.execute()

Scheduling

Pools can concurrently schedule the jobs on the main node, while executing them on worker nodes with the schedule() method:

network = from_storage("example.hdf5")
with network.create_job_pool() as pool:
  if pool.is_main():
    pool.schedule([*network.placement.values]())
  pool.execute()

Warning

Pass in topologically sorted arrays of nodes! Dependencies are only checked between the nodes, not the jobs, by checking for a depends_on attribute.

Listeners

On top of opening the job pool this also registers the appropriate listeners. Listeners listen to updates emitted by the job pool and can respond to changes, for example by printing them out to display the progress of the job pool:

_t = None
def report_time_elapsed(progress):
  global _t
  if progress.reason == PoolProgressReason.POOL_STATUS_CHANGE:
    if progress.status == PoolStatus.SCHEDULING:
      _t = time.time()
    elif progress.status == PoolStatus.CLOSING:
      print(f"Pool execution finished. {time.time()} seconds elapsed.")

with network.create_job_pool() as pool:
  pool.add_listener(report_time_elapsed)
  pool.submit(lambda scaffold: time.sleep(2))
  pool.execute()
  # Will print `Pool execution finished. 2 seconds elapsed.`

Listeners can also be context managers, and will enter and exit the same context as the JobPool.

Caching

Some jobs may benefit from caching data. The problem with memoization techniques like functools.cache in a parallel workflow would be that the data risks remaining cached for the entire workflow, consuming high amounts of memory on every parallel worker, while the job is long over.

To prevent this, JobPools support caching items for as long as any other job owned by the scheduler still needs to complete. To use pool managed caching, simply decorate a method of a @node-decorated class with the pool_cache() decorator:

from bsb import PlacementStrategy, config, pool_cache

@config.node
class MyStrategy(PlacementStrategy):
  @pool_cache
  def heavy_calculations(self):
    return 5 + 5

  def place(self, chunk, indicators):
    # `heavy_calculations` will be called maximum once on each parallel node
    for i in range(1000):
      self.heavy_calculations()