from typing import Union from fastapi import FastAPI, HTTPException from src.mongo import getMongo from pydantic import BaseModel from uuid import UUID, uuid4 from http import HTTPStatus class DirectorInput(BaseModel): name: str birthdate: str nationality: str class Director(DirectorInput): id: str class MovieInput(BaseModel): title: str year: int genre: list[str] director_id: str class Movie(MovieInput): id: str director: Director class ReviewInput(BaseModel): movie_id: str reviewer_name: str rating: float comment: str movie: Movie class Review(ReviewInput): id: str db, client = getMongo() app = FastAPI() @app.get("/directors") def getDirectors() -> list[Director]: return [Director(**director) for director in db["directors"].find()] @app.get("/directors/{director_id}") def getDirector(director_id: str): return Director(**db["directors"].find_one({"id": director_id})) @app.post("/directors") def addDirector(director: DirectorInput) -> Director: director = Director(**director.model_dump(), id=str(uuid4())) db["directors"].insert_one(director.model_dump()) return director @app.put("/directors/{director_id}") def updateDirector(director_id: str, director: DirectorInput) -> Director: db["directors"].replace_one( {"id": director_id}, Director(id=director_id, **director.model_dump()).model_dump(), ) return Director(id=director_id, **director.model_dump()) @app.delete("/directors/{director_id}") def dropDirector(director_id: str): return ( HTTPStatus.OK if db["directors"].delete_one({"id": director_id}).deleted_count else HTTPStatus.NOT_FOUND ) @app.get("/movies") def getMovies() -> list[Movie]: return [Movie(**movie) for movie in db["movies"].find()] @app.get("/movies/{movie_id}") def getMovie(movie_id: str): raw_movie = db["movies"].find_one({"id": movie_id}) return Movie(**raw_movie, director=Director(**db["directors"].find_one({"id": raw_movie["director_id"]}))) @app.post("/movies") def addMovie(movie: MovieInput) -> Movie: if not db["directors"].find_one({"id": movie.director_id}): raise HTTPException( status_code=404, detail=f"director with id '{movie.director_id}' not found" ) movie = Movie(**movie.model_dump(), id=str(uuid4())) db["movies"].insert_one(movie.model_dump()) return movie @app.put("/movies/{movie_id}") def updateMovie(movie_id: str, movie: MovieInput) -> Movie: if not db["directors"].find_one({"id": movie.director_id}): raise Exception(f"director with {movie.director_id} not found") db["movies"].replace_one( {"id": movie_id}, Movie(id=movie_id, **movie.model_dump()).model_dump(), ) return Movie(id=movie_id, **movie.model_dump()) @app.delete("/movies/{movie_id}") def dropMovie(movie_id: str): return ( HTTPStatus.OK if db["movies"].delete_one({"id": movie_id}).deleted_count else HTTPStatus.NOT_FOUND ) @app.get("/reviews") def getReviews() -> list[Review]: return [Review(**review) for review in db["reviews"].find()] @app.get("/reviews/{review_id}") def getReview(review_id: str): return Review(**db["review"].find_one({"id": review_id})) @app.post("/reviews") def addReview(review: ReviewInput) -> Review: if not db["reviews"].find_one({"id": review.movie_id}): raise HTTPException( status_code=404, detail=f"movie with id '{review.movie_id}' not found" ) review = Review(**review.model_dump(), id=str(uuid4())) db["reviews"].insert_one(review.model_dump()) return review @app.put("/reviews/{movie_id}") def updateReview(review_id: str, review: Review) -> Review: if not db["movies"].find_one({"id": review.movie_id}): raise Exception(f"movie with {review.movie_id} not found") db["reviews"].replace_one( {"id": review_id}, Review(id=review_id, **review.model_dump()).model_dump(), ) return Review(id=review_id, **review.model_dump()) @app.delete("/reviews/{review_id}") def dropReview(review_id: str): return ( HTTPStatus.OK if db["reviews"].delete_one({"id": review_id}).deleted_count else HTTPStatus.NOT_FOUND )