Skip to content
Snippets Groups Projects
kalman.py 10.96 KiB
"""
Kalman filters.
"""

# Copyright (C) 2021 OST Ostschweizer Fachhochschule
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

# Author: Juan Pablo Carbajal <ajuanpi+dev@gmail.com>

import numpy as np
from scipy.stats import multivariate_normal as mvn


def kf_predict(m, P, A, Q):
    """ One step prediction of the Kalman filter.

    Parameters
    ----------
    m: array_like, shape (n,) or (n,1)
        Mean value of state.
    P: array_like, shape (n,n)
        Covariance matrix of state.
    A: array_like, shape (n,n)
        State update matrix.
    Q: array_like, shape (n,n)
        Covariance matrix of process noise.

    Returns
    -------
    m: array_like, shape (n,) or (n,1)
        Mean value of the predicted state.
    P: array_like, shape (n,n)
        Covariance matrix of the predicted state.
    """
    return A @ m, A @ P @ A.T + Q


def kf_update(m, P, H, R, y):
    """ Update based on observation of the Kalman filter.

        Parameters
        ----------
        m: array_like, shape (n,) or (n,1)
            Mean value of state.
        P: array_like, shape (n,n)
            Covariance matrix of state.
        H: array_like, shapoe(k,n)
            Measurement matrix.
        R: array_like, shape (n,n)
            Covariance matrix of measurement noise.
        y: scalar
            Observation

        Returns
        -------
        m: array_like, shape (n,) or (n,1)
            Updated mean value of the state.
        P: array_like, shape (n,n)
            Updated covariance matrix of the state.
    """
    # predicted measurement and its covariance
    y_, S = kf_measurement(m, P, H, R)
    res = y - y_

    K = P @ np.linalg.solve(S.T, H).T
    P_ = P - K @ S @ K.T
    m_ = m + K @ res

    return m_, P_


def kf_measurement(m, P, H, R):
    """ Measurement of the Kalman filter.

        Parameters
        ----------
        m: array_like, shape (n,) or (n,1)
            Mean value of state.
        P: array_like, shape (n,n)
            Covariance matrix of state.
        H: array_like, shapoe(k,n)
            Measurement matrix.
        R: array_like, shape (n,n)
            Covariance matrix of measurement noise.

        Returns
        -------
        z: array_like, shape (k,) or (k,1)
            Measurement.
        S: array_like, shape (k,k)
            Covariance matrix of measurement.
    """
    y = H @ m
    S = H @ P @ H.T + R
    return y, S


def kf_train(m, P, A, Q, H, R, y):
    return kf_update(*kf_predict(m, P, A, Q), H, R, y)


def kf_loglikelihood(m, P, H, R, y):
    """ Log likelihood of measurement. """
    y_, S = kf_measurement(m, P, H, R)
    return mvn(y_, S).logpdf(y)


class KF:

    def __init__(self, m0, P0, A, Q, H, R):

        self.A = (lambda t: A) if not callable(A) else A
        self.Q = (lambda t: Q) if not callable(Q) else Q
        self.H = (lambda t: H) if not callable(H) else H
        self.R = (lambda t: R) if not callable(R) else R
        self.m = m0
        self.P = P0
        self.ll = mvn(m0, P0).logpdf(m0)
        self.state_dim = self.A(0).shape[0]
        self.meas_dim = self.H(0).shape[0]

    def train(self, y, *, t=None, return_history=False, inplace=False):
        t = np.arange(y.size) if t is None else t
        if return_history:
            nt = (t.size,)
            m__ = np.zeros(nt + (self.state_dim,)).squeeze()
            P__ = np.zeros(nt + (self.state_dim,) * 2).squeeze()

            y__ = np.nan * np.ones(t.size)
            s2y__ = np.zeros_like(y__)
            ll__ = np.zeros_like(y__)

        m_, P_, ll_ = self.m, self.P, self.ll
        y_ = np.nan * np.ones(self.meas_dim)
        s2y_ = np.zeros((self.meas_dim,) * 2)

        for i, ti in enumerate(t):
            A, Q, H, R = self.A(ti), self.Q(ti), self.H(ti), self.R(ti)
            y_, s2y_ = kf_measurement(m_, P_, H, R)
            m_, P_ = kf_predict(m_, P_, A, Q)
            m_, P_ = kf_update(m_, P_, H, R, y[i])
            ll_ = ll_ + kf_loglikelihood(m_, P_, H, R, y[i])
            if return_history:
                m__[i], P__[i], ll__[i] = m_, P_, ll_
                y__[i], s2y__[i] = y_.squeeze(), s2y_.squeeze()

        if inplace:
            self.m = m_
            self.P = P_
            self.ll = ll_

        if return_history:
            m_, P_, y_, s2y_, ll_ = m__, P__, y__, s2y__, ll__

        return m_, P_, y_, s2y_, ll_


