DynamoTable
Auto-generated documentation for dynamo_query.dynamo_table module.
- dynamo-query / Modules / Dynamo Query / DynamoTable
- DynamoTable
- DynamoTable().batch_delete
- DynamoTable().batch_delete_records
- DynamoTable().batch_get
- DynamoTable().batch_get_records
- DynamoTable().batch_upsert
- DynamoTable().batch_upsert_records
- DynamoTable().cached_batch_get
- DynamoTable().cached_get_record
- DynamoTable().clear_records
- DynamoTable().clear_table
- DynamoTable().client
- DynamoTable().create_table
- DynamoTable().delete_record
- DynamoTable().delete_table
- DynamoTable().get_partition_key
- DynamoTable.get_primary_index
- DynamoTable().get_record
- DynamoTable().get_sort_key
- DynamoTable().get_table_status
- DynamoTable().invalidate_cache
- DynamoTable().max_batch_size
- DynamoTable().normalize_record
- DynamoTable().primary_index
- DynamoTable().query
- DynamoTable().scan
- DynamoTable().table
- DynamoTable().table_keys
- DynamoTable().upsert_record
- DynamoTable().validate_record_attributes
- DynamoTable().wait_until_exists
- DynamoTable().wait_until_not_exists
- DynamoTableError
- DynamoTable
DynamoTable
class DynamoTable(Generic[_RecordType], LazyLogger, ABC):
def __init__(logger: Optional[logging.Logger] = None) -> None:
DynamoDB table manager, uses DynamoQuery
underneath.
Arguments
logger
-logging.Logger
instance.
Examples
from dynamo_query import DynamoTable, DynamoRecord
from typing import Optional
class UserRecord(DynamoRecord):
pk: str
email: str
name: str
points: Optional[int] = None
# Create your dynamo table manager with your record class
class UserTable(DynamoTable[UserRecord]):
# provide a set of your table keys
table_keys = {'pk'}
# use this property to define your table name
@property
def table(self) -> str:
return "my_table"
# define how to get PK from a record
def get_partition_key(self, record: UserRecord) -> str:
return record.email
# we do not have a sort key in our table
def get_sort_key(self, record: UserRecord) -> None:
return None
# specify some GSIs
global_secondary_indexes = [
DynamoTableIndex("gsi_name", "name", None),
DynamoTableIndex("gsi_email_age", "email", "age"),
]
# and now we can create our table in DynamoDB
user_table = UserTable()
user_table.create_table()
Attributes
NO_RECORD
- Sentinels:SentinelValue('NO_RECORD')
See also
DynamoTable().batch_delete
def batch_delete(
data_table: DataTable[_RecordType],
) -> DataTable[_RecordType]:
Delete multuple records as a DataTable from DB.
data_table
must have all columns to calculate table keys.
Examples
# UserTable is a subclass of a DynamoTable
user_table = UserTable()
# we should provide table keys or fields to calculate them
# in our case, PK is calculated from `email` field.
users_table = DataTable[UserRecord]().add_record(
{
"email": "puppy@gmail.com",
},
{
"email": "elon@gmail.com",
},
)
deleted_records = user_table.batch_delete(users_table)
for deleted_record in deleted_records:
# print deleted_record records
# if record was not found - it will still be returned
# but only with the data you provided
print(deleted_record)
Arguments
data_table
- Request data table.
Returns
DataTable with deleted records.
DynamoTable().batch_delete_records
def batch_delete_records(records: Iterable[_RecordType]) -> None:
Delete records from DB.
See DynamoTable().batch_delete.
Arguments
records
- Full or partial records to delete.
DynamoTable().batch_get
def batch_get(data_table: DataTable[_RecordType]) -> DataTable[_RecordType]:
Get multuple records as a DataTable from DB.
data_table
must have all columns to calculate table keys.
Examples
# UserTable is a subclass of a DynamoTable
user_table = UserTable()
# we should provide table keys or fields to calculate them
# in our case, PK is calculated from `email` field.
users_table = DataTable[UserRecord]().add_record(
{
"email": "puppy@gmail.com",
},
{
"email": "elon@gmail.com",
},
)
user_records = user_table.batch_get(users_table)
for user_record in user_records:
# print found records
# if record was not found - it will still be returned
# but only with the data you provided
print(user_record)
Arguments
data_table
- Request data table.
Returns
DataTable with existing records.
DynamoTable().batch_get_records
def batch_get_records(
records: Iterable[_RecordType],
) -> Iterator[_RecordType]:
Get records as an iterator from DB.
Arguments
records
- Full or partial records data.
Yields
Found or not found record data.
DynamoTable().batch_upsert
def batch_upsert(
data_table: DataTable[_RecordType],
set_if_not_exists_keys: Iterable[str] = (),
) -> DataTable[_RecordType]:
Upsert multuple records as a DataTable to DB.
data_table
must have all columns to calculate table keys.
Sets dt_created
field equal to current UTC datetime if a record was created.
Sets dt_modified
field equal to current UTC datetime.
Examples
# UserTable is a subclass of a DynamoTable
user_table = UserTable()
# we should provide table keys or fields to calculate them
# in our case, PK is calculated from `email` field.
users_table = DataTable[UserRecord]().add_record(
{
"email": "puppy@gmail.com",
"name": "Doge Barky",
"age": 20,
},
{
"email": "elon@gmail.com",
"name": "Elon Musk",
"age": 5289,
},
)
upserted_records = user_table.batch_upsert(users_table)
for upserted_record in upserted_records:
# print created and updated records
print(upserted_record)
Arguments
data_table
- Request DataTable.set_if_not_exists_keys
- List of keys to set only if they no do exist in DB.
Returns
A DataTable with upserted results.
DynamoTable().batch_upsert_records
def batch_upsert_records(
records: Iterable[_RecordType],
set_if_not_exists_keys: Iterable[str] = (),
) -> None:
Upsert records to DB.
See DynamoTable().batch_upsert.
Arguments
records
- Full or partial records data.set_if_not_exists_keys
- List of keys to set only if they no do exist in DB.
DynamoTable().cached_batch_get
def cached_batch_get(
data_table: DataTable[_RecordType],
) -> DataTable[_RecordType]:
Get multuple records as a DataTable from DB with caching.
data_table
must have all columns to calculate table keys.
Can be used instead of DynamoTable().batch_get method.
Arguments
data_table
- Request data table.
Returns
DataTable with existing records.
DynamoTable().cached_get_record
def cached_get_record(record: _RecordType) -> Optional[_RecordType]:
Get Record from DB with caching.
Can be used instead of DynamoTable().get_record method.
Returns
A dict with record data or None.
DynamoTable().clear_records
def clear_records() -> None:
Delete all records managed by current table manager.
Deletes only records with sort key starting with sort_key_prefix
.
DynamoTable().clear_table
def clear_table(
partition_key: Optional[str] = None,
partition_key_prefix: Optional[str] = None,
sort_key: Optional[str] = None,
sort_key_prefix: Optional[str] = None,
index: Optional[DynamoTableIndex] = None,
filter_expression: Optional[ConditionExpressionType] = None,
limit: Optional[int] = None,
) -> None:
Remove records from DB.
If partition_key
and partition_key_prefix
are None - deletes all records.
Arguments
partition_key
- Partition key value.sort_key
- Sort key value.partition_key_prefix
- Partition key prefix value.sort_key_prefix
- Sort key prefix value.index
- DynamoTableIndex instance, primary index is used if not provided.filter_expression
- Query filter expression.limit
- Max number of results.
DynamoTable().client
@property
def client() -> DynamoDBClient:
See also
DynamoTable().create_table
def create_table() -> Optional[CreateTableOutputTypeDef]:
Create a table in DynamoDB.
Examples
# UserTable is a subclass of a DynamoTable
user_table = UserTable()
# create a table with key schema and all indexes.
UserTable.create_table()
DynamoTable().delete_record
def delete_record(
record: _RecordType,
condition_expression: Optional[ConditionExpression] = None,
) -> Optional[_RecordType]:
Delete Record from DB.
record
must have all fields to calculate table keys.
Examples
# UserTable is a subclass of a DynamoTable
user_table = UserTable()
# we should provide table keys or fields to calculate them
# in our case, PK is calculated from `email` field.
deleted_record = user_table.delete_record({
"email": "cheater@gmail.com",
})
if deleted_record is None:
# no record found, so nothing was deleted
else:
# print deleted record
print(user_record)
Arguments
record
- Record with required fields for sort and partition keys.condition_expression
- Condition for delete.
Returns
A dict with record data or None.
DynamoTable().delete_table
def delete_table() -> None:
Delete the table from DynamoDB.
If table is creating, wait until it is created, then deletes it. If table is deleting or does not exist, does nothing.
Examples
# UserTable is a subclass of a DynamoTable
user_table = UserTable()
# delete table
UserTable.delete_table()
# make sure that it is deleted
user_table.wait_until_not_exists()
DynamoTable().get_partition_key
def get_partition_key(record: _RecordType) -> Any:
Override this method to get PK from a record.
DynamoTable.get_primary_index
@classmethod
def get_primary_index() -> DynamoTableIndex:
Primary global index
See also
DynamoTable().get_record
def get_record(record: _RecordType) -> Optional[_RecordType]:
Get Record from DB.
record
must have all fields to calculate table keys.
Examples
# UserTable is a subclass of a DynamoTable
user_table = UserTable()
# we should provide table keys or fields to calculate them
# in our case, PK is calculated from `email` field.
user_record = user_table.get_record({
"email": "suspicious@gmail.com",
})
if user_record is None:
# no record found
pass
else:
# print found record
print(user_record)
Arguments
record
- Record with required fields for sort and partition keys.
Returns
A dict with record data or None.
DynamoTable().get_sort_key
def get_sort_key(record: _RecordType) -> Any:
Override this method to get SK from a record.
DynamoTable().get_table_status
def get_table_status() -> Optional[str]:
Get table status from Dynamo.
Returns
Status string or None.
DynamoTable().invalidate_cache
def invalidate_cache() -> None:
Clear cache for DynamoTable().cached_batch_get and DynamoTable().cached_get_record
DynamoTable().max_batch_size
@property
def max_batch_size() -> int:
DynamoTable().normalize_record
def normalize_record(record: _RecordType) -> _RecordType:
Modify record before upsert.
Arguments
record
- Record for upsert.
Returns
Normalized record.
DynamoTable().primary_index
@property
def primary_index() -> DynamoTableIndex:
See also
DynamoTable().query
def query(
partition_key: Any,
index: Optional[DynamoTableIndex] = None,
sort_key: Optional[Any] = None,
sort_key_prefix: Optional[str] = None,
filter_expression: Optional[ConditionExpressionType] = None,
scan_index_forward: bool = True,
projection: Iterable[str] = tuple(),
data: Optional[Dict[(str, Any)]] = None,
limit: Optional[int] = None,
) -> Iterator[_RecordType]:
Query table records by index.
Examples
# UserTable is a subclass of a DynamoTable
user_table = UserTable()
user_records = user_table.query(
# query by our PK
partition_key="new_users",
# and SK starting with `email_`
sort_key_prefix="email_",
# get only users older than ...
# we will provide values in data
filter_expression=ConditionExpression("age", "<="),
# get only first 5 results
limit=5,
# get only name and email fields
projection=("name", "email"),
# ...older than 45 years
data= {"age": 45}
# start with the newest records
scan_index_forward=False,
)
for user_record in user_records:
print(user_record)
Arguments
partition_key
- Partition key value.index
- DynamoTableIndex instance, primary index is used if not provided.sort_key
- Sort key value.sort_key_prefix
- Sort key prefix value.filter_expression
- Query filter expression.scan_index_forward
- Whether to scan index from the beginning.projection
- Record fields to return, by default returns all fields.limit
- Max number of results.
Yields
Matching record.
DynamoTable().scan
def scan(
filter_expression: Optional[ConditionExpressionType] = None,
projection: Iterable[str] = tuple(),
data: Optional[Dict[(str, Any)]] = None,
limit: Optional[int] = None,
) -> Iterator[_RecordType]:
List table records.
Examples
# UserTable is a subclass of a DynamoTable
user_table = UserTable()
user_records = user_table.scan(
# get only users older than ...
# we will provide values in data
filter_expression=ConditionExpression("age", "<="),
# get only first 5 results
limit=5,
# get only name and email fields
projection=("name", "email"),
# ...older than 45 years
data= {"age": 45}
)
for user_record in user_records:
print(user_record)
Arguments
filter_expression
- Query filter expression.scan_index_forward
- Whether to scan index from the beginning.projection
- Record fields to return, by default returns all fields.limit
- Max number of results.
DynamoTable().table
@property
@abstractmethod
def table() -> Table:
Override this method to get DynamoDB Table resource.
See also
DynamoTable().table_keys
@property
def table_keys() -> Set[str]:
DynamoTable().upsert_record
def upsert_record(
record: _RecordType,
condition_expression: Optional[ConditionExpression] = None,
set_if_not_exists_keys: Iterable[str] = (),
extra_data: Dict[(str, Any)] = None,
) -> _RecordType:
Upsert Record to DB.
record
must have all fields to calculate table keys.
Sets dt_created
field equal to current UTC datetime if a record was created.
Sets dt_modified
field equal to current UTC datetime.
Examples
# UserTable is a subclass of a DynamoTable
user_table = UserTable()
# we should provide table keys or fields to calculate them
# in our case, PK is calculated from `email` field.
user_record = user_table.upsert_record(
{
"email": "newuser@gmail.com",
"name": "Somebody Oncetoldme"
"age": 23,
},
set_if_not_exists_keys=["age"], # set age if it does not exist in DB yet.
)
# print upserted record
print(user_record)
Arguments
record
- Record to insert/update.condition_expression
- Condition for update.set_if_not_exists_keys
- List of keys to set only if they no do exist in DB.extra_data
- Data for query.
Returns
A dict with updated record data.
DynamoTable().validate_record_attributes
def validate_record_attributes(record: _RecordType) -> None:
Check that all index keys are set correctly in record.
Arguments
record
- Record for upsert.
Raises
DynamoTableError
- If index key is missing.
DynamoTable().wait_until_exists
def wait_until_exists() -> None:
Proxy method for resource.Table.wait_until_exists
.
DynamoTable().wait_until_not_exists
def wait_until_not_exists() -> None:
Proxy method for resource.Table.wait_until_not_exists
.
DynamoTableError
class DynamoTableError(BaseException):
def __init__(message: str, data: Any = None) -> None:
Main error for DynamoTable class.
Arguments
message
- Error message.data
- Addition JSON-serializeable data.