In this article, the focus is on creating a generic repository class for SQLAlchemy using Python and using it in a FastAPI project.
Introduction
The Repository Pattern, as well as the Unit of Work Pattern, allows creating an abstraction layer between the data access layer and the business logic layer of an application. The purpose of creating this layer is to isolate the data access layer so that the changes we may operate cannot affect the business logic layer directly. Most of the time, generic repository classes are used, to avoid redundant codes.
Here, we will focus more to create a generic repository class for SQLAlchemy
using Python
and will use it in a FastAPI
project.
Generic Repository
Table
AppBaseModelOrm
base class for common property or columns TaskQueue
Db table model GroupQueue
Db table model
models.py:
import datetime
from sqlalchemy import Boolean, Column, Integer, String, \
DateTime, PickleType, Enum as EnumType, JSON
from sqlalchemy.dialects.postgresql import UUID
from db.database import Base
class AppBaseModelOrm:
id = Column(Integer, primary_key=True, index=True, autoincrement=True)
is_active = Column(Boolean, default=True)
created_by = Column(Integer)
updated_by = Column(Integer, default=None)
created_datetime = Column(DateTime(timezone=True), default=datetime.datetime.utcnow)
updated_datetime = Column(DateTime(timezone=True), \
default=None, onupdate=datetime.datetime.utcnow)
account_id = Column(Integer)
class TaskQueue(AppBaseModelOrm, Base):
__tablename__ = "task_queues"
name = Column(String, index=True)
class GroupQueue(AppBaseModelOrm, Base):
__tablename__ = "group_queues"
name = Column(String, index=True)
Repository Class
TableRepository
is the base repo class which contains some basic operations/methods, like:
- Read data from a table
- Add/update/delete rows from/to a table
At the constructor level, the repository class expects:
db:Session
DB session object entity:object
Table entity
table_repo.py:
from sqlalchemy import and_
from sqlalchemy.orm import Session
from sqlalchemy.sql.expression import false
from datetime import datetime
class TableRepository:
entity:object = NotImplementedError
db:Session = NotImplementedError
def __init__(self, db:Session, entity:object):
self.db = db
self.entity = entity
def get_all(self):
return self.db.query(self.entity)
def get_by_id(self, id:int):
return self.db.query(self.entity).filter(self.entity.id==id).one()
def find_by_id(self, id:int):
return self.db.query(self.entity).filter(self.entity.id==id).first()
def get_actives(self):
return self.db.query(self.entity).filter(self.entity.is_active==True)
def get_by_account_id(self, account_id:int):
return self.db.query(self.entity).filter(self.entity.account_id==account_id)
def get_actives_by_account_id(self, account_id:int):
return self.db.query(self.entity).filter\
(self.entity.is_active==True, self.entity.account_id==account_id)
def get_by_create_datetime_range(self, from_datetime:datetime, to_datetime:datetime):
data = self.db.query(self.entity).filter\
(self.entity.created_datetime >= from_datetime, \
self.entity.created_datetime <= to_datetime)
return data
def add(self, entity, created_by_user_id:int = None):
entity.created_by = created_by_user_id
self.db.add(entity)
def update(self, entity, updated_by_user_id:int = None):
entity.updated_by = updated_by_user_id
def delete(self, entity, deleted_by_user_id:int = None):
entity.is_active = False
self.update(entity, updated_by_user_id=deleted_by_user_id)
def permanent_delete(self, entity):
self.db.delete(entity)
Using Repository Class
Here, we are using TableRepository
repo class without any inheritance, for TaskQueue
entity and read/write table task_queues
.
task_queue.py:
from typing import Optional, List
from fastapi import FastAPI, Request, Depends
from fastapi_utils.cbv import cbv
from fastapi_utils.inferring_router import InferringRouter
from sqlalchemy.orm import Session
from app import schemas
from app.depends.db_depend import get_db
from app.depends.auth_depend import get_current_user, CurrentUser
from db import models
from db.table_repo import TableRepository
router = InferringRouter()
@cbv(router)
class TaskQueue:
db: Session = Depends(get_db)
current_user:CurrentUser = Depends(get_current_user)
@router.get("/", response_model=List[schemas.TaskQueueSchema])
def get_all(self):
repo = TableRepository(self.db, models.TaskQueue)
items = repo.get_all().all()
return items
@router.get("/actives", response_model=List[schemas.TaskQueueSchema])
def get_actives(self):
repo = TableRepository(self.db, models.TaskQueue)
items = repo.get_actives().all()
return items
@router.get("/account/{account_id}", response_model=List[schemas.TaskQueueSchema])
def get_by_account(self, account_id: int):
repo = TableRepository(self.db, models.TaskQueue)
items = repo.get_by_account_id(account_id).all()
return items
@router.get("/account/{account_id}/actives", \
response_model=List[schemas.TaskQueueSchema])
def get_actives_by_account(self, account_id: int):
repo = TableRepository(self.db, models.TaskQueue)
items = repo.get_actives_by_account_id(account_id).all()
return items
@router.get("/{id}", response_model=schemas.TaskQueueSchema)
def get_by_id(self, id: int):
repo = TableRepository(self.db, models.TaskQueue)
item = repo.get_by_id(id)
return item
@router.get("/find/{id}", response_model=schemas.TaskQueueSchema)
def find_by_id(self, id: int):
'''can be null'''
repo = TableRepository(self.db, models.TaskQueue)
item = repo.find_by_id(id)
return item
@router.post("/", response_model=schemas.TaskQueueSchema)
def post_item(self, model: schemas.TaskQueueCreate):
item = models.TaskQueue(name=model.name, account_id=model.account_id)
repo = TableRepository(self.db, models.TaskQueue)
repo.add(item, self.current_user.id)
self.db.commit()
self.db.refresh(item)
return item
@router.put("/{id}", response_model=schemas.TaskQueueSchema)
def put_item(self, id:int, model: schemas.TaskQueueUpdate):
'''can be null'''
repo = TableRepository(self.db, models.TaskQueue)
item = repo.find_by_id(id)
if item:
item.name = model.name
repo.update(item, self.current_user.id)
self.db.commit()
self.db.refresh(item)
return item
@router.delete("/{id}", response_model=schemas.TaskQueueSchema)
def delete_item(self, id: int):
'''can be null'''
repo = TableRepository(self.db, models.TaskQueue)
item = repo.find_by_id(id)
if item:
repo.delete(item, self.current_user.id)
self.db.commit()
self.db.refresh(item)
return item
@router.delete("/permanent/{id}", response_model=schemas.TaskQueueSchema)
def permanent_delete_item(self, id: int):
'''can be null'''
repo = TableRepository(self.db, models.TaskQueue)
item = repo.find_by_id(id)
if item:
repo.permanent_delete(item)
self.db.commit()
return item
Using Repository Class as Base
GroupQueueCrud
is inheriting the repo class TableRepository
for GroupQueue
entity and read/write table group_queues
:
Inheriting Repository Class
group_queue_crud.py:
from sqlalchemy.orm import Session
from app import schemas
from db import models
from db.table_repo import TableRepository
class GroupQueueCrud(TableRepository):
def __init__(self, db:Session):
super().__init__(db=db, entity=models.GroupQueue)
Using the CRUD Class
group_queue.py:
from datetime import datetime
from typing import Optional, List
from fastapi import FastAPI, Request, Depends, Query
from fastapi_utils.cbv import cbv
from fastapi_utils.inferring_router import InferringRouter
from sqlalchemy.orm import Session
from app import schemas
from app.depends.db_depend import get_db
from app.depends.auth_depend import get_current_user, CurrentUser
from app.cruds.group_queue_crud import GroupQueueCrud
from db import models
router = InferringRouter()
@cbv(router)
class GroupQueue:
db: Session = Depends(get_db)
current_user:CurrentUser = Depends(get_current_user)
@router.get("/{id}", response_model=schemas.GroupQueueSchema)
def get_by_id(self, id: int):
repo = GroupQueueCrud(self.db)
item = repo.find_by_id(id)
return item
@router.post("/", response_model=schemas.GroupQueueSchema)
def post_item(self, model: schemas.GroupQueueCreate):
item = models.GroupQueue(name=model.name, account_id=model.account_id)
repo = GroupQueueCrud(self.db)
repo.add(item, self.current_user.id)
self.db.commit()
self.db.refresh(item)
return item
@router.put("/{id}", response_model=schemas.GroupQueueSchema)
def put_item(self, id:int, model: schemas.GroupQueueUpdate):
'''can be null'''
repo = GroupQueueCrud(self.db)
item = repo.find_by_id(id)
if item:
item.name = model.name
repo.update(item, self.current_user.id)
self.db.commit()
self.db.refresh(item)
return item
@router.delete("/{id}", response_model=schemas.GroupQueueSchema)
def delete_item(self, id: int):
'''can be null'''
repo = GroupQueueCrud(self.db)
item = repo.find_by_id(id)
if item:
repo.delete(item, self.current_user.id)
self.db.commit()
self.db.refresh(item)
return item
Using the Code
Go to backend folder
Open cmd
Type docker-compose up -d
\backend> docker-compose up -d
project will run http://localhost:4003
Go to Api Doc
http://localhost:4003/docs
Check Task Queue and Group Queue sections:
References
History
- 5th July, 2022: Initial version