Skip to the content.

DataTable

Auto-generated documentation for dynamo_query.data_table module.

DataTable

[find in source code]

class DataTable(Generic[_RecordType], dict):
    @overload
    def __init__(
        base_dict: Optional[Dict[(str, List[Any])]] = ...,
        record_class: None = ...,
    ) -> None:

    @overload
    def __init__(
        base_dict: Optional[Dict[(str, List[Any])]] = ...,
        record_class: Type[_RecordType] = ...,
    ) -> None:

    def __init__(
        base_dict: Optional[Dict[(str, List[Any])]] = None,
        record_class: Optional[Type[_RecordType]] = None,
    ) -> None:

Dictionary that has lists as values

Examples

data_table = DataTable({'a': [1, 2, 3], 'b': [1]})
data_table.max_length # 3
data_table.min_length # 1
data_table.get_lengths() # [3, 1]
data_table.is_normalized() # False

data_table.append('b', [3, 4])
data_table # {'a': [1, 2, 3], 'b': [1, 3, 4]}
data_table.is_normalized() # True

data_table.extend({'c': [5, 6]})
data_table # {'a': [1, 2, 3], 'b': [1, 3, 4], 'c': [5, 6]}

data_table.normalize()
data_table.is_normalized() # True
data_table # {'a': [1, 2, 3], 'b': [1, 3, 4], 'c': [5, 6, NOT_SET]}

from copy import copy
copy(data_table)  # {'a': [1, 2, 3], 'b': [1, 3, 4], 'c': [5, 6, NOT_SET]}
data_table.filter_keys(['a'])  # {'a': [1, 2, 3]}
data_table.filter_keys(['a']).extend({'b': [4]}).normalize()
data_table  # {'a': [1, 2, 3], 'b': [4, NOT_SET, NOT_SET]}


class MyRecord(TypedDict):
    key: str

typed_data_table = DataTable[MyRecord]()
typed_data_table.add_record({"key": "value"})

Arguments

Attributes

DataTable().add_record

[find in source code]

def add_record(*records: Union[(Dict, _RecordType)]) -> _R:

Add a new record to existing data and normalizes it after each record add.

data_table = DataTable({'a': [1], 'b': [3]})
data_table.add_record({'a': 5, 'c': 4}, {'c': 5})
data_table # DataTable({'a': [1, 5], 'b': [3], 'c': [4, 5]})

Arguments

Returns

Itself, so this method can be chained to another.

DataTable().add_table

[find in source code]

def add_table(*data_tables: _R) -> _R:

Add all records from another DataTable to existing one. All tables have to be normalized.

data_table = DataTable({'a': [1], 'b': [2]})
data_table2 = DataTable({'a': [3], 'b': [4]})
data_table.add_table(data_table2)
data_table # DataTable({'a': [1, 3], 'b': [2, 4]})

Arguments

Returns

Itself, so this method can be chained to another.

Raises

DataTable().append

[find in source code]

def append(key: str, values: List) -> _R:

Append DataTable().values to specified key value

base_dict = {'a': [1, 2], 'b': [3]}
DataTable(base_dict).append('a', [5, 6]) # DataTable({'a': [1, 2, 5, 6], 'b': [3]})
DataTable(base_dict).append('c', [5, 6]) # DataTable({'a': [1, 2], 'b': [3], 'c': [5, 6]})

Arguments

Returns

Itself, so this method can be chained to another.

DataTable().as_defaultdict

[find in source code]

def as_defaultdict() -> DefaultDict[(str, List[Any])]:

Return unwrapped defaultdict(list)

data_table = DataTable({'a': [1, 2], 'b': [3, 4]})
data_table.as_defaultdict() # defaultdict(<class 'list'>, {'a': [1, 2], 'b': [3, 4]})

Returns

defaultdict(list) with original DataTable data.

DataTable().copy

[find in source code]

def copy() -> _R:

Equivalent of copy

Returns

A new instance.

DataTable.create

[find in source code]

@classmethod
def create(base_dict: Optional[Dict[(str, List[Any])]] = None) -> _R:

Create a DataTable with untyped dicts as records.

Shorthand to DataTable[Dict[str, Any]]().

Arguments

Returns

A new DataTable instance.

DataTable().drop_duplicates

[find in source code]

def drop_duplicates(subset: Optional[Sequence[str]] = None) -> _R:

Remove duplicate rows from the DataTable (keep first occurrence)

Arguments

Returns

A new instance.

