Skip to the content.

Dynamo Query Main

Auto-generated documentation for dynamo_query.dynamo_query_main module.

Helper for building Boto3 DynamoDB queries.

DynamoQuery

[find in source code]

class DynamoQuery(BaseDynamoQuery):

Helper for building Boto3 DynamoDB queries. See BaseDynamoQuery documentation as well.

query = DynamoQuery.build_scan(
    limit=5,
    filter_expression=ConditionExpression('first_name') & ('last_name', 'in'),
).projection(
    'first_name', 'last_name', 'age',
)
...
data_table = DataTable().add_record({
    'first_name': 'John',
    'last_name': ['Cena', 'Doe', 'Carmack'],
})
table_resource = DynamoConnect().resource.Table('people')
result_data_table = query.execute(
    table_keys=('pk', ),
    table_resource=table_resource,
    data_table=data_table,
)
list(result_data_table.get_records())
# [
#     {
#         'first_name': 'John',
#         'last_name': 'Cena',
#         'age': 42,
#     },
#     {
#         'first_name': 'John',
#         'last_name': 'Carmack',
#         'age': 49,
#     }
# ]

Attributes

See also

DynamoQuery.build_batch_delete_item

[find in source code]

@classmethod
def build_batch_delete_item(
    return_consumed_capacity: ReturnConsumedCapacityType = 'NONE',
    return_item_collection_metrics: ReturnItemCollectionMetricsType = 'NONE',
    logger: Optional[logging.Logger] = None,
) -> _R:

Build delete query for table.meta.client.batch_write_item.

query = DynamoQuery.build_batch_delete_item()
data_table = DataTable().add_record({
    'pk': 'key1',
}, {
    'pk': 'key2',
})

result_data_table = query.execute(
    table_resource=boto3_resource.Table('my_table'),
    table_keys=['pk'],
    data_table=data_table,
)

Arguments

Returns

DynamoQuery instance to execute.

See also

DynamoQuery.build_batch_get_item

[find in source code]

@classmethod
def build_batch_get_item(
    return_consumed_capacity: ReturnConsumedCapacityType = 'NONE',
    logger: Optional[logging.Logger] = None,
) -> _R:

Build query for table.meta.client.batch_get_item.

query = DynamoQuery.build_batch_get_item()
data_table = DataTable().add_record({
    'pk': 'key1',
}, {
    'pk': 'key2',
})

result_data_table = query.execute(
    table_resource=boto3_resource.Table('my_table'),
    table_keys=['pk'],
    data_table=data_table,
)

Arguments

Returns

DynamoQuery instance to execute.

See also

DynamoQuery.build_batch_update_item

[find in source code]

@classmethod
def build_batch_update_item(
    return_consumed_capacity: ReturnConsumedCapacityType = 'NONE',
    return_item_collection_metrics: ReturnItemCollectionMetricsType = 'NONE',
    logger: Optional[logging.Logger] = None,
) -> _R:

Build update query for table.meta.client.batch_write_item.

query = DynamoQuery.build_batch_update_item()
data_table = DataTable().add_record({
    'pk': 'key1',
    'first_name': 'John',
}, {
    'pk': 'key2',
    'first_name': 'Smith',
})

result_data_table = query.execute(
    table_resource=boto3_resource.Table('my_table'),
    table_keys=['pk'],
    data_table=data_table,
)

Arguments

Returns

DynamoQuery instance to execute.

See also

DynamoQuery.build_delete_item

[find in source code]

@classmethod
def build_delete_item(
    condition_expression: Optional[ConditionExpressionType] = None,
    return_consumed_capacity: ReturnConsumedCapacityType = 'NONE',
    return_item_collection_metrics: ReturnItemCollectionMetricsType = 'NONE',
    return_values: ReturnValueType = 'ALL_OLD',
    logger: Optional[logging.Logger] = None,
) -> _R:

Build query for table.delete_item.

query = DynamoQuery.build_delete_item(
    filter_expression=ConditionExpression('first_name')
)

data_table = DataTable().add_record({
    'pk': 'key1',
    'first_name': 'John',
}, {
    'pk': 'key2',
    'first_name': 'Mike',
})

result_data_table = query.execute(
    table_resource=boto3_resource.Table('my_table'),
    table_keys=['pk'],
    data_table=data_table,
)

Arguments

Returns

DynamoQuery instance to execute.

See also

DynamoQuery.build_get_item

[find in source code]

