Skip to content

Commit

Permalink
piecewise linear policy, refactor classification-based policies
Browse files Browse the repository at this point in the history
  • Loading branch information
antoine-galataud committed Mar 28, 2024
1 parent 48f9263 commit 0d574b4
Show file tree
Hide file tree
Showing 5 changed files with 263 additions and 79 deletions.
31 changes: 31 additions & 0 deletions hopes/fun_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import numpy as np


def piecewise_linear(x, left_cp, right_cp, slope, y0, y1) -> np.ndarray:
r"""Define a piecewise linear function with 3 segments, such as:
y0 --- \ left_cp
\ slope
\ right_cp
\ --- y1
Note: the slope is not necessarily negative, the 2nd segment function can be increasing or decreasing.
:param x: the input variable.
:param left_cp: the left change point.
:param right_cp: the right change point.
:param slope: the slope of the linear segment.
:param y0: the base value of the left segment.
:param y1: the base value of the right segment.
"""
# define the conditions for each segment
conditions = [x < left_cp, (x >= left_cp) & (x <= right_cp), x > right_cp]
# first segment is flat until lcp
# second segment is linear between lcp and rcp
# third segment is flat after rcp
funcs = [
lambda _: y0,
lambda v: slope * (v - left_cp) + y0,
lambda _: y1,
]
return np.piecewise(x, conditions, funcs)
170 changes: 157 additions & 13 deletions hopes/policy/policies.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@

import numpy as np
import requests
import torch
from scipy import optimize
from sklearn.linear_model import LogisticRegression

from hopes.dev_utils import override
from hopes.fun_utils import piecewise_linear


class Policy(ABC):
Expand All @@ -31,6 +34,16 @@ def compute_action_probs(self, obs: np.ndarray) -> np.ndarray:
action_probs = np.exp(log_likelihoods)
return action_probs

def select_action(self, obs: np.ndarray) -> np.ndarray:
"""Select actions under the policy for given observations.
:param obs: the observation(s) for which to select an action, shape (batch_size,
obs_dim).
:return: the selected action(s).
"""
action_probs = self.compute_action_probs(obs)
return np.array([np.random.choice(len(probs), p=probs) for probs in action_probs])


class RandomPolicy(Policy):
"""A random policy that selects actions uniformly at random."""
Expand All @@ -46,32 +59,163 @@ def log_likelihoods(self, obs: np.ndarray) -> np.ndarray:
return np.log(action_probs)


class RegressionBasedPolicy(Policy):
"""A policy that uses a regression model to predict the log-likelihoods of actions given
observations."""
class ClassificationBasedPolicy(Policy):
"""A policy that uses a classification model to predict the log-likelihoods of actions given
observations.
In absence of an actual control policy, this can be used to train a policy on a dataset
of (obs, act) pairs that would have been collected offline.
"""

def __init__(
self, obs: np.ndarray, act: np.ndarray, regression_model: str = "logistic"
self,
obs: np.ndarray,
act: np.ndarray,
classification_model: str = "logistic",
model_params: dict | None = None,
) -> None:
"""
:param obs: the observations for training the regression model, shape: (batch_size, obs_dim).
:param act: the actions for training the regression model, shape: (batch_size,).
:param regression_model: the type of regression model to use. For now, only logistic is supported.
:param obs: the observations for training the classification model, shape: (batch_size, obs_dim).
:param act: the actions for training the classification model, shape: (batch_size,).
:param classification_model: the type of classification model to use. For now, only logistic and mlp are supported.
:param model_params: optional parameters for the classification model.
"""
assert regression_model in ["logistic"], "Only logistic regression is supported for now."
supported_models = ["logistic", "mlp"]
assert (
classification_model in supported_models
), f"Only {supported_models} supported for now."
assert obs.ndim == 2, "Observations must have shape (batch_size, obs_dim)."
assert obs.shape[0] == act.shape[0], "Number of observations and actions must match."

self.model_x = obs
self.model_y = act
self.model = LogisticRegression()
self.model_obs = obs
self.model_act = act
self.num_actions = len(np.unique(act))
self.classification_model = classification_model
self.model_params = model_params or {}

if self.classification_model == "logistic":
self.model = LogisticRegression()