########
# Extended Kalman filters
########

def ekf_predict(m, P, Q, dyn):
    """ One step prediction of the extended Kalman filter.

    Parameters
    ----------
    m: array_like, shape (n,) or (n,1)
        Mean value of state.
    P: array_like, shape (n,n)
        Covariance matrix of state.
    Q: array_like, shape (n,n)
        Covariance matrix of process noise.
    dyn: callable
        Function defining the state space dynamics.
        Signature: dyn(m)
        Must return an array_like with the same shape as m and the Jacobian
        of the dynamics w.r.t. to the states as an array_like of shape (n,n)

    Returns
    -------
    m: array_like, shape (n,) or (n,1)
        Mean value of the predicted state.
    P: array_like, shape (n,n)
        Covariance matrix of the predicted state.
    """
    m_, J = dyn(m)
    P_ = J @ P @ J.T + Q
    return m_, P_


def ekf_update(m, P, R, meas, y):
    """ Update based on observation of the extended Kalman filter.

        Parameters
        ----------
        m: array_like, shape (n,) or (n,1)
            Mean value of state.
        P: array_like, shape (n,n)
            Covariance matrix of state.
        R: array_like, shape (n,n)
            Covariance matrix of measurement noise.
        meas: callable
            Function defining the measurement function.
            Signature: meas(m)
            Must return an array_like of shape (k,) with the measurements and
            the Jacobian of the measurement w.r.t. the state as an
            array_like of shape (k,n)

        Returns
        -------
        m: array_like, shape (n,) or (n,1)
            Updated mean value of the state.
        P: array_like, shape (n,n)
            Updated covariance matrix of the state.
    """
    # predicted measurement and its covariance
    y_, S, J = ekf_measurement(m, P, R, meas)
    res = y - y_

    K = P @ np.linalg.solve(S.T, J).T
    P_ = P - K @ S @ K.T
    m_ = m + K @ res

    return m_, P_


def ekf_measurement(m, P, R, meas):
    """ Measurement of the extended Kalman filter.

        Parameters
        ----------
        m: array_like, shape (n,) or (n,1)
            Mean value of state.
        P: array_like, shape (n,n)
            Covariance matrix of state.
        R: array_like, shape (n,n)
            Covariance matrix of measurement noise.
        meas: callable
            Function defining the measurement function.
            Signature: meas(m)
            Must return an array_like of shape (k,) with the measurements and
            the Jacobian of the measurement w.r.t. to the states as an
            array_like of shape (k,n)

        Returns
        -------
        z: array_like, shape (k,) or (k,1)
            Measurement.
        S: array_like, shape (k,k)
            Covariance matrix of measurement.
        J: array_like, shape (k, n)
            Jacobian of the measurement w.r.t. the state
    """
    z, J = meas(m)
    S = J @ P @ J.T + R
    return z, S, J