@classmethod
def build_get_item(
    projection_expression: Optional[ProjectionExpression] = None,
    consistent_read: bool = False,
    return_consumed_capacity: ReturnConsumedCapacityType = 'NONE',
    logger: Optional[logging.Logger] = None,
) -> _R:

Build query for table.get_item.

query = DynamoQuery.build_get_item().projection(
    'first_name', 'last_name', 'age'
)

data_table = DataTable().add_record({
    'pk': 'key1',
}, {
    'pk': 'key2',
})

result_data_table = query.execute(
    table_resource=boto3_resource.Table('my_table'),
    table_keys=['pk'],
    data_table=data_table,
)

Arguments

Returns

DynamoQuery instance to execute.

See also

DynamoQuery.build_query

[find in source code]

@classmethod
def build_query(
    key_condition_expression: ConditionExpressionType,
    index_name: Optional[str] = None,
    projection_expression: Optional[ProjectionExpression] = None,
    filter_expression: Optional[ConditionExpressionType] = None,
    limit: int = MAX_LIMIT,
    exclusive_start_key: Optional[ExclusiveStartKey] = None,
    consistent_read: bool = False,
    scan_index_forward: bool = True,
    logger: Optional[logging.Logger] = None,
) -> _R:

Build query for table.query.

query = DynamoQuery.build_query(
    key_condition_expression=ConditionExpression('first_name'),
    filter_expression=ConditionExpression('last_name', 'in'),
    index_name='gsi_first_name',
    limit=5,
    exclusive_start_key={'pk': '1'},
).projection(
    'first_name', 'last_name', 'age'
)

data_table = DataTable().add_record({
    'first_name': 'Test',
    'last_name': ['Last', 'Last2'],
}, {
    'first_name': 'John',
    'last_name': ['Last', 'Last2'],
})

result_data_table = query.execute(
    table_resource=boto3_resource.Table('my_table'),
    table_keys=['pk'],
    data_table=data_table,
)

Arguments

Returns

DynamoQuery instance to execute.

See also

DynamoQuery.build_scan

[find in source code]

@classmethod
def build_scan(
    filter_expression: Optional[ConditionExpressionType] = None,
    projection_expression: Optional[ProjectionExpression] = None,
    limit: int = MAX_LIMIT,
    exclusive_start_key: Optional[ExclusiveStartKey] = None,
    logger: Optional[logging.Logger] = None,
) -> _R:

Build query for table.scan.

If filter_expression is not provided - it is constructed from input data.

query = DynamoQuery.build_scan(
    filter_expression=ConditionExpression('first_name') & ('last_name', 'in')
    limit=5,
    exclusive_start_key={'pk': '1'}
).projection(
    'first_name', 'last_name', 'age'
)

data_table = DataTable().add_record({
    'first_name': 'Test',
    'last_name': ['Last', 'Last2'],
}, {
    'first_name': 'John',
    'last_name': ['Last', 'Last2'],
})

result_data_table = query.execute(
    table_resource=boto3_resource.Table('my_table'),
    table_keys=['pk'],
    data_table=data_table,
)

Arguments

Returns

DynamoQuery instance to execute.

DynamoQuery.build_update_item

[find in source code]

@classmethod
def build_update_item(
    condition_expression: Optional[ConditionExpressionType] = None,
    update_expression: Optional[UpdateExpression] = None,
    return_consumed_capacity: ReturnConsumedCapacityType = 'NONE',
    return_item_collection_metrics: ReturnItemCollectionMetricsType = 'NONE',
    return_values: ReturnValueType = 'ALL_NEW',
    logger: Optional[logging.Logger] = None,
) -> _R:

Build query for table.update_item.

If update_expression is not provided - it is constructed from input data.

query = DynamoQuery.build_update_item(
    filter_expression=ConditionExpression('first_name')
).update(
    'last_name', 'age',
)

data_table = DataTable().add_record({
    'pk': 'key1',
    'first_name': 'John',
    'age': 32,
}, {
    'pk': 'key2',
    'first_name': 'Mike',
    'age': 19,
})

result_data_table = query.execute(
    table_resource=boto3_resource.Table('my_table'),
    table_keys=['pk'],
    data_table=data_table,
)

Arguments

Returns

DynamoQuery instance to execute.

See also

DynamoQuery().execute

[find in source code]

def execute(
    data_table: Union[(DataTable[_RecordType], RecordsType)],
    table: Optional[Table] = None,
    table_keys: Optional[TableKeys] = TABLE_KEYS,
) -> DataTable[_RecordType]:

