# Copyright 2021 Zilliz. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import threading
from enum import Enum
from typing import List, Tuple, Any
from towhee.dataframe.array.array import Array
from towhee.dataframe._schema import _Schema
from towhee.types._frame import _Frame, FRAME
# from towhee.types import equivalents
[docs]class DataFrame:
"""
A `DataFrame` is a collection of immutable, potentixally heterogeneous blogs of data.
Args:
columns (`list[Tuple[str, Any]]`)
The list of the column names and their corresponding data types.
name (`str`):
Name of the dataframe; `DataFrame` names should be the same as its
representation.
"""
[docs] def __init__(
self,
name: str,
columns: List[Tuple[str, str]],
):
self._name = name
self._sealed = False
self._insert_cache_lock = threading.RLock()
self._insert_cache = {}
self._next_id = 0
self._iterator_lock = threading.RLock()
self._it_id = 0
self._iterators = {}
self._min_offset = 0
self._block_lock = threading.RLock()
self._map_blocked = {}
self._window_start_blocked = {}
self._window_end_blocked = {}
self._data_lock = threading.RLock()
self._len = 0
self._data_as_list = []
self._schema = _Schema()
self._initialize_storage(columns)
def _initialize_storage(self, columns):
"""Set the columns in the schema."""
for name, col_type in columns:
self._schema.add_col(name=name, col_type = col_type)
frame = _Frame()
self._schema.add_col(FRAME, self._class_type(frame))
self._data_as_list = [Array(name=name) for name, _ in self._schema.cols]
[docs] def __getitem__(self, key):
"""Get data at the passed in offset."""
with self._data_lock:
# access a row
if isinstance(key, int):
return tuple(self._data_as_list[i][key] for i in range(len(self._data_as_list)))
# access a column
elif isinstance(key, str):
index = self._schema.col_index(key)
return self._data_as_list[index]
def __str__(self):
"""
Simple to_string for printing and debugging dfs. Currently assumes that the data can be str()'ed.
"""
ret = ''
formater = ''
columns = []
with self._data_lock:
for x in range(len(self._data_as_list)):
columns.append(self._data_as_list[x].name)
formater += '{' + str(x) + ':30}'
ret += formater.format(*columns) + '\n'
if self._min_offset != float('inf'):
for x in range(self._min_offset, self._min_offset + self.__len__()):
values = []
for i in range(len(self._data_as_list)):
val = self._data_as_list[i][x]
values.append(str(val))
ret += formater.format(*values) + '\n'
return ret
def __len__(self):
with self._data_lock:
return self._len - self._min_offset
@property
def current_size(self):
return self.__len__()
@property
def size(self):
return self._len
@property
def name(self) -> str:
return self._name
@property
def iterators(self) -> List[int]:
return self._iterators
@property
def data(self) -> List[Array]:
with self._data_lock:
return self._data_as_list
@property
def columns(self) -> List[str]:
return [x for x, _ in self._schema.cols]
@property
def types(self) -> List[Any]:
return [y for _, y in self._schema.cols]
@property
def sealed(self) -> bool:
return self._sealed
[docs] def get_window(self, start: int, end: int, step: int, comparator: str, iter_id: int = False):
"""
Window based data retrieval. Windows include everything up to but not including
the cutoff.
Args:
start (`int`):
The starting index to read data from.
end (`int`):
The current upper window limit.
step (`int`):
How far the window will slide for next call
comparator ('timestamp' | 'row_id'):
Which value is being used for window calculations.
iter_id (`int`):
Which iterator is reading the data.
Returns:
Responses:
Different states of response codes depending on the dataframe state.
Situations:
Unsealed:
[0, 1, 2, 3] window = (5, 10)
1. Window start is greater than the latest value in df:
a. block for value greater than window start
[0, 1, 2, 3] window = (0, 10)
2. Window end is greater than the latest value in df:
a. block for value that is greater than window end
[5, 6, 7, 8] window = (1, 4)
3. Window end is smaller than all values in df
a. return None and continue to next window
[0, 1, 3, 4] window = (0, 3)
4. Values exist between window start and window end:
a. return values, clear data up to first value
Sealed:
[0, 1, 2, 3] window = (5, 10)
1. Window start is greater than the latest value in df:
a. Stop iteration
[0, 1, 2, 3] window = (0, 10)
2. Window end is greater than the latest value in df:
a. return remaining window values and stop iteration
[5, 6, 7, 8] window = (1, 4)
3. Window end is smaller than all values in df
a. return None and continue to next window
[0, 1, 3, 4] window = (0, 3)
4. Values exist between window start and window end:
a. return values, clear data up to first value
"""
# When the iterator is forcefully unblocked using unblock_iters().
with self._iterator_lock:
if iter_id is not False and self._iterators.get(iter_id) is None:
return Responses.KILLED, None, None
base_offset = self._iterators[iter_id]
rets = []
min_offset = None
max_offset = base_offset
next_offset = None
next_window = start + step
with self._data_lock:
# If the df is empty, we dont want to do anything but wait for a next value if unsealed.
if self.__len__() == 0:
if self.sealed:
return self._ret(Responses.EMPTY_SEALED, None, None, iter_id)
else:
return self._ret(Responses.EMPTY, None, None, iter_id)
# If the first element in the df is larger than the window end we want to move to the next viable window
if getattr(self._data_as_list[-1][base_offset], comparator) >= end:
goal = getattr(self._data_as_list[-1][base_offset], comparator)
new_start = start + step * ((goal - start)//step)
new_end = (new_start - start) + end
if new_end <= goal:
new_start += step
new_end += step
return self._ret(Responses.OLD_WINDOW, None, (new_start, (new_start - start) + end), iter_id)
# If the last element of the df is smaller than the start of the window, proceed to next windows.
elif getattr(self._data_as_list[-1][-1], comparator) < start:
if self.sealed:
return self._ret(Responses.FUTURE_WINDOW_SEALED, None, None, iter_id)
else:
return self._ret(Responses.FUTURE_WINDOW, None, self._len, iter_id)
# iterating through values to see which ones fall in the window. min offset = first value, max offset = last value
for i in range(base_offset, self._len):
if getattr(self._data_as_list[-1][i], comparator) >= start and getattr(self._data_as_list[-1][i], comparator) < end:
if min_offset is None:
min_offset = i
if next_offset is None:
if getattr(self._data_as_list[-1][i], comparator) >= next_window:
next_offset = i
max_offset = i
rets.append(self.__getitem__(max_offset))
elif getattr(self._data_as_list[-1][i], comparator) >= end:
if next_offset is None:
next_offset = i
break
# If the max offset is the length of the df
if max_offset == self._len - 1 and self.sealed:
# Need to check if there is a next window that fits, if so, continue
if next_offset:
return self._ret(Responses.APPROVED_CONTINUE, rets, next_offset, iter_id)
# If not, return whats left and have iterator close.
else:
return self._ret(Responses.APPROVED_DONE, rets, max_offset, iter_id)
# If the last value that fits in window is also the last value of df, cant close
# window yet as we dont know if the next value fits in the same window.
elif max_offset == self._len - 1 and not self.sealed:
return self._ret(Responses.WINDOW_NOT_DONE, None, None, iter_id)
else:
return self._ret(Responses.APPROVED_CONTINUE, rets, next_offset, iter_id)
[docs] def get(self, offset: int, count: int = 1, iter_id: int = False):
"""
Get data from dataframe based on offset and how many values requested.
Args:
offset (`int`):
The starting index to read data from.
count (`int`):
How many values to retrieve.
iter_id (`int`):
Which iterator is reading the data.
Returns:
Responses:
Different states of response codes depending on the dataframe state.
"""
# When the iterator is forcefully unblocked using unblock_iters().
with self._iterator_lock:
if iter_id is not False and self._iterators.get(iter_id) is None:
return self._ret(Responses.KILLED, None, None, iter_id)
elif offset < self._min_offset:
# Up to iterators to decide what to do if the value they want is GCed.
return self._ret(Responses.INDEX_GC, None, None, iter_id)
with self._data_lock:
# The batch of data is available and there is more available.
if offset + count <= self._len:
return self._ret(Responses.APPROVED_CONTINUE, [self.__getitem__(x) for x in range(offset, offset + count)], None, iter_id)
elif self._sealed:
# If no more data is left and trying to get more, iterator will stop and no data returned.
if offset >= self._len:
return self._ret(Responses.INDEX_OOB_SEALED, None, None, iter_id)
# If dataframe is sealed, remaining values will be returned and allow for next step
else:
return self._ret(Responses.APPROVED_CONTINUE, [self.__getitem__(x) for x in range(offset, self._len)], None, iter_id)
# If the requested offset is out of bounds but dataframe is still writing data.
elif offset + count > self._len:
return self._ret(Responses.INDEX_OOB_UNSEALED, None, None, iter_id)
# Something broke.
else:
return self._ret(Responses.UNKOWN_ERROR, None, None, iter_id)
def _ret(self, code, ret, offset, provide_code):
if provide_code is not False:
return code, ret, offset
else:
return ret
[docs] def put(self, item) -> None:
"""
Append a new value to the dataframe.
Args:
item (dict | list | Array | tuple):
The row to insert.
"""
assert not self._sealed, f'DataFrame {self._name} is already sealed, can not put data'
assert isinstance(item, (tuple, dict, list)), f'Dataframe input must be of type (tuple, dict, list), not {type(item)}'
with self._data_lock:
start_len = self._len
if not isinstance(item, list):
item = [item]
for x in item:
self._put(x)
# Clear cache after inserting.
self._put_cached()
frame = None
# Only check blocked iterators if new value added.
if (self._len - start_len) > 0 and self.current_size > 0:
frame = self._data_as_list[-1][-1]
cur_len = self._len
if frame is not None:
self._check_blocked(frame, cur_len)
def _check_blocked(self, frame, cur_len):
"""Unblock iterators if their criteria has been met."""
with self._block_lock:
if len(self._map_blocked) > 0:
rem = []
for i, id_events in self._map_blocked.items():
if i <= cur_len:
for _, event in id_events:
event.set()
rem.append(i)
for key in rem:
del self._map_blocked[key]
if len(self._window_start_blocked) > 0:
rem = []
for cutoff, id_events in self._window_start_blocked.items():
if cutoff[1] <= getattr(frame, cutoff[0]):
for _, event in id_events:
event.set()
rem.append(cutoff)
for key in rem:
del self._window_start_blocked[key]
if len(self._window_end_blocked) > 0:
rem = []
for cutoff, id_events in self._window_end_blocked.items():
if cutoff[1] < getattr(frame, cutoff[0]):
for _, event in id_events:
event.set()
rem.append(cutoff)
for key in rem:
del self._window_end_blocked[key]
def _put(self, item):
"""Method that decides which format the data is in and calls put."""
if isinstance(item, dict):
return self._put_dict(item)
elif isinstance(item, tuple):
return self._put_tuple(item)
else:
raise ValueError('Input data is of wrong format.')
def _put_cached(self):
"""Empty cache values into the data list when the index is reached."""
if len(self._insert_cache) != 0:
with self._insert_cache_lock:
item = self._insert_cache.pop(self._next_id, None)
while item is not None:
# If the row is a frame, skip it but increment next_id
if item == -1:
self._next_id += 1
else:
self._put(item)
item = self._insert_cache.pop(self._next_id, None)
def _put_tuple(self, item: tuple):
"""Appending a tuple to the dataframe."""
if not isinstance(item[-1], _Frame):
item = item + (_Frame(row_id=self._len), )
else:
# If row is empty, append to the insert cache for skipping.
if item[-1].empty:
with self._insert_cache_lock:
self._insert_cache[item[-1].prev_id] = -1
return -1
# if prev_id exists and its not equal to next_id, cache it
elif item[-1].prev_id != -1 and item[-1].prev_id != self._next_id:
with self._insert_cache_lock:
self._insert_cache[item[-1].prev_id] = item
return -2
item[-1].row_id = self._len
# TODO: Figure out the situation on type checking
# for i, x in enumerate(item):
# print(equivalents.get(self._class_type(x), self._class_type(x)) , self._schema.col_type(i), flush=True)
# assert equivalents.get(self._class_type(x), self._class_type(x)) == self._schema.col_type(i)
for i, x in enumerate(item):
self._data_as_list[i].put(x)
self._len += 1
self._next_id += 1
def _put_dict(self, item: dict):
"""Appending a dict to the dataframe."""
if item.get(FRAME, None) is None:
item[FRAME] = _Frame(self._len)
else:
# If row is empty, append to the insert cache for skipping.
if item[FRAME].empty:
with self._insert_cache_lock:
self._insert_cache[item[-1].prev_id] = -1
return -1
# if prev_id exists and its not equal to next_id, cache it
elif item[FRAME].prev_id != -1 and item[FRAME].prev_id != self._next_id:
with self._insert_cache_lock:
self._insert_cache[item[FRAME].prev_id] = item
return -2
item[FRAME].row_id = self._len
# I believe its faster to loop through and check than list comp
# TODO: Add type checking
# for key, val in item.items():
# assert equivalents.get(self._class_type(val), self._class_type(val)) == self._schema.col_type(self._schema.col_index(key))
for key, val in item.items():
self._data_as_list[self._schema.col_index(key)].put(val)
self._len += 1
self._next_id += 1
[docs] def seal(self):
"""
Function to seal the dataframe. A sealed dataframe will not accept anymore data and
the act of sealing the dataframe will trigger blocked iterators to pull the remaining data.
"""
with self._data_lock:
self._sealed = True
# Release all blocked iters.
with self._block_lock:
for _, id_event in self._map_blocked.items():
for _, event in id_event:
event.set()
self._map_blocked.clear()
for _, id_event in self._window_start_blocked.items():
for _, event in id_event:
event.set()
self._window_start_blocked.clear()
for _, id_event in self._window_end_blocked.items():
for _, event in id_event:
event.set()
self._window_end_blocked.clear()
def gc(self):
self.gc_data()
self.gc_blocked()
[docs] def gc_data(self):
"""Garbage collection function to trigger towhee.Array GC."""
with self._iterator_lock:
vals = [value for _, value in self._iterators.items()]
self._min_offset = min(vals + [self._len])
with self._data_lock:
for x in self._data_as_list:
x.gc(self._min_offset)
[docs] def gc_blocked(self):
"""Garbage collection for blocked iterator list."""
with self._block_lock:
for key, val in list(self._map_blocked.items()):
if len(val) == 0:
del self._map_blocked[key]
for key, val in list(self._window_start_blocked.items()):
if len(val) == 0:
del self._window_start_blocked[key]
for key, val in list(self._window_end_blocked.items()):
if len(val) == 0:
del self._window_end_blocked[key]
[docs] def register_iter(self):
"""
Registering an iter allows the df to keep track of where each iterator is in order
to coordinate garbage collection and other processes.
Returns:
An iterator ID that is used for a majority of iterator-dataframe interaction.
"""
with self._iterator_lock:
self._it_id += 1
self._iterators[self._it_id] = 0
return self._it_id
[docs] def remove_iter(self, iter_id: int):
"""
Removing the iterator from the df. Allows for more garbage collecting.
Args:
iter_id (`int`):
The iterator's id.
"""
self.unblock_iter(iter_id)
try:
with self._iterator_lock:
if iter_id in self._iterators:
del self._iterators[iter_id]
except KeyError:
pass
# self.gc_data()
[docs] def ack(self, offset: int, iter_id: int):
"""
An acknowledgement (ack) will notice the `DataFrame`s that the rows already
iterated over are no longer used, and can be garbage collected. Up to this index
but not including.
Args:
offset (`int`):
The latest accepted offset.
iter_id (`int`):
The iterator id to set offset for.
"""
with self._iterator_lock:
if self._iterators[iter_id] <= (offset):
self._iterators[iter_id] = offset
# TODO: Figure out better algorithm for gc'ing
# self.gc_data()
[docs] def notify_map_block(self, event: threading.Event, offset: int, count: int, iter_id: int):
"""
Used by map based iterators to notify the dataframe that it is waiting on a value.
Args:
event (`threading.Event`):
The event to trigger once the value being waited on is available.
offset (`int`):
The offset that is being waited on.
count (`int):
How many values are being waited on.
iter_id (`int`):
The iterator id of the blocked iterator.
"""
with self._block_lock:
index = offset + count
if self._map_blocked.get(index) is None:
self._map_blocked[index] = []
self._map_blocked[index].append((iter_id, event))
[docs] def notify_window_block(self, event: threading.Event, edge: str, cutoff: Tuple[str, int], iter_id: int):
"""
Used by window based iterators to notify the dataframe that it is waiting on a value.
Args:
iter_id (`int`):
The iterator id of the blocked iterator.
event (`threading.Event`):
The event to trigger once the value being waited on is available.
edge ('start', 'end'):
Waiting for a value >= than start, or waiting for a value greater then end.
cutoff ("timestamp" | "row_id", `int`):
The window cutoff that is is being waited on.
"""
with self._block_lock:
if edge == 'start':
if self._window_start_blocked.get(cutoff) is None:
self._window_start_blocked[cutoff] = []
self._window_start_blocked[cutoff].append((iter_id, event))
elif edge == 'end':
if self._window_end_blocked.get(cutoff) is None:
self._window_end_blocked[cutoff] = []
self._window_end_blocked[cutoff].append((iter_id, event))
[docs] def unblock_all(self):
"""Forceful unblocking/killing of all blocked iter."""
with self._block_lock:
for _, id_event in self._map_blocked.items():
for it_id, event in id_event:
del self._iterators[it_id]
event.set()
self._map_blocked.clear()
for _, id_event in self._window_start_blocked.items():
for it_id, event in id_event:
del self._iterators[it_id]
event.set()
self._window_start_blocked.clear()
for _, id_event in self._window_end_blocked.items():
for it_id, event in id_event:
del self._iterators[it_id]
event.set()
self._window_end_blocked.clear()
[docs] def unblock_iter(self, iter_id: int):
"""
Forceful unblocking/killing of the selected iter.
Args:
iter_id (`int`):
The id of the iterator being unblocked.
"""
with self._block_lock:
index = None
for _, id_event in self._map_blocked.items():
for i, (ids, event) in enumerate(id_event):
if ids == iter_id:
del self._iterators[iter_id]
event.set()
index = i
break
if index is not None:
del id_event[index]
# self.gc_blocked()
return
for _, id_event in self._window_start_blocked.items():
for i, (ids, event) in enumerate(id_event):
if ids == iter_id:
del self._iterators[iter_id]
event.set()
index = i
break
if index is not None:
del id_event[index]
# self.gc_blocked()
return
for _, id_event in self._window_end_blocked.items():
for i, (ids, event) in enumerate(id_event):
if ids == iter_id:
del self._iterators[iter_id]
event.set()
index = i
break
if index is not None:
del id_event[index]
# self.gc_blocked()
return
def _class_type(self, o):
"""Iternal function to find the full module path of object."""
module = o.__class__.__module__
if module is None or module == str.__class__.__module__:
return o.__class__.__name__
return module + '.' + o.__class__.__name__
[docs]class Responses(Enum):
"""Response Codes between DF and Iterators."""
INDEX_GC = 1
INDEX_OOB_UNSEALED = 2
INDEX_OOB_SEALED = 3
APPROVED_CONTINUE = 4
APPROVED_DONE = 5
KILLED = 6
UNKOWN_ERROR = 7
WINDOW_NOT_DONE = 8
WINDOW_PASSED = 9
FUTURE_WINDOW = 10
OLD_WINDOW = 11
FUTURE_WINDOW_SEALED = 12
EMPTY = 13
EMPTY_SEALED = 14