Source code for barrista.design

# -*- coding: utf-8 -*-
# pylint: disable=F0401, E0611, C0103, R0902, R0913, R0914, E1101
# pylint: disable=wrong-import-order
"""
This module contains classes and functions to design networks with caffe.

For the layer constructing functions, ``None`` is used to mark a parameter
as unset (this is important in protobuf)!
"""

import copy as _copy
import inspect as _inspect
import logging as _logging
import subprocess as _subprocess
import itertools as _itertools
from tempfile import NamedTemporaryFile as _NamedTemporaryFile
import os as _os
from operator import itemgetter as _itemgetter
import google.protobuf.text_format as _gprototext

# CAREFUL! This must be imported before any caffe-related import!
from .initialization import init as _init
from .config import LAYER_TYPES as _LAYER_TYPES
from .config import CAFFE_BIN_FOLDER as _CAFFE_BIN_FOLDER
from .tools import chunks as _chunks, pbufToPyEnum as _pbufToPyEnum

import caffe.proto.caffe_pb2 as _caffe_pb2
try:  # pragma: no cover
    import caffe.draw as _draw
    import cv2 as _cv2
except ImportError:  # pragma: no cover
    _draw = None
    _cv2 = None
from .net import Net as _Net


#: Expose all detailed properties from the caffe prototxt for
#: in-Python usage.
PROTODETAIL = _caffe_pb2

#: Whether this caffe version supports the `propagate_down` layer property.
_HAS_PROPAGATE_DOWN = hasattr(_caffe_pb2.LayerParameter,
                              'propagate_down')
#: Whether this caffe version supports 'BlobShape'.
_HAS_BLOB_SHAPE = hasattr(_caffe_pb2, 'BlobShape')

_init()
_LOGGER = _logging.getLogger(__name__)
if _draw is None:  # pragma: no cover
    _LOGGER.warn('Could not import caffe.draw or cv2! Drawing is not available. ' +
                 'Probably this is due to the package pydot not being ' +
                 'available.')


#: Describes the phase a network is in (TRAIN or TEST). It may only be set
#: when instantiating the network. If phase is TEST, some memory optimizations
#: are done, that prohibit training the network. The other way around, it is
#: no problem to test a network in TRAIN mode, but it will be less efficient.
Phase = _pbufToPyEnum(_caffe_pb2.Phase)


