# Pyctools - a picture processing algorithm development kit.
# http://github.com/jim-easterbrook/pyctools
# Copyright (C) 2018-20 Pyctools contributors
#
# This program is free software: you can redistribute it and/or
# modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see
# <http://www.gnu.org/licenses/>.
__all__ = ['FilterDesign']
__docformat__ = 'restructuredtext en'
from collections import OrderedDict
import math
import numpy
from scipy import interpolate
from pyctools.core.config import ConfigBool, ConfigEnum, ConfigFloat, ConfigInt, ConfigStr
from pyctools.core.base import Component
from pyctools.core.frame import Frame
from pyctools.core.types import pt_float, pt_complex
[docs]
class FilterDesign(Component):
"""Generate a 1-D filter from an ideal response.
The response is specified as a series of normalised frequency values
(in the range 0.0 to 0.5), corresponding gain values, usually in the
range 0.0 to 1.0, and (optionally) corresponding weight values. A
filter is generated that minimises the weighted square of the
deviation from this ideal response.
The weight values are a measure of how much you care about the
response at different frequencies. For example, you may want "flat"
pass bands and stop bands, but not care too much about the
transition band.
The ``response`` output emits the ideal and actual filter responses.
It can be connected to the
:py:class:`~pyctools.components.io.plotdata.PlotData` component.
The ``interp`` option selects the function used to convert the
series of response points to a continuous function, before
calculating the filter coefficients. See
:py:class:`scipy:scipy.interpolate.interp1d` for more detail.
============== ===== ====
Config
============== ===== ====
``frequency`` str List of frequency values, in increasing order.
``gain`` str List of corresponding gain values.
``weight`` str List of corresponding weight values. Default is unity.
``aperture`` int The number of filter coefficients.
``interp`` str Interpolation function. Possible values: {}
``direction`` str Direction of filter. Possible values: horizontal, vertical.
============== ===== ====
"""
inputs = []
outputs = ['filter', 'response'] #:
with_outframe_pool = False
interp_list = ('linear', 'nearest', 'zero', 'slinear', 'quadratic', 'cubic')
__doc__ = __doc__.format(', '.join(["``'" + x + "'``" for x in interp_list]))
def initialise(self):
self.config['frequency'] = ConfigStr(value='0.0, 0.5')
self.config['gain'] = ConfigStr(value='1.0, 0.0')
self.config['weight'] = ConfigStr()
self.config['aperture'] = ConfigInt(value=9, min_value=3)
self.config['interp'] = ConfigEnum(choices=(self.interp_list))
self.config['direction'] = ConfigEnum(choices=('horizontal', 'vertical'))
def on_start(self):
# send first filter coefs
self.make_filter()
def on_set_config(self):
# send more coefs if config changes
self.make_filter()
def make_filter(self):
self.update_config()
freq_vals = eval(self.config['frequency'])
gain_vals = eval(self.config['gain'])
if len(freq_vals) != len(gain_vals):
self.logger.warning(
'frequency and gain lists are of different length')
freq_vals = freq_vals[:len(gain_vals)]
gain_vals = gain_vals[:len(freq_vals)]
if self.config['weight']:
wgt_vals = eval(self.config['weight'])
if len(wgt_vals) != len(gain_vals):
self.logger.warning(
'frequency and weight lists are of different length')
wgt_vals = wgt_vals[:len(gain_vals)]
gain_vals = gain_vals[:len(wgt_vals)]
freq_vals = freq_vals[:len(gain_vals)]
wgt_vals = numpy.array(wgt_vals, dtype=numpy.double)
freq_vals = numpy.array(freq_vals, dtype=numpy.double)
gain_vals = numpy.array(gain_vals, dtype=numpy.double)
aperture = self.config['aperture']
# how much oversampling needed
pad_len = 256
while pad_len < aperture * 4:
pad_len *= 2
# interpolate ideal response
int_freq = numpy.linspace(0.0, 0.5, pad_len + 1, dtype=numpy.double)
interp_func = interpolate.interp1d(
freq_vals, gain_vals, kind=self.config['interp'],
bounds_error=False, fill_value='extrapolate')
int_gain = interp_func(int_freq)
# interpolate weight
if self.config['weight']:
interp_func = interpolate.interp1d(
freq_vals, wgt_vals, kind=self.config['interp'],
bounds_error=False, fill_value='extrapolate')
int_wgt = interp_func(int_freq)
else:
int_wgt = numpy.ones((pad_len + 1,), dtype=numpy.double)
# 'DC' and 'half fs' gain are always important
int_wgt_copy = int_wgt.copy()
int_wgt[0] = numpy.amax(int_wgt) * pad_len
int_wgt[pad_len] = int_wgt[0]
# compute response matrices
W2 = numpy.empty((pad_len * 2,), dtype=numpy.double)
W2[:pad_len + 1] = int_wgt
W2[pad_len + 1:] = int_wgt[pad_len - 1:0:-1]
W2 = W2 ** 2
W2R = numpy.empty((pad_len * 2,), dtype=numpy.double)
W2R[:pad_len + 1] = int_gain
W2R[pad_len + 1:] = int_gain[pad_len - 1:0:-1]
W2R *= W2
W2 = numpy.fft.ifft(W2)
W2R = numpy.fft.ifft(W2R)
# compute matrices to solve
MtM = numpy.empty((aperture, aperture), dtype=numpy.double)
MtR = numpy.empty((aperture,), dtype=numpy.double)
offset = aperture // 2
for j in range(aperture):
MtR[j] = numpy.real(W2R[j - offset])
for i in range(aperture):
MtM[j, i] = numpy.real(W2[i - j])
# solve using Cholesky decomposition
L = numpy.linalg.cholesky(MtM)
coefs = numpy.linalg.solve(numpy.transpose(L.conjugate()),
numpy.linalg.solve(L, MtR)).astype(pt_float)
# send filter output
fil_frame = Frame()
if self.config['direction'] == 'horizontal':
fil_frame.data = coefs.reshape((1, -1, 1))
else:
fil_frame.data = coefs.reshape((-1, 1, 1))
fil_frame.type = 'fil'
fil_frame.set_audit(self, 'data = OptimumFilterCoefficients()\n',
with_config=self.config)
self.send('filter', fil_frame)
# compute actual response
padded = numpy.zeros(pad_len * 2, dtype=numpy.double)
for j in range(aperture):
padded[j - offset] = coefs[j]
response = numpy.fft.rfft(padded)
# send response output
resp_frame = Frame()
resp_frame.type = 'resp'
if self.config['weight']:
resp_frame.data = numpy.stack(
(int_freq.astype(pt_float),
int_gain.astype(pt_float),
int_wgt_copy.astype(pt_float),
numpy.real(response).astype(pt_float)))
labels = ('normalised frequency', 'ideal gain',
'weight', 'actual gain')
else:
resp_frame.data = numpy.stack(
(int_freq.astype(pt_float),
int_gain.astype(pt_float),
numpy.real(response).astype(pt_float)))
labels = 'normalised frequency', 'ideal gain', 'actual gain'
resp_frame.metadata.set('labels', repr(labels))
resp_frame.set_audit(self, 'data = OptimumFilterResponse()\n',
with_config=self.config)
self.send('response', resp_frame)