Source code for pyctools.components.fft.window

#  Pyctools - a picture processing algorithm development kit.
#  http://github.com/jim-easterbrook/pyctools
#  Copyright (C) 2014-19  Pyctools contributors
#
#  This program is free software: you can redistribute it and/or
#  modify it under the terms of the GNU General Public License as
#  published by the Free Software Foundation, either version 3 of the
#  License, or (at your option) any later version.
#
#  This program is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
#  General Public License for more details.
#
#  You should have received a copy of the GNU General Public License
#  along with this program.  If not, see
#  <http://www.gnu.org/licenses/>.

"""FFT window functions.

Window functions are used with Fourier transforms to give some control
over the trade-off between frequency resolution and "leakage". See
`Wikipedia <http://en.wikipedia.org/wiki/Window_function>`_ for a good
introduction to the subject.

This module contains components to generate several popular window
functions, and one to generate a corresponding "inverse window". This
can be used to reconstruct a signal from its Fourier transform. Note
that overlapping tiles are almost essential to avoid visible tile edge
effects when doing this. The inverse windows compensate for the
attenuation of the original window and "cross fade" from one tile to
the next, overlapping, tile. There is a choice of crossfade function.

For each window function component there is a "core" function. This
generates a single :py:class:`~pyctools.core.frame.Frame` containing
the window data. This is useful if you don't need to change the window
while your network of components is running. For example::

    window = Modulate()
    window.cell(HammingCore(xtile=32, ytile=32))

creates a windowing component that uses a 32x32 Hamming window. Its
``cell`` input does not need to be connected to anything.

.. autosummary::
   :nosignatures:

   Hann
   HannCore
   Hamming
   HammingCore
   Blackman
   BlackmanCore
   Kaiser
   KaiserCore
   InverseWindow

"""

__all__ = ['Window', 'Hann', 'Hamming', 'Blackman', 'Kaiser', 'InverseWindow']
__docformat__ = 'restructuredtext en'

from collections import OrderedDict
import math
import numpy
import sys
if 'sphinx' in sys.modules:
    __all__ += ['HannCore', 'HammingCore', 'BlackmanCore', 'KaiserCore']

from pyctools.core.base import Component
from pyctools.core.config import ConfigBool, ConfigEnum, ConfigFloat, ConfigInt
from pyctools.core.frame import Frame
from pyctools.core.types import pt_float

class WindowBase(Component):
    inputs = []
    with_outframe_pool = False

    def initialise(self):
        self.config['xtile'] = ConfigInt(min_value=1)
        self.config['ytile'] = ConfigInt(min_value=1)
        self.config['sym'] = ConfigBool(True)

    def on_start(self):
        # send first window
        self.make_window()

    def on_set_config(self):
        # send more windows if config changes
        self.make_window()


