Using a python cron to sync data between tables¶
This section will talk about how you can use a python cron to sync data between tables.
For the purpose of this example, we are going to consider two tables tableA
and tableB
. Both the tables will have the same schema
Column | Type |
---|---|
id | Integer (Primary Key) |
value | Text |
last_updated | Timestamp (Default: now()) |
Here, we want to ensure that the data between tableA
is always synced with tableB
.
It is important to have some form of indication to know what has changed in tableA
, so that we can insert/update this into tableB
. In this case, we are going with the last_updated
column.
We will ensure that for every change on tableA
will update the last_updated
value for that row. This way, we only need to fetch data whose last_updated
timestamp value is greater than the latest last_updated
timestamp on tableB
.
Similarly, you can have other patterns like maintaining an is_synced
or status
field.
Handling deletes also can be made simple by having an is_deleted
column in tableA
. So instead of actually deleting the row, you would update its is_deleted
value to true and then handle this change on tableB
.
To implement our simple example of syncing data between tableA
and tableB
, we are going to do the following:
- Clone an existing python cron microservice from Hasura Hub into our Hasura Project.
- Create a new file where the code to sync the tables goes in.
- Syncing Logic would be as follows
- Fetch the row from
tableB
with the latest value forlast_updated
- Fetch all rows from
tableA
after the latestlast_updated
timestamp oftableB
- Insert/Update this data into
tableA
- Fetch the row from
Alright, let’s get started.
Cloning a python cron microservice from Hasura Hub¶
Clone the cron
microservice from hasura/python-cron into your project by following the
instructions here
Note
Since this microservice is a cron job we do not need to expose it to an external URL. You can skip step 4 in the instructions to clone a microservice.
Next, navigate to microservices/cron/src
and add a new file called table_sync.py
.
Let’s now add our logic to this file.
Fetching latest value of last_updated
from tableB
¶
def getLastUpdatedTimestamp() :
tableBRequestPayload = {
"type": "select",
"args": {
"table": "tableB",
"columns": [
"last_updated"
],
"order_by": ["-last_updated"],
"limit": "1"
}
}
lastUpdatedTableBData = requests.request("POST", url, data=json.dumps(tableBRequestPayload), headers=headers)
if (lastUpdatedTableBData.status_code != 200) :
print("Failed to fetch data from tableB")
return
jsonData = lastUpdatedTableBData.json()
lastUpdatedTimestamp = jsonData[0]['last_updated'] if len(jsonData) > 0 else None
return lastUpdatedTimestamp
The order_by
key is used to order the results in the descending order of last_updated
. The '-'
symbol is used for descending and a '+'
symbol for ascending. The limit
key specifies the number of rows we want, in this case 1
, thereby fetching us the row with the latest value of last_updated
.
Fetching new values from tableA
after latest timestamp¶
def getAllDataFromTimestamp(lastUpdatedTimestamp):
requestPayload = {
"type": "select",
"args": {
"table": "tableA",
"columns": [
"*"
]
}
}
if (lastUpdatedTimestamp is not None):
requestPayload['args']['where'] = {
"last_updated": {
"$gt": lastUpdatedTimestamp
}
}
response = requests.request("POST", url, data=json.dumps(requestPayload), headers=headers)
if (response.status_code != 200) :
print("Failed to fetch data from tableA")
print(response.content)
return
jsonData = response.json()
return jsonData
If lastUpdatedTimestamp
does not have a value then fetch everything from tableA
. The where
condition specifies that we want to fetch rows whose last_updated
timestamp is greater than the value provided.
Upserting data into tableB
¶
def upsertDataToTableB(newData):
# Perform operations on new data if required or sync as is
requestPayload = {
"type": "insert",
"args": {
"table": "tableB",
"objects": newData,
"on_conflict": {
"action": "update",
"constraint_on": [
"id"
]
}
}
}
response = requests.request("POST", url, data=json.dumps(requestPayload), headers=headers)
if (response.status_code != 200) :
print("Failed to insert data into tableB")
print(response.content)
return
jsonData = response.json()
return jsonData
The value of on_conflict
specifies that whenever the unique key constraint on the id
column is violated, update the row with the data in the request. A new row is inserted if there is no conflict.
The newData
being passed to this function is the response from the previous function getAllDataFromTimestamp
.
Note
You can also modify the value of newData
according to your needs and then insert it into your “clone table” (tableB
in this case). Ensure that the schema of tableB
is modified as needed.
The complete code for this file can be found here.
Deploying the microservice¶
Before we deploy this microservice, we also need to make a small change in the main.py
file to run our newly create table_sync.py
file every 1 minute.
Open microservices/cron/src/main.py
and replace hello.py
with table_sync.py
in line 7.
Your main.py
will look like:
from crontab import CronTab
# File name for cron
my_cron = CronTab(tabfile='my_cron.tab')
# Add cron command and time span
job = my_cron.new(command='python /usr/src/app/table_sync.py')
job.minute.every(1)
# Write cron jobs to cron tab file
my_cron.write()
# Run the scheduler
for result in my_cron.run_scheduler():
print (result)
To deploy the microservice, simply git push
.
git add . && git commit -m "python sync cron"
git push hasura master