Source code for towhee.engine.thread_pool_task_executor

# Copyright 2021 Zilliz. All rights reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import signal
import queue
import threading
import traceback
from concurrent.futures import ThreadPoolExecutor

from towhee.engine.operator_pool import OperatorPool
from towhee.engine.operator_runner.runner_base import RunnerBase
from towhee.utils.log import engine_log

[docs]class ThreadPoolTaskExecutor(threading.Thread): """ A thread pool executor. Each device has one Executor. Args: name: (`str`) Name of the device for which the executor will run. Example device name: dev_name_0 = 'cpu:0' dev_name_1 = 'gpu:0' cache_path: (`str`) Local path for which operators are stored. Defaults to `$HOME/.towhee/operators`. """
[docs] def __init__(self, name: str, cache_path: str = None): super().__init__() self._name = name self._task_queue = queue.Queue() self._op_pool = OperatorPool(cache_path=cache_path) self._thread_pool = ThreadPoolExecutor() self._is_run = True self.setDaemon(True)
@property def name(self): return self._name @property def num_tasks(self): return self._task_queue.qsize()
[docs] def push_task(self, task: RunnerBase) -> bool: """Push a task to the end of the task queue. Args: task: Pre-initialized `Task` object to push onto the queue. """ return self._task_queue.put(task)
def execute(self, runner: RunnerBase): try: op = self._op_pool.acquire_op(runner.hub_op_id, runner.op_args, runner.tag) runner.set_op(op) runner.process() if runner.is_end() and runner.op is not None: runner.unset_op() self._op_pool.release_op(op) except Exception as e: # pylint: disable=broad-except engine_log.error(traceback.format_exc()) engine_log.error(e) os.kill(os.getpid(), signal.SIGINT)
[docs] def run(self): """ Runs the execution loop. """ while self._is_run: # If there are no tasks in the queue, will blocking until sth put into the queue. # When the executor ends, self.stop() function will put None into the queue, # so if get None, break the loop. runner = self._task_queue.get() if self._is_run and runner is not None: self._thread_pool.submit(self.execute, runner) else: break
[docs] def stop(self): """ Sets a flag, which stops the execution loop after a period of time. """ self._is_run = False self._task_queue.put(None)