Execute a query and get results. To get raw AWS responses, use query.get_raw_responses() after this method. To get LastEvaluatedKey, use query.get_last_evaluated_key() after this method.

If table_keys were not provided, method gets them from table schema. It is slow, so it is better to pass them explicitly.

input_data_table = DataTable()
input_data_table.add_record({
    'name': 'John',
}, {
    'name': 'Mike',
})
results = DynamoQuery.build_scan(
    filter_expression=ConditionExpression('name'),
).execute(
    table_keys=['pk'],
    table=table_resource,
    data_table=input_data_table,
)

Arguments

Returns

A DataTable with query results.

DynamoQuery().execute_dict

[find in source code]

def execute_dict(
    data: Optional[Dict[(str, Any)]] = None,
    table: Optional[Table] = None,
    table_keys: Optional[TableKeys] = TABLE_KEYS,
) -> DataTable[_RecordType]:

Execute a query for a single record and get results. See DynamoQuery().execute method.

search_data = {
    'name': 'John',
}
results = DynamoQuery.build_scan(
    filter_expression=ConditionExpression('name'),
).execute_dict(
    table_keys=['pk'],
    table=table_resource,
    data=search_data,
)

Arguments

Returns

A DataTable with query results.

DynamoQuery().get_last_evaluated_key

[find in source code]

def get_last_evaluated_key() -> Optional[ExclusiveStartKey]:

Get LastEvaluatedKey from the last execution.

query = DynamoQuery.build_scan(limit=5)
results = query.execute()

# if you use the same query it remembers LastEvaluatedKey, so you get the next page
# on the next execution

results_page2 = query.execute()

# to continue from the same place later, save `LastEvaluatedKey`

start_key = query.get_last_evaluated_key()

query2 = DynamoQuery.build_scan(
    limit=5,
    exclusive_start_key=start_key,
)
results_page3 = query.execute()

Returns

A dict that can be used in ExclusiveStartKey parameter.

DynamoQuery().get_raw_responses

[find in source code]

def get_raw_responses() -> List[Dict[(str, Any)]]:

Get raw AWS responses from the last execution. Use flags ReturnConsumedCapacity and ReturnItemCollectionMetrics to get additional metrics. Also Count and ScannedCount fields might be interesting.

Returns

A list of AWS responses.

DynamoQuery.get_table_keys

[find in source code]

@staticmethod
def get_table_keys(table: Table) -> TableKeys:

Get table keys from schema.

table = boto3.resource('dynamodb').Table('my_table')
table_keys = DynamoQuery.get_table_keys()
table_keys # ['pk', 'sk']

Arguments

Returns

A list of table keys.

See also

DynamoQuery().limit

[find in source code]

def limit(limit: int) -> _R:

Limit results for scan or query method.

query = DynamoQuery.scan()
query.limit(10)

Arguments

limit - Number of max entries.

Returns

Itself, so this method can be chained.

DynamoQuery().projection

[find in source code]

def projection(*fields: str) -> _R:

Django ORM-like shortcut for adding ProjectionExpression

query = DynamoQuery.build_update()
query.projection(update=['field1', 'field2')

# lines above are equivalent to

query = DynamoQuery.build_update(
    projection_expression=ProjectionExpression('field1', 'field2')
)

Arguments

fields - A list of fields to use as ProjectionExpression keys

Returns

Itself, so this method can be chained.

DynamoQuery().reset_start_key

[find in source code]

def reset_start_key() -> _R:

Set paginated query to the start.

DynamoQuery().table

[find in source code]

def table(
    table: Optional[Table],
    table_keys: Optional[TableKeys] = TABLE_KEYS,
) -> _R:

Set table resource and table keys.

Arguments

DynamoQuery().update

[find in source code]

def update(
    update: Iterable[str] = tuple(),
    set_if_not_exists: Iterable[str] = tuple(),
    add: Iterable[str] = tuple(),
    delete: Iterable[str] = tuple(),
    remove: Iterable[str] = tuple(),
    *args: str,
) -> _R:

Shortcut for adding UpdateExpression.

query = DynamoQuery.build_update()
query.update(update=['field1'], add=['my_list'])

# lines above are equivalent to

query = DynamoQuery.build_update(
    update_expression=UpdateExpression(update=['field1'], add=['my_list'])
)

Arguments

Returns

Itself, so this method can be chained.