Source code for towhee.dataframe.iterators

import threading
from typing import List, Tuple

from towhee.dataframe.dataframe import DataFrame, Responses

[docs]class DataFrameIterator: """ Base iterator implementation. All iterators should be subclasses of `DataFrameIterator`. Args: df (`Dataframe`): The dataframe to iterate over. block (`bool`): Whether to block on call. """
[docs] def __init__(self, df: DataFrame, block: bool = True): self._df = df self._df_name = df.name self._offset = 0 self._block = block self._id = df.register_iter() self._done = False self._event = threading.Event()
def __iter__(self): return self def __next__(self) -> List[Tuple]: # The base iterator is purposely defined to have exatly 0 elements. raise StopIteration @property def df_name(self) -> str: """ Returns the df name. """ return self._df_name @property def current_index(self) -> int: """ Returns the current index. """ return self._offset @property def accessible_size(self) -> int: """ Returns current accessible data size. """ return self._df.size - self._offset @property def id(self) -> int: return self._id
[docs] def notify(self): """ Remove unblock and remove iterator. """ if self._done is not True: df = self._df df.remove_iter(self._id) self._done = True
[docs]class MapIterator(DataFrameIterator): """ A row-based map `DataFrame` iterator. Args: df (`towhee.Dataframe`): The dataframe that is being iterated. block (`bool`): Whether to block when data not present. """
[docs] def __init__(self, df: DataFrame, block: bool = True): super().__init__(df, block = block) self._batch_size = 1 self._step = 1
def __iter__(self): return self def __next__(self): """ Returns: (`list(tuple([Any, ...])`) In the normal case, the iterator will return a list of tuples at each call. (`None`) In the case that the `DataFrame` is not sealed and the new rows are not ready yet, the iterator will return `None`. Raises: (`StopIteration`) The iteration end if the `DataFrame` is sealed and the last row is reached. """ df = self._df if self._done: raise StopIteration code, row, _ = df.get(self._offset, count = self._batch_size, iter_id = self._id) if code == Responses.INDEX_GC: raise IndexError elif code == Responses.INDEX_OOB_UNSEALED: if self._block: df.notify_map_block(self._event, self._offset, self._batch_size, self._id) self._event.wait() self._event.clear() return self.__next__() return None elif code == Responses.APPROVED_CONTINUE: df.ack(self._offset + self._step, self._id) self._offset += self._step return row elif code == Responses.INDEX_OOB_SEALED: df.remove_iter(self._id) self._offset = 0 self._done = True raise StopIteration elif code == Responses.KILLED: raise StopIteration else: # 'unkown_error' raise Exception
[docs]class BatchIterator(MapIterator): """ A row-based batched map `DataFrame` iterator. Args: df (`towhee.Dataframe`): The dataframe that is being iterated. batch_size (`int`): How many values to read. step (`int`): How many steps to take after each read. block (`bool`): Whether to block when data not present. """
[docs] def __init__(self, df: DataFrame, batch_size: int = 1, step: int = 1, block: bool = True): super().__init__(df, block = block) self._batch_size = batch_size self._step = step
[docs]class WindowIterator(DataFrameIterator): """ A row-based window `DataFrame` iterator. Args: df (`towhee.Dataframe`): The dataframe that is being iterated. start (`int`): Where to start the window from. window_size (`int`): How large of a window. step (`int`): How far to iterate window per read. use_timestamp (`bool`): Whether to use timestamp instead of row_id. block (`bool`): Whether to block when data not present. """
[docs] def __init__(self, df: DataFrame, start: int = 0, window_size: int = 1, step: int = None, use_timestamp: bool = False, block: bool = True): super().__init__(df, block = block) if use_timestamp: start *= 1000 window_size *= 1000 if step is not None: step *= 1000 self._window_size = window_size self._current_window = (start, (start + window_size)) self._step = step if step is None: self._step = window_size if use_timestamp: self._comparator = 'timestamp' else: self._comparator = 'row_id'
def __next__(self): """ Returns: (`list(Tuple[Any, ...])`) In the normal case, the iterator will return a list of `tuple` at each call. (`None`) In the case that the `DataFrame` is not sealed and the new rows are not ready yet, the iterator will return `None`. The caller should determine whether to block the iteration or exit the loop. Raises: (`StopIteration`) The iteration end iff the `DataFrame` is sealed and the last row is reached. """ df = self._df if self._done: df.remove_iter(self._id) raise StopIteration code, rows, offset = df.get_window(self._current_window[0], self._current_window[1], self._step, self._comparator, self._id) if code in (Responses.EMPTY_SEALED, Responses.FUTURE_WINDOW_SEALED): self._done = True df.remove_iter(self._id) raise StopIteration elif code == Responses.EMPTY: df.notify_window_block(self._event, 'start', (self._comparator, self._current_window[0]), self._id) self._event.wait() self._event.clear() return self.__next__() elif code == Responses.WINDOW_NOT_DONE: df.notify_window_block(self._event, 'end', (self._comparator, self._current_window[1]), self._id) self._event.wait() self._event.clear() return self.__next__() elif code == Responses.FUTURE_WINDOW: df.ack(offset, self._id) #TODO: Garbage collection logic if waiting on window that is far. df.notify_window_block(self._event, 'start', (self._comparator, self._current_window[0]), self._id) self._event.wait() self._event.clear() return self.__next__() elif code == Responses.OLD_WINDOW: self._current_window = offset return self.__next__() elif code == Responses.APPROVED_DONE: self._done = True return rows elif code == Responses.APPROVED_CONTINUE: self._current_window = (self._current_window[0] + self._step, self._current_window[1] + self._step) df.ack(offset, self._id) return rows elif code == Responses.KILLED: self._done = True raise StopIteration else: # 'unkown_error' raise Exception