Templates#

Custom Systems#

Main Points#

Custom dynamical systems should inherit from DynamicalSystem. This is the main interface for dynamical system environments, and differs mainly from gym.Env in that it adds a state_space attribute and defines a custom step() function that solves an initial value problem to compute the subsequent state of the system and calls generate_disturbance() and generate_observation() internally.

Typically, the main methods to be overridden in custom systems are dynamics() and generate_disturbance().

Important

Remember that dynamics() is defined using dynamics:

\[\dot{x} = f(x, u, w)\]

This makes the code easier since in many cases the system dynamics are given in continuous time, but need to be simulated in discrete time.

System Templates#

It is recommended to use the following templates when defining new systems (environments) to use in algorithms.

import gym
from gym_socks.envs.dynamical_system import DynamicalSystem

import numpy as np


class CustomDynamicalSystemEnv(DynamicalSystem):

    def __init__(self, seed=0, *args, **kwargs):
        super().__init__(*args, **kwargs)

        self.observation_space = gym.spaces.Box(
            low=-np.inf, high=np.inf, shape=(2,), dtype=np.float32
        )
        self.state_space = gym.spaces.Box(
            low=-np.inf, high=np.inf, shape=(2,), dtype=np.float32
        )
        self.action_space = gym.spaces.Box(
            low=-np.inf, high=np.inf, shape=(1,), dtype=np.float32
        )

        self.state = None

        self.seed(seed=seed)

    def generate_disturbance(self, time, state, action):
        w = self.np_random.standard_normal(size=self.state_space.shape)
        return 1e-2 * np.array(w)

    def dynamics(self, time, state, action, disturbance):
        ...

The default template defines a stochastic dynamical system with dynamics given by:

\[\dot{x} = f(x, u, w)\]
import gym
from gym_socks.envs.dynamical_system import DynamicalSystem

import numpy as np


class CustomDynamicalSystemEnv(DynamicalSystem):

    def __init__(self, seed=0, *args, **kwargs):
        super().__init__(*args, **kwargs)

        self.observation_space = gym.spaces.Box(
            low=-np.inf, high=np.inf, shape=(2,), dtype=np.float32
        )
        self.state_space = gym.spaces.Box(
            low=-np.inf, high=np.inf, shape=(2,), dtype=np.float32
        )
        self.action_space = gym.spaces.Box(
            low=-np.inf, high=np.inf, shape=(1,), dtype=np.float32
        )

        self.state = None

        self.A = np.zeros(shape=(2, 2))  # <-- change this
        self.B = np.zeros(shape=(2, 1))  # <-- change this

        self.seed(seed=seed)

    def step(self, time, action):

        disturbance = self.generate_disturbance(time, self.state, action)
        self.state = self.dynamics(time, self.state, action, disturbance)
        obs = self.generate_observation(time, self.state, action)

        return obs, 0, False, {}

    def generate_disturbance(self, time, state, action):
        w = self.np_random.standard_normal(size=self.state_space.shape)
        return 1e-2 * np.array(w)

    def dynamics(self, time, state, action, disturbance):
        return self.A @ state + self.B @ action + disturbance

A discrete-time linear system has dynamics given by:

\[x_{t+1} = A x_{t} + B u_{t} + w_{t}\]
import gym
from gym_socks.envs.dynamical_system import DynamicalSystem

import numpy as np


class CustomDynamicalSystemEnv(DynamicalSystem):

    def __init__(self, seed=0, *args, **kwargs):
        super().__init__(*args, **kwargs)

        self.observation_space = gym.spaces.Box(
            low=-np.inf, high=np.inf, shape=(2,), dtype=np.float32
        )
        self.state_space = gym.spaces.Box(
            low=-np.inf, high=np.inf, shape=(2,), dtype=np.float32
        )
        self.action_space = gym.spaces.Box(
            low=-np.inf, high=np.inf, shape=(1,), dtype=np.float32
        )

        self.state = None

        self.seed(seed=seed)

    def generate_disturbance(self, time, state, action):
        w = self.np_random.standard_normal(size=self.state_space.shape)
        return 1e-2 * np.array(w)

    def generate_observation(self, time, state, action):
        v = self.np_random.standard_normal(
            size=self.observation_space.shape
        )
        return np.array(state, dtype=np.float32) + np.asarray(v)

    def dynamics(self, time, state, action, disturbance):
        ...

