Copy an AWS dynamodb table to an existing table with an exponential timing back-off
May 20, 2018
aws dynamodb pythonHere is some python code I wrote to copy the contents of on AWS dynamodb table to another - with exponential timing back-off to manage usage limits.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" Copy an AWS dynamodb table to an existing table with an exponential timing back-off | |
Assume receiving table has a compatible schema | |
""" | |
import logging | |
from time import sleep | |
import boto3 | |
from botocore.exceptions import ClientError | |
AWS_RESOURCE = 'dynamodb' | |
AWS_REGION = 'us-east-1' | |
def _copy_table(from_table, to_table, pause_time): | |
"""Copy all items in table from_table to to_table""" | |
batch = to_table.batch_writer() | |
response = from_table.scan() | |
sleep(pause_time) | |
for item in response['Items']: | |
batch.put_item(Item=item) | |
while 'LastEvaluatedKey' in response: | |
response = from_table.scan( | |
ExclusiveStartKey=response['LastEvaluatedKey'] | |
) | |
sleep(pause_time) | |
for item in response['Items']: | |
batch.put_item(Item=item) | |
def table_copy(from_table, to_table): | |
""" Copy all items in table from_table to to_table | |
Catch quota exceptions, backoff, and retry as required | |
""" | |
retry_exceptions = ('ProvisionedThroughputExceededException', | |
'ThrottlingException') | |
retries = 0 | |
pause_time = 0 | |
while True: | |
try: | |
_copy_table(from_table, to_table, pause_time) | |
break | |
except ClientError as err: | |
if err.response['Error']['Code'] not in retry_exceptions: | |
raise | |
pause_time = (2 ** retries) | |
logging.info('Back-off set to %d seconds', pause_time) | |
retries += 1 | |
if __name__ == "__main__": | |
logging.getLogger("").setLevel(logging.INFO) | |
logging.basicConfig(format='%(asctime)s %(message)s', datefmt='%d/%m/%Y %I:%M:%S %p') | |
DB = boto3.resource(AWS_RESOURCE, region_name=AWS_REGION) | |
table_copy(DB.Table('FromTable'), DB.Table('ToTable')) | |
logging.info("Processing Complete") | |
logging.shutdown() |