elif self.classification_model == "mlp":
hidden_size = self.model_params.get("hidden_size", 64)
activation = self.model_params.get("activation", "relu")
act_cls = torch.nn.ReLU if activation == "relu" else torch.nn.Tanh
self.model = torch.nn.Sequential(
torch.nn.Linear(self.model_obs.shape[1], hidden_size),
act_cls(),
torch.nn.Linear(hidden_size, hidden_size),
act_cls(),
torch.nn.Linear(hidden_size, self.num_actions),
)

def fit(self):
self.model.fit(self.model_x, self.model_y)
if self.classification_model == "mlp":
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(
self.model.parameters(), lr=self.model_params.get("lr", 0.01)
)

for epoch in range(self.model_params.get("num_epochs", 1000)):
optimizer.zero_grad()
output = self.model(torch.tensor(self.model_obs, dtype=torch.float32))
loss = criterion(
output, torch.tensor(self.model_act, dtype=torch.float32).view(-1).long()
)
loss.backward()
optimizer.step()
# print(f"Epoch {epoch}, Loss: {loss.item()}")

else:
self.model.fit(self.model_obs, self.model_act)

@override(Policy)
def log_likelihoods(self, obs: np.ndarray) -> np.ndarray:
if self.classification_model == "mlp":
with torch.no_grad():
output = self.model(torch.Tensor(obs))
return torch.log_softmax(output, dim=1).numpy()
else:
return self.model.predict_log_proba(obs)


class PiecewiseLinearPolicy(Policy):
"""A piecewise linear policy that selects actions based on a set of linear segments defined by
thresholds and slopes.
This can be used to estimate a probability distribution over actions drawn from a BMS
reset rule, for instance an outdoor air reset that is a function of outdoor air
temperature and is bounded by a minimum and maximum on both axis. This can also be
helpful to model a simple schedule, where action is a function of time.
"""

def __init__(
self,
obs: np.ndarray,
act: np.ndarray,
actions_bins: list[float | int] | None = None,
):
"""
:param obs: the observations for training the piecewise linear model, shape: (batch_size, obs_dim).
:param act: the actions for training the piecewise linear model, shape: (batch_size,).
:param actions_bins: the bins for discretizing the action space. If not provided, we assume the action space
is already discretized.
"""
assert (
len(obs.shape) == 1 or obs.shape[1] == 1
), "Piecewise linear policy only supports 1D observations."
assert obs.shape[0] == act.shape[0], "Number of observations and actions must match."

self.model_obs = obs.squeeze() if obs.ndim == 2 else obs
self.model_act = act.squeeze() if act.ndim == 2 else act
self.model_params = None

# discretize the action space
self.actions_bins = actions_bins if actions_bins else np.unique(self.model_act)
self.num_actions = len(actions_bins)

def fit(self):
# estimate bounds from input data
left_cp_bound_percentile = 30
right_cp_bound_percentile = 70
left_cp, right_cp = np.percentile(
self.model_act, (left_cp_bound_percentile, right_cp_bound_percentile)
)
left_cp_min = left_cp
left_cp_max = right_cp
right_cp_min = left_cp
right_cp_max = right_cp
y0_min = np.min(self.model_act)
y0_max = np.max(self.model_act)
y1_min = np.min(self.model_act)
y1_max = np.max(self.model_act)
slope_min = -np.inf
slope_max = np.inf

output = optimize.curve_fit(
piecewise_linear,
self.model_obs,
self.model_act,
bounds=(
[left_cp_min, right_cp_min, slope_min, y0_min, y1_min],
[left_cp_max, right_cp_max, slope_max, y0_max, y1_max],
),
)
self.model_params, error = output # noqa
# print(f"Model params: {self.model_params}")
# print(f"Error: {error}")

@override(Policy)
def log_likelihoods(self, obs: np.ndarray) -> np.ndarray:
return self.model.predict_log_proba(obs)
raw_actions = piecewise_linear(obs, *self.model_params)
# bin the action to the nearest action using the discretized action space
actions = [min(self.actions_bins, key=lambda x: abs(x - ra)) for ra in raw_actions]
# return the log-likelihoods
return np.array(
[
[np.log(1.0) if a == action else np.log(1e-6) for a in self.actions_bins]
for action in actions
]
)


