Source code for pyctools.components.hardware.sanescanner

# Pyctools - a picture processing algorithm development kit.
# http://github.com/jim-easterbrook/pyctools
# Copyright (C) 2025  Pyctools contributors
#
# This file is part of Pyctools.
#
# Pyctools is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation, either version 3 of the License, or (at your
# option) any later version.
#
# Pyctools is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Pyctools.  If not, see <http://www.gnu.org/licenses/>.

__all__ = []
__docformat__ = 'restructuredtext en'

import os
import sys

if 'sphinx' in sys.modules:
    __all__ += ['SaneScanner']

try:
    import sane
    sane.init()
    sane_types = dict((sane.TYPE_STR[x], x) for x in sane.TYPE_STR)
except ImportError:
    if 'sphinx' in sys.modules:
        sane = None
    else:
        raise

import numpy

from pyctools.core.config import (
    ConfigBool, ConfigEnum, ConfigInt, ConfigIntEnum, ConfigStr)
from pyctools.core.base import Component, InputBuffer
from pyctools.core.frame import Frame


class ScannerFactory(object):
    """Ensure scanner is only connected once.

    """

    scanners = {}

    def get_dev(self, name):
        if name not in self.scanners:
            self.scanners[name] = sane.open(name)
        return self.scanners[name]

scanner_factory = ScannerFactory()


[docs] class SaneScanner(Component): """Base class for scanners using the Linux SANE_ scanner library. This component requires the `python-sane`_ package. Different scanners have different configuration options, so the config documentation is generated dynamically when a scanner class derived from :py:class:`SaneScanner` is instantiated. A derived class for each connected scanner make and model is defined when the :py:mod:`~.sanescanner` module is imported. The component has two outputs. ``preview`` is used when the scanner's ``preview`` config setting is selected. The scanned image is streamed continually, allowing downstream processes such as gamma correction to be adjusted for best image quality. The main ``output`` is used to send a single scanned image when the ``preview`` config setting is not selected. This would typically be connected to a similar processing pipeline but ending in an image file writer. .. _python-sane: https://python-sane.readthedocs.io/ .. _SANE: http://www.sane-project.org/ """ inputs = [] outputs = ['output', 'preview'] #: def initialise(self): # create a dummy input to enable process_frame when scan is complete self.input_buffer['scan_done'] = InputBuffer(self.new_frame) # connect to scanner self.dev = scanner_factory.get_dev(self._name) # add config to docstring self.__doc__ += self.config_doc() # initialise config for name in self.dev.optlist: if not name: continue option = self.dev[name] if option.type in ( sane_types['TYPE_FIXED'], sane_types['TYPE_BUTTON'], sane_types['TYPE_GROUP']) or not option.is_settable(): continue kw = {'has_default': False, 'enabled': option.is_active()} if option.type == sane_types['TYPE_BOOL']: config_type = ConfigBool elif option.type == sane_types['TYPE_INT']: config_type = ConfigInt else: config_type = ConfigStr if isinstance(option.constraint, list): if config_type == ConfigStr: config_type = ConfigEnum else: config_type = ConfigIntEnum kw['choices'] = option.constraint elif option.constraint: kw['min_value'] = option.constraint[0] kw['max_value'] = option.constraint[1] if kw['enabled']: kw['value'] = getattr(self.dev, name) self.config[name] = config_type(**kw) def config_doc(self): # convert scanner options to config doc string result = '.. list-table:: Config\n\n' for name in self.dev.optlist: if not name: continue option = self.dev[name] if option.type in ( sane_types['TYPE_FIXED'], sane_types['TYPE_BUTTON'], sane_types['TYPE_GROUP']) or not option.is_settable(): continue result += f' * - ``{name}``\n' if option.type == sane_types['TYPE_BOOL']: result += f' - bool\n' elif option.type == sane_types['TYPE_INT']: result += f' - int\n' else: result += f' - str\n' result += f' - {option.desc}\n' result += '\n' return result def on_set_config(self): self.update_config() for name, value in self.config.items(): if name not in self.dev.optlist: continue option = self.dev[name] value.enabled = option.is_active() if not value.enabled: continue try: setattr(self.dev, name, value) except Exception as ex: print('Cannot set "{}" to "{}": {}'.format( name, value, str(ex))) self._shadow_config = None def on_start(self): self.on_set_config() try: self.dev.start() image = self.dev.arr_snap(progress=self.progress) except Exception as ex: # probably scan was cancelled print(str(ex)) self.stop() return # scale data if image.dtype == numpy.uint8: pass elif image.dtype == numpy.uint16: image = image.astype(numpy.float32) / numpy.float32(2 ** 8) else: self.logger.error('Cannot handle %s data type', str(image.dtype)) self.stop() return # create a Pyctools Frame frame = Frame() if image.shape[2] == 3: frame.type = 'RGB' elif image.shape[2] == 1: frame.type = 'Y' else: frame.type = '???' # send frame to dummy input frame.data = image frame.frame_no = 0 frame.set_audit(self, 'data = scan\n', with_config=self.config) self.input_buffer['scan_done'].input(frame) def progress(self, line_no, max_lines): # hacky way to see if user has requested stop if None in self._event_loop.incoming: self.dev.cancel() def process_frame(self): preview = self.config['preview'] output_name = self.outputs[preview] in_frame = self.input_buffer['scan_done'].get() out_frame = self.outframe_pool[output_name].get() out_frame.initialise(in_frame) self.send(output_name, out_frame) if preview: # stream frame to preview output in_frame.frame_no += 1 self.input_buffer['scan_done'].input(in_frame) else: # shut down pipeline after sending one main frame self.stop()
# create class for each connected scanner if sane: for (_name, _vendor, _model, _type) in sane.get_devices(localOnly=True): _class_name = 'Sane_{vendor}_{model}'.format( vendor = _vendor.title().replace(' ', ''), model = _model.title().replace(' ', '') ) _class = type(_class_name, (SaneScanner,), { '__doc__': f'{_vendor} {_model} scanner.\n\n', '_name': _name}) setattr(sys.modules[__name__], _class_name, _class) __all__.append(_class_name)