DataTable().extend

[find in source code]

def extend(*extra_dicts: Dict[(str, List[Any])]) -> _R:

Extend values lists with values from extra_dicts If some keys are missing from this dict, they will be created.

base_dict = {'a': [1], 'b': [3]}
DataTable(base_dict).extend({'a':  [5, 6]}) # DataTable({'a': [1, 5, 6], 'b': [3]})
DataTable(base_dict).extend({'c': [5, 6]}) # DataTable({'a': [1], 'b': [3], 'c': [5, 6]})
DataTable(base_dict).extend(
    {'a': [1]}, {'c': [1]}
) # DataTable({'a': [1, 1], 'b': [3], 'c': [1]})

Arguments

Returns

Itself, so this method can be chained to another.

DataTable().filter_keys

[find in source code]

def filter_keys(keys: Iterable[str]) -> _R:

Create a new DataTable instance only with keys listed it DataTable().keys

data_table = DataTable({'a': [1, 2], 'b': [3, 4]})
data_table.filter_keys(['a', 'c']) # DataTable({'a': [1, 2]})
data_table.filter_keys(data_table.keys()) # DataTable({'a': [1, 2], 'b': [3, 4]})
data_table.filter_keys([]) # DataTable({})

Arguments

Returns

A copy of original DataTable with matching keys

DataTable().filter_records

[find in source code]

def filter_records(
    query: Dict[(str, Any)],
    operand: Filter = Filter.EQUALS,
) -> _R:

Create a new DataTable instance with records that match query

data_table = DataTable({'a': [1, 2, 1], 'b': [3, 4, 5], 'c': [1]})
data_table.filter_records({'a': 1}) # DataTable({'a': [1, 1], 'b': [3, 5], 'c': [1, None]})
data_table.filter_records({'a': 2}) # DataTable({'a': [2], 'b': [4], 'c': [None]})
data_table.get_record({'c': 2}) # DataTable({'a': [], 'b': [], 'c': []})
data_table.get_record({'d': 1}) # DataTable({'a': [], 'b': [], 'c': []})

Arguments

Returns

A copy of original DataTable with matching records

See also

DataTable().get_column

[find in source code]

def get_column(column_name: str) -> List[Any]:

Return all column values.

Not set values are resolved to NOT_SET_RESOLVED_VALUE by DataTable().resolve_not_set_value method.

data_table = DataTable({'a': [1, 3], 'b': [2, DataTable.NOT_SET], 'c': []}).normalize()
data_table.get_column('a') # [1, 3]
data_table.get_column('b') # [2, None]
data_table.get_column('c') # [None, None]
data_table.get_column('d') # [None, None]

Arguments

Returns

A list of column values.

Raises

DataTable().get_column_names

[find in source code]

def get_column_names() -> List[str]:

Get all column names.

data_table = DataTable({'a': [1], 'b': [DataTable.NOT_SET], 'c': []})
data_table.get_column_names() # ['a', 'b', 'c']

Returns

A list of column names.

DataTable().get_lengths

[find in source code]

def get_lengths() -> List[int]:

Get lengths of all values as a list

DataTable({'a': [1, 2], 'b': [3, 4]}).get_lengths() # [2, 2]
DataTable({'a': [1, 2], 'b': [3]}).get_lengths() # [2, 1]
DataTable({'a': []}).get_lengths() # [0]
DataTable({}).get_lengths() # []

Returns

List with all rows lenghts.

DataTable().get_record

[find in source code]

def get_record(record_index: int) -> _RecordType:

Get one record of DataTable by record_index as dict of {key: value}. Not set values are resolved to NOT_SET_RESOLVED_VALUE by DataTable().resolve_not_set_value method.

data_table = DataTable({'a': [1, 2], 'b': [3, 4]})
data_table.get_record(0) # {'a': 1, 'b': 3}
data_table.get_record(1) # {'a': 2, 'b': 4}
data_table.get_record(2) # DataTableError

Arguments

Returns

Dict with original DataTable keys and corresponding values.

DataTable().get_records

[find in source code]

def get_records() -> Iterator[_RecordType]:

Generator for all records with keys in DataTable.

data_table = DataTable({'a': [1, 2], 'b': [3, 4]})
for record in data_table.get_records():
    record # {'a': 1, 'b': 3}, then {'a': 2, 'b': 4}

Yields

Dict with original DataTable keys and corresponding values.

DataTable().get_set_column_names

[find in source code]

