diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index cbcdcbd..c256592 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -16,7 +16,7 @@ jobs: - name: Install dependencies run: | python -m pip install --upgrade pip - pip install -r requirements.txt + pip install -e . pip install -r requirements_dev.txt - name: Test with pytest run: | diff --git a/pyproject.toml b/pyproject.toml index 5f3e9d1..81b870b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -69,7 +69,7 @@ extend-select = [ "TID", "YTT", ] -ignore = ["E741"] +ignore = ["E741", "E711"] show-fixes = true line-length = 120 indent-width = 4 diff --git a/src/ai/__init__.py b/src/ai/__init__.py new file mode 100644 index 0000000..a590626 --- /dev/null +++ b/src/ai/__init__.py @@ -0,0 +1,3 @@ +from .ai import run + +__all__ = ["run"] diff --git a/src/ai/ai.py b/src/ai/ai.py new file mode 100644 index 0000000..8d7446d --- /dev/null +++ b/src/ai/ai.py @@ -0,0 +1,35 @@ +from game import Main +from loguru import logger +from utils import BestMove, GameMode + +from .move import get_best_move + + +def run() -> None: + app = Main(GameMode.AI_TRAINING) + app.play() + game = app.game + + if not game: + return + + tetris = game.tetris + + while True: + app.handle_events() + app.run_game_loop() + + best_move: BestMove = get_best_move(game.tetris, tetris.tetromino.figure) + figure = game.tetris.tetromino.figure + logger.warning(f"{figure.name=} {best_move=}") + + for rotation in range(best_move.rotation): + tetris.tetromino.rotate() + + for _ in range(abs(best_move.x_axis_offset)): + if best_move.x_axis_offset > 0: + tetris.move_right() + elif best_move.x_axis_offset < 0: + tetris.move_left() + + tetris.drop() diff --git a/src/ai/heuristics/__init__.py b/src/ai/heuristics/__init__.py new file mode 100644 index 0000000..30920fb --- /dev/null +++ b/src/ai/heuristics/__init__.py @@ -0,0 +1,6 @@ +from .bumpiness import get_bumpiness +from .height import aggregate_height +from .holes import count_holes +from .lines import complete_lines + +__all__ = ["aggregate_height", "get_bumpiness", "complete_lines", "count_holes"] diff --git a/src/ai/heuristics/bumpiness.py b/src/ai/heuristics/bumpiness.py new file mode 100644 index 0000000..1fa73af --- /dev/null +++ b/src/ai/heuristics/bumpiness.py @@ -0,0 +1,20 @@ +import numpy as np + +from .peaks import get_peaks + + +def get_bumpiness( + field: np.ndarray[int, np.dtype[np.uint8]], +) -> int: + """ + Calculate the bumpiness of a given field based on peaks. + + Args: + field: The game field. + + Returns: + The bumpiness of the field. + """ + field = get_peaks(field) + diff = np.diff(field) + return int(np.sum(np.abs(diff))) diff --git a/src/ai/heuristics/height.py b/src/ai/heuristics/height.py new file mode 100644 index 0000000..6d9433c --- /dev/null +++ b/src/ai/heuristics/height.py @@ -0,0 +1,17 @@ +import numpy as np + +from .peaks import get_peaks + + +def aggregate_height(field: np.ndarray[int, np.dtype[np.uint8]]) -> int: + """ + Calculates the aggregate height of the field. + + Args: + field: 2D array representing the game field. + + Returns: + The aggregate height of the field. + """ + heights = get_peaks(field) + return int(np.sum(heights)) diff --git a/src/ai/heuristics/holes.py b/src/ai/heuristics/holes.py new file mode 100644 index 0000000..f12ef30 --- /dev/null +++ b/src/ai/heuristics/holes.py @@ -0,0 +1,29 @@ +import numpy as np + + +def count_holes( + field: np.ndarray[int, np.dtype[np.uint8]], +) -> int: + """ + Calculate the number of holes in each column of the given field. + + Args: + field: The signal field. + peaks: Array containing peak indices. If not provided, it will be computed from the field. + + Returns: + The total number of holes in the field. + """ + + num_rows, num_cols = field.shape + holes_count = 0 + + for col in range(num_cols): + has_tile_above = False + + for row in range(num_rows): + if field[row, col] == 1: + has_tile_above = True + elif field[row, col] == 0 and has_tile_above: + holes_count += 1 + return holes_count diff --git a/src/ai/heuristics/lines.py b/src/ai/heuristics/lines.py new file mode 100644 index 0000000..01d9061 --- /dev/null +++ b/src/ai/heuristics/lines.py @@ -0,0 +1,14 @@ +import numpy as np + + +def complete_lines(field: np.ndarray[int, np.dtype[np.uint8]]) -> int: + """ + Calculates the number of complete lines in the field. + + Args: + field: 2D array representing the game field. + + Returns: + The number of complete lines in the field. + """ + return int(np.sum(np.all(field, axis=1))) diff --git a/src/ai/heuristics/peaks.py b/src/ai/heuristics/peaks.py new file mode 100644 index 0000000..c1f7a9c --- /dev/null +++ b/src/ai/heuristics/peaks.py @@ -0,0 +1,20 @@ +import numpy as np + + +def get_peaks(field: np.ndarray[int, np.dtype[np.uint8]]) -> np.ndarray[int, np.dtype[np.uint8]]: + """ + Calculate the peaks of a given field. + + Args: + field: 2D array representing the game field. + + Returns: + 2D array representing the peaks of the field. + """ + result = np.zeros(field.shape[1], dtype=int) + for col in range(field.shape[1]): + for row in range(field.shape[0]): + if field[row, col] != 0: + result[col] = field.shape[0] - row + break + return result diff --git a/src/ai/move.py b/src/ai/move.py new file mode 100644 index 0000000..ea95aa1 --- /dev/null +++ b/src/ai/move.py @@ -0,0 +1,63 @@ +from typing import Optional + +import pygame +from game import Tetris +from game.sprites import Tetromino +from loguru import logger +from utils import CONFIG, BestMove, Direction, Figure + +from .score import calculate_score + +NUM_ROTATIONS: dict[Figure, int] = { + Figure.I: 2, + Figure.O: 1, + Figure.T: 4, + Figure.S: 2, + Figure.Z: 2, + Figure.J: 4, + Figure.L: 4, +} + + +def get_best_move(game: Tetris, figure: Figure) -> BestMove: + best_move: Optional[BestMove] = None + best_score: Optional[float] = None + phantom_sprites = pygame.sprite.Group() # type: ignore + + for rotation in range(NUM_ROTATIONS[figure]): + for i in range(CONFIG.game.columns): + tetermino = Tetromino(phantom_sprites, None, game.field, game.tetromino.figure, True) + x_axis_movement: int = 0 + for _ in range(rotation): + tetermino.rotate() + + while tetermino.move_horizontal(Direction.LEFT): # move maximaly to the left + x_axis_movement -= 1 + + for _ in range(i): + if tetermino.move_horizontal(Direction.RIGHT): # slowly move to the right + x_axis_movement += 1 + + tetermino.drop() + + score: float = calculate_score(game) + + logger.debug(f"{tetermino.figure.name=:3} {score=:6.6f} {best_score=} {rotation=:1} {x_axis_movement=:1}") + + if best_score is None or score > best_score: + best_score = score + best_move = BestMove(rotation, x_axis_movement) + + if not tetermino._are_new_positions_valid( + [pygame.Vector2(block.pos.x + 1, block.pos.y) for block in tetermino.blocks] + ): + continue + + # logger.debug(f"{field=}") + tetermino.kill() + + if not best_move: + best_move = BestMove(0, 0) + tetermino.kill() + phantom_sprites.empty() + return best_move diff --git a/src/ai/score.py b/src/ai/score.py new file mode 100644 index 0000000..e3e023e --- /dev/null +++ b/src/ai/score.py @@ -0,0 +1,17 @@ +import numpy as np +from game import Tetris + +from .heuristics import aggregate_height, complete_lines, count_holes, get_bumpiness + + +def calculate_score(game: Tetris) -> float: + field: np.ndarray[int, np.dtype[np.uint8]] = np.where(game.field != None, 1, 0) + for block in game.tetromino.blocks: + field[int(block.pos.y), int(block.pos.x)] = 1 + + height = aggregate_height(field) * -0.510066 + lines = complete_lines(field) * 0.760666 + holes = count_holes(field) * -0.35663 + bumpiness = get_bumpiness(field) * -0.184483 + + return height + lines + holes + bumpiness diff --git a/src/utils/__init__.py b/src/utils/__init__.py index c6dec43..eb68ea0 100644 --- a/src/utils/__init__.py +++ b/src/utils/__init__.py @@ -3,7 +3,7 @@ from .enum import Direction, GameMode, Rotation from .figure import Figure from .path import BASE_PATH from .settings import read_settings, save_settings -from .tuples import Size +from .tuples import BestMove, Size __all__ = [ "BASE_PATH", @@ -15,4 +15,5 @@ __all__ = [ "GameMode", "read_settings", "save_settings", + "BestMove", ] diff --git a/src/utils/tuples.py b/src/utils/tuples.py index 27e6e0a..de00867 100644 --- a/src/utils/tuples.py +++ b/src/utils/tuples.py @@ -17,3 +17,16 @@ class Size(NamedTuple): if isinstance(other, Size): return Size(self.width - other.width, self.height - other.height) return Size(self.width - other, self.height - other) + + +class BestMove(NamedTuple): + """ + A best move object. + + Attributes: + rotation: The rotation of the best move. + x_axis_offset: The x-axis offset of the best move. + """ + + rotation: int + x_axis_offset: int diff --git a/tests/ai/test_heuristics.py b/tests/ai/test_heuristics.py new file mode 100644 index 0000000..dafda31 --- /dev/null +++ b/tests/ai/test_heuristics.py @@ -0,0 +1,65 @@ +import unittest + +import numpy as np +from ai.heuristics import aggregate_height, complete_lines, count_holes, get_bumpiness + + +class TestHeuristics(unittest.TestCase): + def setUp(self) -> None: + self.field = np.array( + [ + [0, 0, 0, 0, 1, 1, 0, 0, 0, 0], + [0, 1, 1, 1, 1, 1, 1, 0, 0, 1], + [0, 1, 1, 0, 1, 1, 1, 1, 1, 1], + [1, 1, 1, 1, 1, 1, 1, 1, 1, 1], + [1, 1, 1, 0, 1, 1, 1, 1, 1, 1], + [1, 1, 1, 1, 1, 1, 1, 1, 1, 1], + ] + ) + + self.field2 = np.array( + [ + [0, 0, 0, 0, 0, 0, 0, 0, 0, 0], + [0, 0, 0, 0, 0, 0, 0, 0, 0, 0], + [0, 0, 0, 0, 0, 0, 0, 0, 0, 0], + [0, 0, 0, 0, 0, 0, 0, 0, 0, 0], + [0, 0, 0, 0, 0, 0, 0, 0, 0, 1], + [1, 0, 0, 0, 0, 0, 0, 0, 0, 1], + [1, 0, 0, 1, 1, 0, 0, 0, 0, 1], + [1, 1, 0, 1, 1, 0, 0, 0, 0, 1], + ] + ) + self.field3 = np.array( + [ + [0, 0, 0, 0, 0, 0, 0, 0, 0, 0], + [0, 0, 0, 0, 0, 0, 0, 0, 0, 0], + [0, 0, 0, 0, 0, 0, 0, 0, 0, 0], + [0, 0, 0, 0, 0, 0, 0, 0, 0, 0], + [0, 0, 0, 0, 0, 0, 0, 0, 0, 0], + [0, 1, 0, 0, 0, 0, 0, 0, 0, 0], + [0, 1, 1, 0, 0, 0, 0, 0, 0, 1], + [1, 1, 1, 0, 0, 0, 0, 1, 1, 1], + [1, 1, 1, 1, 1, 1, 1, 1, 1, 1], + [1, 1, 1, 1, 1, 1, 0, 0, 1, 1], + ] + ) + + def test_aggregate_height(self) -> None: + self.assertEqual(aggregate_height(self.field), 48) + self.assertEqual(aggregate_height(self.field2), 12) + self.assertEqual(aggregate_height(self.field3), 30) + + def test_complete_lines(self) -> None: + self.assertEqual(complete_lines(self.field), 2) + self.assertEqual(complete_lines(self.field2), 0) + self.assertEqual(complete_lines(self.field3), 1) + + def test_holes(self) -> None: + self.assertEqual(count_holes(self.field), 2) + self.assertEqual(count_holes(self.field2), 0) + self.assertEqual(count_holes(self.field3), 2) + + def test_bumpiness(self) -> None: + self.assertEqual(get_bumpiness(self.field), 6) + self.assertEqual(get_bumpiness(self.field2), 11) + self.assertEqual(get_bumpiness(self.field3), 7) diff --git a/tests/test_blank.py b/tests/test_blank.py deleted file mode 100644 index a59017a..0000000 --- a/tests/test_blank.py +++ /dev/null @@ -1,6 +0,0 @@ -import unittest - - -class TestBlank(unittest.TestCase): - def test(self) -> None: - pass diff --git a/tetris/__main__.py b/tetris/__main__.py index 6152b49..306ecbe 100755 --- a/tetris/__main__.py +++ b/tetris/__main__.py @@ -27,6 +27,13 @@ parser.add_argument( help="Run app with GUI [Default]", ) +parser.add_argument( + "-t", + "--train", + action="store_true", + help="Train AI", +) + def setup_logger(level: str = "warning") -> None: from utils import BASE_PATH @@ -64,10 +71,14 @@ def main(args) -> None: level = "info" else: level = "warning" - setup_logger(level) - run() + if args.train: # type: ignore + import ai + + ai.run() + else: + run() if __name__ == "__main__":