Source code for towhee.functional.mixins.safe

# Copyright 2021 Zilliz. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Any, Callable

from towhee.functional.option import Empty
from towhee.functional.mixins.dag import register_dag
from towhee.functional.option import Option, Some

[docs]class SafeMixin: """ Mixin for exception safety. """
[docs] @register_dag def exception_safe(self): """ Making the data collection exception-safe by warp elements with `Option`. Examples: 1. Exception breaks pipeline execution: >>> from towhee import DataCollection >>> dc = DataCollection.range(5) >>> dc.map(lambda x: x / (0 if x == 3 else 2)).to_list() Traceback (most recent call last): ZeroDivisionError: division by zero 2. Exception-safe execution >>> dc.exception_safe().map(lambda x: x / (0 if x == 3 else 2)).to_list() [Some(0.0), Some(0.5), Some(1.0), Empty(), Some(2.0)] >>> dc.exception_safe().map(lambda x: x / (0 if x == 3 else 2)).filter(lambda x: x < 1.5).to_list() [Some(0.0), Some(0.5), Some(1.0), Empty()] >>> dc.exception_safe().map(lambda x: x / (0 if x == 3 else 2)).filter(lambda x: x < 1.5, drop_empty=True).to_list() [Some(0.0), Some(0.5), Some(1.0)] """ result = map(lambda x: Some(x) if not isinstance(x, Option) else x, self._iterable) return self._factory(result)
[docs] def safe(self): """ Shortcut for `exception_safe`. """ return self.exception_safe()
[docs] @register_dag def fill_empty(self, default: Any = None) -> 'DataCollection': """ Unbox `Option` values and fill `Empty` with default values. Args: default (Any): default value to replace empty values; Returns: DataCollection: data collection with empty values filled with `default`; Examples: >>> from towhee import DataCollection >>> dc = DataCollection.range(5) >>> dc.safe().map(lambda x: x / (0 if x == 3 else 2)).fill_empty(-1.0).to_list() [0.0, 0.5, 1.0, -1.0, 2.0] """ result = map(lambda x: x.get() if isinstance(x, Some) else default, self._iterable) return self._factory(result)
[docs] @register_dag def drop_empty(self, callback: Callable = None) -> 'DataCollection': """ Unbox `Option` values and drop `Empty`. Args: callback (Callable): handler for empty values; Returns: DataCollection: A DataCollection that drops empty values; Examples: >>> from towhee import DataCollection >>> dc = DataCollection.range(5) >>> dc.safe().map(lambda x: x / (0 if x == 3 else 2)).drop_empty().to_list() [0.0, 0.5, 1.0, 2.0] Get inputs that case exceptions: >>> exception_inputs = [] >>> result = dc.safe().map(lambda x: x / (0 if x == 3 else 2)).drop_empty(lambda x: exception_inputs.append(x.get().value)) >>> exception_inputs [3] """ if callback is not None: def inner(data): for x in data: if isinstance(x, Empty): callback(x) if isinstance(x, Some): yield x.get() else: yield x result = inner(self._iterable) else: def inner(data): for x in data: if isinstance(x, Some): yield x.get() elif not isinstance(x, Empty): yield x result = inner(self._iterable) return self._factory(result)