DBT Incremental export to S3
For data sharing, S3 is the cheapest, most efficient and universally accepted.
While you could use macros to do this, I find it much easier to create a seperate helper python script that is run after your dbt models are built.
We’ll make a python script that does this.
Iterate the models:
path = "models/"
files = os.listdir(path)
sql_files = [file for file in files if file.endswith('.sql')]
for sql_file in sql_files:
modelname = sql_file[:-4]
print(f"Exporting {modelname} to S3...")
...
Inside the block, fetch the cursor, or create a new, blank one
cursor_key = '' + modelname + '/_last_inserted_id.csv'
def update_cursor(new_cursor):
csv_bytes_value = bytes(f"{new_cursor}\n", encoding='UTF-8')
try:
s3.put_object(Body=csv_bytes_value, Bucket="l3-dbt-test-bucket", Key=cursor_key)
except Exception as e:
print("There was an error updating the cursor.")
print(e)
try:
s3.head_object(Bucket="l3-dbt-test-bucket", Key=cursor_key)
csv_obj = s3.get_object(Bucket="l3-dbt-test-bucket", Key=cursor_key)
byte_data = csv_obj['Body'].read()
str_data = byte_data.decode("utf-8")
_last_inserted_id = int(str_data)
print(f"Found cursor file ✅ (cursor: {_last_inserted_id}))")
except NoCredentialsError:
print(f"No credentials provided.")
except Exception as e:
print(f"Could not find cursor file... Generating one...")
initial_cursor = 0
update_cursor(initial_cursor)
print(f"Generated cursor file ✅ (cursor: {initial_cursor})")
_last_inserted_id = initial_cursor
print(f"Exporting from {_last_inserted_id}")
Then export the records to s3 based on the cursor. Remember to order by ID, and add the ID-filter!
sql = f"""
INSERT INTO
FUNCTION s3(
'{s3_bucket_url}/{modelname}/{modelname}_{datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S")}.csv',
'{s3_access_key}',
'{s3_secret_key}',
'TabSeparatedWithNames'
)
SELECT
*
FROM
{modelname}
WHERE id > '{ _last_inserted_id}'
ORDER BY id ASC
SETTINGS format_tsv_null_representation = '';
"""
client.command(sql)
And update the cursor
sql = f"""
SELECT
max(id)
FROM
{modelname}
"""
cursor = client.query(sql)
new_cursor = cursor.result_rows[0][0]
update_cursor(new_cursor)
That’s all!
CI
You’ll want to run this in your continous integration. Example with Github actions and Pipenv.
Pipfile
[scripts]
+ export_s3 = "python scripts/incremental-export-s3.py"
Github actions file:
- name: Run dbt models
run: pipenv run dbt run
+ - name: Incrementally export models to s3
+ run: pipenv run export_s3

Hi!
I'm Lars. I write about building scalable systems, software architecture, and programming.
I created Turfemon to be my playground. A place where I can dive into technical topics, share quick insights, and document my learning journey, without contaminating my main site's pristine collection of 'profound' insights.
Working on something exciting? Send me an email: show email