[docs] class Hann(WindowBase): """Hann window. =========== ==== ==== Config =========== ==== ==== ``xtile`` int Horizontal tile size. ``ytile`` int Vertical tile size. ``sym`` bool When True (default), generates a symmetric window. =========== ==== ==== """ def make_window(self): self.update_config() x_tile = self.config['xtile'] y_tile = self.config['ytile'] sym = self.config['sym'] self.send('output', HannCore(x_tile=x_tile, y_tile=y_tile, sym=sym))
[docs] class Hamming(WindowBase): """Hamming window. =========== ==== ==== Config =========== ==== ==== ``xtile`` int Horizontal tile size. ``ytile`` int Vertical tile size. ``sym`` bool When True (default), generates a symmetric window. =========== ==== ==== """ def make_window(self): self.update_config() x_tile = self.config['xtile'] y_tile = self.config['ytile'] sym = self.config['sym'] self.send('output', HammingCore(x_tile=x_tile, y_tile=y_tile, sym=sym))
[docs] class Blackman(WindowBase): """Blackman window. =========== ===== ==== Config =========== ===== ==== ``xtile`` int Horizontal tile size. ``ytile`` int Vertical tile size. ``sym`` bool When True (default), generates a symmetric window. ``alpha`` float Window control parameter. =========== ===== ==== """ def initialise(self): super(Blackman, self).initialise() self.config['alpha'] = ConfigFloat(value=0.16) def make_window(self): self.update_config() x_tile = self.config['xtile'] y_tile = self.config['ytile'] sym = self.config['sym'] alpha = self.config['alpha'] self.send('output', BlackmanCore( x_tile=x_tile, y_tile=y_tile, sym=sym, alpha=alpha))
[docs] class Kaiser(WindowBase): """Kaiser-Bessel window. See :py:func:`numpy:numpy.kaiser` for more detail. Note that NumPy and SciPy use a control parameter called ``beta``. This is ``alpha * pi``. =========== ===== ==== Config =========== ===== ==== ``xtile`` int Horizontal tile size. ``ytile`` int Vertical tile size. ``sym`` bool When True (default), generates a symmetric window. ``alpha`` float Window control parameter. =========== ===== ==== """ def initialise(self): super(Kaiser, self).initialise() self.config['alpha'] = ConfigFloat(value=0.9) def make_window(self): self.update_config() x_tile = self.config['xtile'] y_tile = self.config['ytile'] sym = self.config['sym'] alpha = self.config['alpha'] self.send('output', KaiserCore( x_tile=x_tile, y_tile=y_tile, sym=sym, alpha=alpha))
def Window2D(name, x_tile, y_tile, sym, function_1D, x_params={}, y_params={}): if x_tile == 1: x_win = numpy.array([1.0], dtype=numpy.float32) elif sym: x_win = function_1D(x_tile, **x_params) else: x_win = function_1D(x_tile + 1, **x_params)[:-1] if y_tile == 1: y_win = numpy.array([1.0], dtype=numpy.float32) elif sym: y_win = function_1D(y_tile, **y_params) else: y_win = function_1D(y_tile + 1, **y_params)[:-1] x_win = x_win.reshape((1, 1, -1, 1)) y_win = y_win.reshape((1, -1, 1, 1)) out_frame = Frame() out_frame.data = x_win * y_win out_frame.type = 'win' audit = out_frame.metadata.get('audit') audit += 'data = %sWindow()\n' % name audit += ' size: %d x %d\n' % (y_tile, x_tile) audit += ' symmetric: %s\n' % (str(sym)) extras = [] for key, value in x_params.items(): extras.append('%s: %s' % (key, str(value))) if extras: audit += ' horiz params: %s\n' % (', '.join(extras)) extras = [] for key, value in y_params.items(): extras.append('%s: %s' % (key, str(value))) if extras: audit += ' vert params: %s\n' % (', '.join(extras)) out_frame.metadata.set('audit', audit) return out_frame def HannCore(x_tile=1, y_tile=1, sym=True): def Hann_1D(tile): result = numpy.ndarray([tile], dtype=numpy.float32) for i in range(tile): result[i] = 0.5 + (0.5 * math.cos( math.pi * float((i * 2) + tile - 1) / float(tile - 1))) return result return Window2D('Hann', x_tile, y_tile, sym, Hann_1D) def HammingCore(x_tile=1, y_tile=1, sym=True): def Hamming_1D(tile): result = numpy.ndarray([tile], dtype=numpy.float32) for i in range(tile): result[i] = 0.53836 + (0.46164 * math.cos( math.pi * float((i * 2) + tile - 1) / float(tile - 1))) return result return Window2D('Hamming', x_tile, y_tile, sym, Hamming_1D) def BlackmanCore(x_tile=1, y_tile=1, sym=True, alpha=0.16): def Blackman_1D(tile, alpha): result = numpy.ndarray([tile], dtype=numpy.float32) a0 = (1.0 - alpha) / 2.0 a1 = -1.0 / 2.0 a2 = alpha / 2.0 for i in range(tile): f = math.pi * float(i * 2) / float(tile - 1) result[i] = a0 + (a1 * math.cos(f)) + (a2 * math.cos(2.0 * f)) return result return Window2D('Blackman', x_tile, y_tile, sym, Blackman_1D, x_params={'alpha' : alpha}, y_params={'alpha' : alpha}) def KaiserCore(x_tile=1, y_tile=1, sym=True, alpha=0.9): def Kaiser_1D(tile, alpha=0.9): return numpy.kaiser(tile, alpha * math.pi) return Window2D('Kaiser', x_tile, y_tile, sym, Kaiser_1D, x_params={'alpha' : alpha}, y_params={'alpha' : alpha}) def _Hann_1D(tile, alpha): return numpy.hanning(tile) def _Hamming_1D(tile, alpha): return numpy.hamming(tile) def _Blackman_1D(tile, alpha): return numpy.blackman(tile) def _Kaiser_1D(tile, alpha): return numpy.kaiser(tile, alpha * math.pi)
[docs] class Window(Component): """General 2-D window. The ``function`` parameter selects one of several widely used 1-D window functions. The ``sym`` parameter makes the window symmetrical or not. Note that a symmetrical window with an even size does not have a central value of unity. ``alpha`` is the control parameter for the ``Kaiser`` window. See :py:func:`numpy:numpy.kaiser` for more detail. Note that NumPy and SciPy use a control parameter called ``beta``, which is ``alpha * pi``. The ``combine2D`` parameter selects how the horizontal and vertical windows are combined to make a 2-D window. (If either dimension has size one it has no effect.) The option names refer to the shape of the contours if you plotted the window. ``square`` means the two windows are simply multiplied together. ``round`` means the window value depends on the normalised distance from the centre of the window, with points further than 0.5 set to zero. ``round2`` is an alternative that is normalised to the diagonal, so no points are further than 0.5. ============= ===== ==== Config ============= ===== ==== ``xtile`` int Horizontal tile size. ``ytile`` int Vertical tile size. ``sym`` bool When True (default), generates a symmetric window. ``combine2D`` str How to combine 1-D windows to make 2-D window. Can be ``square``, ``round`` or ``round2``. ``function`` str Choose the window function. Possible values: {} ``alpha`` float Window control parameter. ============= ===== ==== """ inputs = [] with_outframe_pool = False functions = OrderedDict(( # name function has alpha ('Hann', (_Hann_1D, False)), ('Hamming', (_Hamming_1D, False)), ('Blackman', (_Blackman_1D, True)), ('Kaiser', (_Kaiser_1D, True)), )) __doc__ = __doc__.format(', '.join(["``'" + x + "'``" for x in functions])) def initialise(self): self.config['xtile'] = ConfigInt(min_value=1) self.config['ytile'] = ConfigInt(min_value=1) self.config['sym'] = ConfigBool(True) self.config['combine2D'] = ConfigEnum(choices=('square', 'round', 'round2')) self.config['function'] = ConfigEnum(choices=self.functions.keys()) self.config['alpha'] = ConfigFloat() def on_start(self): # send first window self.make_window() def on_set_config(self): # send more windows if config changes self.make_window() def make_window(self): self.update_config() x_tile = self.config['xtile'] y_tile = self.config['ytile'] sym = self.config['sym'] combine2D = self.config['combine2D'] function = self.config['function'] alpha = self.config['alpha'] function_1D, has_alpha = self.functions[function] if combine2D == 'square' or x_tile == 1 or y_tile == 1: if x_tile == 1: x_win = numpy.array([1.0]) elif sym: x_win = function_1D(x_tile, alpha) else: x_win = function_1D(x_tile + 1, alpha)[:-1] if y_tile == 1: y_win = numpy.array([1.0]) elif sym: y_win = function_1D(y_tile, alpha) else: y_win = function_1D(y_tile + 1, alpha)[:-1] x_win = x_win.reshape((1, 1, -1, 1)) y_win = y_win.reshape((1, -1, 1, 1)) window = x_win * y_win else: xc, yc = x_tile // 2, y_tile // 2 if sym: xc, yc = xc - 0.5, yc - 0.5 func_win = numpy.zeros((2049,), dtype=pt_float) func_win[0:1025] = function_1D(2049, alpha)[1024:] window = numpy.empty((1, y_tile, x_tile, 1), dtype=pt_float) for y in range(y_tile): for x in range(x_tile): window[0, y, x, 0] = math.sqrt((((x - xc) / x_tile) ** 2) + (((y - yc) / y_tile) ** 2)) if combine2D == 'round2': window /= math.sqrt(2.0) window = numpy.interp( window, numpy.linspace(0, 1.0, func_win.shape[0]), func_win) out_frame = Frame() out_frame.data = window.astype(pt_float) out_frame.type = 'win' audit = out_frame.metadata.get('audit') audit += 'data = %sWindow()\n' % function audit += ' size: %d x %d\n' % (y_tile, x_tile) audit += ' symmetric: %s\n' % str(sym) if has_alpha: audit += ' alpha: %g\n' % alpha out_frame.metadata.set('audit', audit) self.send('output', out_frame)
[docs] class InverseWindow(Component): """Generate "inverse" window function with inter-tile cross-fade. The ``fade`` config value determines the transition from one tile to the next, within their area of overlap. ``'switch'`` abruptly cuts from one tile to the next, ``'linear'`` does a cross-fade and ``'minsnr'`` does a weighted cross-fade to minimise signal to noise ratio. ========= === ==== Config ========= === ==== ``xtile`` int Horizontal tile size. ``ytile`` int Vertical tile size. ``xoff`` int Horizontal tile offset. Typically set to xtile / 2. ``yoff`` int Vertical tile offset. Typically set to ytile / 2. ``fade`` str Can be ``'switch'``, ``'linear'`` or ``'minsnr'``. ========= === ==== """ outputs = ['window', 'inv_window'] #: with_outframe_pool = False def initialise(self): self.config['xtile'] = ConfigInt(min_value=1) self.config['ytile'] = ConfigInt(min_value=1) self.config['xoff'] = ConfigInt(min_value=1) self.config['yoff'] = ConfigInt(min_value=1) self.config['fade'] = ConfigEnum(choices=('switch', 'linear', 'minsnr')) self.in_frame = None def on_set_config(self): # send more windows if config changes if self.in_frame: self.make_window() def process_frame(self): self.in_frame = self.input_buffer['input'].get() self.send('window', self.in_frame) self.make_window() def make_window(self): self.update_config() x_tile = self.config['xtile'] y_tile = self.config['ytile'] x_off = self.config['xoff'] y_off = self.config['yoff'] fade = self.config['fade'] # adjust config to suit actual window in_data = self.in_frame.as_numpy(dtype=numpy.float32) if in_data.shape[1] != y_tile: y_off = y_off * in_data.shape[1] // y_tile y_tile = in_data.shape[1] if in_data.shape[2] != x_tile: x_off = x_off * in_data.shape[2] // x_tile x_tile = in_data.shape[2] out_frame = Frame() out_frame.initialise(self.in_frame) audit = out_frame.metadata.get('audit') audit += 'data = InverseWindow(data)\n' audit += ' size: %d x %d, offset: %d x %d\n' % ( y_tile, x_tile, y_off, x_off) audit += ' fade: %s\n' % fade out_frame.metadata.set('audit', audit) result = numpy.empty(in_data.shape, dtype=numpy.float32) x_overlap = x_tile - x_off y_overlap = y_tile - y_off for y in range(y_tile): y0 = y while y0 >= y_off: y0 -= y_off for x in range(x_tile): x0 = x while x0 >= x_off: x0 -= x_off # get window value of this and neighbouring tiles centre = in_data[0, y, x, 0] neighbours = [] for j in range(y0, y_tile, y_off): for i in range(x0, x_tile, x_off): if j == y and i == x: continue neighbours.append(in_data[0, j, i, 0]) if not neighbours: result[0, y, x, 0] = 1.0 / max(centre, 0.000001) elif fade == 'minsnr': result[0, y, x, 0] = centre / ( (centre ** 2) + sum(map(lambda x: x ** 2, neighbours))) elif fade == 'linear': result[0, y, x, 0] = 1.0 / (centre + sum(neighbours)) else: biggest = max(neighbours) if centre > biggest: result[0, y, x, 0] = 1.0 / max(centre, 0.000001) elif centre < biggest: result[0, y, x, 0] = 0.0 else: result[0, y, x, 0] = 0.5 / max(centre, 0.000001) out_frame.data = result self.send('inv_window', out_frame)