본문 바로가기
개발/Backend

LLM 기반 추천 시스템 백엔드 - LLM 추천 파트

by 아르카눔 2025. 4. 25.

추천에 대한 기능 중에서는 특정 유저에 대한 추천의 생성과 등록된 추천을 불러는 것의 두 가지 기능이 있다.

 

여기서는 추천의 생성 및 DB 등록만 알아본다.

 

POST로 추천 생성 

@app.post('/api/recommend')
async def create_recommend(user_id: UserId, db: AsyncSession = Depends(get_async_db)):
    return await recommend_func(user_id, db)

 

아래는 llm_recommend.py에 있는 함수들이다.

 

우선 LLM을 활용한 추천을 위한 프롬프트를 준비한다.

그 다음 프롬프트에 유저가 이전에 높은 별점을 매긴 영화 정보를 삽입한다.

현재 서비스하고 있는, 다시 말해서 DB에 있는 영화로 한정하기 위해서 추천 가능한 영화 정보를 가져온다.

이때 LangChain을 사용하고 API 호출 비용이 저렴한 Gemini-1.5-flash를 사용한다.

추천 영화 제목 목록을 생성한 다음에는 movies 테이블에서 영화의 ID, 장르를, 그리고 ratings 테이블에서 계산한 평균 별점을 가져온다. 

 

 

LLM 모델 정의, 프롬프트 정의, 체인 정의

from dotenv import load_dotenv
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import JsonOutputParser

# .env 파일 로드
load_dotenv()

# Load LLM model
# Gemini API Key  
API_KEY = os.getenv("GEMINI_API_KEY")

# Credentials 관련 조치
credentials_path = os.getenv("GEMINI_CREDENTIAL_PATH")
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = credentials_path

model_name = "gemini-1.5-flash"
model = ChatGoogleGenerativeAI(model=model_name)

# 원하는 데이터 구조를 정의한다.
class RecommendationJson(BaseModel):
    items: str = Field(description="유저에 추천할 만한 영화들 목록")
    explanation: str = Field(description="추천의 이유")


# 파서를 설정하고 프롬프트 템플릿에 지시사항을 주입한다.
# 그러면 Topic 클래스의 템플릿에 맞는 형태로 JSON으로 반환.  
parser = JsonOutputParser(pydantic_object=RecommendationJson)

prompt = ChatPromptTemplate.from_messages(
    [
        ("system", "당신은 영화 추천 시스템 AI 어시스턴트 입니다. 주어진 영화 리스트 중에서 유저가 좋아할 만한 영화들을 10개 추천하고 그 이유를 간략하게 설명하세요."),
        ("user", "#Format: {format_instructions}\n\n#Movie List: {movie_candidates}\n\n#Question: {question}"),
    ]
)

prompt = prompt.partial(format_instructions=parser.get_format_instructions())

chain = prompt | model | parser  # 체인을 구성한다.

 

 

추천 가능한, 현재 제공 가능한 영화 목록 가져오기

# 비동기 데이터베이스 의존성
async def get_async_db():
    async with async_session_maker() as session:
        yield session

# 추천 가능한 영화 리스트 가져오기

async def get_all_movie_list(db:  AsyncSession = Depends(get_async_db)):
    try:
        query = (
            select(Movies.movieId, Movies.title, Movies.genre)
            .select_from(Movies)
        )
  
        results = await db.execute(query)
        results = results.all()

        #print(f"results: {results}")
        results_as_dict = []
        for row in results:
            rating_dict = {
                "movieId": row.movieId,
                "title": row.title,
                'genre': row.genre
            }
            results_as_dict.append(rating_dict)

        #print(f"results: {results_as_dict}")
        
        if not results_as_dict:
            return {"message": "No Movie list.",
                'movieList': []} # 없으면 빈칸 리턴 
        return {"message": "Movie list loaded successfully.",
                'movieList': results_as_dict}
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

 

 

유저의 별점 히스토리 가져오기

 

# 특정 유저의 rating history를 불러온다 
async def get_rating_history(userId: int, db:  AsyncSession = Depends(get_async_db)):
    try:
        '''
        # 평점이 높은 10개만 리턴하고 영화 정보도 가져온다. 
        '''
        query = (
            select(Ratings.userId, 
                   Ratings.movieId,
                   Ratings.rating,
                   Movies.title,
                   Movies.genre,
                   Ratings.timestamp
                   )
            .select_from(Ratings)
            .join(Movies, Ratings.movieId == Movies.movieId)
            .where(Ratings.userId == userId)
            .order_by(desc(Ratings.rating))
            .limit(10)
        )
  
        results = await db.execute(query)
        rating_history = results.all()

        results_as_dict = []
        for row in rating_history:
            rating_dict = {
                "userId": row.userId,
                "movieId": row.movieId,
                "rating": row.rating,
                "title": row.title,
                "genre": row.genre,
                "timestamp": row.timestamp
            }
            results_as_dict.append(rating_dict)

        #print(f"results: {results}")
        
        if not results_as_dict:
            return {"message": "No Rating history.",
                'rating_history': []} # 없으면 빈칸 리턴 
        return {"message": "Rating history loaded successfully.",
                'rating_history': results_as_dict}
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

 

 

추천된 영화 타이틀에 대한 ID, 평균 별점 및 장르 정보 추출

 

rating 테이블에서 평균 별점을 서브쿼리로 가져오는 방식으로 추출한다.

 

