Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / database

Generic Repository with SQLAlchemy and Python

4.27/5 (5 votes)
5 Jul 2022CPOL1 min read 14.5K   139  
Example of Generic Repository Class with SQLAlchemy and Python
In this article, the focus is on creating a generic repository class for SQLAlchemy using Python and using it in a FastAPI project.

Introduction

The Repository Pattern, as well as the Unit of Work Pattern, allows creating an abstraction layer between the data access layer and the business logic layer of an application. The purpose of creating this layer is to isolate the data access layer so that the changes we may operate cannot affect the business logic layer directly. Most of the time, generic repository classes are used, to avoid redundant codes.

Here, we will focus more to create a generic repository class for SQLAlchemy using Python and will use it in a FastAPI project.

Generic Repository

Table

  • AppBaseModelOrm base class for common property or columns
  • TaskQueue Db table model
  • GroupQueue Db table model

models.py:

Python
import datetime
from sqlalchemy import Boolean, Column, Integer, String,  \
     DateTime, PickleType, Enum as EnumType, JSON
from sqlalchemy.dialects.postgresql import UUID
from db.database import Base

# common fields for all entities
class AppBaseModelOrm:
    id = Column(Integer, primary_key=True, index=True, autoincrement=True)
    is_active = Column(Boolean, default=True)  # soft delete
    created_by = Column(Integer)
    updated_by = Column(Integer, default=None)
    created_datetime = Column(DateTime(timezone=True), default=datetime.datetime.utcnow)
    updated_datetime = Column(DateTime(timezone=True), \
                       default=None, onupdate=datetime.datetime.utcnow)

    account_id = Column(Integer)

# tables
class TaskQueue(AppBaseModelOrm, Base):
    __tablename__ = "task_queues"
    name = Column(String, index=True)

class GroupQueue(AppBaseModelOrm, Base):
    __tablename__ = "group_queues"
    name = Column(String, index=True)

Repository Class

TableRepository is the base repo class which contains some basic operations/methods, like:

  • Read data from a table
  • Add/update/delete rows from/to a table

At the constructor level, the repository class expects:

  • db:Session DB session object
  • entity:object Table entity

table_repo.py:

Python
from sqlalchemy import and_
from sqlalchemy.orm import Session
from sqlalchemy.sql.expression import false
from datetime import datetime

class TableRepository:

    entity:object = NotImplementedError
    db:Session = NotImplementedError

    def __init__(self, db:Session, entity:object):
        self.db = db
        self.entity = entity

    def get_all(self):
        return self.db.query(self.entity)
           
    def get_by_id(self, id:int):
        return self.db.query(self.entity).filter(self.entity.id==id).one()

    def find_by_id(self, id:int):
        return self.db.query(self.entity).filter(self.entity.id==id).first()

    def get_actives(self):
        return self.db.query(self.entity).filter(self.entity.is_active==True)

    def get_by_account_id(self, account_id:int):
        return self.db.query(self.entity).filter(self.entity.account_id==account_id)

    def get_actives_by_account_id(self, account_id:int):
        return self.db.query(self.entity).filter\
        (self.entity.is_active==True, self.entity.account_id==account_id)

    def get_by_create_datetime_range(self, from_datetime:datetime, to_datetime:datetime):
        data = self.db.query(self.entity).filter\
        (self.entity.created_datetime >= from_datetime, \
        self.entity.created_datetime <= to_datetime)
        return data

    def add(self, entity, created_by_user_id:int = None):
        entity.created_by = created_by_user_id
        self.db.add(entity)      

    def update(self, entity, updated_by_user_id:int = None):
        entity.updated_by = updated_by_user_id

    def delete(self, entity, deleted_by_user_id:int = None):
        entity.is_active = False
        self.update(entity, updated_by_user_id=deleted_by_user_id)

    def permanent_delete(self, entity):
        self.db.delete(entity) 

Using Repository Class

Here, we are using TableRepository repo class without any inheritance, for TaskQueue entity and read/write table task_queues.

task_queue.py:

Python
from typing import Optional, List
from fastapi import FastAPI, Request, Depends
from fastapi_utils.cbv import cbv
from fastapi_utils.inferring_router import InferringRouter
from sqlalchemy.orm import Session

from app import schemas
from app.depends.db_depend import get_db
from app.depends.auth_depend import get_current_user, CurrentUser
from db import models
from db.table_repo import TableRepository

router = InferringRouter()

