mirror of
https://github.com/kristoferssolo/Traffic-Light-Detector.git
synced 2026-03-22 00:36:22 +00:00
Moved from logging to loguru
This commit is contained in:
@@ -1,20 +1,14 @@
|
|||||||
"""This program uses a trained neural network to detect the color of a traffic light in images."""
|
"""This program uses a trained neural network to detect the color of a traffic light in images."""
|
||||||
|
|
||||||
import logging
|
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
from detector.object_detection import load_ssd_coco, perform_object_detection
|
from detector.object_detection import load_ssd_coco, perform_object_detection
|
||||||
from detector.paths import IMAGES_IN_PATH, LOGS_PATH, MODEL_PATH
|
from detector.paths import IMAGES_IN_PATH, MODEL_PATH
|
||||||
|
from loguru import logger
|
||||||
from tensorflow import keras
|
from tensorflow import keras
|
||||||
|
|
||||||
# Set up logging
|
|
||||||
logger = logging.getLogger(__name__)
|
|
||||||
handler = logging.FileHandler(str(Path.joinpath(LOGS_PATH, f"{__name__}.log")))
|
|
||||||
formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
|
|
||||||
handler.setFormatter(formatter)
|
|
||||||
logger.addHandler(handler)
|
|
||||||
|
|
||||||
|
|
||||||
|
@logger.catch
|
||||||
def detect_traffic_light_color_image() -> None:
|
def detect_traffic_light_color_image() -> None:
|
||||||
model_traffic_lights_nn = keras.models.load_model(str(MODEL_PATH))
|
model_traffic_lights_nn = keras.models.load_model(str(MODEL_PATH))
|
||||||
|
|
||||||
|
|||||||
@@ -1,6 +1,5 @@
|
|||||||
"""This program extracts traffic lights from images."""
|
"""This program extracts traffic lights from images."""
|
||||||
|
|
||||||
import logging
|
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
import cv2
|
import cv2
|
||||||
@@ -9,16 +8,11 @@ from detector.object_detection import (
|
|||||||
load_ssd_coco,
|
load_ssd_coco,
|
||||||
perform_object_detection,
|
perform_object_detection,
|
||||||
)
|
)
|
||||||
from detector.paths import CROPPED_IMAGES_PATH, INPUT_PATH, LOGS_PATH
|
from detector.paths import CROPPED_IMAGES_PATH, INPUT_PATH
|
||||||
|
from loguru import logger
|
||||||
# Set up logging
|
|
||||||
logger = logging.getLogger(__name__)
|
|
||||||
handler = logging.FileHandler(str(Path.joinpath(LOGS_PATH, f"{__name__}.log")))
|
|
||||||
formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
|
|
||||||
handler.setFormatter(formatter)
|
|
||||||
logger.addHandler(handler)
|
|
||||||
|
|
||||||
|
|
||||||
|
@logger.catch
|
||||||
def extract_traffic_lights() -> None:
|
def extract_traffic_lights() -> None:
|
||||||
files = Path.iterdir(INPUT_PATH)
|
files = Path.iterdir(INPUT_PATH)
|
||||||
|
|
||||||
|
|||||||
@@ -1,21 +1,16 @@
|
|||||||
import tensorflow as tf
|
"""This program helps detect objects (e.g. traffic lights) in images."""
|
||||||
import numpy as np
|
|
||||||
import cv2
|
|
||||||
import logging
|
|
||||||
from detector.paths import LOGS_PATH, IMAGES_OUT_PATH
|
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
|
import cv2
|
||||||
|
import numpy as np
|
||||||
|
import tensorflow as tf
|
||||||
|
from detector.paths import IMAGES_OUT_PATH
|
||||||
|
from loguru import logger
|
||||||
|
|
||||||
# Inception V3 model for Keras
|
# Inception V3 model for Keras
|
||||||
from tensorflow.keras.applications.inception_v3 import preprocess_input
|
from tensorflow.keras.applications.inception_v3 import preprocess_input
|
||||||
|
|
||||||
|
|
||||||
# Set up logging
|
|
||||||
logger = logging.getLogger(__name__)
|
|
||||||
handler = logging.FileHandler(str(Path.joinpath(LOGS_PATH, f"{__name__}.log")))
|
|
||||||
formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
|
|
||||||
handler.setFormatter(formatter)
|
|
||||||
logger.addHandler(handler)
|
|
||||||
|
|
||||||
# COCO labels
|
# COCO labels
|
||||||
LABEL_PERSON = 1
|
LABEL_PERSON = 1
|
||||||
LABEL_CAR = 3
|
LABEL_CAR = 3
|
||||||
@@ -24,20 +19,43 @@ LABEL_TRUCK = 8
|
|||||||
LABEL_TRAFFIC_LIGHT = 10
|
LABEL_TRAFFIC_LIGHT = 10
|
||||||
LABEL_STOP_SIGN = 13
|
LABEL_STOP_SIGN = 13
|
||||||
|
|
||||||
|
# Create a dictionary that maps object class labels to their corresponding colors and text labels
|
||||||
|
LABELS = {
|
||||||
|
LABEL_PERSON: (0, 255, 255),
|
||||||
|
LABEL_CAR: (255, 255, 0),
|
||||||
|
LABEL_BUS: (255, 255, 0),
|
||||||
|
LABEL_TRUCK: (255, 255, 0),
|
||||||
|
LABEL_TRAFFIC_LIGHT: (255, 255, 255),
|
||||||
|
LABEL_STOP_SIGN: (128, 0, 0),
|
||||||
|
}
|
||||||
|
|
||||||
def accept_box(boxes, box_index, tolerance) -> bool:
|
LABEL_TEXT = {
|
||||||
|
LABEL_PERSON: "Person",
|
||||||
|
LABEL_CAR: "Car",
|
||||||
|
LABEL_BUS: "Bus",
|
||||||
|
LABEL_TRUCK: "Truck",
|
||||||
|
LABEL_TRAFFIC_LIGHT: "Traffic Light",
|
||||||
|
LABEL_STOP_SIGN: "Stop Sign",
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@logger.catch
|
||||||
|
def accept_box(boxes: list[dict[str, float]] | None, box_index: int, tolerance: int) -> bool:
|
||||||
"""Eliminate duplicate bounding boxes."""
|
"""Eliminate duplicate bounding boxes."""
|
||||||
box = boxes[box_index]
|
if boxes is not None:
|
||||||
|
box = boxes[box_index]
|
||||||
|
|
||||||
for idx in range(box_index):
|
for idx in range(box_index):
|
||||||
other_box = boxes[idx]
|
other_box = boxes[idx]
|
||||||
if abs(center(other_box, "x") - center(box, "x")) < tolerance and abs(center(other_box, "y") - center(box, "y")) < tolerance:
|
if abs(center(other_box, "x") - center(box, "x")) < tolerance and abs(center(other_box, "y") - center(box, "y")) < tolerance:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
return True
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
def load_model(model_name):
|
@logger.catch
|
||||||
|
def load_model(model_name: str) -> tf.saved_model.LoadOptions:
|
||||||
"""Download a pretrained object detection model, and save it to your hard drive."""
|
"""Download a pretrained object detection model, and save it to your hard drive."""
|
||||||
url = f"http://download.tensorflow.org/models/object_detection/tf2/20200711/{model_name}.tar.gz"
|
url = f"http://download.tensorflow.org/models/object_detection/tf2/20200711/{model_name}.tar.gz"
|
||||||
|
|
||||||
@@ -49,7 +67,8 @@ def load_model(model_name):
|
|||||||
return tf.saved_model.load(f"{model_dir}/saved_model")
|
return tf.saved_model.load(f"{model_dir}/saved_model")
|
||||||
|
|
||||||
|
|
||||||
def load_rgb_images(files, shape=None):
|
@logger.catch
|
||||||
|
def load_rgb_images(files, shape: tuple[int, int] | None = None):
|
||||||
"""Loads the images in RGB format."""
|
"""Loads the images in RGB format."""
|
||||||
|
|
||||||
# For each image in the directory, convert it from BGR format to RGB format
|
# For each image in the directory, convert it from BGR format to RGB format
|
||||||
@@ -59,88 +78,73 @@ def load_rgb_images(files, shape=None):
|
|||||||
return [cv2.resize(img, shape) for img in images] if shape else images
|
return [cv2.resize(img, shape) for img in images] if shape else images
|
||||||
|
|
||||||
|
|
||||||
def load_ssd_coco():
|
@logger.catch
|
||||||
|
def load_ssd_coco() -> tf.saved_model.LoadOptions:
|
||||||
"""Load the neural network that has the SSD architecture, trained on the COCO data set."""
|
"""Load the neural network that has the SSD architecture, trained on the COCO data set."""
|
||||||
return load_model("ssd_resnet50_v1_fpn_640x640_coco17_tpu-8")
|
return load_model("ssd_resnet50_v1_fpn_640x640_coco17_tpu-8")
|
||||||
|
|
||||||
|
|
||||||
def save_image_annotated(img_rgb, file_name: Path, output, model_traffic_lights=None) -> None:
|
@logger.catch
|
||||||
|
def save_image_annotated(image_rgb, file_name: Path, output, model_traffic_lights=None) -> None:
|
||||||
"""Annotate the image with the object types, and generate cropped images of traffic lights."""
|
"""Annotate the image with the object types, and generate cropped images of traffic lights."""
|
||||||
output_file = Path.joinpath(IMAGES_OUT_PATH, file_name.name)
|
output_file = Path.joinpath(IMAGES_OUT_PATH, file_name.name)
|
||||||
|
|
||||||
# For each bounding box that was detected
|
# For each bounding box that was detected
|
||||||
for idx, _ in enumerate(output["boxes"]):
|
for idx, (box, object_class) in enumerate(zip(output["boxes"], output["detection_classes"])):
|
||||||
|
|
||||||
# Extract the type of the object that was detected
|
|
||||||
obj_class = output["detection_classes"][idx]
|
|
||||||
|
|
||||||
|
color = LABELS.get(object_class, (255, 255, 255))
|
||||||
# How confident the object detection model is on the object's type
|
# How confident the object detection model is on the object's type
|
||||||
score = int(output["detection_scores"][idx] * 100)
|
score: int = object_class * 100
|
||||||
|
|
||||||
# Extract the bounding box
|
# Extract the bounding box
|
||||||
box = output["boxes"][idx]
|
box = output["boxes"][idx]
|
||||||
|
|
||||||
color = None
|
label_text = f"{object_class} {score}"
|
||||||
label_text = ""
|
if object_class == LABEL_TRAFFIC_LIGHT:
|
||||||
|
if model_traffic_lights is not None:
|
||||||
# if obj_class == LABEL_PERSON:
|
|
||||||
# color = (0, 255, 255)
|
|
||||||
# label_text = f"Person {score}"
|
|
||||||
# if obj_class == LABEL_CAR:
|
|
||||||
# color = (255, 255, 0)
|
|
||||||
# label_text = f"Car {score}"
|
|
||||||
# if obj_class == LABEL_BUS:
|
|
||||||
# label_text = f"Bus {score}"
|
|
||||||
# color = (255, 255, 0)
|
|
||||||
# if obj_class == LABEL_TRUCK:
|
|
||||||
# color = (255, 255, 0)
|
|
||||||
# label_text = f"Truck {score}"
|
|
||||||
# if obj_class == LABEL_STOP_SIGN:
|
|
||||||
# color = (128, 0, 0)
|
|
||||||
# label_text = f"Stop Sign {score}"
|
|
||||||
if obj_class == LABEL_TRAFFIC_LIGHT:
|
|
||||||
color = (255, 255, 255)
|
|
||||||
label_text = f"Traffic Light {score}"
|
|
||||||
|
|
||||||
if model_traffic_lights:
|
|
||||||
|
|
||||||
# Annotate the image and save it
|
# Annotate the image and save it
|
||||||
img_traffic_light = img_rgb[box["y"]:box["y2"], box["x"]:box["x2"]]
|
image_traffic_light = image_rgb[box["y"]:box["y2"], box["x"]:box["x2"]]
|
||||||
img_inception = cv2.resize(img_traffic_light, (299, 299))
|
image_inception = cv2.resize(image_traffic_light, (299, 299))
|
||||||
|
|
||||||
# Uncomment this if you want to save a cropped image of the traffic light
|
# Uncomment this if you want to save a cropped image of the traffic light
|
||||||
# cv2.imwrite(output_file.replace('.jpg', '_crop.jpg'), cv2.cvtColor(img_inception, cv2.COLOR_RGB2BGR))
|
image_inception = np.array([preprocess_input(image_inception)])
|
||||||
img_inception = np.array([preprocess_input(img_inception)])
|
|
||||||
|
|
||||||
prediction = model_traffic_lights.predict(img_inception)
|
prediction = model_traffic_lights.predict(image_inception)
|
||||||
label = np.argmax(prediction)
|
label = np.argmax(prediction)
|
||||||
score_light = int(np.max(prediction) * 100)
|
score_light = int(np.max(prediction) * 100)
|
||||||
|
|
||||||
match label:
|
if label == 0:
|
||||||
case 0: label_text = f"Green {score_light}"
|
label_text = f"Green {score_light}"
|
||||||
case 1: label_text = f"Yellow {score_light}"
|
elif label == 1:
|
||||||
case 2: label_text = f"Red {score_light}"
|
label_text = f"Yellow {score_light}"
|
||||||
case _: label_text = "NO-LIGHT"
|
elif label == 2:
|
||||||
|
label_text = f"Red {score_light}"
|
||||||
|
else:
|
||||||
|
label_text = "NO-LIGHT"
|
||||||
|
|
||||||
if color and label_text and accept_box(output["boxes"], idx, 5) and score > 50:
|
# Draw the bounding box and object class label on the image, if the confidence score is above 50 and the box is not a duplicate
|
||||||
cv2.rectangle(img_rgb, (box["x"], box["y"]), (box["x2"], box["y2"]), color, 2)
|
if color and label_text and accept_box(output["boxes"], idx, 5) and score > 50:
|
||||||
cv2.putText(img_rgb, label_text, (box["x"], box["y"]), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
|
cv2.rectangle(image_rgb, (box["x"], box["y"]), (box["x2"], box["y2"]), color, 2)
|
||||||
|
cv2.putText(image_rgb, label_text, (box["x"], box["y"]), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
|
||||||
|
|
||||||
cv2.imwrite(str(output_file), cv2.cvtColor(img_rgb, cv2.COLOR_RGB2BGR))
|
cv2.imwrite(str(output_file), cv2.cvtColor(image_rgb, cv2.COLOR_RGB2BGR))
|
||||||
logger.info(output_file)
|
logger.info(output_file)
|
||||||
|
|
||||||
|
|
||||||
def center(box, coord_type):
|
@logger.catch
|
||||||
|
def center(box: dict[str, float], coord_type: str) -> float:
|
||||||
"""Get center of the bounding box."""
|
"""Get center of the bounding box."""
|
||||||
return (box[coord_type] + box[coord_type + "2"]) / 2
|
return (box[coord_type] + box[coord_type + "2"]) / 2
|
||||||
|
|
||||||
|
|
||||||
|
@logger.catch
|
||||||
def perform_object_detection(model, file_name, save_annotated=False, model_traffic_lights=None):
|
def perform_object_detection(model, file_name, save_annotated=False, model_traffic_lights=None):
|
||||||
"""Perform object detection on an image using the predefined neural network."""
|
"""Perform object detection on an image using the predefined neural network."""
|
||||||
# Store the image
|
# Store the image
|
||||||
img_bgr = cv2.imread(str(file_name))
|
image_bgr = cv2.imread(str(file_name))
|
||||||
img_rgb = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB)
|
image_rgb = cv2.cvtColor(image_bgr, cv2.COLOR_BGR2RGB)
|
||||||
input_tensor = tf.convert_to_tensor(img_rgb) # Input needs to be a tensor
|
input_tensor = tf.convert_to_tensor(image_rgb) # Input needs to be a tensor
|
||||||
input_tensor = input_tensor[tf.newaxis, ...]
|
input_tensor = input_tensor[tf.newaxis, ...]
|
||||||
|
|
||||||
# Run the model
|
# Run the model
|
||||||
@@ -150,8 +154,7 @@ def perform_object_detection(model, file_name, save_annotated=False, model_traff
|
|||||||
|
|
||||||
# Convert the tensors to a NumPy array
|
# Convert the tensors to a NumPy array
|
||||||
num_detections = int(output.pop("num_detections"))
|
num_detections = int(output.pop("num_detections"))
|
||||||
output = {key: value[0, :num_detections].numpy()
|
output = {key: value[0, :num_detections].numpy() for key, value in output.items()}
|
||||||
for key, value in output.items()}
|
|
||||||
output["num_detections"] = num_detections
|
output["num_detections"] = num_detections
|
||||||
|
|
||||||
logger.info(f"Detection classes: {output['detection_classes']}")
|
logger.info(f"Detection classes: {output['detection_classes']}")
|
||||||
@@ -159,108 +162,90 @@ def perform_object_detection(model, file_name, save_annotated=False, model_traff
|
|||||||
|
|
||||||
# The detected classes need to be integers.
|
# The detected classes need to be integers.
|
||||||
output["detection_classes"] = output["detection_classes"].astype(np.int64)
|
output["detection_classes"] = output["detection_classes"].astype(np.int64)
|
||||||
output["boxes"] = [
|
output["boxes"] = [{"y": int(box[0] * image_rgb.shape[0]),
|
||||||
{"y": int(box[0] * img_rgb.shape[0]), "x": int(box[1] * img_rgb.shape[1]), "y2": int(box[2] * img_rgb.shape[0]),
|
"x": int(box[1] * image_rgb.shape[1]),
|
||||||
"x2": int(box[3] * img_rgb.shape[1])} for box in output["detection_boxes"]]
|
"y2": int(box[2] * image_rgb.shape[0]),
|
||||||
|
"x2": int(box[3] * image_rgb.shape[1])}
|
||||||
|
for box in output["detection_boxes"]]
|
||||||
|
|
||||||
if save_annotated:
|
if save_annotated:
|
||||||
save_image_annotated(img_rgb, file_name, output, model_traffic_lights)
|
save_image_annotated(image_rgb, file_name, output, model_traffic_lights)
|
||||||
|
|
||||||
return img_rgb, output, file_name
|
return image_rgb, output, file_name
|
||||||
|
|
||||||
|
|
||||||
def perform_object_detection_video(model, video_frame, model_traffic_lights=None):
|
@logger.catch
|
||||||
|
def perform_object_detection_video(video_frame, model, model_traffic_lights):
|
||||||
"""Perform object detection on a video using the predefined neural network."""
|
"""Perform object detection on a video using the predefined neural network."""
|
||||||
|
|
||||||
# Store the image
|
# Store the image
|
||||||
img_rgb = cv2.cvtColor(video_frame, cv2.COLOR_BGR2RGB)
|
image_rgb = cv2.cvtColor(video_frame, cv2.COLOR_BGR2RGB)
|
||||||
input_tensor = tf.convert_to_tensor(img_rgb) # Input needs to be a tensor
|
input_tensor = tf.convert_to_tensor(image_rgb) # Input needs to be a tensor
|
||||||
input_tensor = input_tensor[tf.newaxis, ...]
|
input_tensor = input_tensor[tf.newaxis, ...]
|
||||||
|
|
||||||
# Run the model
|
# Run the model
|
||||||
output = model(input_tensor)
|
output = model(input_tensor)
|
||||||
|
|
||||||
# Convert the tensors to a NumPy array
|
# Convert the tensors to a NumPy array
|
||||||
num_detections = int(output.pop("num_detections"))
|
number_detections = int(output.pop("num_detections"))
|
||||||
output = {key: value[0, :num_detections].numpy()
|
output = {key: value[0, :number_detections].numpy() for key, value in output.items()}
|
||||||
for key, value in output.items()}
|
output["num_detections"] = number_detections
|
||||||
output["num_detections"] = num_detections
|
|
||||||
|
|
||||||
# The detected classes need to be integers.
|
# The detected classes need to be integers.
|
||||||
output["detection_classes"] = output["detection_classes"].astype(np.int64)
|
output["detection_classes"] = output["detection_classes"].astype(np.int64)
|
||||||
output["boxes"] = [
|
output["boxes"] = [{"y": int(box[0] * image_rgb.shape[0]),
|
||||||
{"y": int(box[0] * img_rgb.shape[0]), "x": int(box[1] * img_rgb.shape[1]), "y2": int(box[2] * img_rgb.shape[0]),
|
"x": int(box[1] * image_rgb.shape[1]),
|
||||||
"x2": int(box[3] * img_rgb.shape[1])} for box in output["detection_boxes"]]
|
"y2": int(box[2] * image_rgb.shape[0]),
|
||||||
|
"x2": int(box[3] * image_rgb.shape[1])}
|
||||||
|
for box in output["detection_boxes"]]
|
||||||
|
|
||||||
# For each bounding box that was detected
|
# For each bounding box that was detected
|
||||||
for idx, _ in enumerate(output["boxes"]):
|
for idx, (box, object_class) in enumerate(zip(output.get("boxes"), output.get("detection_classes"))):
|
||||||
|
color = LABELS.get(object_class, None)
|
||||||
# Extract the type of the object that was detected
|
|
||||||
obj_class = output["detection_classes"][idx]
|
|
||||||
|
|
||||||
# How confident the object detection model is on the object's type
|
# How confident the object detection model is on the object's type
|
||||||
score = int(output["detection_scores"][idx] * 100)
|
score: int = object_class * 100
|
||||||
|
label_text = f"{LABEL_TEXT.get(object_class)} {score}"
|
||||||
|
|
||||||
# Extract the bounding box
|
if object_class == LABEL_TRAFFIC_LIGHT:
|
||||||
box = output["boxes"][idx]
|
# Annotate the image and save it
|
||||||
|
image_traffic_light = image_rgb[box.get("y"):box.get("y2"), box.get("x"):box.get("x2")]
|
||||||
|
image_inception = cv2.resize(image_traffic_light, (299, 299))
|
||||||
|
|
||||||
color = None
|
image_inception = np.array([preprocess_input(image_inception)])
|
||||||
label_text = ""
|
|
||||||
|
|
||||||
# if obj_class == LABEL_PERSON:
|
prediction = model_traffic_lights.predict(image_inception)
|
||||||
# color = (0, 255, 255)
|
label = np.argmax(prediction)
|
||||||
# label_text = "Person " + str(score)
|
score_light = int(np.max(prediction) * 100)
|
||||||
# if obj_class == LABEL_CAR:
|
|
||||||
# color = (255, 255, 0)
|
|
||||||
# label_text = "Car " + str(score)
|
|
||||||
# if obj_class == LABEL_BUS:
|
|
||||||
# color = (255, 255, 0)
|
|
||||||
# label_text = "Bus " + str(score)
|
|
||||||
# if obj_class == LABEL_TRUCK:
|
|
||||||
# color = (255, 255, 0)
|
|
||||||
# label_text = "Truck " + str(score)
|
|
||||||
# if obj_class == LABEL_STOP_SIGN:
|
|
||||||
# color = (128, 0, 0)
|
|
||||||
# label_text = f"Stop Sign {score}"
|
|
||||||
if obj_class == LABEL_TRAFFIC_LIGHT:
|
|
||||||
color = (255, 255, 255)
|
|
||||||
label_text = f"Traffic Light {score}"
|
|
||||||
|
|
||||||
if model_traffic_lights:
|
if label == 0:
|
||||||
|
label_text = f"Green {score_light}"
|
||||||
# Annotate the image and save it
|
elif label == 1:
|
||||||
img_traffic_light = img_rgb[box["y"]:box["y2"], box["x"]:box["x2"]]
|
label_text = f"Yellow {score_light}"
|
||||||
img_inception = cv2.resize(img_traffic_light, (299, 299))
|
elif label == 2:
|
||||||
|
label_text = f"Red {score_light}"
|
||||||
img_inception = np.array([preprocess_input(img_inception)])
|
else:
|
||||||
|
label_text = "NO-LIGHT"
|
||||||
prediction = model_traffic_lights.predict(img_inception)
|
|
||||||
label = np.argmax(prediction)
|
|
||||||
score_light = int(np.max(prediction) * 100)
|
|
||||||
|
|
||||||
match label:
|
|
||||||
case 0: label_text = f"Green {score_light}"
|
|
||||||
case 1: label_text = f"Yellow {score_light}"
|
|
||||||
case 2: label_text = f"Red {score_light}"
|
|
||||||
case _: label_text = "NO-LIGHT" # This is not a traffic light
|
|
||||||
|
|
||||||
# Use the score variable to indicate how confident we are it is a traffic light (in % terms)
|
# Use the score variable to indicate how confident we are it is a traffic light (in % terms)
|
||||||
# On the actual video frame, we display the confidence that the light is either red, green,
|
# On the actual video frame, we display the confidence that the light is either green, yellow,
|
||||||
# yellow, or not a valid traffic light.
|
# red or not a valid traffic light.
|
||||||
if color and label_text and accept_box(output["boxes"], idx, 5) and score > 20:
|
if accept_box(output.get("boxes"), idx, 5) and score > 20:
|
||||||
cv2.rectangle(img_rgb, (box["x"], box["y"]), (box["x2"], box["y2"]), color, 2)
|
cv2.rectangle(image_rgb, (box.get("x"), box.get("y")), (box.get("x2"), box.get("y2")), color, 2)
|
||||||
cv2.putText(img_rgb, label_text, (box["x"], box["y"]), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
|
cv2.putText(image_rgb, label_text, (box.get("x"), box.get("y")), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
|
||||||
|
|
||||||
return cv2.cvtColor(img_rgb, cv2.COLOR_RGB2BGR)
|
return cv2.cvtColor(image_rgb, cv2.COLOR_RGB2BGR)
|
||||||
|
|
||||||
|
|
||||||
def double_shuffle(images, labels):
|
@logger.catch
|
||||||
|
def double_shuffle(images: list[str], labels: list[int]) -> tuple[list[str], list[int]]:
|
||||||
"""Shuffle the images to add some randomness."""
|
"""Shuffle the images to add some randomness."""
|
||||||
indexes = np.random.permutation(len(images))
|
indexes = np.random.permutation(len(images))
|
||||||
|
|
||||||
return [images[idx] for idx in indexes], [labels[idx] for idx in indexes]
|
return [images[idx] for idx in indexes], [labels[idx] for idx in indexes]
|
||||||
|
|
||||||
|
|
||||||
|
@logger.catch
|
||||||
def reverse_preprocess_inception(image_preprocessed):
|
def reverse_preprocess_inception(image_preprocessed):
|
||||||
"""Reverse the preprocessing process."""
|
"""Reverse the preprocessing process for an image that has been input to the Inception V3 model."""
|
||||||
image = image_preprocessed + 1 * 127.5
|
image = image_preprocessed + 1 * 127.5
|
||||||
return image.astype(np.uint8)
|
return image.astype(np.uint8)
|
||||||
|
|||||||
@@ -1,7 +1,8 @@
|
|||||||
import logging
|
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from shutil import rmtree
|
from shutil import rmtree
|
||||||
|
|
||||||
|
from loguru import logger
|
||||||
|
|
||||||
|
|
||||||
BASE_PATH = Path(__file__).resolve().parent.parent.parent
|
BASE_PATH = Path(__file__).resolve().parent.parent.parent
|
||||||
MODEL_PATH = Path.joinpath(BASE_PATH, "model.h5")
|
MODEL_PATH = Path.joinpath(BASE_PATH, "model.h5")
|
||||||
@@ -30,14 +31,8 @@ INPUT_PATH = Path.joinpath(EXTRACTION_PATH, "input")
|
|||||||
PATHS = (LOGS_PATH, ASSETS_PATH, VALID_PATH, DETECTION_PATH, IMAGES_IN_PATH, IMAGES_OUT_PATH, VIDEOS_IN_PATH, VIDEOS_OUT_PATH,
|
PATHS = (LOGS_PATH, ASSETS_PATH, VALID_PATH, DETECTION_PATH, IMAGES_IN_PATH, IMAGES_OUT_PATH, VIDEOS_IN_PATH, VIDEOS_OUT_PATH,
|
||||||
DATESET_PATH, GREEN_PATH, YELLOW_PATH, RED_PATH, NOT_PATH, EXTRACTION_PATH, CROPPED_IMAGES_PATH, INPUT_PATH)
|
DATESET_PATH, GREEN_PATH, YELLOW_PATH, RED_PATH, NOT_PATH, EXTRACTION_PATH, CROPPED_IMAGES_PATH, INPUT_PATH)
|
||||||
|
|
||||||
# Set up logging
|
|
||||||
logger = logging.getLogger(__name__)
|
|
||||||
handler = logging.FileHandler(str(Path.joinpath(LOGS_PATH, f"{__name__}.log")))
|
|
||||||
formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
|
|
||||||
handler.setFormatter(formatter)
|
|
||||||
logger.addHandler(handler)
|
|
||||||
|
|
||||||
|
|
||||||
|
@logger.catch
|
||||||
def create_dirs(fresh: bool = False) -> None:
|
def create_dirs(fresh: bool = False) -> None:
|
||||||
if fresh:
|
if fresh:
|
||||||
rmtree(ASSETS_PATH)
|
rmtree(ASSETS_PATH)
|
||||||
|
|||||||
@@ -4,14 +4,13 @@ of a traffic light. Performance on the validation data set is saved
|
|||||||
to a directory. Also, the best neural network model is saved as traffic.h5.
|
to a directory. Also, the best neural network model is saved as traffic.h5.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import collections # Handles specialized container datatypes
|
import collections
|
||||||
import logging
|
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
import cv2 # Computer vision library
|
import cv2
|
||||||
import matplotlib.pyplot as plt # Plotting library
|
import matplotlib.pyplot as plt
|
||||||
import numpy as np # Scientific computing library
|
import numpy as np
|
||||||
import tensorflow as tf # Machine learning library
|
import tensorflow as tf
|
||||||
from detector.object_detection import (
|
from detector.object_detection import (
|
||||||
double_shuffle,
|
double_shuffle,
|
||||||
load_rgb_images,
|
load_rgb_images,
|
||||||
@@ -19,14 +18,14 @@ from detector.object_detection import (
|
|||||||
)
|
)
|
||||||
from detector.paths import (
|
from detector.paths import (
|
||||||
GREEN_PATH,
|
GREEN_PATH,
|
||||||
LOGS_PATH,
|
|
||||||
MODEL_PATH,
|
MODEL_PATH,
|
||||||
NOT_PATH,
|
NOT_PATH,
|
||||||
RED_PATH,
|
RED_PATH,
|
||||||
VALID_PATH,
|
VALID_PATH,
|
||||||
YELLOW_PATH,
|
YELLOW_PATH,
|
||||||
)
|
)
|
||||||
from tensorflow import keras # Library for neural networks
|
from loguru import logger
|
||||||
|
from tensorflow import keras
|
||||||
from tensorflow.keras.applications.inception_v3 import InceptionV3, preprocess_input
|
from tensorflow.keras.applications.inception_v3 import InceptionV3, preprocess_input
|
||||||
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
|
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
|
||||||
from tensorflow.keras.layers import (
|
from tensorflow.keras.layers import (
|
||||||
@@ -41,18 +40,13 @@ from tensorflow.keras.optimizers import Adadelta
|
|||||||
from tensorflow.keras.preprocessing.image import ImageDataGenerator
|
from tensorflow.keras.preprocessing.image import ImageDataGenerator
|
||||||
from tensorflow.keras.utils import to_categorical
|
from tensorflow.keras.utils import to_categorical
|
||||||
|
|
||||||
# Set up logging
|
|
||||||
logger = logging.getLogger(__name__)
|
|
||||||
handler = logging.FileHandler(str(Path.joinpath(LOGS_PATH, f"{__name__}.log")))
|
|
||||||
formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
|
|
||||||
handler.setFormatter(formatter)
|
|
||||||
logger.addHandler(handler)
|
|
||||||
|
|
||||||
# Show the version of TensorFlow and Keras that I am using
|
# Show the version of TensorFlow and Keras that I am using
|
||||||
logger.info("TensorFlow", tf.__version__)
|
logger.info("TensorFlow", tf.__version__)
|
||||||
logger.info("Keras", keras.__version__)
|
logger.info("Keras", keras.__version__)
|
||||||
|
|
||||||
|
|
||||||
|
@logger.catch
|
||||||
def show_history(history):
|
def show_history(history):
|
||||||
"""
|
"""
|
||||||
Visualize the neural network model training history
|
Visualize the neural network model training history
|
||||||
@@ -70,6 +64,7 @@ def show_history(history):
|
|||||||
plt.show()
|
plt.show()
|
||||||
|
|
||||||
|
|
||||||
|
@logger.catch
|
||||||
def Transfer(n_classes, freeze_layers=True):
|
def Transfer(n_classes, freeze_layers=True):
|
||||||
"""Use the InceptionV3 neural network architecture to perform transfer learning."""
|
"""Use the InceptionV3 neural network architecture to perform transfer learning."""
|
||||||
logger.info("Loading Inception V3...")
|
logger.info("Loading Inception V3...")
|
||||||
@@ -120,6 +115,7 @@ def Transfer(n_classes, freeze_layers=True):
|
|||||||
return top_model
|
return top_model
|
||||||
|
|
||||||
|
|
||||||
|
@logger.catch
|
||||||
def train_traffic_light_color() -> None:
|
def train_traffic_light_color() -> None:
|
||||||
# Perform image augmentation.
|
# Perform image augmentation.
|
||||||
# Image augmentation enables us to alter the available images
|
# Image augmentation enables us to alter the available images
|
||||||
@@ -249,13 +245,13 @@ def train_traffic_light_color() -> None:
|
|||||||
logger.info(f"Length of the validation data set: {len(x_valid)}")
|
logger.info(f"Length of the validation data set: {len(x_valid)}")
|
||||||
|
|
||||||
# Go through the validation data set, and see how the model did on each image
|
# Go through the validation data set, and see how the model did on each image
|
||||||
for idx, _ in enumerate(x_valid):
|
for x_value, y_value in zip(x_valid, y_valid):
|
||||||
|
|
||||||
# Make the image a NumPy array
|
# Make the image a NumPy array
|
||||||
img_as_ar = np.array([x_valid[idx]])
|
image_as_ar = np.array(x_value)
|
||||||
|
|
||||||
# Generate predictions
|
# Generate predictions
|
||||||
prediction = model.predict(img_as_ar)
|
prediction = model.predict(image_as_ar)
|
||||||
|
|
||||||
# Determine what the label is based on the highest probability
|
# Determine what the label is based on the highest probability
|
||||||
label = np.argmax(prediction)
|
label = np.argmax(prediction)
|
||||||
@@ -263,8 +259,8 @@ def train_traffic_light_color() -> None:
|
|||||||
# Create the name of the directory and the file for the validation data set
|
# Create the name of the directory and the file for the validation data set
|
||||||
# After each run, delete this out_valid/ directory so that old files are not
|
# After each run, delete this out_valid/ directory so that old files are not
|
||||||
# hanging around in there.
|
# hanging around in there.
|
||||||
file_name = str(Path.joinpath(VALID_PATH, f"{idx}_{label}_{np.argmax(str(y_valid[idx]))}.jpg"))
|
file_name = str(Path.joinpath(VALID_PATH, f"{idx}_{label}_{np.argmax(str(y_value))}.jpg"))
|
||||||
image = img_as_ar[0]
|
image = image_as_ar[0]
|
||||||
|
|
||||||
# Reverse the image preprocessing process
|
# Reverse the image preprocessing process
|
||||||
image = reverse_preprocess_inception(image)
|
image = reverse_preprocess_inception(image)
|
||||||
|
|||||||
Reference in New Issue
Block a user