From 15112f912e6237114135573f9d1532355e534dc5 Mon Sep 17 00:00:00 2001 From: Kristofers Solo Date: Sun, 22 Oct 2023 23:08:51 +0300 Subject: [PATCH] feat(sqlite): move from csv to sqlite --- src/bookstore/inventory.py | 95 ++++++++++++++++++-------------------- 1 file changed, 45 insertions(+), 50 deletions(-) diff --git a/src/bookstore/inventory.py b/src/bookstore/inventory.py index 1cc18fe..31c080d 100644 --- a/src/bookstore/inventory.py +++ b/src/bookstore/inventory.py @@ -1,4 +1,4 @@ -import csv +import sqlite3 from pathlib import Path from .book import Book @@ -6,70 +6,65 @@ from .isbn import ISBN class Inventory: - def __init__(self, books: list[Book] | None) -> None: - self.books: dict[ISBN, Book] = {book.isbn: book for book in books} if books else {} + def __init__(self, db_path: Path) -> None: + self.conn = sqlite3.connect(db_path) + self.cursor = self.conn.cursor() + self.cursor.execute("CREATE TABLE IF NOT EXISTS Book (title TEXT, author TEXT, isbn TEXT PRIMARY KEY, price REAL, stock INTEGER)") - def read_from_file(self, path: Path) -> None: - """Read `Inventory` from `path` csv file.""" - with open(path, mode="r") as file: - reader = csv.reader(file) + def save(self) -> None: + """Save `Inventory` to SQLite database.""" + self.conn.commit() - next(reader, None) # Skip header + def close(self) -> None: + """Close database connection.""" + self.conn.close() - for title, author, isbn, price, stock in reader: - book = Book(title, author, isbn, price, stock) - self.add(book) - - def save_to_file(self, path: Path) -> None: - """Save `Inventory` to `path` csv file.""" - with open(path, mode="w") as file: - writer = csv.writer(file) - - writer.writerow(["Title", "Auther", "ISBN", "Price", "Stock"]) - - for book in self.books.values(): - writer.writerow([book.title, book.author, book.isbn, book.price, book.stock]) - - def add(self, book: Book) -> None: + def add(self, *books: Book) -> None: """Add `Book` to the `Inventory`. `Book`s ISBN must be unique.""" - if book.isbn in self.books: - raise BaseException(f"Book with this ISBN: {book.isbn} already exists!") # TODO: create custom exception - - self.books[book.isbn] = book + for book in books: + try: + self.cursor.execute("INSERT INTO Book VALUES (?, ?, ?, ?, ?)", (book.title, book.author, book.isbn, book.price, book.stock)) + self.conn.commit() + except sqlite3.InternalError: + print(f"A book with ISBN {book.isbn} already exists in the database.") def delete(self, isbn: ISBN) -> Book | None: - """Deletes `Book` from `Inventory` by `isbn` and returns deleted `Book`""" - return self.books.pop(isbn, None) + """Deletes `Book` from `Inventory` by `ISBN` and returns deleted `Book`""" + deleted_book = self.find_by_isbn(isbn) + + self.cursor.execute("DELETE FROM Book WHERE isbn = ?", (isbn,)) + self.conn.commit() + + return deleted_book def find_by_isbn(self, isbn: ISBN) -> Book | None: """Looks up `Book` within `Inventory` by book `ISBN` and returns it. Returns `None` if it doesn't exist.""" - if isbn not in self.books.keys(): + self.cursor.execute("SELECT * FROM Book WHERE isbn = ?", (isbn,)) + book = self.cursor.fetchone() + if not book: return None - return self.books.get(isbn) + return Book(*book) def find_by_title(self, title: str) -> list[Book] | None: """Looks up `Book`s within `Inventory` by book title and returns them as a `List`. Returns `None` if none were found""" - found_books = [] - for book in self.books.values(): - if book.title.lower() == title.strip().lower(): - found_books.append(book) - - if found_books: - return found_books - return None + self.cursor.execute("SELECT * FROM Book WHERE title = ?", (title,)) + books = self.cursor.fetchall() + if not books: + return None + return [Book(*book) for book in books] def find_by_author(self, author: str) -> list[Book] | None: """Looks up `Book`s within `Inventory` by book author and returns them as a `List`. Returns `None` if none were found""" + self.cursor.execute("SELECT * FROM Book WHERE author = ?", (author,)) + books = self.cursor.fetchall() + if not books: + return None + return [Book(*book) for book in books] - found_books = [] - for book in self.books.values(): - if book.author.lower() == author.strip().lower(): - found_books.append(book) - - if found_books: - return found_books - return None - - def list_all(self) -> list[Book]: + def list_all(self) -> list[Book] | None: """Returns `List` of all `Book`s.""" - return list(self.books.values()) + self.cursor.execute("SELECT * FROM Book") + books = self.cursor.fetchall() + if not books: + return None + return [Book(*book) for book in books]