@cbv(router)
class TaskQueue:
    db: Session = Depends(get_db)
    current_user:CurrentUser = Depends(get_current_user)

    @router.get("/", response_model=List[schemas.TaskQueueSchema])
    def get_all(self):
        repo = TableRepository(self.db, models.TaskQueue)
        items = repo.get_all().all()
        return items

    @router.get("/actives", response_model=List[schemas.TaskQueueSchema])
    def get_actives(self):
        repo = TableRepository(self.db, models.TaskQueue)
        items = repo.get_actives().all()
        return items

    @router.get("/account/{account_id}", response_model=List[schemas.TaskQueueSchema])
    def get_by_account(self, account_id: int):
        repo = TableRepository(self.db, models.TaskQueue)
        items = repo.get_by_account_id(account_id).all()
        return items

    @router.get("/account/{account_id}/actives", \
                 response_model=List[schemas.TaskQueueSchema])
    def get_actives_by_account(self, account_id: int):
        repo = TableRepository(self.db, models.TaskQueue)
        items = repo.get_actives_by_account_id(account_id).all()
        return items

    @router.get("/{id}", response_model=schemas.TaskQueueSchema)
    def get_by_id(self, id: int):
        repo = TableRepository(self.db, models.TaskQueue)
        item = repo.get_by_id(id)
        return item

    @router.get("/find/{id}", response_model=schemas.TaskQueueSchema)
    def find_by_id(self, id: int):
        '''can be null'''
        repo = TableRepository(self.db, models.TaskQueue)
        item = repo.find_by_id(id)
        return item

    @router.post("/", response_model=schemas.TaskQueueSchema)
    def post_item(self, model: schemas.TaskQueueCreate):
        item = models.TaskQueue(name=model.name, account_id=model.account_id)
        repo = TableRepository(self.db, models.TaskQueue)
        repo.add(item, self.current_user.id)
        self.db.commit()
        self.db.refresh(item)
        return item

    @router.put("/{id}", response_model=schemas.TaskQueueSchema)
    def put_item(self, id:int, model: schemas.TaskQueueUpdate):
        '''can be null'''
        repo = TableRepository(self.db, models.TaskQueue)
        item = repo.find_by_id(id)
        if item:
            item.name = model.name
            repo.update(item, self.current_user.id)
            self.db.commit()
            self.db.refresh(item)
        return item

    @router.delete("/{id}", response_model=schemas.TaskQueueSchema)
    def delete_item(self, id: int):
        '''can be null'''       
        repo = TableRepository(self.db, models.TaskQueue)
        item = repo.find_by_id(id)
        if item:
            repo.delete(item, self.current_user.id)
            self.db.commit()
            self.db.refresh(item)
        return item

    @router.delete("/permanent/{id}", response_model=schemas.TaskQueueSchema)
    def permanent_delete_item(self, id: int):
        '''can be null'''       
        repo = TableRepository(self.db, models.TaskQueue)
        item = repo.find_by_id(id)
        if item:
            repo.permanent_delete(item)
            self.db.commit()
        return item

Using Repository Class as Base

GroupQueueCrud is inheriting the repo class TableRepository for GroupQueue entity and read/write table group_queues:

Inheriting Repository Class

group_queue_crud.py:

Python
from sqlalchemy.orm import Session
from app import schemas
from db import models
from db.table_repo import TableRepository

class GroupQueueCrud(TableRepository):
    
    def __init__(self, db:Session): 
        super().__init__(db=db, entity=models.GroupQueue)

Using the CRUD Class

group_queue.py:

Python
from datetime import datetime
from typing import Optional, List
from fastapi import FastAPI, Request, Depends, Query
from fastapi_utils.cbv import cbv
from fastapi_utils.inferring_router import InferringRouter
from sqlalchemy.orm import Session

from app import schemas
from app.depends.db_depend import get_db
from app.depends.auth_depend import get_current_user, CurrentUser
from app.cruds.group_queue_crud import GroupQueueCrud
from db import models

router = InferringRouter()


@cbv(router)
class GroupQueue:
    db: Session = Depends(get_db)
    current_user:CurrentUser = Depends(get_current_user)

    @router.get("/{id}", response_model=schemas.GroupQueueSchema)
    def get_by_id(self, id: int):
        repo = GroupQueueCrud(self.db)
        item = repo.find_by_id(id)
        return item    

    @router.post("/", response_model=schemas.GroupQueueSchema)
    def post_item(self, model: schemas.GroupQueueCreate):
        item = models.GroupQueue(name=model.name, account_id=model.account_id)
        repo = GroupQueueCrud(self.db)
        repo.add(item, self.current_user.id)
        self.db.commit()
        self.db.refresh(item)
        return item

    @router.put("/{id}", response_model=schemas.GroupQueueSchema)
    def put_item(self, id:int, model: schemas.GroupQueueUpdate):
        '''can be null'''
        repo = GroupQueueCrud(self.db)
        item = repo.find_by_id(id)
        if item:
            item.name = model.name
            repo.update(item, self.current_user.id)
            self.db.commit()
            self.db.refresh(item)
        return item

    @router.delete("/{id}", response_model=schemas.GroupQueueSchema)
    def delete_item(self, id: int):
        '''can be null'''       
        repo = GroupQueueCrud(self.db)
        item = repo.find_by_id(id)
        if item:
            repo.delete(item, self.current_user.id)
            self.db.commit()
            self.db.refresh(item)
        return item

Using the Code

Python
Go to backend folder
Open cmd 
Type docker-compose up -d

\backend> docker-compose up -d

project will run http://localhost:4003

Go to Api Doc
http://localhost:4003/docs#/

Check Task Queue and Group Queue sections:

Image 1

Image 2

References

History

  • 5th July, 2022: Initial version

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)