# 영화 리스트에 있는 영화들의 정보들을 가져온다 
async def get_movie_info(movieList: list, db:  AsyncSession = Depends(get_async_db)):
    try:
        # 영화의 평균 rating을 구하는 서브쿼리
        avg_rating_subquery = (
            select(Ratings.movieId, func.round(func.avg(Ratings.rating), 1).label("meanRating"))
            .group_by(Ratings.movieId)
            .subquery()
        )

        query = (
            select(
                   Movies.movieId,
                   Movies.title,
                   Movies.genre,
                   avg_rating_subquery.c.meanRating # 평균 평점 컬럼 추가 
                   )
            .select_from(Movies)
            .join(avg_rating_subquery, Movies.movieId == avg_rating_subquery.c.movieId, isouter=True)
            .where(Movies.title.in_(movieList))
        )
  
        results = await db.execute(query)
        results = results.all()

        #print(f"results: {results}")
        timestamp_unix = time.time()
        results_as_dict = []
        for row in results:
            rating_dict = {
                "movieId": row.movieId,
                "rating": row.meanRating,
                "title": row.title,
                "genre": row.genre,
                "timestamp": timestamp_unix
            }
            results_as_dict.append(rating_dict)
        print(f'movide info result: {results_as_dict}')
        
        if not results_as_dict:
            return {"message": "No Informations in DB.",
                'movieInfo': []} # 없으면 빈칸 리턴 
        return {"message": "Movie info loaded successfully.",
                'movieInfo': results_as_dict}
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

 

 

영화 목록을 문자열로 변환하는 함수와 프롬프트에서 유저의 별점을 등록하는 함수 

 

def convert_movie_list_to_string(movie_list: List[dict]) -> str:
    """
    Convert a list of movie titles to a comma-separated string.
    """
    movie_string = ', '.join([movie['title'] for movie in movie_list])
    return movie_string


def fill_template(target_template, source_data):
    rating_history = source_data['rating_history']
    intro = target_template['introduction']
    rating_template = target_template['rating_template']
    filled_template = intro

    for rating_row in rating_history:
        item = rating_row['title']
        genre = rating_row['genre']
        rating = rating_row['rating']
        filled_rating_template = rating_template.format(item=item, rating=rating)
        filled_template += filled_rating_template
    return filled_template

 

 

ORM 방식으로 DB에 추천 영화를 INSERT

 

List[dict] 형태로 넣어서 벌크 단위로 INSERT가 가능하다. 

# ORM 방식
async def insert_recommend(recommendations: List[dict], 
                           userId: int, 
                           db: AsyncSession = Depends(get_async_db)):
    try:
        # 사전에 정의된 추천 모델 정보 
        recommenderId = 2
        recommenderName = 'LLM-Gemini-Prompt-v1'

        bulk_insert_data = []
        print(f"recommendations in insert rec: {recommendations}")
        for movie in recommendations:
            recommended_movie = {
                "userId": userId,
                "movieId": movie['movieId'],
                "meanRating": movie['rating'],
                "timestamp": movie['timestamp'],
                "recommenderId": recommenderId,
                "recommenderName": recommenderName
            }
            bulk_insert_data.append(recommended_movie)

        insert_stmt = insert(Recommendations).values(bulk_insert_data)
        await db.execute(insert_stmt)  # 세션에 모델 인스턴스 추가
        await db.commit()  # 변경 사항 커밋

        return {"message": "Recommendations added successfully"}
    
    except Exception as e:
        await db.rollback()
        raise HTTPException(status_code=500, detail=str(e))

 

 

최종 추천 함수

 

async def recommend_func(userId: int, db: AsyncSession = Depends(get_async_db)):
    userId = int(userId.userId)
    #print(f"userId: {userId}")
    rating_history = await get_rating_history(userId, db)
    #print(f"rating_history: {rating_history['message']}")
    filled_template = fill_template(target_template, rating_history)
    #print(f'filled_template: {filled_template}')

    #movie_candidates = manager.get_candidates()
    movie_candidates = await get_all_movie_list(db)
    if movie_candidates['movieList'] is None:
        return {"message": "Movie candidates have not been initialized."}
    else:
        movie_candidates = movie_candidates['movieList']
        #print(f"movie_candidates: {movie_candidates}")

    recommended = chain.invoke({"movie_candidates": movie_candidates,
                                "question": filled_template})
    
    print(f"recommended: {recommended}")
    movieList = recommended['items']
    recommendations = await get_movie_info(movieList, db)
    print(f"recommendations with info added: {recommendations['message']}")
    message = await insert_recommend(recommendations['movieInfo'], userId, db)
    return recommendations

 

 

중간 중간 print하면서 함수들이 제대로 작동하는지 확인했다.

 

추천 영화 풀을 유저마다, 혹은 유저를 클러스터링해서 클러스터마다 다르게 줄 수도 있고,

 

영화의 장르 정보를 사용할 수도 있고 여러가지로 고도화가 가능할 여지가 많아 보이지만 일단 POC 느낌으로 여기까지만 구성한다.

 

 

 

 

'개발 > Backend' 카테고리의 다른 글

LLM 기반 추천 시스템 백엔드  (0) 2025.04.25
SQLAlchemy 기초  (0) 2025.04.23
FastAPI 기초 알아보기  (0) 2025.04.20
백엔드 기초 개념과 가이드 라인  (1) 2025.03.30