class HttpPolicy(Policy):
Expand Down
43 changes: 19 additions & 24 deletions hopes/rew/rewards.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,22 +43,20 @@ def __init__(
obs: np.ndarray,
act: np.ndarray,
rew: np.ndarray,
reward_model: str = "linear",
regression_model: str = "linear",
model_params: dict | None = None,
) -> None:
"""
:param obs: the observations for training the reward model, shape: (batch_size, obs_dim).
:param act: the actions for training the reward model, shape: (batch_size,).
:param rew: the rewards for training the reward model, shape: (batch_size,).
:param reward_model: the type of reward model to use. For now, only linear, polynomial and mlp are supported.
:param regression_model: the type of reward model to use. For now, only linear, polynomial and mlp are supported.
:param model_params: optional parameters for the reward model.
"""
if model_params is None:
model_params = {}
supported_reward_models = ["linear", "polynomial", "mlp"]

assert (
reward_model in supported_reward_models
regression_model in supported_reward_models
), f"Only {supported_reward_models} supported for now."
assert obs.ndim == 2, "Observations must have shape (batch_size, obs_dim)."
assert (
Expand All @@ -68,18 +66,18 @@ def __init__(
self.obs = obs
self.act = act.reshape(-1, 1) if act.ndim == 1 else act
self.rew = rew.reshape(-1, 1) if rew.ndim == 1 else rew
self.model_params = model_params
self.reward_model = reward_model
self.model_params = model_params or {}
self.regression_model = regression_model
self.poly_features = None

# both linear and polynomial models are implemented using sklearn LinearRegression
# for polynomial model, we use PolynomialFeatures to generate polynomial features then fit the linear model
if self.reward_model == "linear" or self.reward_model == "polynomial":
if self.regression_model == "linear" or self.regression_model == "polynomial":
self.model = LinearRegression()

# mlp model is implemented using torch. We use a simple feedforward neural network and MSE loss.
# configuration is basic for now, but can be extended in the future
elif self.reward_model == "mlp":
elif self.regression_model == "mlp":
hidden_size = model_params.get("hidden_size", 64)
activation = model_params.get("activation", "relu")
act_cls = torch.nn.ReLU if activation == "relu" else torch.nn.Tanh
Expand All @@ -93,8 +91,10 @@ def fit(self) -> None:
"""Fit the reward model to the training data."""
model_in = np.concatenate((self.obs, self.act), axis=1)

if self.reward_model == "mlp":
optimizer = torch.optim.Adam(self.model.parameters())
if self.regression_model == "mlp":
optimizer = torch.optim.Adam(
self.model.parameters(), lr=self.model_params.get("lr", 0.01)
)
criterion = torch.nn.MSELoss()
for _ in range(self.model_params.get("num_epochs", 1000)):
optimizer.zero_grad()
Expand All @@ -103,12 +103,12 @@ def fit(self) -> None:
loss.backward()
optimizer.step()

elif self.reward_model == "polynomial":
elif self.regression_model == "polynomial":
self.poly_features = PolynomialFeatures(degree=self.model_params.get("degree", 2))
self.model.fit(self.poly_features.fit_transform(model_in), self.rew)

elif isinstance(self.model, LinearRegression):
self.model.fit(np.concatenate((self.obs, self.act), axis=1), self.rew)
elif self.regression_model == "linear":
self.model.fit(model_in, self.rew)

def estimate(self, obs: np.ndarray, act: np.ndarray) -> np.ndarray:
"""Estimate the rewards for a given set of observations and actions.
Expand All @@ -121,17 +121,12 @@ def estimate(self, obs: np.ndarray, act: np.ndarray) -> np.ndarray:
if act.ndim == 1:
act = act.reshape(-1, 1)

if isinstance(self.model, torch.nn.Module):
inputs = np.concatenate((obs, act), axis=1)

if self.regression_model == "mlp":
with torch.no_grad():
return (
self.model(
torch.tensor(np.concatenate((obs, act), axis=1), dtype=torch.float32)
)
.numpy()
.flatten()
)
return self.model(torch.tensor(inputs, dtype=torch.float32)).numpy().flatten()
else:
inputs = np.concatenate((obs, act), axis=1)
if self.reward_model == "polynomial":
if self.regression_model == "polynomial":
inputs = self.poly_features.transform(inputs)
return np.squeeze(self.model.predict(inputs))
Loading

0 comments on commit 0d574b4

Please sign in to comment.