개요
SQLAlchemy는 파이썬으로 DB를 관리하는 패키지다.
이걸 모를 때는 쿼리를 아래와 같이 작성하고
query = """SELECT *
FROM SAMPLE_TABLE
WHERE TIMESTAMP < '2025-04-01'
"""
sqlite로 db에 연결해서 excute하고 그랬는데 그럴 필요가 없다.
Core 방식과 ORM (Object Relational Mapping)이 있다.
Core:
SQL의 표현에 가까운 방식
low-level
ORM:
객체지향적인 방식
high-level
이라고 한다.
우선 SQL과 유사한 Core 방식을 먼저 살펴본 다음에 ORM 방식을 살펴보기로 한다.
살펴볼 내용들
실습할 쿼리문들
- SELECT
- INSERT
- UPDATE
- DELETE
사용한 파이썬 패키지
- databases
- sqlalchemy
- fastapi
- sqlite3
문서
자세한 내용은 SQLAlchemy의 공식 Documentation을 참고했다. (링크)
데이터
쿼리를 사용하는 데이터는 추천 시스템에서 유명한 MovieLens-1m 데이터를 사용한다. (링크)
사람들이 영화에 별점을 매긴 1m = 1 백만개의 점수 데이터다.
이를 ott.db 형태의 데이터로 가공해서 사용한다.
자세한 코드는 생략하겠지만 가공 방법은 다음과 같다.
- txt를 with open으로 불러오고
- pandas dataframe으로 변형하고
- sqlite3로 ott.db를 만든 다음 해당 DB에 table 형태로 저장한다.
DB의 테이블은 users, movies, ratings 세 개가 있다.
아래와 같은 형태다.
movies
users
ratings
1. Core 방식
참조 문서: 링크
engine을 만들어서 connection을 관리한다.
관련 문서: 링크
데이터베이스 형식 선언
# ratings, user, movie 등등의 테이블 정의
'''
databases는 비동기를 지원하는 라이브러리다.
async와 await가 가능하다.
반면에 database는 비동기식이다.
'''
from sqlalchemy import Table, Column, Integer, String, Float, ForeignKey
#from app.database import metadata
from database import metadata
movies = Table(
"movies",
metadata,
Column("movieId", Integer, primary_key=True),
Column("title", String),
Column("genre", String),
)
users = Table(
"users",
metadata,
Column("userId", Integer, primary_key=True),
Column("gender", String),
Column("age", Integer),
Column("occupation", String),
Column("zipCode", String),
)
ratings = Table(
"ratings",
metadata,
Column("userId", Integer, ForeignKey("users.userId")),
Column("movieId", Integer, ForeignKey("movies.movieId")),
Column("rating", Float),
Column("timestamp", Integer),
)
SELECT, INSERT, UPDATE, DELETE를 알아본다. (링크)
SELECT 조회
보통 HTTP의 GET과 같이 사용한다.
from sqlalchemy import select
from sqlalchemy import create_engine
# 상대 경로로 변경
DATABASE_URL = "sqlite:///./ott.db"
# 동기 엔진 생성
engine = create_engine(DATABASE_URL)
smt = select(movies, ratings, users).select_from(movies).join(ratings).join(users).where(users.c.userId == 1).limit(10)
with engine.connect() as conn:
result = conn.execute(smt)
for row in result:
print(row)
>>>
(1193, "One Flew Over the Cuckoo's Nest (1975)", 'Drama', 1, 1193, 5.0, 978300760, 1, 'F', 1, 10, '48067')
(661, 'James and the Giant Peach (1996)', "Animation|Children's|Musical", 1, 661, 3.0, 978302109, 1, 'F', 1, 10, '48067')
(914, 'My Fair Lady (1964)', 'Musical|Romance', 1, 914, 3.0, 978301968, 1, 'F', 1, 10, '48067')
(3408, 'Erin Brockovich (2000)', 'Drama', 1, 3408, 4.0, 978300275, 1, 'F', 1, 10, '48067')
(2355, "Bug's Life, A (1998)", "Animation|Children's|Comedy", 1, 2355, 5.0, 978824291, 1, 'F', 1, 10, '48067')
(1197, 'Princess Bride, The (1987)', 'Action|Adventure|Comedy|Romance', 1, 1197, 3.0, 978302268, 1, 'F', 1, 10, '48067')
(1287, 'Ben-Hur (1959)', 'Action|Adventure|Drama', 1, 1287, 5.0, 978302039, 1, 'F', 1, 10, '48067')
(2804, 'Christmas Story, A (1983)', 'Comedy|Drama', 1, 2804, 5.0, 978300719, 1, 'F', 1, 10, '48067')
(594, 'Snow White and the Seven Dwarfs (1937)', "Animation|Children's|Musical", 1, 594, 4.0, 978302268, 1, 'F', 1, 10, '48067')
(919, 'Wizard of Oz, The (1939)', "Adventure|Children's|Drama|Musical", 1, 919, 4.0, 978301368, 1, 'F', 1, 10, '48067')
SELECT
FROM
JOIN
JOIN
WHERE
LIMIT
위 명령어들을
select(movies, ratings, users).select_from(movies).join(ratings).join(users).where(users.c.userId == 1).limit(10)
같은 형태로 적용하게 된다.
logger 선언
logger라고 로그를 기록하는 객체를 만들어서 사용할 수 있다.
import logging
# Configure logging
logging.basicConfig(
level=logging.INFO,
filename='app.log', # Specify the log file name
filemode='a', # Append mode; use 'w' to overwrite the file each time
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
함수로 SELECT 선언
def get_movies_by_user(user_id, engine):
try:
# 동기 엔진 생성
#engine = create_engine(DATABASE_URL)
# SQLAlchemy 쿼리 작성
stmt = select(movies, ratings, users).select_from(movies).join(ratings).join(users).where(users.c.userId == user_id)
with engine.connect() as conn:
result = conn.execute(stmt)
result_list = result.fetchall()
return result_list
except Exception as e:
logger.error(f"Error fetching movies for user {user_id}: {str(e)}")
raise e
result_list = get_movies_by_user(1, engine)
result_list[3:10]
INSERT 선언
INSERT DB에 데이터를 삽입하는 함수로 HTTP의 POST와 주로 결합한다.
ratings 테이블에 유저 1이 새로운 영화 A에 대한 평점을 새로 기록하는 상황을 구현하고자 한다.
우선 유저 1이 평점을 매기지 않았던 영화를 뽑는다.
모든 영화 ID 가져오기
import numpy as np
# 모든 영화의 ids를 가져온다.
def get_all_movie_ids(engine):
try:
# 동기 엔진 생성
#engine = create_engine(DATABASE_URL)
# SQLAlchemy 쿼리 작성
stmt = select(movies.c.movieId)
with engine.connect() as conn:
result = conn.execute(stmt)
result_list = result.fetchall()
return result_list
except Exception as e:
logger.error(f"Error fetching all movie IDs: {str(e)}")
raise e
all_movie_ids = get_all_movie_ids(engine)
print(f"The number of movies in the database: {len(all_movie_ids)}")
>> The number of movies in the database: 3883
유저 1이 평점 매기지 않았던 영화들 중에서 하나 뽑기 with numpy
# Convert result_list to a list of movie IDs
all_movie_ids = [row[0] for row in all_movie_ids]
all_movie_ids[0:5], all_movie_ids[-5:]
>> ([1, 2, 3, 4, 5], [3948, 3949, 3950, 3951, 3952])
# Convert result_list to a list of movie IDs
movie_ids = [row[0] for row in result_list]
pool = set(all_movie_ids) - set(movie_ids)
pool = np.array(list(pool))
chosen = np.random.choice(pool, 1, replace=False)
chosen_movie_id = chosen[0]
print(f'chosen movieId: {chosen[0]}')
>> chosen movieId: 593
새로 삽입할 영화의 ID는 593이다.
Insert 함수
chosen_movie_id의 타입을 type(chosen_movie_id)로 확인하면 numpy.int64가 나온다.
이를 sqlite의 table에 바로 넣으면 5102000000000000 이라는 값이 DB에 삽입된다.
이를 파이썬의 정수 형태로 변경하는 따라서 chosen_movie_id = int(chosen_movie_id) 작업이 필요하다.
from sqlalchemy import insert
import time
def insert_ratings_by_user(user_id, movie_id, rating_num, engine):
try:
# 동기 엔진 생성
#engine = create_engine(DATABASE_URL)
print(f"movieId: {movie_id}")
# SQLAlchemy 쿼리 작성
stmt = insert(ratings).values(userId=user_id, movieId=movie_id, rating=rating_num, timestamp=time.time())
with engine.connect() as conn:
conn.execute(stmt)
conn.commit()
#logger.info(f"Inserted rating: {rating_num} by user: {user_id} to movie: {movie_id}")
except Exception as e:
logger.error(f"Error inserting rating by user: {user_id} to movie {movie_id}: {str(e)}")
raise e
# numpy.int65 타입의 chosen_movie_id를 int로 변환
# 그렇지 않으면 DB에서 5102000000000000과 같은 값으로 인식한다.
chosen_movie_id = int(chosen_movie_id)
insert_ratings_by_user(1, chosen_movie_id, 5, engine)
>> movieId: 593
데이터 삽입 확인
result_list = get_movies_by_user(1, engine)
result_list[-5:]
>> [(531, 'Secret Garden, The (1993)', "Children's|Drama", 1, 531, 4.0, 978302149, 1, 'F', 1, 10, '48067'),
(3114, 'Toy Story 2 (1999)', "Animation|Children's|Comedy", 1, 3114, 4.0, 978302174, 1, 'F', 1, 10, '48067'),
(608, 'Fargo (1996)', 'Crime|Drama|Thriller', 1, 608, 4.0, 978301398, 1, 'F', 1, 10, '48067'),
(1246, 'Dead Poets Society (1989)', 'Drama', 1, 1246, 4.0, 978302091, 1, 'F', 1, 10, '48067'),
(593, 'Silence of the Lambs, The (1991)', 'Drama|Thriller', 1, 593, 5.0, 1745368989.8571641, 1, 'F', 1, 10, '48067')]
맨 마지막에 보면 영화 593, Silence of the Lambs가 삽입된걸 확인할 수 있다. 별점은 1, 593, 다음의 숫자인 5.0이다.
UPDATE 선언
업데이트는 기존 DB의 값을 변형할 때 사용한다.
실무에서는 조심해서 사용해야 한다.
HTTP의 PUT과 PATCH와 사용한다.
여기서는 유저 1이 영화 593에 매긴 별점을 4로 변환한다.
from sqlalchemy import update
def update_ratings_by_user(user_id, movie_id, rating_num, engine):
try:
# 동기 엔진 생성
#engine = create_engine(DATABASE_URL)
# SQLAlchemy 쿼리 작성
stmt = update(ratings).where(ratings.c.userId == user_id, ratings.c.movieId == movie_id).values(rating=rating_num)
with engine.connect() as conn:
conn.execute(stmt)
conn.commit()
#logger.info(f"Updated rating: {rating_num} by user: {user_id} for movie: {movie_id}")
except Exception as e:
logger.error(f"Error updating rating by user: {user_id} for movie: {movie_id}: {str(e)}")
raise e
update_ratings_by_user(1, chosen_movie_id, 4, engine)
result_list = get_movies_by_user(1, engine)
result_list[-5:]
>> [(531, 'Secret Garden, The (1993)', "Children's|Drama", 1, 531, 4.0, 978302149, 1, 'F', 1, 10, '48067'),
(3114, 'Toy Story 2 (1999)', "Animation|Children's|Comedy", 1, 3114, 4.0, 978302174, 1, 'F', 1, 10, '48067'),
(608, 'Fargo (1996)', 'Crime|Drama|Thriller', 1, 608, 4.0, 978301398, 1, 'F', 1, 10, '48067'),
(1246, 'Dead Poets Society (1989)', 'Drama', 1, 1246, 4.0, 978302091, 1, 'F', 1, 10, '48067'),
(593, 'Silence of the Lambs, The (1991)', 'Drama|Thriller', 1, 593, 4.0, 1745368989.8571641, 1, 'F', 1, 10, '48067')]
맨 마지막 행을 보면 영화 593, Silence of the Lambs의 별점이 5.0에서 4.0으로 변한걸 알 수 있다.
DELETE 선언
기존 DB의 값을 삭제할 때 사용한다.
실무에서는 굉장히 조심해서 사용해야 한다.
HTTP의 DELETE와 사용한다.
from sqlalchemy import delete
def delete_ratings_by_user(user_id, movie_id, engine):
try:
# 동기 엔진 생성
#engine = create_engine(DATABASE_URL)
# SQLAlchemy 쿼리 작성
stmt = delete(ratings).where(ratings.c.userId == user_id, ratings.c.movieId == movie_id)
with engine.connect() as conn:
conn.execute(stmt)
conn.commit()
#logger.info(f"Deleted rating by user: {user_id} for movie: {movie_id}")
except Exception as e:
logger.error(f"Error deleting rating by user: {user_id} for movie: {movie_id}: {str(e)}")
raise e
delete_ratings_by_user(1, 593, engine)
result_list = get_movies_by_user(1, engine)
result_list[-5:]
>> [(2028, 'Saving Private Ryan (1998)', 'Action|Drama|War', 1, 2028, 5.0, 978301619, 1, 'F', 1, 10, '48067'),
(531, 'Secret Garden, The (1993)', "Children's|Drama", 1, 531, 4.0, 978302149, 1, 'F', 1, 10, '48067'),
(3114, 'Toy Story 2 (1999)', "Animation|Children's|Comedy", 1, 3114, 4.0, 978302174, 1, 'F', 1, 10, '48067'),
(608, 'Fargo (1996)', 'Crime|Drama|Thriller', 1, 608, 4.0, 978301398, 1, 'F', 1, 10, '48067'),
(1246, 'Dead Poets Society (1989)', 'Drama', 1, 1246, 4.0, 978302091, 1, 'F', 1, 10, '48067')]
영화 593, Silence of the Lambs이 삭제된걸 확인할 수 있다.
Async 비동기 방식
기존의 태스크가 완료되지 않았더라도 다음 태스크에 대한 수행이 가능한 방법이다.
여기서는 SELECT만 확인한다.
관련문서: 링크
import asyncio
from sqlalchemy import select
from sqlalchemy.ext.asyncio import create_async_engine
# 상대 경로로 변경
DATABASE_URL = "sqlite+aiosqlite:///./ott.db"
# 동기 엔진 생성
async_engine = create_async_engine(DATABASE_URL)
async def get_movies_by_user(user_id, async_engine):
try:
# 비동기 엔진 생성
engine = create_async_engine(DATABASE_URL)
# SQLAlchemy 쿼리 작성
stmt = select(movies, ratings, users).select_from(movies).join(ratings).join(users).where(users.c.userId == user_id)
async with async_engine.connect() as conn:
result = await conn.execute(stmt)
result_list = result.fetchall()
return result_list
except Exception as e:
logger.error(f"Error fetching movies for user {user_id}: {str(e)}")
raise e
result_list = await get_movies_by_user(1, async_engine)
result_list[3:10]
>> [(3408, 'Erin Brockovich (2000)', 'Drama', 1, 3408, 4.0, 978300275, 1, 'F', 1, 10, '48067'),
(2355, "Bug's Life, A (1998)", "Animation|Children's|Comedy", 1, 2355, 5.0, 978824291, 1, 'F', 1, 10, '48067'),
(1197, 'Princess Bride, The (1987)', 'Action|Adventure|Comedy|Romance', 1, 1197, 3.0, 978302268, 1, 'F', 1, 10, '48067'),
(1287, 'Ben-Hur (1959)', 'Action|Adventure|Drama', 1, 1287, 5.0, 978302039, 1, 'F', 1, 10, '48067'),
(2804, 'Christmas Story, A (1983)', 'Comedy|Drama', 1, 2804, 5.0, 978300719, 1, 'F', 1, 10, '48067'),
(594, 'Snow White and the Seven Dwarfs (1937)', "Animation|Children's|Musical", 1, 594, 4.0, 978302268, 1, 'F', 1, 10, '48067'),
(919, 'Wizard of Oz, The (1939)', "Adventure|Children's|Drama|Musical", 1, 919, 4.0, 978301368, 1, 'F', 1, 10, '48067')]
2. ORM 방식
참조 문서: 링크
engine을 session으로 만들어서 관리한다.
데이터베이스 테이블 정의
참조 문서: 링크
from sqlalchemy import ForeignKey
from sqlalchemy.orm import (
DeclarativeBase,
mapped_column,
Mapped
)
class Base(DeclarativeBase):
pass
class Users(Base):
__tablename__ = "users"
userId: Mapped[int] = mapped_column(primary_key=True)
gender: Mapped[str]
age: Mapped[int]
occupation: Mapped[str]
zipCode: Mapped[str]
class Movies(Base):
__tablename__ = "movies"
movieId: Mapped[int] = mapped_column(primary_key=True)
title: Mapped[str]
genre: Mapped[str]
class Ratings(Base):
__tablename__ = "ratings"
userId: Mapped[int] = mapped_column(ForeignKey("users.userId"), primary_key=True)
movieId: Mapped[int] = mapped_column(ForeignKey("movies.movieId"), primary_key=True)
rating: Mapped[float]
timestamp: Mapped[int]
SELECT
참조 문서: 링크
from sqlalchemy import create_engine
from sqlalchemy import select
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import Session
# 상대 경로로 변경
DATABASE_URL = "sqlite:///./ott.db" # 현재 디렉토리에 생성되도록 수정
# 동기 엔진 생성
engine = create_engine(DATABASE_URL, echo=True)
session = Session(engine)
stmt = select(Users).where(Users.userId.in_([1, 2]))
for user in session.scalars(stmt):
print(f"User ID: {user.userId}, Gender: {user.gender}, Age: {user.age}, Occupation: {user.occupation}, ZipCode: {user.zipCode}")
>> 2025-04-23 10:14:17,774 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2025-04-23 10:14:17,778 INFO sqlalchemy.engine.Engine SELECT users."userId", users.gender, users.age, users.occupation, users."zipCode"
FROM users
WHERE users."userId" IN (?, ?)
2025-04-23 10:14:17,778 INFO sqlalchemy.engine.Engine [generated in 0.00044s] (1, 2)
User ID: 1, Gender: F, Age: 1, Occupation: 10, ZipCode: 48067
User ID: 2, Gender: M, Age: 56, Occupation: 16, ZipCode: 70072
위와 같은 결과가 나온다.
engien을 설정할 때 echo=True라고 설정해서 SQL 쿼리문의 수행 결과가 출력 되는것 같다.
왜 Core 방식에서는 안 나타나는지 추가로 확인해봐야겠다.
session.scalars(stmt)의 결과는 <sqlalchemy.engine.result.ChunkedIteratorResult at 0x7f431a553080> 라는 형태의 데이터다.
이를 for 문으로 살펴보면 아래와 같이 _sa_instance_state로 나온다.
for movie, rating in movies_list:
print(movie.__dict__)
print(rating.__dict__)
print("---")
>>>
{'_sa_instance_state': <sqlalchemy.orm.state.InstanceState object at 0x7f431a6757e0>, 'movieId': 1193, 'title': "One Flew Over the Cuckoo's Nest (1975)", 'genre': 'Drama'}
{'_sa_instance_state': <sqlalchemy.orm.state.InstanceState object at 0x7f431a674a00>, 'timestamp': 978300760, 'userId': 1, 'movieId': 1193, 'rating': 5.0}
---
{'_sa_instance_state': <sqlalchemy.orm.state.InstanceState object at 0x7f431a675840>, 'movieId': 661, 'title': 'James and the Giant Peach (1996)', 'genre': "Animation|Children's|Musical"}
{'_sa_instance_state': <sqlalchemy.orm.state.InstanceState object at 0x7f431a6758a0>, 'timestamp': 978302109, 'userId': 1, 'movieId': 661, 'rating': 3.0}
---
{'_sa_instance_state': <sqlalchemy.orm.state.InstanceState object at 0x7f431a675900>, 'movieId': 914, 'title': 'My Fair Lady (1964)', 'genre': 'Musical|Romance'}
{'_sa_instance_state': <sqlalchemy.orm.state.InstanceState object at 0x7f431a675960>, 'timestamp': 978301968, 'userId': 1, 'movieId': 914, 'rating': 3.0}
---
{'_sa_instance_state': <sqlalchemy.orm.state.InstanceState object at 0x7f431a6759c0>, 'movieId': 3408, 'title': 'Erin Brockovich (2000)', 'genre': 'Drama'}
{'_sa_instance_state': <sqlalchemy.orm.state.InstanceState object at 0x7f431a675a20>, 'timestamp': 978300275, 'userId': 1, 'movieId': 3408, 'rating': 4.0}
---
{'_sa_instance_state': <sqlalchemy.orm.state.InstanceState object at 0x7f431a675a80>, 'movieId': 2355, 'title': "Bug's Life, A (1998)", 'genre': "Animation|Children's|Comedy"}
{'_sa_instance_state': <sqlalchemy.orm.state.InstanceState object at 0x7f431a675ae0>, 'timestamp': 978824291, 'userId': 1, 'movieId': 2355, 'rating': 5.0}
---
따라서 향후 API를 만들기 위해서 파이썬 딕셔너리 형태로 만들어주는게 좋기 때문에 이를 직접 만들어줘야 한다.
그리고 engine을 사용한 Core와 다르게 ORM에서는 Session의 클래스가 있어서 타입 힌트 형태로 함수를 짤 수가 있다.
검색해보니 파이썬의 type annotation or type hint는 mypy 등과 같은 타입 검사 도구를 사용하지 않는 한 타입의 강제성은 없다고 한다.
from typing import List, Dict
def select_movies_by_user_id(user_id: int, session: Session) -> List[Dict]:
stmt = select(Movies, Ratings).join(Ratings).where(Ratings.userId == user_id)
movies_list = session.execute(stmt).all()
result = []
for movie, ratings in movies_list:
result.append(({'userId': user_id,
'movieId': movie.movieId,
'title': movie.title,
'genre': movie.genre,
'rating': ratings.rating}))
return result
movies_list = select_movies_by_user_id(1, session)
movies_list[0:5]
>>
2025-04-23 10:14:37,650 INFO sqlalchemy.engine.Engine SELECT movies."movieId", movies.title, movies.genre, ratings."userId", ratings."movieId" AS "movieId_1", ratings.rating, ratings.timestamp
FROM movies JOIN ratings ON movies."movieId" = ratings."movieId"
WHERE ratings."userId" = ?
2025-04-23 10:14:37,652 INFO sqlalchemy.engine.Engine [generated in 0.00132s] (1,)
[{'userId': 1,
'movieId': 1193,
'title': "One Flew Over the Cuckoo's Nest (1975)",
'genre': 'Drama',
'rating': 5.0},
{'userId': 1,
'movieId': 661,
'title': 'James and the Giant Peach (1996)',
'genre': "Animation|Children's|Musical",
'rating': 3.0},
{'userId': 1,
'movieId': 914,
'title': 'My Fair Lady (1964)',
'genre': 'Musical|Romance',
'rating': 3.0},
{'userId': 1,
'movieId': 3408,
'title': 'Erin Brockovich (2000)',
'genre': 'Drama',
'rating': 4.0},
{'userId': 1,
'movieId': 2355,
'title': "Bug's Life, A (1998)",
'genre': "Animation|Children's|Comedy",
'rating': 5.0}]
딕셔너리들의 리스트 형태로 결과가 잘 나오는 것을 확인할 수 있다.
Insert, Update, Delete 참조 문서: 링크
INSERT
이번에는 get_all_movie_ids를 session을 이용해서 ORM 방법으로 짜고,
새로운 영화 ID를 하나 뽑는다.
import numpy as np
def get_all_movie_ids(session: Session) -> List[int]:
try:
# 동기 엔진 생성
#engine = create_engine(DATABASE_URL)
# SQLAlchemy 쿼리 작성
stmt = select(Movies.movieId)
result = session.execute(stmt)
result_list = result.fetchall()
# Convert result_list to a list of movie IDs
result_list = [row[0] for row in result_list]
return result_list
except Exception as e:
logger.error(f"Error fetching all movie IDs: {str(e)}")
raise e
all_movie_ids = get_all_movie_ids(session)
print(f"The number of movies in the database: {len(all_movie_ids)}")
print(all_movie_ids[0:5], all_movie_ids[-5:])
# Convert result_list to a list of movie IDs
result_list = select_movies_by_user_id(1, session)
movie_ids = [row['movieId'] for row in result_list]
pool = set(all_movie_ids) - set(movie_ids)
pool = np.array(list(pool))
chosen = np.random.choice(pool, 1, replace=False)
chosen_movie_id = chosen[0]
chosen_movie_id = int(chosen_movie_id)
print(f'chosen movieId: {chosen[0]}')
>>
2025-04-23 10:20:58,517 INFO sqlalchemy.engine.Engine SELECT movies."movieId"
FROM movies
2025-04-23 10:20:58,518 INFO sqlalchemy.engine.Engine [cached since 172.6s ago] ()
The number of movies in the database: 3883
[1, 2, 3, 4, 5] [3948, 3949, 3950, 3951, 3952]
2025-04-23 10:20:58,531 INFO sqlalchemy.engine.Engine SELECT movies."movieId", movies.title, movies.genre, ratings."userId", ratings."movieId" AS "movieId_1", ratings.rating, ratings.timestamp
FROM movies JOIN ratings ON movies."movieId" = ratings."movieId"
WHERE ratings."userId" = ?
2025-04-23 10:20:58,531 INFO sqlalchemy.engine.Engine [cached since 380.9s ago] (1,)
chosen movieId: 125
이번에는 movie_id로 125번이 나왔다.
유저 1이 movie_id 125에 별점을 주는 방식으로 INSERT를 수행한다.
from sqlalchemy import insert
import time
def insert_ratings_by_user(user_id: int, movie_id: int, rating_num: float, session: Session):
try:
# 동기 엔진 생성
#engine = create_engine(DATABASE_URL)
# SQLAlchemy 쿼리 작성
stmt = insert(Ratings).values(userId=user_id, movieId=movie_id, rating=rating_num, timestamp=time.time())
session.execute(stmt)
session.commit()
#logger.info(f"Inserted rating: {rating_num} by user: {user_id} to movie: {movie_id}")
except Exception as e:
logger.error(f"Error inserting rating by user: {user_id} to movie {movie_id}: {str(e)}")
raise e
insert_ratings_by_user(1, chosen_movie_id, 5, session)
result_list = select_movies_by_user_id(1, session)
result_list[-5:]
2025-04-23 10:21:22,854 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2025-04-23 10:21:22,856 INFO sqlalchemy.engine.Engine INSERT INTO ratings ("userId", "movieId", rating, timestamp) VALUES (?, ?, ?, ?)
2025-04-23 10:21:22,857 INFO sqlalchemy.engine.Engine [cached since 19.93s ago] (1, 125, 5.0, 1745371282.8546083)
2025-04-23 10:21:22,865 INFO sqlalchemy.engine.Engine COMMIT
2025-04-23 10:21:22,872 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2025-04-23 10:21:22,873 INFO sqlalchemy.engine.Engine SELECT movies."movieId", movies.title, movies.genre, ratings."userId", ratings."movieId" AS "movieId_1", ratings.rating, ratings.timestamp
FROM movies JOIN ratings ON movies."movieId" = ratings."movieId"
WHERE ratings."userId" = ?
2025-04-23 10:21:22,873 INFO sqlalchemy.engine.Engine [cached since 405.2s ago] (1,)
[{'userId': 1,
'movieId': 3114,
'title': 'Toy Story 2 (1999)',
'genre': "Animation|Children's|Comedy",
'rating': 4.0},
{'userId': 1,
'movieId': 608,
'title': 'Fargo (1996)',
'genre': 'Crime|Drama|Thriller',
'rating': 4.0},
{'userId': 1,
'movieId': 1246,
'title': 'Dead Poets Society (1989)',
'genre': 'Drama',
'rating': 4.0},
{'userId': 1,
'movieId': 125,
'title': 'Flirting With Disaster (1996)',
'genre': 'Comedy',
'rating': 5.0},
{'userId': 1,
'movieId': 125,
'title': 'Flirting With Disaster (1996)',
'genre': 'Comedy',
'rating': 5.0}]
영화 ID가 125인 Flirting With Disaster에 5점을 준 것을 확인할 수 있다.
UPDATE
영화 ID가 125인 Flirting With Disaster에 대한 별점을 5에서 3으로 변경해보자.
from sqlalchemy import update
def update_ratings_by_user(user_id: int, movie_id: int, rating_num: float, session: Session):
try:
# 동기 엔진 생성
#engine = create_engine(DATABASE_URL)
# SQLAlchemy 쿼리 작성
stmt = update(Ratings).where(Ratings.userId == user_id, Ratings.movieId == movie_id).values(rating=rating_num)
session.execute(stmt)
session.commit()
#logger.info(f"Updated rating: {rating_num} by user: {user_id} for movie: {movie_id}")
except Exception as e:
logger.error(f"Error updating rating by user: {user_id} for movie: {movie_id}: {str(e)}")
raise e
update_ratings_by_user(1, chosen_movie_id, 3, session)
result_list = select_movies_by_user_id(1, session)
result_list[-5:]
>> 2025-04-23 10:22:13,395 INFO sqlalchemy.engine.Engine UPDATE ratings SET rating=? WHERE ratings."userId" = ? AND ratings."movieId" = ?
2025-04-23 10:22:13,397 INFO sqlalchemy.engine.Engine [generated in 0.00137s] (3.0, 1, 125)
2025-04-23 10:22:13,718 INFO sqlalchemy.engine.Engine COMMIT
2025-04-23 10:22:13,722 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2025-04-23 10:22:13,723 INFO sqlalchemy.engine.Engine SELECT movies."movieId", movies.title, movies.genre, ratings."userId", ratings."movieId" AS "movieId_1", ratings.rating, ratings.timestamp
FROM movies JOIN ratings ON movies."movieId" = ratings."movieId"
WHERE ratings."userId" = ?
2025-04-23 10:22:13,724 INFO sqlalchemy.engine.Engine [cached since 456.1s ago] (1,)
[{'userId': 1,
'movieId': 3114,
'title': 'Toy Story 2 (1999)',
'genre': "Animation|Children's|Comedy",
'rating': 4.0},
{'userId': 1,
'movieId': 608,
'title': 'Fargo (1996)',
'genre': 'Crime|Drama|Thriller',
'rating': 4.0},
{'userId': 1,
'movieId': 1246,
'title': 'Dead Poets Society (1989)',
'genre': 'Drama',
'rating': 4.0},
{'userId': 1,
'movieId': 125,
'title': 'Flirting With Disaster (1996)',
'genre': 'Comedy',
'rating': 3.0},
{'userId': 1,
'movieId': 125,
'title': 'Flirting With Disaster (1996)',
'genre': 'Comedy',
'rating': 3.0}]
3점으로 정상적으로 변경되었음을 알 수 있다.
DELETE
이번에는 해당 영화에 대한 평점 정보를 삭제해본다.
from sqlalchemy import delete
def delete_ratings_by_user(user_id: int, movie_id: int, session: Session):
try:
stmt = delete(Ratings).where(Ratings.userId == user_id, Ratings.movieId == movie_id)
session.execute(stmt)
session.commit()
#logger.info(f"Deleted rating by user: {user_id} for movie: {movie_id}")
except Exception as e:
logger.error(f"Error deleting rating by user: {user_id} for movie: {movie_id}: {str(e)}")
raise e
delete_ratings_by_user(1, chosen_movie_id, session)
result_list = select_movies_by_user_id(1, session)
result_list[-5:]
>> 2025-04-23 10:23:03,879 INFO sqlalchemy.engine.Engine DELETE FROM ratings WHERE ratings."userId" = ? AND ratings."movieId" = ?
2025-04-23 10:23:03,880 INFO sqlalchemy.engine.Engine [generated in 0.00108s] (1, 125)
2025-04-23 10:23:04,241 INFO sqlalchemy.engine.Engine COMMIT
2025-04-23 10:23:04,246 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2025-04-23 10:23:04,246 INFO sqlalchemy.engine.Engine SELECT movies."movieId", movies.title, movies.genre, ratings."userId", ratings."movieId" AS "movieId_1", ratings.rating, ratings.timestamp
FROM movies JOIN ratings ON movies."movieId" = ratings."movieId"
WHERE ratings."userId" = ?
2025-04-23 10:23:04,247 INFO sqlalchemy.engine.Engine [cached since 506.6s ago] (1,)
[{'userId': 1,
'movieId': 2028,
'title': 'Saving Private Ryan (1998)',
'genre': 'Action|Drama|War',
'rating': 5.0},
{'userId': 1,
'movieId': 531,
'title': 'Secret Garden, The (1993)',
'genre': "Children's|Drama",
'rating': 4.0},
{'userId': 1,
'movieId': 3114,
'title': 'Toy Story 2 (1999)',
'genre': "Animation|Children's|Comedy",
'rating': 4.0},
{'userId': 1,
'movieId': 608,
'title': 'Fargo (1996)',
'genre': 'Crime|Drama|Thriller',
'rating': 4.0},
{'userId': 1,
'movieId': 1246,
'title': 'Dead Poets Society (1989)',
'genre': 'Drama',
'rating': 4.0}]
정상적으로 영화 ID가 125인 Flirting With Disaster에 대한 별점 정보가 삭제되었음을 알 수 있다.
Async 비동기
여기서는 ORM 방식의 비동기에 대해서 알아본다.
역시나 SELECT에 대해서만 수행한다.
참조 문서: 링크
import asyncio
from sqlalchemy import create_engine
from sqlalchemy import select
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import Session
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker, AsyncAttrs
# 상대 경로로 변경
DATABASE_URL = "sqlite+aiosqlite:///./ott.db"
# 동기 엔진 생성
async_engine = create_async_engine(DATABASE_URL, echo=True)
async_session = async_sessionmaker(async_engine, expire_on_commit=False, class_=AsyncSession)
async def get_movies_by_user_id(user_id: int, async_session: AsyncSession) -> List[Dict]:
async with async_session() as session:
async with session.begin():
stmt = select(Movies, Ratings).join(Ratings).where(Ratings.userId == user_id).limit(5)
result = await session.execute(stmt)
result = []
for movie, ratings in movies_list:
result.append(({'userId': user_id,
'movieId': movie.movieId,
'title': movie.title,
'genre': movie.genre,
'rating': ratings.rating}))
return result
movies_list = await get_movies_by_user_id(1, async_session)
movies_list
>>
2025-04-22 13:37:55,610 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2025-04-22 13:37:55,611 INFO sqlalchemy.engine.Engine SELECT movies."movieId", movies.title, movies.genre, ratings."userId", ratings."movieId" AS "movieId_1", ratings.rating, ratings.timestamp
FROM movies JOIN ratings ON movies."movieId" = ratings."movieId"
WHERE ratings."userId" = ?
LIMIT ? OFFSET ?
2025-04-22 13:37:55,612 INFO sqlalchemy.engine.Engine [generated in 0.00049s] (1, 5, 0)
2025-04-22 13:37:55,643 INFO sqlalchemy.engine.Engine COMMIT
[{'userId': 1,
'movieId': 1193,
'title': "One Flew Over the Cuckoo's Nest (1975)",
'genre': 'Drama',
'rating': 5.0},
{'userId': 1,
'movieId': 661,
'title': 'James and the Giant Peach (1996)',
'genre': "Animation|Children's|Musical",
'rating': 3.0},
{'userId': 1,
'movieId': 914,
'title': 'My Fair Lady (1964)',
'genre': 'Musical|Romance',
'rating': 3.0},
{'userId': 1,
'movieId': 3408,
'title': 'Erin Brockovich (2000)',
'genre': 'Drama',
'rating': 4.0},
{'userId': 1,
'movieId': 2355,
'title': "Bug's Life, A (1998)",
'genre': "Animation|Children's|Comedy",
'rating': 5.0}]
정상적으로 유저 1이 평점을 매긴 영화 정보가 불러와지는걸 알 수 있다.
동기방식과 비동기방식, Blocking과 Non Blocking에 대한 참고 글: 링크
References:
https://pypi.org/project/databases/
https://yoon001.tistory.com/87
https://conding-note.tistory.com/103
https://docs.sqlalchemy.org/en/20/core/selectable.html#sqlalchemy.sql.expression.Select
https://docs.sqlalchemy.org/en/20/orm/quickstart.html#simple-select
https://docs.sqlalchemy.org/en/20/orm/queryguide/select.html#orm-queryguide-select-columns
https://docs.sqlalchemy.org/en/20/orm/extensions/asyncio.html
https://welcome-be.tistory.com/18
https://docs.sqlalchemy.org/en/20/core/dml.html#sqlalchemy.sql.expression.update
https://docs.sqlalchemy.org/en/20/orm/queryguide/dml.html#orm-bulk-insert-statements
https://humblego.tistory.com/18
https://www.daleseo.com/python-type-annotations/
https://turtleiscrying.tistory.com/29
https://otzslayer.github.io/cs/2024/11/22/fastapi-sync-async-benchmark.html
'개발 > Backend' 카테고리의 다른 글
LLM 기반 추천 시스템 백엔드 - LLM 추천 파트 (0) | 2025.04.25 |
---|---|
LLM 기반 추천 시스템 백엔드 (0) | 2025.04.25 |
FastAPI 기초 알아보기 (0) | 2025.04.20 |
백엔드 기초 개념과 가이드 라인 (1) | 2025.03.30 |