import math
import string
from abc import ABC, abstractmethod
from dataclasses import is_dataclass
from typing import Generic, Optional, TypeVar, Union
import numpy as np
from omegaconf import DictConfig, OmegaConf
from openelm.configs import BaseConfig, ImageELMConfig, SodaraceELMConfig
from openelm.diff_model import PromptMutationForImgTask, PromptMutationForSodarace
from openelm.environments.sodaracer import SodaraceSimulator
Phenotype = Optional[np.ndarray]
[docs]def ackley(x: np.ndarray) -> np.ndarray:
d = x.shape[-1]
a = 5
b = 0.1
o1 = -a * np.exp(-b * np.sqrt(np.sum(x**2, axis=1) / d))
o2 = -np.exp(np.sum(np.cos(math.tau * x) / d, axis=1))
return -(a + math.exp(1) + o1 + o2)
[docs]class Genotype(ABC):
def __str__(self) -> str:
raise NotImplementedError
[docs] @abstractmethod
def to_phenotype(self) -> Optional[Phenotype]:
raise NotImplementedError
GenoType = TypeVar("GenoType", bound=Genotype)
[docs]class BaseEnvironment(ABC, Generic[GenoType]):
[docs] def __init__(self) -> None:
self.genotype_space: np.ndarray
self.batch_size: int
[docs] @abstractmethod
def random(self) -> list[GenoType]:
raise NotImplementedError
[docs] @abstractmethod
def mutate(self, x: list[GenoType]) -> list[GenoType]:
raise NotImplementedError
[docs] @abstractmethod
def fitness(self, x: GenoType) -> float:
raise NotImplementedError
@property
def max_fitness(self) -> int:
return 0
@property
# [starts, endings) of search intervals
def behavior_space(self) -> np.ndarray:
return self.genotype_space
@property
def behavior_ndim(self) -> int:
return self.behavior_space.shape[1]
@staticmethod
def _load_config(config):
# TODO: convert all to dataclass
if isinstance(config, str):
return OmegaConf.load(config)
elif isinstance(config, (dict, DictConfig)):
return DictConfig(config)
elif is_dataclass(config):
return OmegaConf.structured(config)
else:
raise ValueError
[docs]class ArrayGenotype(Genotype, np.ndarray):
def __new__(cls, input_array):
obj = np.asarray(input_array).view(cls)
return obj
def __str__(self) -> str:
return f'({", ".join(map(str, np.asarray(self)))})'
[docs] def to_phenotype(self) -> Phenotype:
return np.asarray(self)
# find all local maxima of a multimodal function
[docs]class FunctionOptim(BaseEnvironment[ArrayGenotype]):
[docs] def __init__(self, ndim=2):
self.genotype_ndim = ndim
self.genotype_space = np.repeat([[-4, 4]], self.genotype_ndim, axis=0).T
self.batch_size: int = 1
[docs] def random(self) -> list[ArrayGenotype]:
return [
ArrayGenotype(np.random.uniform(*self.genotype_space))
for _ in range(self.batch_size)
]
[docs] def mutate(self, x: list[ArrayGenotype]) -> list[ArrayGenotype]:
for i in range(self.batch_size):
ix = np.random.randint(self.genotype_ndim)
x[i][ix] = x[i][ix] + np.random.uniform(-1, 1)
return x
[docs] def fitness(self, x: ArrayGenotype) -> float:
return ackley(x[None])[0]
[docs]class StringArrayGenotype(ArrayGenotype):
def __str__(self) -> str:
x: np.ndarray = np.round(self)
return "".join(
string.ascii_letters[ix]
for ix in np.clip(x.astype(int), 0, len(string.ascii_letters) - 1)
)
[docs] def to_phenotype(self) -> Phenotype:
return np.asarray(self)
[docs]class MatchString(BaseEnvironment[StringArrayGenotype]):
# find a string by mutating one character at a time
[docs] def __init__(self, target: str):
self.alphabet = string.ascii_letters
self.target = np.array([self.alphabet.index(ch) for ch in target])
self.genotype_ndim = self.target.shape[0]
self.genotype_space = np.repeat(
[[0, len(self.alphabet)]], self.genotype_ndim, axis=0
).T
[docs] def random(self) -> list[StringArrayGenotype]:
return [
StringArrayGenotype(np.random.uniform(*self.genotype_space))
for _ in range(self.batch_size)
]
[docs] def mutate(self, x: list[StringArrayGenotype]) -> list[StringArrayGenotype]:
for i in range(self.batch_size):
ix = np.random.randint(self.genotype_ndim)
x[i][ix] = x[i][ix] + np.random.uniform(-1, 1)
return x
[docs] def fitness(self, x: StringArrayGenotype) -> float:
return -np.abs(x - self.target).sum()
[docs]class ImageGeneration(Genotype):
"""Genotype for generated images."""
[docs] def __init__(self, program_str: str, result_obj: np.ndarray):
self.program_str = program_str
self.result_obj = result_obj
self.valid = self.validate()
def __str__(self) -> str:
if self.valid:
return str(self.result_obj.reshape((-1, 3)).mean(axis=0).astype(int))
else:
return ""
[docs] def validate(self) -> bool:
return len(self.result_obj.shape) == 3 and self.result_obj.shape[2] == 3
[docs] def to_phenotype(self, mode: str = "3-channel-avg") -> Optional[Phenotype]:
if not self.valid:
return None
if mode == "3-channel-avg":
# Average RGB channels.
# Assume the input is of shape (height, width, channel), and we
# average each channel to get (channel,)
return np.average(self.result_obj.reshape((-1, 3)), axis=0)
else:
return None
[docs]class ImageOptim(BaseEnvironment[ImageGeneration]):
"""
Mutate programs that return images.
Fitness is simply the absolute difference between the returning
image and the target image. To map into the behavior space,
if behavior_mode=="3-channel", the image will be divided into blocks
(specified in `block_size`), and average
values of RGB channels in each block will be put together as a point in the
behavior space (average-pooling).
"""
default_diff_model_cls = PromptMutationForImgTask
# Record different definitions of behavior spaces in a dict. Feel free to add.
behavior_mode_spec = {"3-channel-avg": {"genotype_ndim": 3}}
[docs] def __init__(
self,
seed: dict,
config: Union[str, dict, DictConfig],
target_img: np.ndarray,
diff_model,
behavior_mode: str = "3-channel",
run_name: Optional[str] = None,
):
"""
Mutate programs that return images.
Fitness is simply the absolute difference between the returning
image and the target image. To map into the behavior space,
if behavior_mode=="3-channel", the image will be divided into blocks
(specified in `block_size`), and average values of RGB channels in each
block will be put together as a point in the behavior space (average-pooling).
Args:
seed: the seed dict.
config: the config file path or dict.
target_img: the target image.
diff_model: the diff model (or alternatives).
behavior_mode: (Optional) a string indicating the way an individual
is mapped into behavior space.
run_name: (Optional) override the run_name in config.
"""
if isinstance(seed, dict):
self.seed = ImageGeneration(**seed)
else:
raise TypeError
self.config: ImageELMConfig = self._load_config(config)
if run_name is not None:
self.config.run_name = run_name
self.target_img = target_img
self.shape = target_img.shape
if diff_model is None:
self.diff_model = self.default_diff_model_cls(self.config)
else:
self.diff_model = diff_model
self.behavior_mode = behavior_mode
self.genotype_ndim: int = self.behavior_mode_spec[self.behavior_mode][
"genotype_ndim"
]
self.genotype_space = np.repeat([[0, 255]], self.genotype_ndim, axis=0).T
[docs] def generate_program(self, code_batch: list[str]) -> list[ImageGeneration]:
"""
Call LM to generate a new program and run it.
Returns:
An ImageGeneration object containing the code, the resulting image
and the error code.
"""
generated_programs = self.diff_model.generate_program(code_batch)
return [ImageGeneration(**p) for p in generated_programs]
[docs] def random(self) -> list[ImageGeneration]:
"""
Randomly generate a batch of codes and evaluate their outputs.
Returns:
a tuple of the code string and the returning result (None if there
is error).
"""
program_str_list = [self.seed.program_str] * self.batch_size
new_images = self.generate_program(program_str_list)
return new_images
[docs] def mutate(self, images_list: list[ImageGeneration]) -> list[ImageGeneration]:
"""
Randomly mutate a batch of codes and evaluate their outputs.
Args:
x: the individual to be mutated.
Returns:
a tuple of the code string and the returning result (None if there
is an error).
"""
program_str_list = [sr.program_str for sr in images_list]
new_images = self.generate_program(program_str_list)
return new_images
[docs] def fitness(self, x: ImageGeneration) -> float:
if not x.valid or x.result_obj.shape != self.shape:
return -np.inf
return -np.abs(x.result_obj - self.target_img).sum()
[docs]class Sodaracer(Genotype):
[docs] def __init__(self, program_str: str, result_obj: dict):
"""
The Sodaracer genotype.
Args:
program_str: the string for the original code.
result_obj: the dict of sodaracer.
"""
self.program_str: str = program_str
self.result_obj: dict = result_obj
# self._fitness: Optional[float] = None
# Check whether the Sodaracer is valid.
try:
# Test the Sodaracer by instantiating a simulation.
self.simulator = SodaraceSimulator(body=self.result_obj)
self.morphology = self.simulator.morphology
self.evaluate(0)
self.valid = True
except Exception:
self.valid = False
[docs] def evaluate(self, eval_ms: int) -> float:
self._fitness = self.simulator.evaluate(eval_ms)
# if self._fitness is None:
return self._fitness
def __str__(self) -> str:
return self.program_str
[docs] def to_phenotype(self) -> Optional[Phenotype]:
if self.valid:
return np.array(
[
self.morphology["height"],
self.morphology["width"],
self.morphology["mass"],
]
).astype(int)
else:
return None
@property
def fitness(self) -> Optional[float]:
return self._fitness
[docs]class Sodarace(BaseEnvironment[Sodaracer]):
default_diff_model_cls = PromptMutationForSodarace
[docs] def __init__(
self,
seed: dict,
config: Union[str, dict, DictConfig, BaseConfig],
diff_model,
eval_ms: int,
max_height: int = 1000,
max_width: int = 1000,
max_mass: int = 2000,
ndim: int = 3,
run_name: Optional[str] = None,
) -> None:
"""
Sodarace environment.
Args:
seed: the seed dict.
config: the config file path or dict.
diff_model: the diff model (or alternatives).
eval_ms: The time in ms for sodaracer evaluation.
max_height: (Optional) the maximal height.
max_width: (Optional) the maximal width.
max_mass: (Optional) the maximal mass.
ndim: (Optional) the dimension of behavior space.
run_name: (Optional) override the run_name in config.
"""
if isinstance(seed, dict):
self.seed = Sodaracer(**seed)
else:
raise TypeError
# TODO: rewrite config code to make everything an instance of a dataclass
self.config: SodaraceELMConfig = self._load_config(config)
if run_name is not None:
self.config.run_name = run_name
if diff_model is None:
self.diff_model = self.default_diff_model_cls(self.config)
else:
self.diff_model = diff_model
self.batch_size = self.config.batch_size
self.eval_ms = eval_ms
self.genotype_ndim = ndim
self.genotype_space = np.array(
[[0, max_height], [0, max_width], [0, max_mass]]
).T
[docs] def generate_program(self, code_batch: list[str]) -> list[Sodaracer]:
# Call LM to generate a new program and run it, returning a dict
# containing the program string and the dict from running it.
generated_programs = self.diff_model.generate_program(code_batch)
return [Sodaracer(**p) for p in generated_programs]
[docs] def random(self) -> list[Sodaracer]:
program_str_list = [self.seed.program_str] * self.batch_size
new_sodaracers = self.generate_program(program_str_list)
return new_sodaracers
[docs] def mutate(self, sodaracer_list: list[Sodaracer]) -> list[Sodaracer]:
program_str_list = [sr.program_str for sr in sodaracer_list]
new_sodaracers = self.generate_program(program_str_list)
return new_sodaracers
[docs] def fitness(self, x: Sodaracer) -> float:
# Call Sodaracers environment to get the fitness.
if x.valid:
return x.evaluate(self.eval_ms)
else:
return -np.inf