From: Erik Andresen Date: Fri, 27 Jul 2018 07:01:37 +0000 (+0200) Subject: Added simple kalman X-Git-Url: https://defiant.homedns.org/gitweb/?a=commitdiff_plain;h=a122ce94f6adfb634d2c3ec74f2b12c782115cc7;p=pyshared.git Added simple kalman --- diff --git a/simple_kalman.py b/simple_kalman.py new file mode 100755 index 0000000..d3d36ef --- /dev/null +++ b/simple_kalman.py @@ -0,0 +1,100 @@ +#!/usr/bin/env python +# -*- coding: iso-8859-15 -*- + +# Implementation of simple Kalman in python based on +# Xilinx DSP Magazine #1 (2005) - http://www.xilinx.com/publications/ + +import numpy as np + +class simple_kalman: + def __init__(self, x_est, P_est, Q, R): + # Convert arguments to vectors/matrices if not given + if type(x_est) != type(np.array([0])): x_est = np.array(x_est) + if type(P_est) != type(np.matrix([0])): P_est = np.matrix(P_est) + if type(Q) != type(np.matrix([0])): Q = np.matrix(Q) + if type(R) != type(np.matrix([0])): R = np.matrix(R) + + self.x_est = x_est # Systemzustand + self.P_est = P_est # Fehlerkovarianz + self.Q = Q # Systemrauschen + self.R = R # Varianz des Messfehlers + self.I = np.eye(P_est.shape[0]) + + def run(self, y): + # Korrektur mit der Messung + # (1) Berechnung der Kalman Verstärkung + K = self.P_est * (self.R + self.P_est).I + # (2) Korrektur der Schätzung mit der Messung y + x = self.x_est + K*(y - self.x_est) + # (3) Korrektur der Fehlerkovarianzmatrix + P = (self.I-K)*self.P_est + # + # Prädiktion + # (1) Prädiziere den Systemzustand + self.x_est = x + # (2) Präzidiere die Fehlerkovarianzmatrix + self.P_est = P + self.Q + + if x.shape == (1,1): return x.item(0,0) + return x + + +if __name__ == '__main__': + import random + from matplotlib.pyplot import * + + # + # 1d Test + # + orig = [0.5 for i in range(100)] + y = [orig[i] + (random.random()-0.5)/5 for i in range(0, len(orig))] + x = [] + + x_est = 0.0 # Systemzustand + P_est = 0.1 # Fehlerkovarianz + Q = 10e-6 # Systemrauschen + R = 0.0035 # Varianz des Messfehlers + p = simple_kalman(x_est, P_est, Q, R) + + for i in range(0, 100): + # Messwert + x.append(p.run(y[i])) + + plot(orig, label="Orig") + plot(range(100), y, 'o', label="Noise") + plot(x, label="Filtered") + ylim(0, 2) + legend() + show() + + # + # 2d Test + # + orig1 = [0.5 for i in range(100)] + orig2 = [1.5 for i in range(100)] + y1 = [orig1[i] + (random.random()-0.5)/5 for i in range(0, len(orig1))] + y2 = [orig2[i] + (random.random()-0.5)/5 for i in range(0, len(orig2))] + x1 = [] + x2 = [] + + x_est = np.array([[0.0, 0.0]]).T # Systemzustand + P_est = 0.1 * np.eye(2) # Fehlerkovarianz + Q = 10e-6 * np.eye(2) # Systemrauschen + R = np.matrix([[0.0035, 0.0], [0.0, 0.0035]]) # Varianz des Messfehlers + p = simple_kalman(x_est, P_est, Q, R) + + for i in range(0, 100): + # Messwert + a = p.run(np.array([[y1[i], y2[i]]]).T) + x1.append(a.item((0, 0))) + x2.append(a.item((1, 0))) + + plot(orig1, label="Orig") + plot(orig2, label="Orig") + plot(range(100), y1, 'o', label="Noise") + plot(range(100), y2, 'o', label="Noise") + plot(x1, label="Filtered") + plot(x2, label="Filtered") + ylim(0, 2) + legend() + show()