Source code for openelm.map_elites

from collections import defaultdict
from typing import Optional

import numpy as np
from tqdm import trange

from openelm.environments import BaseEnvironment, Genotype

Phenotype = Optional[np.ndarray]
MapIndex = Optional[tuple]


[docs]class Map: """ Class to represent a map of any dimensionality, backed by a numpy array. This class is necessary to handle the circular buffer for the history dimension. """
[docs] def __init__( self, dims: tuple, fill_value: float, dtype: type = np.float32, history_length: int = 1, ): """ Class to represent a map of any dimensionality, backed by a numpy array. This class is a wrapper around a numpy array that handles the circular buffer for the history dimension. We use it for the array of fitnesses, genomes, and to track whether a niche in the map has been explored or not. Args: dims (tuple): Tuple of ints representing the dimensions of the map. fill_value (float): Fill value to initialize the array. dtype (type, optional): Type to pass to the numpy array to initialize it. For example, we initialize the map of genomes with type `object`. Defaults to np.float32. history_length (int, optional): Length of history to store for each niche (cell) in the map. This acts as a circular buffer, so after storing `history_length` items, the buffer starts overwriting the oldest items. Defaults to 1. """ self.history_length: int = history_length self.dims: tuple = dims if self.history_length == 1: self.array: np.ndarray = np.full(dims, fill_value, dtype=dtype) else: # Set starting top of buffer to 0 (% operator) self.top = np.full(dims, self.history_length - 1, dtype=int) self.array = np.full((history_length,) + dims, fill_value, dtype=dtype) self.empty = True
def __getitem__(self, map_ix): """If history length > 1, the history dim is an n-dim circular buffer.""" if self.history_length == 1: return self.array[map_ix] else: return self.array[(self.top[map_ix], *map_ix)] def __setitem__(self, map_ix, value): self.empty = False if self.history_length == 1: self.array[map_ix] = value else: top_val = self.top[map_ix] top_val = (top_val + 1) % self.history_length self.top[map_ix] = top_val self.array[(self.top[map_ix], *map_ix)] = value @property def shape(self) -> tuple: """Wrapper around the shape of the numpy array.""" return self.array.shape @property def map_size(self) -> int: """Returns the product of the dimension sizes, not including history.""" if self.history_length == 1: return self.array.size else: return self.array[0].size @property def qd_score(self) -> float: """Returns the quality-diversity score of the map.""" return self.array[np.isfinite(self.array)].sum() @property def maximum(self) -> float: """Returns the maximum value in the map.""" return self.array.max() @property def niches_filled(self) -> int: """Returns the number of niches in the map that have been explored.""" return np.count_nonzero(np.isfinite(self.array))
[docs]class MAPElites: """ Class implementing MAP-Elites, a quality-diversity algorithm. MAP-Elites creates a map of high perfoming solutions at each point in a discretized behavior space. First, the algorithm generates some initial random solutions, and evaluates them in the environment. Then, it repeatedly mutates the solutions in the map, and places the mutated solutions in the map if they outperform the solutions already in their niche. """
[docs] def __init__( self, env, n_bins: int, init_map: Optional[Map] = None, history_length: int = 1, save_history: bool = False, ): """ Class implementing MAP-Elites, a quality-diversity algorithm. Args: env (BaseEnvironment): The environment to evaluate solutions in. This should be a subclass of `BaseEnvironment`, and should implement methods to generate random solutions, mutate existing solutions, and evaluate solutions for their fitness in the environment. n_bins (int): Number of bins to partition the behavior space into. init_map (Map, optional): A map to use for the algorithm. If not passed, a new map will be created. Defaults to None. history_length (int): Length of history to store for each niche (cell) in the map. This acts as a circular buffer, so after storing `history_length` items, the buffer starts overwriting the oldest items. save_history (bool, optional): Whether to save the history of all generated solutions, even if they are not inserted into the map. Defaults to False. """ self.env: BaseEnvironment = env self.n_bins = n_bins self.history_length = history_length self.save_history = save_history # self.history will be set/reset each time when calling `.search(...)` self.history: dict = defaultdict(list) # discretization of space self.bins = np.linspace(*env.behavior_space, n_bins + 1)[1:-1].T # type: ignore # TODO: abstract all maps out to a single class. # perfomance of niches if init_map is None: self.fitnesses: Map = Map( dims=(n_bins,) * env.behavior_ndim, fill_value=-np.inf, dtype=float, history_length=history_length, ) else: self.fitnesses = init_map # niches' sources self.genomes: Map = Map( dims=self.fitnesses.dims, fill_value=0.0, dtype=object, history_length=history_length, ) # index over explored niches to select from self.nonzero: Map = Map(dims=self.fitnesses.dims, fill_value=False, dtype=bool) # bad mutations that ended up with invalid output. self.recycled = [None] * 1000 self.recycled_count = 0 print(f"MAP of size: {self.fitnesses.dims} = {self.fitnesses.map_size}")
[docs] def to_mapindex(self, b: Phenotype) -> MapIndex: """Converts a phenotype (position in behaviour space) to a map index.""" return ( None if b is None else tuple(np.digitize(x, bins) for x, bins in zip(b, self.bins)) )
[docs] def random_selection(self) -> MapIndex: """Randomly select a niche (cell) in the map that has been explored.""" ix = np.random.choice(np.flatnonzero(self.nonzero.array)) return np.unravel_index(ix, self.nonzero.dims)
[docs] def search(self, initsteps: int, totalsteps: int, atol: float = 1.0) -> str: """ Run the MAP-Elites search algorithm. Args: initsteps (int): Number of initial random solutions to generate. totalsteps (int): Total number of steps to run the algorithm for, including initial steps. atol (float, optional): Tolerance for how close the best performing solution has to be to the maximum possible fitness before the search stops early. Defaults to 1. Returns: str: A string representation of the best perfoming solution. The best performing solution object can be accessed via the `current_max_genome` class attribute. """ tbar = trange(int(totalsteps)) max_fitness = -np.inf max_genome = None if self.save_history: self.history = defaultdict(list) for n_steps in tbar: if n_steps < initsteps or self.genomes.empty: # Initialise by generating initsteps random solutions. # If map is still empty: force to do generation instead of mutation. new_individuals: list[Genotype] = self.env.random() else: # Randomly select a batch of elites from the map. batch: list[Genotype] = [] for _ in range(self.env.batch_size): map_ix = self.random_selection() batch.append(self.genomes[map_ix]) # Mutate the elite. new_individuals = self.env.mutate(batch) # `new_individuals` is a list of generation/mutation. We put them # into the behavior space one-by-one. # TODO: account for the case where multiple new individuals are # placed in the same niche, for saving histories. for individual in new_individuals: map_ix = self.to_mapindex(individual.to_phenotype()) # if the return is None, the individual is invalid and is thrown # into the recycle bin. if map_ix is None: self.recycled[self.recycled_count % len(self.recycled)] = individual self.recycled_count += 1 continue if self.save_history: # TODO: thresholding self.history[map_ix].append(individual) self.nonzero[map_ix] = True fitness = self.env.fitness(individual) # If new fitness greater than old fitness in niche, replace. if fitness > self.fitnesses[map_ix]: self.fitnesses[map_ix] = fitness self.genomes[map_ix] = individual # If new fitness is the highest so far, update the tracker. if fitness > max_fitness: max_fitness = fitness max_genome = individual tbar.set_description(f"{max_fitness=:.4f}") # Stop if best fitness is within atol of maximum possible fitness. if np.isclose(max_fitness, self.env.max_fitness, atol=atol): break self.current_max_genome = max_genome return str(max_genome)
[docs] def niches_filled(self): """Get the number of niches that have been explored in the map.""" return self.fitnesses.niches_filled
[docs] def maximum_fitness(self): """Get the maximum fitness value in the map.""" return self.fitnesses.maximum
[docs] def qd_score(self): """ Get the quality-diversity score of the map. The quality-diversity score is the sum of the performance of all solutions in the map. """ return self.fitnesses.qd_score