def ekf_forecast(m, P, Q, R, dyn, meas, t_span, *, dt=1, return_history=False):
    """ Multiple step prediction. """
    Q = (lambda t: Q) if not callable(Q) else Q
    R = (lambda t: R) if not callable(R) else R
    steps = int(np.ceil((t_span[1]-t_span[0]) / dt))
    r0 = R(t_span[0])
    k = r0.shape[0] if not np.isscalar(r0) else 1
    if return_history:
        m__ = np.zeros((steps, m.size)).squeeze()
        P__ = np.zeros((steps,) + P.shape).squeeze()
        y__ = np.zeros((steps, k)).squeeze()
        s2y__ = np.zeros((steps, k)).squeeze()
    m_, P_ = m, P
    t__ = np.arange(*t_span, step=dt)
    for n, t_ in enumerate(t__):
        m_, P_ = ekf_predict(m_, P_, Q(t_), dyn)
        y_, s2y_, _ = ekf_measurement(m_, P_, R(t_), meas)
        if return_history:
            m__[n], P__[n] = m_, P_
            y__[n], s2y__[n] = y_.squeeze(), s2y_.squeeze()
    if return_history:
        m_, P_, y_, s2y_, t_ = m__, P__, y__, s2y__, t__
    return m_, P_, y_, s2y_, t_


def ekf_train(m, P, Q, R, dyn, meas, y):
    return ekf_update(*ekf_predict(m, P, Q, dyn), R, meas, y)


def ekf_loglikelihood(m, P, R, meas, y):
    """ Log likelihood of measurement. """
    y_, S, J = ekf_measurement(m, P, R, meas)
    return mvn(y_, S).logpdf(y)


class EKF:

    def __init__(self, model, m0, P0, Q, R):

        self.model = model
        self.Q = (lambda t: Q) if not callable(Q) else Q
        self.R = (lambda t: R) if not callable(R) else R
        self.m = m0
        self.P = P0
        self.ll = mvn(m0, P0).logpdf(m0)

    def train(self, y, *, t=None, return_history=False, inplace=False):
        t = np.arange(y.size) if t is None else t
        if return_history:
            nt = (t.size,)
            m__ = np.zeros(nt + (self.m.size,)).squeeze()
            P__ = np.zeros(nt + (self.m.size,) * 2).squeeze()

            y__ = np.nan * np.ones(t.size)
            s2y__ = np.zeros_like(y__)
            ll__ = np.zeros_like(y__)

        m_, P_, ll_ = self.m, self.P, self.ll
        y_ = np.nan * np.ones(self.model.meas_dim)
        s2y_ = np.zeros((self.model.meas_dim,) * 2)

        dyn = self.model.dynamics
        meas = self.model.measurement
        for i, ti in enumerate(t):
            Q, R = self.Q(ti), self.R(ti)
            y_, s2y_, _ = ekf_measurement(m_, P_, R, meas)
            m_, P_ = ekf_predict(m_, P_, Q, dyn)
            m_, P_ = ekf_update(m_, P_, R, meas, y[i])
            ll_ = ll_ + ekf_loglikelihood(m_, P_, R, meas, y[i])
            if return_history:
                m__[i], P__[i], ll__[i] = m_, P_, ll_
                y__[i], s2y__[i] = y_.squeeze(), s2y_.squeeze()

        if inplace:
            self.m = m_
            self.P = P_
            self.ll = ll_

        if return_history:
            m_, P_, y_, s2y_, ll_ = m__, P__, y__, s2y__, ll__

        return m_, P_, y_, s2y_, ll_

    def forecast(self, t_span, *, return_history=False):
        dyn = self.model.dynamics
        meas = self.model.measurement
        m_, P_, y_, s2y_, t_ = ekf_forecast(self.m, self.P, self.Q, self.R,
                                            dyn, meas, t_span, dt=self.model.dt,
                                            return_history=return_history)
        return m_, P_, y_, s2y_, t_