""" Kalman filters. """ # Copyright (C) 2021 OST Ostschweizer Fachhochschule # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. # Author: Juan Pablo Carbajal <ajuanpi+dev@gmail.com> import numpy as np from scipy.stats import multivariate_normal as mvn def kf_predict(m, P, A, Q): """ One step prediction of the Kalman filter. Parameters ---------- m: array_like, shape (n,) or (n,1) Mean value of state. P: array_like, shape (n,n) Covariance matrix of state. A: array_like, shape (n,n) State update matrix. Q: array_like, shape (n,n) Covariance matrix of process noise. Returns ------- m: array_like, shape (n,) or (n,1) Mean value of the predicted state. P: array_like, shape (n,n) Covariance matrix of the predicted state. """ return A @ m, A @ P @ A.T + Q def kf_update(m, P, H, R, y): """ Update based on observation of the Kalman filter. Parameters ---------- m: array_like, shape (n,) or (n,1) Mean value of state. P: array_like, shape (n,n) Covariance matrix of state. H: array_like, shapoe(k,n) Measurement matrix. R: array_like, shape (n,n) Covariance matrix of measurement noise. y: scalar Observation Returns ------- m: array_like, shape (n,) or (n,1) Updated mean value of the state. P: array_like, shape (n,n) Updated covariance matrix of the state. """ # predicted measurement and its covariance y_, S = kf_measurement(m, P, H, R) res = y - y_ K = P @ np.linalg.solve(S.T, H).T P_ = P - K @ S @ K.T m_ = m + K @ res return m_, P_ def kf_measurement(m, P, H, R): """ Measurement of the Kalman filter. Parameters ---------- m: array_like, shape (n,) or (n,1) Mean value of state. P: array_like, shape (n,n) Covariance matrix of state. H: array_like, shapoe(k,n) Measurement matrix. R: array_like, shape (n,n) Covariance matrix of measurement noise. Returns ------- z: array_like, shape (k,) or (k,1) Measurement. S: array_like, shape (k,k) Covariance matrix of measurement. """ y = H @ m S = H @ P @ H.T + R return y, S def kf_train(m, P, A, Q, H, R, y): return kf_update(*kf_predict(m, P, A, Q), H, R, y) def kf_loglikelihood(m, P, H, R, y): """ Log likelihood of measurement. """ y_, S = kf_measurement(m, P, H, R) return mvn(y_, S).logpdf(y) class KF: def __init__(self, m0, P0, A, Q, H, R): self.A = (lambda t: A) if not callable(A) else A self.Q = (lambda t: Q) if not callable(Q) else Q self.H = (lambda t: H) if not callable(H) else H self.R = (lambda t: R) if not callable(R) else R self.m = m0 self.P = P0 self.ll = mvn(m0, P0).logpdf(m0) self.state_dim = self.A(0).shape[0] self.meas_dim = self.H(0).shape[0] def train(self, y, *, t=None, return_history=False, inplace=False): t = np.arange(y.size) if t is None else t if return_history: nt = (t.size,) m__ = np.zeros(nt + (self.state_dim,)).squeeze() P__ = np.zeros(nt + (self.state_dim,) * 2).squeeze() y__ = np.nan * np.ones(t.size) s2y__ = np.zeros_like(y__) ll__ = np.zeros_like(y__) m_, P_, ll_ = self.m, self.P, self.ll y_ = np.nan * np.ones(self.meas_dim) s2y_ = np.zeros((self.meas_dim,) * 2) for i, ti in enumerate(t): A, Q, H, R = self.A(ti), self.Q(ti), self.H(ti), self.R(ti) y_, s2y_ = kf_measurement(m_, P_, H, R) m_, P_ = kf_predict(m_, P_, A, Q) m_, P_ = kf_update(m_, P_, H, R, y[i]) ll_ = ll_ + kf_loglikelihood(m_, P_, H, R, y[i]) if return_history: m__[i], P__[i], ll__[i] = m_, P_, ll_ y__[i], s2y__[i] = y_.squeeze(), s2y_.squeeze() if inplace: self.m = m_ self.P = P_ self.ll = ll_ if return_history: m_, P_, y_, s2y_, ll_ = m__, P__, y__, s2y__, ll__ return m_, P_, y_, s2y_, ll_ ######## # Extended Kalman filters ######## def ekf_predict(m, P, Q, dyn): """ One step prediction of the extended Kalman filter. Parameters ---------- m: array_like, shape (n,) or (n,1) Mean value of state. P: array_like, shape (n,n) Covariance matrix of state. Q: array_like, shape (n,n) Covariance matrix of process noise. dyn: callable Function defining the state space dynamics. Signature: dyn(m) Must return an array_like with the same shape as m and the Jacobian of the dynamics w.r.t. to the states as an array_like of shape (n,n) Returns ------- m: array_like, shape (n,) or (n,1) Mean value of the predicted state. P: array_like, shape (n,n) Covariance matrix of the predicted state. """ m_, J = dyn(m) P_ = J @ P @ J.T + Q return m_, P_ def ekf_update(m, P, R, meas, y): """ Update based on observation of the extended Kalman filter. Parameters ---------- m: array_like, shape (n,) or (n,1) Mean value of state. P: array_like, shape (n,n) Covariance matrix of state. R: array_like, shape (n,n) Covariance matrix of measurement noise. meas: callable Function defining the measurement function. Signature: meas(m) Must return an array_like of shape (k,) with the measurements and the Jacobian of the measurement w.r.t. the state as an array_like of shape (k,n) Returns ------- m: array_like, shape (n,) or (n,1) Updated mean value of the state. P: array_like, shape (n,n) Updated covariance matrix of the state. """ # predicted measurement and its covariance y_, S, J = ekf_measurement(m, P, R, meas) res = y - y_ K = P @ np.linalg.solve(S.T, J).T P_ = P - K @ S @ K.T m_ = m + K @ res return m_, P_ def ekf_measurement(m, P, R, meas): """ Measurement of the extended Kalman filter. Parameters ---------- m: array_like, shape (n,) or (n,1) Mean value of state. P: array_like, shape (n,n) Covariance matrix of state. R: array_like, shape (n,n) Covariance matrix of measurement noise. meas: callable Function defining the measurement function. Signature: meas(m) Must return an array_like of shape (k,) with the measurements and the Jacobian of the measurement w.r.t. to the states as an array_like of shape (k,n) Returns ------- z: array_like, shape (k,) or (k,1) Measurement. S: array_like, shape (k,k) Covariance matrix of measurement. J: array_like, shape (k, n) Jacobian of the measurement w.r.t. the state """ z, J = meas(m) S = J @ P @ J.T + R return z, S, J def ekf_forecast(m, P, Q, R, dyn, meas, t_span, *, dt=1, return_history=False): """ Multiple step prediction. """ Q = (lambda t: Q) if not callable(Q) else Q R = (lambda t: R) if not callable(R) else R steps = int(np.ceil((t_span[1]-t_span[0]) / dt)) r0 = R(t_span[0]) k = r0.shape[0] if not np.isscalar(r0) else 1 if return_history: m__ = np.zeros((steps, m.size)).squeeze() P__ = np.zeros((steps,) + P.shape).squeeze() y__ = np.zeros((steps, k)).squeeze() s2y__ = np.zeros((steps, k)).squeeze() m_, P_ = m, P t__ = np.arange(*t_span, step=dt) for n, t_ in enumerate(t__): m_, P_ = ekf_predict(m_, P_, Q(t_), dyn) y_, s2y_, _ = ekf_measurement(m_, P_, R(t_), meas) if return_history: m__[n], P__[n] = m_, P_ y__[n], s2y__[n] = y_.squeeze(), s2y_.squeeze() if return_history: m_, P_, y_, s2y_, t_ = m__, P__, y__, s2y__, t__ return m_, P_, y_, s2y_, t_ def ekf_train(m, P, Q, R, dyn, meas, y): return ekf_update(*ekf_predict(m, P, Q, dyn), R, meas, y) def ekf_loglikelihood(m, P, R, meas, y): """ Log likelihood of measurement. """ y_, S, J = ekf_measurement(m, P, R, meas) return mvn(y_, S).logpdf(y) class EKF: def __init__(self, model, m0, P0, Q, R): self.model = model self.Q = (lambda t: Q) if not callable(Q) else Q self.R = (lambda t: R) if not callable(R) else R self.m = m0 self.P = P0 self.ll = mvn(m0, P0).logpdf(m0) def train(self, y, *, t=None, return_history=False, inplace=False): t = np.arange(y.size) if t is None else t if return_history: nt = (t.size,) m__ = np.zeros(nt + (self.m.size,)).squeeze() P__ = np.zeros(nt + (self.m.size,) * 2).squeeze() y__ = np.nan * np.ones(t.size) s2y__ = np.zeros_like(y__) ll__ = np.zeros_like(y__) m_, P_, ll_ = self.m, self.P, self.ll y_ = np.nan * np.ones(self.model.meas_dim) s2y_ = np.zeros((self.model.meas_dim,) * 2) dyn = self.model.dynamics meas = self.model.measurement for i, ti in enumerate(t): Q, R = self.Q(ti), self.R(ti) y_, s2y_, _ = ekf_measurement(m_, P_, R, meas) m_, P_ = ekf_predict(m_, P_, Q, dyn) m_, P_ = ekf_update(m_, P_, R, meas, y[i]) ll_ = ll_ + ekf_loglikelihood(m_, P_, R, meas, y[i]) if return_history: m__[i], P__[i], ll__[i] = m_, P_, ll_ y__[i], s2y__[i] = y_.squeeze(), s2y_.squeeze() if inplace: self.m = m_ self.P = P_ self.ll = ll_ if return_history: m_, P_, y_, s2y_, ll_ = m__, P__, y__, s2y__, ll__ return m_, P_, y_, s2y_, ll_ def forecast(self, t_span, *, return_history=False): dyn = self.model.dynamics meas = self.model.measurement m_, P_, y_, s2y_, t_ = ekf_forecast(self.m, self.P, self.Q, self.R, dyn, meas, t_span, dt=self.model.dt, return_history=return_history) return m_, P_, y_, s2y_, t_