Start from sratch

This commit is contained in:
Kristofers Solo 2022-12-14 13:40:05 +02:00
parent 35cbabc9b4
commit 2eb53b4196
7 changed files with 0 additions and 656 deletions

View File

@ -1,18 +0,0 @@
"""This program uses a trained neural network to detect the color of a traffic light in images."""
from detector.object_detection import load_ssd_coco, perform_object_detection
from detector.paths import IMAGES_IN_PATH, MODEL_PATH
from loguru import logger
from tensorflow import keras
@logger.catch
def detect_traffic_light_color_image() -> None:
model_traffic_lights_nn = keras.models.load_model(str(MODEL_PATH))
# Load the SSD neural network that is trained on the COCO data set
model_ssd = load_ssd_coco()
# Go through all image files, and detect the traffic light color.
for file in IMAGES_IN_PATH.iterdir():
image, out, file_name = perform_object_detection(model=model_ssd, file_name=file, save_annotated=True, model_traffic_lights=model_traffic_lights_nn)
logger.info(f"Performed object detection on {file}")

View File

@ -1,76 +0,0 @@
"""This program extracts traffic lights from images."""
import cv2
from detector.object_detection import (
LABEL_TRAFFIC_LIGHT,
load_ssd_coco,
perform_object_detection,
)
from detector.paths import CROPPED_IMAGES_PATH, INPUT_PATH
from loguru import logger
@logger.catch
def extract_traffic_lights() -> None:
files = INPUT_PATH.iterdir()
# Load the object detection model
this_model = load_ssd_coco()
# Keep track of the number of traffic lights found
traffic_light_count = 0
# Keep track of the number of image files that were processed
file_count = 0
# Display a count of the number of images we need to process
# logger.info(f"Number of Images: {len(files)}")
# Go through each image file, one at a time
for file in files:
# Detect objects in the image
# img_rgb is the original image in RGB format
# out is a dictionary containing the results of object detection
# file_name is the name of the file
img_rgb, out, file_name = perform_object_detection(model=this_model, file_name=file, save_annotated=None, model_traffic_lights=None)
# Every 10 files that are processed
if file_count % 10 == 0:
# Display a count of the number of files that have been processed
logger.info(f"Images processed: {file_count}")
# Display the total number of traffic lights that have been identified so far
logger.info(f"Number of Traffic lights identified: {traffic_light_count}")
# Increment the number of files by 1
file_count += 1
# For each traffic light (i.e. bounding box) that was detected
for idx, _ in enumerate(out["boxes"]):
# Extract the type of object that was detected
obj_class = out["detection_classes"][idx]
# If the object that was detected is a traffic light
if obj_class == LABEL_TRAFFIC_LIGHT:
# Extract the coordinates of the bounding box
box = out["boxes"][idx]
# Extract (i.e. crop) the traffic light from the image
traffic_light = img_rgb[box["y"]:box["y2"], box["x"]:box["x2"]]
# Convert the traffic light from RGB format into BGR format
traffic_light = cv2.cvtColor(traffic_light, cv2.COLOR_RGB2BGR)
# Store the cropped image in a folder named 'traffic_light_cropped'
cv2.imwrite(str(CROPPED_IMAGES_PATH.joinpath(f"{traffic_light_count}.jpg")), traffic_light)
# Increment the number of traffic lights by 1
traffic_light_count += 1
# Display the total number of traffic lights identified
logger.info(f"Number of Traffic lights identified: {traffic_light_count}")

View File