A partially observable system includes an observation function \(h\). It is usually used when the state observations are corrupted by some sort of noise process.

\[\begin{split}\dot{x} &= f(x, u, w) \\ y &= h(x, u, v)\end{split}\]

The system can then be “registered” using the OpenAI gym register function in order to “make” the system via a string identifier. This is useful for configuring experiments using sacred, and for ensuring correct versioning of environments for repeatability.

from gym.envs.registration import register

register(
    id="CustomDynamicalSystemEnv-v0",
    entry_point="CustomDynamicalSystemEnv",
    order_enforce=False,
)

Note

The step, reset, render, and close methods are inherited from gym.Env and should be overridden if custom behavior is needed, for instance if explicitly using linear dynamics \(x_{t+1} = A x_{t} + B u_{t} + w_{t}\) is desired.

Custom Policies#

By definition, a policy is a function which returns a control action. In SOCKS, a policy is defined as a callable class that inherits from BasePolicy, which is also used as a parent class for algorithms which compute control policies.

Thus, a policy simply inherits from BasePolicy and implements a custom __call__() function.

Important

The main thing to know when defining custom policies is that functions in SOCKS which sample from systems pass all relevant information to the policy for the purpose of computing a control action. Since the policy can be either time-varying or time-invariant or open- or closed-loop, it may require the time and state of the system in order to compute the control action. Thus, custom policies should explicitly name these variables as parameters if they are required, and should use *args, **kwargs to capture additional, unneeded arguments.

Policy Templates#

It is recommended to use the following template when defining new policies to use in algorithms.

from gym_socks.envs.policy import BasePolicy

class CustomPolicy(BasePolicy):

    def __init__(self, action_space: Space = None):
        self.action_space = action_space

    def __call__(self, *args, **kwargs):
        return self.action_space.sample()

Custom Sampling Functions#

Very simply, a sampling function returns a random sample. In python, this can either be a regular function that returns a value or a generator that yields a value. SOCKS implements a wrapper function, sample_fn(), which converts a regular function into an infinite generator.

In order to create a custom sampling function, we need a function that returns or yields a single observation, and then we can use the sample_fn() decorator to convert it into a sample function.

Sampling Function Templates#

It is recommended to use the following templates when defining a new sampling function.

from gym_socks.sampling import sample_fn

@sample_fn
def custom_sampler():

    # Generate observation here.

    return obs
from gym_socks.sampling import sample_fn

@sample_fn
def custom_sampler():

    # Generate observation here.

    yield obs
from gym_socks.sampling import sample_fn
from gym_socks.sampling import space_sampler

from gym_socks.envs.integrator import NDIntegrator
from gym_socks.policies import RandomizedPolicy

# Partially observable ND integrator system.
# Replace this with your own system.
class PartiallyObservableNDIntegrator(NDIntegratorEnv):
    def generate_observation(self, time, state, action):
        v = self.np_random.standard_normal(
            size=self.observation_space.shape
        )
        return np.array(state, dtype=np.float32) + np.asarray(v)

env = PartiallyObservableNDIntegrator(dim=2)

state_sampler = space_sampler(space=env.state_space)
action_sampler = space_sampler(space=env.action_space)

@sample_fn
def custom_sampler():
    state = next(state_sampler)
    action = next(action_space)

    env.reset(state)
    observation, *_ = env.step(action=action)
    next_state = env.state

    return state, action, next_state, observation

Note

The main reason to use a generator instead of a regular function is if you have internal variables or arguments that are passed to the generator whose value should be preserved or “remembered” between calls.

Note

Behind the scenes, the sample_fn() decorator wraps the sample function in a helper class that provides several useful functions that allow us to generate a finite sample and manipulate the generator, for instance to repeat outputs multiple times.