def get_set_column_names() -> List[str]:

Get column names that have no NOT_SET values.

data_table = DataTable({'a': [1], 'b': [DataTable.NOT_SET], 'c': []})
data_table.get_set_column_names() # ['a', 'c']
data_table.normalize()
data_table.get_set_column_names() # ['a']

Returns

A list of column names.

DataTable().has_column

[find in source code]

def has_column(*column_names: str) -> bool:

Check if all columns with column_names exist.

data_table = DataTable({'a': [1], 'b': [2], 'c': []}).normalize()
data_table.has_column('a') # True
data_table.has_column('b') # True
data_table.has_column('c') # True
data_table.has_column('d') # False

Arguments

Returns

True if check is successful.

DataTable().has_set_column

[find in source code]

def has_set_column(*column_names: str) -> bool:

Check if all columns with column_names exist and have all values set.

data_table = DataTable({'a': [1], 'b': [2], 'c': []}).normalize()
data_table.has_set_column('a') # True
data_table.has_set_column('b') # True
data_table.has_set_column('c') # False
data_table.has_set_column('d') # False

Arguments

Returns

True if check is successful.

DataTable().is_normalized

[find in source code]

def is_normalized() -> bool:

Check if all values have the same length.

DataTable({'a': [1, 2], 'b': [3, 4]}).is_normalized() # True
DataTable({'a': [1, 2], 'b': [3]}).is_normalized() # False
DataTable({}).is_normalized() # True

Returns

True if all rows have the same length

DataTable().items

[find in source code]

def items() -> Iterator[Tuple[(str, List[Any])]]:

Iterate over items of a base dict.

Examples

d = DataTable({"a": [1, 2], "b": [3, 4]})
for item in d.items():
    print(item) # ("a", [1, 2]), then ("b", [3, 4])

Returns

An iterator over base dict items.

DataTable().keys

[find in source code]

def keys() -> Iterator[str]:

Iterate over keys of a base dict.

Examples

d = DataTable({"a": [1, 2], "b": [3, 4]})
for item in d.keys():
    print(item) # "a", then "b"

Returns

An iterator over base dict keys.

DataTable().max_length

[find in source code]

@property
def max_length() -> int:

Maximum length of values

DataTable({'a': [1, 2], 'b': [3, 4]}).max_length # 2
DataTable({'a': [1, 2], 'b': [3]}).max_length # 2
DataTable({'a': []}).max_length # 0
DataTable({}).max_length # 0

Returns

Lenght of the longest row.

DataTable().min_length

[find in source code]

@property
def min_length() -> int:

Minimum length of values

DataTable({'a': [1, 2], 'b': [3, 4]}).min_length # 2
DataTable({'a': [1, 2], 'b': [3]}).min_length # 1
DataTable({'a': []}).min_length # 0
DataTable({}).min_length # 0

Returns

Lenght of the shortest row.

DataTable().normalize

[find in source code]

def normalize() -> _R:

Normalize all items to DataTable().max_length using default value.

data_table = DataTable({'a': [1, 2], 'b': [3], 'c': []})
data_table.normalize() # DataTable({'a': [1, 2], 'b': [3, None], 'c': [None, None]})

Arguments

Returns

Itself, so this method can be chained to another.

DataTable().resolve_not_set_value

[find in source code]

def resolve_not_set_value(column_name: str, record_index: int) -> Any:

Get a value to use for missing values. Override this methd in a subclass to use a different behavior.

Arguments

DataTable().set

[find in source code]

def set(column_name: str, record_index: int, value: Any) -> _R:

Set value in-place for column_name and record_index.

data_table = DataTable({'a': [1, 2], 'b': [DataTable.NOT_SET]})
data_table.set('a', 1, 'value_a').set('b', 0, 'value_b')
data_table # DataTable({'a': [1, 'value_a'], 'b': ['value_b']})

data_table.set('b', 1, 'value_b') # DataTableError
data_table.set('c', 0, 'value_c') # DataTableError

Returns

Itself, so this method can be chained to another.

Raises

DataTable().values

[find in source code]

def values() -> Iterator[List[Any]]:

Iterate over values of a base dict.

Examples

d = DataTable({"a": [1, 2], "b": [3, 4]})
for item in d.values():
    print(item) # [1, 2], then [3, 4]

Returns

An iterator over base dict values.

DataTableError

[find in source code]

class DataTableError(BaseException):

Main error for DataTable class.

Filter

[find in source code]

class Filter(Enum):