# Pyctools - a picture processing algorithm development kit.
# http://github.com/jim-easterbrook/pyctools
# Copyright (C) 2020 Pyctools contributors
#
# This program is free software: you can redistribute it and/or
# modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see
# <http://www.gnu.org/licenses/>.
__all__ = ['VideoFileWriter2']
from contextlib import contextmanager
import os
import subprocess
import numpy
from pyctools.core.config import ConfigEnum, ConfigFloat, ConfigPath
from pyctools.core.frame import Frame, Metadata
from pyctools.core.base import Component
from pyctools.core.types import pt_float
[docs]
class VideoFileWriter2(Component):
"""Write video files, including "raw" format.
This component uses FFmpeg_ to write video to a variety of formats.
Make sure you have installed FFmpeg before attempting to use
:py:class:`VideoFileWriter2`.
There are two configuration settings that control the saved file
format. ``codec`` chooses a codec "family" such as ``'FFV1'`` and
``pix_fmt`` chooses the data layout before compression. The
"container" format, such as ``AVI`` or ``MOV``, is inferred from the
file name extension. Not all codecs will work with all pixel
formats, and FFmpeg supports many more pixel formats (and codecs)
that are available in :py:class:`VideoFileWriter2`. Let me know if
you have any particular requirements that are not already included.
The ``raw`` and ``ffv1`` codecs are lossless, but look out for
unwanted RGB<->YUV conversion or UV resampling. ``H264`` always
converts to YUV, ``H264rgb`` always converts to RGB.
The ``input_Y_RGB`` input accepts images with 1 or 3 components as Y
or RGB. The ``input_UV`` input accepts UV images with 2 components.
The ``input`` config specifies the expected inputs.
=========== === ====
Config
=========== === ====
``path`` str Path name of file to be written.
``input`` str The input video format. Can be ``'RGB'``, ``'YUV'``, or ``'Y'``.
``codec`` str Codec name. Possible values: {}.
``pix_fmt`` str Pixel format. Possible values: {}.
``fps`` int Video frame rate. Only affects how file is replayed.
=========== === ====
.. _FFmpeg: https://www.ffmpeg.org/
.. _fourcc: https://www.fourcc.org/
"""
with_outframe_pool = False
inputs = ['input_Y_RGB', 'input_UV'] #:
outputs = []
fourcc = {
'gray' : 'Y8',
'gray16le': 'Y16',
'rgb24' : 'BGR[24]',
'uyvy422' : 'UYVY',
'yuv422p' : 'YV16',
}
codecs = {
'raw' : ['-c:v', 'rawvideo', '-f', 'rawvideo'],
'FFV1' : ['-c:v', 'ffv1'],
'H264' : ['-c:v', 'libx264', '-qp', '0'],
'H264rgb': ['-c:v', 'libx264rgb', '-qp', '0'],
}
pix_fmts = ('rgb24', 'rgb48le', 'uyvy422', 'yuv422p', 'yuv422p10le',
'gray', 'gray16le')
__doc__ = __doc__.format(
', '.join(["``'" + x + "'``"for x in codecs.keys()]),
', '.join(["``'" + x + "'``"for x in pix_fmts]))
def initialise(self):
self.config['path'] = ConfigPath(exists=False)
self.config['input'] = ConfigEnum(choices=('RGB', 'YUV', 'Y'))
self.config['codec'] = ConfigEnum(choices=(self.codecs.keys()))
self.config['pix_fmt'] = ConfigEnum(choices=self.pix_fmts)
self.config['fps'] = ConfigFloat(value=25, min_value=1, decimals=2)
def on_start(self):
# start generator to write data
self.generator = self.file_writer()
next(self.generator)
def process_frame(self):
Y_frame = self.input_buffer['input_Y_RGB'].get()
UV_frame = self.input_buffer['input_UV'].peek()
if UV_frame.frame_no >= 0:
UV_frame = self.input_buffer['input_UV'].get()
else:
UV_frame = None
# send frame(s) to generator
self.generator.send((Y_frame, UV_frame))
@contextmanager
def subprocess(self, *arg, **kw):
sp = None
try:
sp = subprocess.Popen(*arg, **kw)
yield sp
finally:
if sp:
if sp.stdin:
sp.stdin.flush()
for pipe in sp.stdin, sp.stdout, sp.stderr:
if pipe:
pipe.close()
sp.wait()
if sp.returncode:
self.logger.critical('FFmpeg return code %d', sp.returncode)
def file_writer(self):
self.update_config()
path = self.config['path']
input_ = self.config['input']
codec = self.config['codec']
out_fmt = self.config['pix_fmt']
fps = self.config['fps']
# if no UV input expected, create a dummy "static" frame
if input_ != 'YUV':
self.input_UV(Frame())
# get first frame
Y_frame, UV_frame = yield True
Y_data = Y_frame.as_numpy()
# check number of components
ylen, xlen, comps = Y_data.shape
if UV_frame:
if comps != 1:
self.logger.critical('Y input has %d components', comps)
return
UV_data = UV_frame.as_numpy()
UV_ylen, UV_xlen, UV_comps = UV_data.shape
if UV_comps != 2:
self.logger.critical('UV input has %d components', UV_comps)
return
elif comps not in (1, 3):
self.logger.critical('Y/RGB input has %d components', comps)
return
# choose format to send data to FFmpeg
bit16 = True
if UV_frame:
ss_x = xlen // UV_xlen
ss_y = ylen // UV_xlen
if ss_x == 1 and ss_y == 1:
if out_fmt in ('yuv444p', 'gray'):
in_fmt = 'yuv444p'
bit16 = False
else:
in_fmt = 'yuv444p16le'
elif ss_x == 2 and ss_y == 1:
if out_fmt in ('uyvy422', 'yuv422p', 'gray'):
in_fmt = 'yuv422p'
bit16 = False
else:
in_fmt = 'yuv422p16le'
elif ss_x == 2 and ss_y == 2:
if out_fmt in ('yuv420p', 'gray'):
in_fmt = 'yuv420p'
bit16 = False
else:
in_fmt = 'yuv420p16le'
elif comps == 3:
if out_fmt in ('rgb24', ):
bit16 = False
in_fmt = 'bgr24'
else:
in_fmt = 'bgr48le'
else:
if out_fmt in ('gray', ):
bit16 = False
in_fmt = 'gray'
else:
in_fmt = 'gray16le'
if in_fmt != out_fmt:
self.logger.warning(
'Converting "%s" to "%s" in FFmpeg', in_fmt, out_fmt)
# save metadata
metadata = Metadata().copy(Y_frame.metadata)
if codec == 'raw':
# store what's needed to read file
if out_fmt not in self.fourcc:
self.logger.critical('Cannot store "%s" in a raw file', out_fmt)
return
metadata.set('fourcc', self.fourcc[out_fmt])
metadata.set('xlen', str(xlen))
metadata.set('ylen', str(ylen))
else:
metadata.set('fourcc', None)
metadata.set('xlen', None)
metadata.set('ylen', None)
if UV_frame:
metadata.merge_audit({'Y': Y_frame, 'UV': UV_frame})
in_name = 'multiplex(Y, UV)'
else:
in_name = 'data'
metadata.set_audit(
self, '{} = {}\n FFmpeg: {} -> {}\n'.format(
os.path.basename(path), in_name, in_fmt, out_fmt),
with_date=True, with_config=self.config)
metadata.to_file(path)
# save data
cmd = ['ffmpeg', '-v', 'error', '-y', '-an',
'-f', 'rawvideo', '-s', '{}x{}'.format(xlen, ylen),
'-r', '{}'.format(fps), '-pix_fmt', in_fmt, '-i', '-']
cmd += self.codecs[codec]
cmd += ['-r', '{}'.format(fps), '-pix_fmt', out_fmt, path]
with self.subprocess(cmd, stdin=subprocess.PIPE) as sp:
while True:
if bit16:
Y_data = Y_data.astype(pt_float) * pt_float(256.0)
Y_data = Y_data.clip(
pt_float(0), pt_float(2**16 - 1)).astype('<u2')
else:
Y_data = Y_frame.as_numpy(dtype=numpy.uint8)
sp.stdin.write(Y_data.tobytes())
if in_fmt.startswith('yuv'):
UV_data = UV_frame.as_numpy(dtype=pt_float)
if UV_data.shape != (UV_ylen, UV_xlen, UV_comps):
self.logger.critical('UV dimensions changed')
return
# add offset to make unsigned byte data
UV_data = UV_data + pt_float(128.0)
if bit16:
UV_data = UV_data * pt_float(256.0)
UV_data = UV_data.clip(
pt_float(0), pt_float(2**16 - 1)).astype('<u2')
else:
UV_data = UV_data.clip(
pt_float(0), pt_float(255)).astype(numpy.uint8)
sp.stdin.write(UV_data[:,:,0].tobytes())
sp.stdin.write(UV_data[:,:,1].tobytes())
# get next frame
Y_frame, UV_frame = yield True
Y_data = Y_frame.as_numpy()
if Y_data.shape != (ylen, xlen, comps):
self.logger.critical('Y/RGB dimensions changed')
return