Forked from
Scientific Computing and Engineering Group / Teaching / online_learning
107 commits behind the upstream repository.
-
JuanPi Carbajal authoredc8a70a6a
kalman.py 10.96 KiB
"""
Kalman filters.
"""
# Copyright (C) 2021 OST Ostschweizer Fachhochschule
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# Author: Juan Pablo Carbajal <ajuanpi+dev@gmail.com>
import numpy as np
from scipy.stats import multivariate_normal as mvn
def kf_predict(m, P, A, Q):
""" One step prediction of the Kalman filter.
Parameters
----------
m: array_like, shape (n,) or (n,1)
Mean value of state.
P: array_like, shape (n,n)
Covariance matrix of state.
A: array_like, shape (n,n)
State update matrix.
Q: array_like, shape (n,n)
Covariance matrix of process noise.
Returns
-------
m: array_like, shape (n,) or (n,1)
Mean value of the predicted state.
P: array_like, shape (n,n)
Covariance matrix of the predicted state.
"""
return A @ m, A @ P @ A.T + Q
def kf_update(m, P, H, R, y):
""" Update based on observation of the Kalman filter.
Parameters
----------
m: array_like, shape (n,) or (n,1)
Mean value of state.
P: array_like, shape (n,n)
Covariance matrix of state.
H: array_like, shapoe(k,n)
Measurement matrix.
R: array_like, shape (n,n)
Covariance matrix of measurement noise.
y: scalar
Observation
Returns
-------
m: array_like, shape (n,) or (n,1)
Updated mean value of the state.
P: array_like, shape (n,n)
Updated covariance matrix of the state.
"""
# predicted measurement and its covariance
y_, S = kf_measurement(m, P, H, R)
res = y - y_
K = P @ np.linalg.solve(S.T, H).T
P_ = P - K @ S @ K.T
m_ = m + K @ res
return m_, P_
def kf_measurement(m, P, H, R):
""" Measurement of the Kalman filter.
Parameters
----------
m: array_like, shape (n,) or (n,1)
Mean value of state.
P: array_like, shape (n,n)
Covariance matrix of state.
H: array_like, shapoe(k,n)
Measurement matrix.
R: array_like, shape (n,n)
Covariance matrix of measurement noise.
Returns
-------
z: array_like, shape (k,) or (k,1)
Measurement.
S: array_like, shape (k,k)
Covariance matrix of measurement.
"""
y = H @ m
S = H @ P @ H.T + R
return y, S
def kf_train(m, P, A, Q, H, R, y):
return kf_update(*kf_predict(m, P, A, Q), H, R, y)
def kf_loglikelihood(m, P, H, R, y):
""" Log likelihood of measurement. """
y_, S = kf_measurement(m, P, H, R)
return mvn(y_, S).logpdf(y)
class KF:
def __init__(self, m0, P0, A, Q, H, R):
self.A = (lambda t: A) if not callable(A) else A
self.Q = (lambda t: Q) if not callable(Q) else Q
self.H = (lambda t: H) if not callable(H) else H
self.R = (lambda t: R) if not callable(R) else R
self.m = m0
self.P = P0
self.ll = mvn(m0, P0).logpdf(m0)
self.state_dim = self.A(0).shape[0]
self.meas_dim = self.H(0).shape[0]
def train(self, y, *, t=None, return_history=False, inplace=False):
t = np.arange(y.size) if t is None else t
if return_history:
nt = (t.size,)
m__ = np.zeros(nt + (self.state_dim,)).squeeze()
P__ = np.zeros(nt + (self.state_dim,) * 2).squeeze()
y__ = np.nan * np.ones(t.size)
s2y__ = np.zeros_like(y__)
ll__ = np.zeros_like(y__)
m_, P_, ll_ = self.m, self.P, self.ll
y_ = np.nan * np.ones(self.meas_dim)
s2y_ = np.zeros((self.meas_dim,) * 2)
for i, ti in enumerate(t):
A, Q, H, R = self.A(ti), self.Q(ti), self.H(ti), self.R(ti)
y_, s2y_ = kf_measurement(m_, P_, H, R)
m_, P_ = kf_predict(m_, P_, A, Q)
m_, P_ = kf_update(m_, P_, H, R, y[i])
ll_ = ll_ + kf_loglikelihood(m_, P_, H, R, y[i])
if return_history:
m__[i], P__[i], ll__[i] = m_, P_, ll_
y__[i], s2y__[i] = y_.squeeze(), s2y_.squeeze()
if inplace:
self.m = m_
self.P = P_
self.ll = ll_
if return_history:
m_, P_, y_, s2y_, ll_ = m__, P__, y__, s2y__, ll__
return m_, P_, y_, s2y_, ll_
########
# Extended Kalman filters
########
def ekf_predict(m, P, Q, dyn):
""" One step prediction of the extended Kalman filter.
Parameters
----------
m: array_like, shape (n,) or (n,1)
Mean value of state.
P: array_like, shape (n,n)
Covariance matrix of state.
Q: array_like, shape (n,n)
Covariance matrix of process noise.
dyn: callable
Function defining the state space dynamics.
Signature: dyn(m)
Must return an array_like with the same shape as m and the Jacobian
of the dynamics w.r.t. to the states as an array_like of shape (n,n)
Returns
-------
m: array_like, shape (n,) or (n,1)
Mean value of the predicted state.
P: array_like, shape (n,n)
Covariance matrix of the predicted state.
"""
m_, J = dyn(m)
P_ = J @ P @ J.T + Q
return m_, P_
def ekf_update(m, P, R, meas, y):
""" Update based on observation of the extended Kalman filter.
Parameters
----------
m: array_like, shape (n,) or (n,1)
Mean value of state.
P: array_like, shape (n,n)
Covariance matrix of state.
R: array_like, shape (n,n)
Covariance matrix of measurement noise.
meas: callable
Function defining the measurement function.
Signature: meas(m)
Must return an array_like of shape (k,) with the measurements and
the Jacobian of the measurement w.r.t. the state as an
array_like of shape (k,n)
Returns
-------
m: array_like, shape (n,) or (n,1)
Updated mean value of the state.
P: array_like, shape (n,n)
Updated covariance matrix of the state.
"""
# predicted measurement and its covariance
y_, S, J = ekf_measurement(m, P, R, meas)
res = y - y_
K = P @ np.linalg.solve(S.T, J).T
P_ = P - K @ S @ K.T
m_ = m + K @ res
return m_, P_
def ekf_measurement(m, P, R, meas):
""" Measurement of the extended Kalman filter.
Parameters
----------
m: array_like, shape (n,) or (n,1)
Mean value of state.
P: array_like, shape (n,n)
Covariance matrix of state.
R: array_like, shape (n,n)
Covariance matrix of measurement noise.
meas: callable
Function defining the measurement function.
Signature: meas(m)
Must return an array_like of shape (k,) with the measurements and
the Jacobian of the measurement w.r.t. to the states as an
array_like of shape (k,n)
Returns
-------
z: array_like, shape (k,) or (k,1)
Measurement.
S: array_like, shape (k,k)
Covariance matrix of measurement.
J: array_like, shape (k, n)
Jacobian of the measurement w.r.t. the state
"""
z, J = meas(m)
S = J @ P @ J.T + R
return z, S, J
def ekf_forecast(m, P, Q, R, dyn, meas, t_span, *, dt=1, return_history=False):
""" Multiple step prediction. """
Q = (lambda t: Q) if not callable(Q) else Q
R = (lambda t: R) if not callable(R) else R
steps = int(np.ceil((t_span[1]-t_span[0]) / dt))
r0 = R(t_span[0])
k = r0.shape[0] if not np.isscalar(r0) else 1
if return_history:
m__ = np.zeros((steps, m.size)).squeeze()
P__ = np.zeros((steps,) + P.shape).squeeze()
y__ = np.zeros((steps, k)).squeeze()
s2y__ = np.zeros((steps, k)).squeeze()
m_, P_ = m, P
t__ = np.arange(*t_span, step=dt)
for n, t_ in enumerate(t__):
m_, P_ = ekf_predict(m_, P_, Q(t_), dyn)
y_, s2y_, _ = ekf_measurement(m_, P_, R(t_), meas)
if return_history:
m__[n], P__[n] = m_, P_
y__[n], s2y__[n] = y_.squeeze(), s2y_.squeeze()
if return_history:
m_, P_, y_, s2y_, t_ = m__, P__, y__, s2y__, t__
return m_, P_, y_, s2y_, t_
def ekf_train(m, P, Q, R, dyn, meas, y):
return ekf_update(*ekf_predict(m, P, Q, dyn), R, meas, y)
def ekf_loglikelihood(m, P, R, meas, y):
""" Log likelihood of measurement. """
y_, S, J = ekf_measurement(m, P, R, meas)
return mvn(y_, S).logpdf(y)
class EKF:
def __init__(self, model, m0, P0, Q, R):
self.model = model
self.Q = (lambda t: Q) if not callable(Q) else Q
self.R = (lambda t: R) if not callable(R) else R
self.m = m0
self.P = P0
self.ll = mvn(m0, P0).logpdf(m0)
def train(self, y, *, t=None, return_history=False, inplace=False):
t = np.arange(y.size) if t is None else t
if return_history:
nt = (t.size,)
m__ = np.zeros(nt + (self.m.size,)).squeeze()
P__ = np.zeros(nt + (self.m.size,) * 2).squeeze()
y__ = np.nan * np.ones(t.size)
s2y__ = np.zeros_like(y__)
ll__ = np.zeros_like(y__)
m_, P_, ll_ = self.m, self.P, self.ll
y_ = np.nan * np.ones(self.model.meas_dim)
s2y_ = np.zeros((self.model.meas_dim,) * 2)
dyn = self.model.dynamics
meas = self.model.measurement
for i, ti in enumerate(t):
Q, R = self.Q(ti), self.R(ti)
y_, s2y_, _ = ekf_measurement(m_, P_, R, meas)
m_, P_ = ekf_predict(m_, P_, Q, dyn)
m_, P_ = ekf_update(m_, P_, R, meas, y[i])
ll_ = ll_ + ekf_loglikelihood(m_, P_, R, meas, y[i])
if return_history:
m__[i], P__[i], ll__[i] = m_, P_, ll_
y__[i], s2y__[i] = y_.squeeze(), s2y_.squeeze()
if inplace:
self.m = m_
self.P = P_
self.ll = ll_
if return_history:
m_, P_, y_, s2y_, ll_ = m__, P__, y__, s2y__, ll__
return m_, P_, y_, s2y_, ll_
def forecast(self, t_span, *, return_history=False):
dyn = self.model.dynamics
meas = self.model.measurement
m_, P_, y_, s2y_, t_ = ekf_forecast(self.m, self.P, self.Q, self.R,
dyn, meas, t_span, dt=self.model.dt,
return_history=return_history)
return m_, P_, y_, s2y_, t_