[docs]class NetSpecification(object): r""" Represents a network specification. Initializes the fields with similar semantics as the prototxt files. An important difference is, that if ``predict_inputs`` and ``predict_input_shapes`` are specified, this corresponds to two virtual network specifications. One, where the stage is set as specified by the ``stages`` parameter, and one where the stage is set to 'predict'. Both networks share the same weights. When instantiated, the 'predict' network will automatically be used by the :py:func:`barrista.net.Net.predict` method. .. graph:: network_stages_and_phases subgraph cluster_fit { phase_train_fit [label="phase: train"]; phase_test_fit [label="phase: test"]; label="stage: fit"; graph[style=dotted]; } subgraph cluster_predict { phase_test_predict [label="phase: test"]; label="stage: predict"; graph[style=dotted]; } To get the plain specification of the network in 'predict' mode, use the method :py:func:`barrista.design.NetSpecification.get_predict_net_specification`. :param input_shape: list(list(int)). The input shape specification of the network. The length of each of the sub-lists must be > 0. :param inputs: list(string) or None. The names of the network inputs. The length must match the length of the ``input_shape`` list. If ``None`` is used, this is initialized as ``['data']``. :param layers: list(:py:class:`barrista.design.LayerSpecification`) or None. The layer specifications. If None is used, this is initialized as ``[]``. :param force_backward: bool. Whether to force a backward pass for all layers only useful during training. :param phase: :py:data:`barrista.design.Phase` or None. The phase to have the network in. If it is None, it is set to either ``TRAIN`` if ``predict_inputs`` and ``predict_inputs_shapes`` is given, or ``TEST`` otherwise. Default: None. :param level: int. The level of the network. Can be used to in- or exclude layers depending on the level. :param stages: list(string) or None. The stages the network is in. Can be used to in- or exclude layers depending on the stages. By default, 'fit' and 'predict' stages are used, as mentioned before. If ``None`` is specified, this is initialized as ``['fit']``. :param debug_info: bool. If set to True, give additional debug output on the console. :param name: string. The name of the network. :param predict_inputs: list(string) or None. If set, will be used as ``inputs`` for a network with the same specification, except ``stages=['predict']``. This will then automatically used after instantiation for predicting inputs. :param predict_input_shapes: list(list(int)) or None. If set, will be used together with ``predict_inputs`` as stated above. """ def __init__(self, input_shape, inputs=None, layers=None, force_backward=False, phase=None, level=0, stages=None, debug_info=False, name='DCNN', predict_inputs=None, predict_input_shapes=None): """See class documentation.""" if inputs is None: inputs = ['data'] if layers is None: layers = [] if phase is None: if predict_inputs is not None and predict_input_shapes is not None: phase = Phase.TRAIN else: phase = Phase.TEST if stages is None: stages = ['fit'] assert len(inputs) == len(input_shape) for shapelist in input_shape: assert len(shapelist) > 0 if predict_inputs is not None or predict_input_shapes is not None: assert predict_inputs is not None assert predict_input_shapes is not None assert len(predict_inputs) == len(predict_input_shapes) for shape in predict_input_shapes: assert len(shape) > 0 and len(shape) < 5 self.input_shape = input_shape self.inputs = inputs self.layers = layers self.force_backward = force_backward self.phase = phase self.level = level self.stages = stages self.debug_info = debug_info self.name = name self.predict_inputs = predict_inputs self.predict_input_shapes = predict_input_shapes
[docs] def get_predict_net_specification(self): """Get the plain network specification with ``stages=['predict']``.""" assert (self.predict_inputs is not None and self.predict_input_shapes is not None) return NetSpecification(self.predict_input_shapes, self.predict_inputs, self.layers, False, Phase.TEST, self.level, ['predict'], self.debug_info, self.name)
[docs] def to_pbuf_message(self, drop_phase=False): r""" Create a plain protobuf message from this object. Since this object is not derived from the protobuf objects, it is necessary to have this converter. Deriving is discouraged by the protobuf documentation and this object offers a lot more functionality. :param drop_phase: Bool. If set to ``True``, phase, level and stage are not serialized. Default: False. """ pblayers = [] for idx, layer in enumerate(self.layers): if idx == 0: pblayers.append(layer.to_pbuf_message(idx, None, self.inputs)) else: pblayers.append(layer.to_pbuf_message(idx, self.layers[idx-1], self.inputs)) if drop_phase: pbstate = _caffe_pb2.NetState() else: pbstate = _caffe_pb2.NetState(phase=self.phase, level=self.level, stage=self.stages) if _HAS_BLOB_SHAPE: pbinput_shape = [_caffe_pb2.BlobShape(dim=dims) for dims in self.input_shape] netmessage = _caffe_pb2.NetParameter(name=self.name, input=self.inputs, input_shape=pbinput_shape, layer=pblayers, force_backward=self.force_backward, # noqa state=pbstate, debug_info=self.debug_info) else: # pragma: no cover shapes_to_use = [] for shape in self.input_shape: if len(shape) != 4: shape += [1] * (4 - len(shape)) shapes_to_use.append(shape) netmessage = _caffe_pb2.NetParameter(name=self.name, input=self.inputs, input_dim=_itertools.chain(*self.input_shape), layer=pblayers, force_backward=self.force_backward, # noqa state=pbstate, debug_info=self.debug_info) assert netmessage.IsInitialized() return netmessage
[docs] def to_prototxt(self, output_filename=None, drop_phase=False): r""" Create a plain, human readable, prototxt representation. If ``output_filename`` is set, the resulting text is written into that file, as well as returned. Otherwise, only the text is returned. Layers will be automatically wired together, if their ``bottoms`` or ``tops`` are not set. If the have multiple inputs or outputs, you will have to take care of that (there is no way of inferring the semantics then). :param output_filename: String or None. A file to write the prototxt information to. :param drop_phase: Bool. If set to ``True``, phase, level and stage will not be serialized. Default: False. """ messagestr = _gprototext.MessageToString(self.to_pbuf_message(drop_phase=drop_phase)) if output_filename is not None: with open(output_filename, 'w') as outf: outf.write(messagestr) return messagestr
@staticmethod
[docs] def from_prototxt(text=None, filename=None): r""" Create an :py:class:`NetSpecification` object from a text spec. Either ``text`` or ``filename`` may be set, and is accordingly used. Files may be of any caffe prototxt version. """ # Check if the user erroneously specified a filename as text. if text is not None: if _os.linesep not in text: if _os.path.exists(text): # pragma: no cover _LOGGER.warn('You probably mistakenly specified a filename ' 'as text: "%s"! Trying to recover...', text) filename = text text = None if filename is not None: assert text is None # Do a conversion if necessary. with _NamedTemporaryFile(mode='r', suffix='.prototxt') as tmpfile: net_upgrader_exec = _os.path.join(_CAFFE_BIN_FOLDER, 'upgrade_net_proto_text') assert _os.path.exists(net_upgrader_exec),\ ("The executable 'upgrade_net_proto_text' was not found " "in your _CAFFE_BIN_FOLDER! Please set it from the " "module `barrista.config`. The current folder is set " "to: " + _CAFFE_BIN_FOLDER + ".") _subprocess.check_call([net_upgrader_exec, filename, tmpfile.name]) text = tmpfile.read() message = _caffe_pb2.NetParameter() _gprototext.Merge(text, message) # Check for completeness of the parsing process. fields = message.ListFields() for fielddesc in map(_itemgetter(0), fields): # pylint: disable=W0141 if fielddesc.name not in ['name', 'input_shape', 'debug_info', 'input', 'input_dim', 'layer', 'force_backward', 'state']: _LOGGER.warn('Parsed net prototxt contained unknown field ' + fielddesc.name + '. Ignored.') if len(message.input_dim) > 0: _LOGGER.warn('The loaded prototxt contains `input_dim` fields. ' 'They are deprecated! Use `input_shape` instead.') if _HAS_BLOB_SHAPE: assert len(message.input_shape) == 0 assert len(message.input_dim) % 4 == 0 input_shape = _copy.deepcopy(list(_chunks(message.input_dim, 4))) else: # pragma: no cover input_shape = _copy.deepcopy([bshape.dim for bshape in message.input_shape]) inputs = _copy.deepcopy(message.input) layerspecs = [LayerSpecification.from_pbuf_message(layer) for layer in message.layer] pbforcebw = message.force_backward phase = message.state.phase level = message.state.level stages = _copy.deepcopy(message.state.stage) debug_info = message.debug_info name = message.name spec = NetSpecification(input_shape, inputs, layerspecs, pbforcebw, phase, level, stages, debug_info, name) return spec
[docs] def copy(self): r""" Create a deep copy of this object. All layers are deep copied, so modifying layer objects of the old object does not influence the copied one! """ return _copy.deepcopy(self)
[docs] def instantiate(self): """Create an instantiated net with the current object configuration.""" # Write spec to a temporary file. with _NamedTemporaryFile(mode='w', suffix='.prototxt') as tmpfile: tmpfile.write(self.to_prototxt()) tmpfile.flush() _specification = self.copy() net = _Net(tmpfile.name, mode=self.phase, specification=_specification) return net
[docs] def visualize(self, layout_dir='LR', display=False): """ Create and optionally display an image of the net structure. :param layout_dir: string in ['LR', 'TB', 'BT']. Short string for graph layout direction. :param display: bool. If set to ``True``, displays the graphic in a window. Press enter to close it. :returns: 3D numpy array. Graphic of the visualization as (H, W, C) image in BGR format. """ if _draw is None or _cv2 is None: # pragma: no cover raise Exception('Drawing is not available!') else: # pragma: no cover with _NamedTemporaryFile(mode='w+b', suffix='.png') as tmpfile: _draw.draw_net_to_file(self.to_pbuf_message(), tmpfile.name, rankdir=layout_dir) result_image = _cv2.imread(tmpfile.name) assert result_image is not None if display: # pragma: no cover _cv2.imshow(self.name, result_image) _cv2.waitKey(0) _cv2.destroyWindow(self.name) return result_image
[docs]class LayerSpecification(object): r""" Describes one caffe layer. :param bottoms: list(string) or None. If set, specifies the inputs. If unset, will be automatically wired to the preceeding layer ``top[0]``. :param tops: list(string) or None. If set, specifies the top names of this layer. If unset, will be set to the layer name. :param name: string or None. If set, gives the name of the layer. Otherwise, use ``_layer_{idx}``, where ``idx`` specifies the layer index. :param phase: int or None. If set, specifies the layer phase. :param include_phase: :py:data:`barrista.design.Phase` or None. Only include this layer in the given phase. :param include_stages: string or None. Only include this layer if *all* stages are present. :param include_min_level: int or None. Only include this layer, if the network level is >= this value. :param include_max_level: int or None. Only include this layer, if the network level is <= this value. :param params: list(:py:data:`barrista.design.ParamSpec`) or None. Multipliers for learning rate and weight decay for the layer parameters. :param propagate_down: list(bool) or None. Specifies on which bottoms the backpropagation should be skipped. Must be either 0 or equal to the number of bottoms. If ``None`` is specified, this is initialized as ``[]``. Not available in all caffe versions! :param loss_param: :py:data:`barrista.design.LossParameter` or None. Specifies optional ignore labels and normalization for the loss. :param loss_weights: list(float) or None. The amount of weight to assign each top blob in the objective. If ``None`` is specified, this is initialized as ``[]``. """ def __init__(self, bottoms=None, tops=None, name=None, phase=None, include_phase=None, include_stages=None, include_min_level=None, include_max_level=None, params=None, propagate_down=None, loss_param=None, loss_weights=None): """See class documentation.""" self.bottoms = bottoms self.tops = tops self.name = name self.phase = phase self.include_phase = include_phase self.include_stages = include_stages self.include_min_level = include_min_level self.include_max_level = include_max_level self.params = params if propagate_down is None: propagate_down = [] else: if not _HAS_PROPAGATE_DOWN: # pragma: no cover raise Exception("This caffe version does not support the " "`propagate_down` layer property!") self.propagate_down = propagate_down self.loss_param = loss_param if loss_weights is None: loss_weights = [] self.loss_weights = loss_weights self.type = None self._additional_parameters = [] def __eq__(self, other): """Deep equality comparison for all properties.""" for attrname in ['bottoms', 'tops', 'name', 'phase', 'include_phase', 'include_stages', 'include_min_level', 'include_max_level', 'params', 'propagate_down', 'loss_param', 'loss_weights', 'type', '_additional_parameters']: if not getattr(self, attrname) == getattr(other, attrname): return False return True @staticmethod
[docs] def from_pbuf_message(message): r"""Create a LayerSpecification object from a protobuf message.""" bottoms = _copy.deepcopy(message.bottom) tops = _copy.deepcopy(message.top) name = _copy.deepcopy(message.name) include_phase = None include_stages = None include_min_level = None include_max_level = None if len(message.include) > 0: msg0fields = message.include[0].ListFields() fieldnames = [fld[0].name for fld in msg0fields] include_phase = None if 'phase' in fieldnames: include_phase = _copy.deepcopy(message.include[0].phase) if 'stage' in fieldnames: include_stages = _copy.deepcopy(message.include[0].stage) if 'min_level' in fieldnames: include_min_level = _copy.deepcopy(message.include[0].min_level) if 'max_level' in fieldnames: include_max_level = _copy.deepcopy(message.include[0].max_level) if len(message.include) > 1: _LOGGER.warn('Layer %s has include specifications that can ' 'not be modeled in this tool! Ignoring.', name) fields = message.ListFields() fieldnames = [fld[0].name for fld in fields] phase = None if 'phase' not in fieldnames else _copy.deepcopy( message.phase) params = None if 'param' not in fieldnames else _copy.deepcopy( message.param) if _HAS_PROPAGATE_DOWN: propagate_down = _copy.deepcopy(message.propagate_down) else: # pragma: no cover propagate_down = None loss_param = (None if 'loss_param' not in fieldnames else _copy.deepcopy(message.loss_param)) loss_weights = _copy.deepcopy(message.loss_weight) spec = LayerSpecification(bottoms, tops, name, phase, include_phase, include_stages, include_min_level, include_max_level, params, propagate_down, loss_param, loss_weights) # Get type and type-dependent parameters. spec.type = message.type fields = message.ListFields() for fielddesc, fieldval in fields: if fielddesc.name not in ['name', 'bottom', 'type', 'top', 'phase', 'include', 'param', 'propagate_down', 'loss_param', 'loss_weight']: setattr(spec, fielddesc.name, _copy.deepcopy(fieldval)) # pylint: disable=W0212 spec._additional_parameters.append(fielddesc.name) return spec
[docs] def to_pbuf_message(self, # pylint: disable=R0912, R0915 layerindex, preceeding_layer, net_input): r""" Create a protobuf specification of this layer. It automatically wires together preceeding and following layers, if ``tops`` or ``bottoms`` are not set. This does not work with multiple in- or outputs. :param layerindex: int >= 0. The index of this layer. Is used to generate the layer name. :param preceeding_layer: :class:`barrista.design.LayerSpecification`. The preceeding layer to create the wiring with. :param net_input: string. The name of the network input (used for the first layer input). """ assert layerindex >= 0 assert self.type is not None if layerindex != 0: assert preceeding_layer is not None # Prepare the arguments. kwargs = dict() # Bottom. pbbottom = [] omit_bottom = False if self.bottoms is None: if preceeding_layer is None: if net_input is not None and len(net_input) > 0: pbbottom = [net_input[0]] else: omit_bottom = True else: if preceeding_layer.tops is None: if preceeding_layer.name is None: pbbottom = ['_layer_{0}'.format(layerindex - 1)] else: pbbottom = [preceeding_layer.name] else: pbbottom = [preceeding_layer.tops[0]] else: preplist = self.bottoms[:] mapidx = 0 for btidx, btname in enumerate(preplist): if btname is None: if preceeding_layer.tops is not None: preplist[btidx] = preceeding_layer.tops[mapidx] else: preplist[btidx] = '_layer_{0}'.format(layerindex - 1) mapidx += 1 pbbottom = preplist if not omit_bottom: kwargs['bottom'] = pbbottom # Top. pbtop = [] if self.tops is None: if self.name is None: pbtop = ['_layer_{0}'.format(layerindex)] else: pbtop = [self.name] else: pbtop = self.tops kwargs['top'] = pbtop # Name. pbname = self.name if pbname is None: pbname = '_layer_{0}'.format(layerindex) kwargs['name'] = pbname if self.phase is not None: kwargs['phase'] = self.phase # include. include_kwargs = dict() if self.include_phase is not None: include_kwargs['phase'] = self.include_phase if self.include_stages is not None and len(self.include_stages) > 0: include_kwargs['stage'] = self.include_stages if self.include_min_level is not None: include_kwargs['min_level'] = self.include_min_level if self.include_max_level is not None: include_kwargs['max_level'] = self.include_max_level if len(include_kwargs) > 0: kwargs['include'] = [_caffe_pb2.NetStateRule(**include_kwargs)] kwargs['type'] = self.type if self.params is not None: kwargs['param'] = self.params if _HAS_PROPAGATE_DOWN: kwargs['propagate_down'] = self.propagate_down if self.loss_param is not None: kwargs['loss_param'] = self.loss_param kwargs['loss_weight'] = self.loss_weights for add_pname in self._additional_parameters: kwargs[add_pname] = getattr(self, add_pname) layerMessage = _caffe_pb2.LayerParameter(**kwargs) assert layerMessage.IsInitialized() return layerMessage # Generate the layers.
for _layerkey in list(_LAYER_TYPES.keys()): # Construct the layer constructor. _parameters = [] # Get the parameters for the standard layer function. _layerArgSpec = _inspect.getargspec(LayerSpecification.__init__) _without_defaults = len(_layerArgSpec.args) - len(_layerArgSpec.defaults) for _idx in range(1, len(_layerArgSpec.args)): _parameters.append((_layerArgSpec.args[_idx], _idx >= _without_defaults, 'LayerSpecification', _layerArgSpec.args[_idx])) # Get the layer specific parameters by inspecting the protobuf objects. _layer_error = False for _param_obj in _LAYER_TYPES[_layerkey]: assert _param_obj.endswith('Parameter'),\ ('Only add the ...Parameter ' 'objects as layer parameters. Their name must end with ' '"Parameter". All other parameters concerning the layer ' 'must be explicitely coded.') _obj_type = None try: exec('_obj_type = _caffe_pb2.' + _param_obj) # pylint: disable=W0122 except AttributeError as nfe: # pragma: no cover print(("[WARNING] Parameter {} not found in caffe proto " "configuration of {}! Adjust your barrista.config! " "Skipping layer!").format(_param_obj, _layerkey)) _layer_error = True break # This is guaranteed to work, because the additional parameter objects # for caffe MUST only have optional parameters. exec('_obj_instance = _obj_type()') # pylint: disable=W0122 for _fieldname in list(_obj_type.__dict__.keys()): if isinstance(_obj_type.__dict__[_fieldname], property): _parameters.append((_param_obj[:-9] + '_' + _fieldname, True, _param_obj, _fieldname)) if _layer_error: # pragma: no cover continue # Analyzed the layers. Generating layer objects... _layer_prefix = _layerkey _func_spec = 'def {_layer_prefix}Layer('.format(**locals()) for _ptpl in _parameters: _func_spec += _ptpl[0] if _ptpl[1]: _func_spec += '=None' # '={0}().{1}'.format(_ptpl[2], _ptpl[3]) _func_spec += ',' _func_spec = _func_spec[:-1] _func_spec += '):' + _os.linesep _func_spec += ' _ret_obj = LayerSpecification({0})'.format( ', '.join(_layerArgSpec.args[1:])) + _os.linesep for _param_obj in _LAYER_TYPES[_layerkey]: _obj_type = None exec('_obj_type = _caffe_pb2.' + _param_obj) # pylint: disable=W0122 # identify the property name for this parameter. _detected = False _tmp_layer_param = _caffe_pb2.LayerParameter() for _propname in list(_caffe_pb2.LayerParameter.__dict__.keys()): if isinstance(getattr(_tmp_layer_param, _propname), _obj_type): _detected = True break assert _detected, ('The parameter name of the layer property {0} ' + 'could not be found!').format(_param_obj) _func_spec += r''' _ret_obj._additional_parameters.append("{_propname}") {_propname}_dummy = _caffe_pb2.{_param_obj}() {_propname}_kwargs = dict()'''.format(**locals()) + _os.linesep for _ptpl in _parameters: if _ptpl[2] == _param_obj: # Stay compatible with older caffe version where some fields # were scalars that are now repeated fields. _func_spec += (r''' if {_ptpl[0]} is not None: try: _ = {_ptpl[0]} + 1 # Python version independent integer check if (hasattr(getattr({_propname}_dummy, '{_ptpl[3]}'), 'append')): # The attribute is list typed now, so convert. {_ptpl[0]} = [{_ptpl[0]}] except: # Nothing to do here. pass {_propname}_kwargs["{_ptpl[3]}"] = {_ptpl[0]}{_os.linesep}''')\ .format(**locals()) _func_spec += r''' _ret_obj.{_propname} = _caffe_pb2.{_param_obj}(**{_propname}_kwargs)'''\ .format(**locals()) + _os.linesep _func_spec += ' _ret_obj.type = "{}"'.format(_layerkey) + _os.linesep _func_spec += ' return _ret_obj' _LOGGER.debug(_func_spec) exec(_func_spec) # pylint: disable=W0122