@ -1,246 +0,0 @@
"""This program helps detect objects (e.g. traffic lights) in images."""
from pathlib import Path
import cv2
import numpy as np
import tensorflow as tf
from detector.paths import IMAGES_OUT_PATH
from loguru import logger
# Inception V3 model for Keras
from tensorflow.keras.applications.inception_v3 import preprocess_input
# COCO labels
LABEL_PERSON = 1
LABEL_CAR = 3
LABEL_BUS = 6
LABEL_TRUCK = 8
LABEL_TRAFFIC_LIGHT = 10
LABEL_STOP_SIGN = 13
# Create a dictionary that maps object class labels to their corresponding colors and text labels
LABELS = {
LABEL_PERSON: (0, 255, 255),
LABEL_CAR: (255, 255, 0),
LABEL_BUS: (255, 255, 0),
LABEL_TRUCK: (255, 255, 0),
LABEL_TRAFFIC_LIGHT: (255, 255, 255),
LABEL_STOP_SIGN: (128, 0, 0),
}
LABEL_TEXT = {
LABEL_PERSON: "Person",
LABEL_CAR: "Car",
LABEL_BUS: "Bus",
LABEL_TRUCK: "Truck",
LABEL_TRAFFIC_LIGHT: "Traffic Light",
LABEL_STOP_SIGN: "Stop Sign",
}
@logger.catch
def accept_box(boxes: list[dict[str, float]] | None, box_index: int, tolerance: int) -> bool:
"""Eliminate duplicate bounding boxes."""
if boxes is not None:
box = boxes[box_index]
for idx in range(box_index):
other_box = boxes[idx]
if abs(center(other_box, "x") - center(box, "x")) < tolerance and abs(center(other_box, "y") - center(box, "y")) < tolerance:
return False
return True
return False
@logger.catch
def load_model(model_name: str) -> tf.saved_model.LoadOptions:
"""Download a pretrained object detection model, and save it to your hard drive."""
url = f"http://download.tensorflow.org/models/object_detection/tf2/20200711/{model_name}.tar.gz"
# Download a file from a URL that is not already in the cache
model_dir = tf.keras.utils.get_file(fname=model_name, untar=True, origin=url)
logger.info(f"Loaded model: {model_dir}")
return tf.saved_model.load(f"{model_dir}/saved_model")
@logger.catch
def load_rgb_images(files, shape: tuple[int, int] | None = None):
"""Loads the images in RGB format."""
# For each image in the directory, convert it from BGR format to RGB format
images = [cv2.cvtColor(cv2.imread(str(file)), cv2.COLOR_BGR2RGB) for file in files]
# Resize the image if the desired shape is provided
return [cv2.resize(img, shape) for img in images] if shape else images
@logger.catch
def load_ssd_coco() -> tf.saved_model.LoadOptions:
"""Load the neural network that has the SSD architecture, trained on the COCO data set."""
return load_model("ssd_resnet50_v1_fpn_640x640_coco17_tpu-8")
@logger.catch
def save_image_annotated(image_rgb, file_name: Path, output, model_traffic_lights) -> None:
"""Annotate the image with the object types, and generate cropped images of traffic lights."""
output_file = IMAGES_OUT_PATH.joinpath(file_name.name)
# For each bounding box that was detected
for idx, (box, object_class) in enumerate(zip(output["boxes"], output["detection_classes"])):
color = LABELS.get(object_class, None)
# How confident the object detection model is on the object's type
score: int = object_class * 100
label_text = f"{LABEL_TEXT.get(object_class)} {score}"
if object_class == LABEL_TRAFFIC_LIGHT:
# Annotate the image and save it
image_traffic_light = image_rgb[box.get("y"):box.get("y2"), box.get("x"):box.get("x2")]
image_inception = cv2.resize(image_traffic_light, (299, 299))
# Uncomment this if you want to save a cropped image of the traffic light
image_inception = np.array([preprocess_input(image_inception)])
prediction = model_traffic_lights.predict(image_inception)
label = np.argmax(prediction)
score_light = int(np.max(prediction) * 100)
if label == 0:
label_text = f"Green {score_light}"
elif label == 1:
label_text = f"Yellow {score_light}"
elif label == 2:
label_text = f"Red {score_light}"
else:
label_text = "NO-LIGHT"
# Draw the bounding box and object class label on the image, if the confidence score is above 50 and the box is not a duplicate
if color and label_text and accept_box(output.get("boxes"), idx, 5) and score > 50:
cv2.rectangle(image_rgb, (box.get("x"), box.get("y")), (box.get("x2"), box.get("y2")), color, 2)
cv2.putText(image_rgb, label_text, (box.get("x"), box.get("y")), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
cv2.imwrite(str(output_file), cv2.cvtColor(image_rgb, cv2.COLOR_RGB2BGR))
logger.info(output_file)
@ logger.catch
def center(box: dict[str, float], coord_type: str) -> float:
"""Get center of the bounding box."""
return (box[coord_type] + box[coord_type + "2"]) / 2
@ logger.catch
def perform_object_detection(model, file_name: Path, save_annotated=False, model_traffic_lights=None):
"""Perform object detection on an image using the predefined neural network."""
# Store the image
image_bgr = cv2.imread(str(file_name))
image_rgb = cv2.cvtColor(image_bgr, cv2.COLOR_BGR2RGB)
input_tensor = tf.convert_to_tensor(image_rgb) # Input needs to be a tensor
input_tensor = input_tensor[tf.newaxis, ...]
# Run the model
output = model(input_tensor)
logger.debug(f"Number detections: {output['num_detections']} {int(output['num_detections'])}")
# Convert the tensors to a NumPy array
number_detections = int(output.pop("num_detections"))
output = {key: value[0, :number_detections].numpy() for key, value in output.items()}
output["num_detections"] = number_detections
logger.debug(f"Detection classes: {output['detection_classes']}")
logger.debug(f"Detection Boxes: {output['detection_boxes']}")
# The detected classes need to be integers.
output["detection_classes"] = output["detection_classes"].astype(np.int64)
output["boxes"] = [{"y": int(box[0] * image_rgb.shape[0]),
"x": int(box[1] * image_rgb.shape[1]),
"y2": int(box[2] * image_rgb.shape[0]),
"x2": int(box[3] * image_rgb.shape[1])}
for box in output["detection_boxes"]]
if save_annotated:
save_image_annotated(image_rgb, file_name, output, model_traffic_lights)
return image_rgb, output, file_name
@ logger.catch
def perform_object_detection_video(video_frame, model, model_traffic_lights):
"""Perform object detection on a video using the predefined neural network."""
# Store the image
image_rgb = cv2.cvtColor(video_frame, cv2.COLOR_BGR2RGB)
input_tensor = tf.convert_to_tensor(image_rgb) # Input needs to be a tensor
input_tensor = input_tensor[tf.newaxis, ...]
# Run the model
output = model(input_tensor)
# Convert the tensors to a NumPy array
number_detections = int(output.pop("num_detections"))
output = {key: value[0, :number_detections].numpy() for key, value in output.items()}
output["num_detections"] = number_detections
# The detected classes need to be integers.
output["detection_classes"] = output["detection_classes"].astype(np.int64)
output["boxes"] = [{"y": int(box[0] * image_rgb.shape[0]),
"x": int(box[1] * image_rgb.shape[1]),
"y2": int(box[2] * image_rgb.shape[0]),
"x2": int(box[3] * image_rgb.shape[1])}
for box in output["detection_boxes"]]
# For each bounding box that was detected
for idx, (box, object_class) in enumerate(zip(output.get("boxes"), output.get("detection_classes"))):
color = LABELS.get(object_class, None)
# How confident the object detection model is on the object's type
score: int = object_class * 100
label_text = f"{LABEL_TEXT.get(object_class)} {score}"
if object_class == LABEL_TRAFFIC_LIGHT:
# Annotate the image and save it
image_traffic_light = image_rgb[box.get("y"):box.get("y2"), box.get("x"):box.get("x2")]
image_inception = cv2.resize(image_traffic_light, (299, 299))
image_inception = np.array([preprocess_input(image_inception)])
prediction = model_traffic_lights.predict(image_inception)
label = np.argmax(prediction)
score_light = int(np.max(prediction) * 100)
if label == 0:
label_text = f"Green {score_light}"
elif label == 1:
label_text = f"Yellow {score_light}"
elif label == 2:
label_text = f"Red {score_light}"
else:
label_text = "NO-LIGHT"
# Use the score variable to indicate how confident we are it is a traffic light (in % terms)
# On the actual video frame, we display the confidence that the light is either green, yellow,
# red or not a valid traffic light.
if accept_box(output.get("boxes"), idx, 5) and score > 20:
cv2.rectangle(image_rgb, (box.get("x"), box.get("y")), (box.get("x2"), box.get("y2")), color, 2)
cv2.putText(image_rgb, label_text, (box.get("x"), box.get("y")), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
return cv2.cvtColor(image_rgb, cv2.COLOR_RGB2BGR)
@ logger.catch
def double_shuffle(images: list[str], labels: list[int]) -> tuple[list[str], list[int]]:
"""Shuffle the images to add some randomness."""
indexes = np.random.permutation(len(images))
return [images[idx] for idx in indexes], [labels[idx] for idx in indexes]
@ logger.catch
def reverse_preprocess_inception(image_preprocessed):
"""Reverse the preprocessing process for an image that has been input to the Inception V3 model."""
image = image_preprocessed + 1 * 127.5
return image.astype(np.uint8)

View File

@ -1,47 +0,0 @@
from pathlib import Path
from shutil import rmtree
from loguru import logger
BASE_PATH = Path(__file__).resolve().parent.parent.parent
MODEL_PATH = BASE_PATH.joinpath("model.h5")
LOGS_PATH = BASE_PATH.joinpath(".logs")
ASSETS_PATH = BASE_PATH.joinpath("assets")
VALID_PATH = ASSETS_PATH.joinpath("out_valid")
DETECTION_PATH = ASSETS_PATH.joinpath("detection")
IMAGES_IN_PATH = DETECTION_PATH.joinpath("images_in")
IMAGES_OUT_PATH = DETECTION_PATH.joinpath("images_out")
VIDEOS_IN_PATH = DETECTION_PATH.joinpath("videos_in")
VIDEOS_OUT_PATH = DETECTION_PATH.joinpath("videos_out")
DATESET_PATH = ASSETS_PATH.joinpath("dataset")
GREEN_PATH = DATESET_PATH.joinpath("0_green")
YELLOW_PATH = DATESET_PATH.joinpath(DATESET_PATH, "1_yellow")
RED_PATH = DATESET_PATH.joinpath("2_red")
NOT_PATH = DATESET_PATH.joinpath("3_not")
EXTRACTION_PATH = ASSETS_PATH.joinpath("extraction")
CROPPED_IMAGES_PATH = EXTRACTION_PATH.joinpath("cropped")
INPUT_PATH = EXTRACTION_PATH.joinpath("input")
PATHS = (LOGS_PATH, VALID_PATH, IMAGES_IN_PATH, IMAGES_OUT_PATH, VIDEOS_IN_PATH, VIDEOS_OUT_PATH,
GREEN_PATH, YELLOW_PATH, RED_PATH, NOT_PATH, CROPPED_IMAGES_PATH, INPUT_PATH)
@logger.catch
def create_dirs(fresh: bool = False) -> None:
if fresh:
rmtree(ASSETS_PATH)
rmtree(LOGS_PATH)
MODEL_PATH.unlink()
create_dirs()
logger.info("Deleted all dirs")
else:
for path in PATHS:
if not path.exists():
path.mkdir(parents=True, exist_ok=True)
logger.info(f"Created dir {path}")

View File

View File

@ -1,269 +0,0 @@
"""
This program trains a neural network to detect the color
of a traffic light. Performance on the validation data set is saved
to a directory. Also, the best neural network model is saved as traffic.h5.
"""
import collections
import cv2
import matplotlib.pyplot as plt
import numpy as np
import tensorflow as tf
from detector.object_detection import (
double_shuffle,
load_rgb_images,
reverse_preprocess_inception,
)
from detector.paths import (
GREEN_PATH,
MODEL_PATH,
NOT_PATH,
RED_PATH,
VALID_PATH,
YELLOW_PATH,
)
from loguru import logger
from tensorflow import keras
from tensorflow.keras.applications.inception_v3 import InceptionV3, preprocess_input
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
from tensorflow.keras.layers import (
BatchNormalization,
Dense,
Dropout,
GlobalAveragePooling2D,
)
from tensorflow.keras.losses import categorical_crossentropy
from tensorflow.keras.models import Sequential
from tensorflow.keras.optimizers import Adadelta
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.utils import to_categorical
# Show the version of TensorFlow and Keras that I am using
logger.info("TensorFlow", tf.__version__)
logger.info("Keras", keras.__version__)
@logger.catch
def show_history(history):
"""
Visualize the neural network model training history
A record of training loss values and metrics values at
successive epochs, as well as validation loss values
and validation metrics values
"""
plt.plot(history.history["accuracy"])
plt.plot(history.history["val_accuracy"])
plt.title("model accuracy")
plt.ylabel("accuracy")
plt.xlabel("epoch")
plt.legend(["train_accuracy", "validation_accuracy"], loc="best")
plt.show()
@logger.catch
def Transfer(n_classes, freeze_layers=True):
"""Use the InceptionV3 neural network architecture to perform transfer learning."""
logger.info("Loading Inception V3...")
# To understand what the parameters mean, do a Google search 'inceptionv3 keras'.
# The first search result should send you to the Keras website, which has an
# explanation of what each of these parameters mean.
# input_top means we are removing the top part of the Inception model, which is the
# classifier.
# input_shape needs to have 3 channels, and needs to be at least 75x75 for the
# resolution.
# Our neural network will build off of the Inception V3 model (trained on the ImageNet
# data set).
base_model = InceptionV3(weights="imagenet", include_top=False, input_shape=(299, 299, 3))
logger.info("Inception V3 has finished loading.")
# Display the base network architecture
logger.info(f"Layers: {len(base_model.layers)}")
logger.info(f"Shape: {base_model.output_shape[1:]}")
logger.info(f"Shape: {base_model.output_shape}")
logger.info(f"Shape: {base_model.outputs}")
base_model.summary()
# Create the neural network. This network uses the Sequential
# architecture where each layer has one
# input tensor (e.g. vector, matrix, etc.) and one output tensor
top_model = Sequential()
# Our classifier model will build on top of the base model
top_model.add(base_model)
top_model.add(GlobalAveragePooling2D())
top_model.add(Dropout(0.5))
top_model.add(Dense(1024, activation="relu"))
top_model.add(BatchNormalization())
top_model.add(Dropout(0.5))
top_model.add(Dense(512, activation="relu"))
top_model.add(Dropout(0.5))
top_model.add(Dense(128, activation="relu"))
top_model.add(Dense(n_classes, activation="softmax"))
# Freeze layers in the model so that they cannot be trained (i.e. the
# parameters in the neural network will not change)
if freeze_layers:
for layer in base_model.layers:
layer.trainable = False
return top_model
@logger.catch
def train_traffic_light_color() -> None:
# Perform image augmentation.
# Image augmentation enables us to alter the available images
# (e.g. rotate, flip, changing the hue, etc.) to generate more images that our
# neural network can use for training...therefore preventing us from having to
# collect more external images.
datagen = ImageDataGenerator(rotation_range=5, width_shift_range=[-10, -5, -2, 0, 2, 5, 10],
zoom_range=[0.7, 1.5], height_shift_range=[-10, -5, -2, 0, 2, 5, 10],
horizontal_flip=True)
shape = (299, 299)
# Load the cropped traffic light images from the appropriate directory
img_0_green = load_rgb_images(GREEN_PATH.iterdir(), shape)
img_1_yellow = load_rgb_images(YELLOW_PATH.iterdir(), shape)
img_2_red = load_rgb_images(RED_PATH.iterdir(), shape)
img_3_not_traffic_light = load_rgb_images(NOT_PATH.iterdir(), shape)
# Create a list of the labels that is the same length as the number of images in each
# category
# 0 = green
# 1 = yellow
# 2 = red
# 3 = not a traffic light
labels = [0] * len(img_0_green)
labels.extend([1] * len(img_1_yellow))
labels.extend([2] * len(img_2_red))
labels.extend([3] * len(img_3_not_traffic_light))
# Create NumPy array
labels_np: np.ndarray[int, np.dtype[np.generic]] = np.ndarray(shape=(len(labels), 4))
images_np: np.ndarray[int, np.dtype[np.generic]] = np.ndarray(shape=(len(labels), shape[0], shape[1], 3))
# Create a list of all the images in the traffic lights data set
img_all = []
img_all.extend(img_0_green)
img_all.extend(img_1_yellow)
img_all.extend(img_2_red)
img_all.extend(img_3_not_traffic_light)
# Make sure we have the same number of images as we have labels
assert len(img_all) == len(labels)
# Shuffle the images
img_all = [preprocess_input(img) for img in img_all]
(img_all, labels) = double_shuffle(img_all, labels)
# Store images and labels in a NumPy array
for idx, _ in enumerate(labels):
images_np[idx] = img_all[idx]
labels_np[idx] = labels[idx]
logger.info(f"Images: {len(img_all)}")
logger.info(f"Labels: {len(labels)}")
# Perform one-hot encoding
for idx, _ in enumerate(labels_np):
# We have four integer labels, representing the different colors of the
# traffic lights.
labels_np[idx] = np.array(to_categorical(labels[idx], 4))
# Split the data set into a training set and a validation set
# The training set is the portion of the data set that is used to
# determine the parameters (e.g. weights) of the neural network.
# The validation set is the portion of the data set used to
# fine tune the model-specific parameters (i.e. hyperparameters) that are
# fixed before you train and test your neural network on the data. The
# validation set helps us select the final model (e.g. learning rate,
# number of hidden layers, number of hidden units, activation functions,
# number of epochs, etc.
# In this case, 80% of the data set becomes training data, and 20% of the
# data set becomes validation data.
idx_split = int(len(labels_np) * 0.8)
x_train = images_np[0:idx_split]
x_valid = images_np[idx_split:]
y_train = labels_np[0:idx_split]
y_valid = labels_np[idx_split:]
# Store a count of the number of traffic lights of each color
cnt = collections.Counter(labels)
logger.info(f"Labels: {cnt}")
n = len(labels)
logger.info(f"0: {cnt[0]}")
logger.info(f"1: {cnt[1]}")
logger.info(f"2: {cnt[2]}")
logger.info(f"3: {cnt[3]}")
# Calculate the weighting of each traffic light class
class_weight = {0: n / cnt[0], 1: n / cnt[1], 2: n / cnt[2], 3: n / cnt[3]}
logger.info(f"Class weight: {class_weight}")
# Save the best model as traffic.h5
checkpoint = ModelCheckpoint(str(MODEL_PATH), monitor="val_loss", mode="min", verbose=1, save_best_only=True)
early_stopping = EarlyStopping(min_delta=0.0005, patience=15, verbose=1)
# Generate model using transfer learning
model = Transfer(n_classes=4, freeze_layers=True)
# Display a summary of the neural network model
model.summary()
# Generate a batch of randomly transformed images
it_train = datagen.flow(x_train, y_train, batch_size=32)
# Configure the model parameters for training
model.compile(loss=categorical_crossentropy, optimizer=Adadelta(
learning_rate=1.0, rho=0.95, epsilon=1e-08), metrics=["accuracy"])
# Train the model on the image batches for a fixed number of epochs
# Store a record of the error on the training data set and metrics values
# in the history object.
history_object = model.fit(it_train, epochs=250, validation_data=(
x_valid, y_valid), shuffle=True, callbacks=[
checkpoint, early_stopping], class_weight=class_weight)
# Display the training history
show_history(history_object)
# Get the loss value and metrics values on the validation data set
score = model.evaluate(x_valid, y_valid, verbose=0)
logger.info(f"Validation loss: {score[0]}")
logger.info(f"Validation accuracy: {score[1]}")
logger.info("Saving the validation data set...")
logger.info(f"Length of the validation data set: {len(x_valid)}")
# Go through the validation data set, and see how the model did on each image
for x_value, y_value in zip(x_valid, y_valid):
# Make the image a NumPy array
image_as_ar = np.array(x_value)
# Generate predictions
prediction = model.predict(image_as_ar)
# Determine what the label is based on the highest probability
label = np.argmax(prediction)
# Create the name of the directory and the file for the validation data set
# After each run, delete this out_valid/ directory so that old files are not
# hanging around in there.
file_name = str(VALID_PATH.joinpath(f"{idx}_{label}_{np.argmax(str(y_value))}.jpg"))
image = image_as_ar[0]
# Reverse the image preprocessing process
image = reverse_preprocess_inception(image)
# Save the image file
cv2.imwrite(file_name, cv2.cvtColor(image, cv2.COLOR_RGB2BGR))
logger.info("The validation data set has been saved!")