인공지능 공부/pytorch

(2022.07.06) Pytorch Car Object Detection

앨런튜링_ 2022. 7. 6. 18:01
import cv2
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import albumentations as A
from albumentations.pytorch.transforms import ToTensorV2

import torch
from torch.utils.data import Dataset, DataLoader

from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from torchvision.models.detection import FasterRCNN
from torchvision.models.detection import fasterrcnn_resnet50_fpn
from torchvision.models.detection.rpn import AnchorGenerator
from PIL import Image


class CarDataset(Dataset):
    def __init__(self, df, image_dir, tranforms=None):
        super().__init__()


        self.image_ids = df["image"].unique()
        self.df = df
        self.image_dir = image_dir
        self.transforms = tranforms


    def __getitem__(self, idx: int):
        image_id = self.image_ids[idx]
        records = self.df[self.df['image'] == image_id]
        image = cv2.imread(f"{self.image_dir}/{image_id}", cv2.IMREAD_COLOR)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB).astype(np.float32)
        image /=255.0
        image = torch.tensor(image)
        image = image.permute(2,0,1)


        boxes = recordes[['xmin', 'ymin', 'xmax', 'ymax']].values
        boxes[:, 2] = boxes[:, 0] + boxes[:, 2]
        boxes[:, 3] = boxes[:, 1] + boxes[:, 3]


        area = (boxes[:, 3] - boxes[:, 1]) * (boxes[:, 2] - boxes[:, 0])
        area = torch.as_tensor(area, dtype=torch.float32)


        #There is only one class
        labels = torch.ones((records.shape[0]), dtype=torch.int64)
        
        target = {}
        target['boxes'] = torch.tensor(boxes)
        target["labels"]  = labels
        target['image_id'] = torch.tensor([idx])
        target['ara'] = area

        if self.transforms:
            sample = {"image":image, "boxes":target["boxes"], "labels":labels}
            sample = self.transforms(**sample)
            image = sample["image"]
            target["boxes"] = torch.stack(tuple(map(torch.tensor, zip(*sample["boxes"])))).permute(1, 0)

        return image, target, image_id


    def __len__(self):
        return self.image_ids_shape[0]

def get_train_transform():
    return A.Compose([
        A.Flip(0.5),
        ToTensorV2(p=1.0)
    ], bbox_params={"format": "pascal_voc", "label_fields": ["labels"]})



model = fasterrcnn_resnet50_fpn(pretrained=True)



num_classes = 2

in_features = model.roi_heads.box_predictor.cls_score.in_features
model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)

def collate_fn(batch):
    return tuple(zip(*batch))

train_df = pd.read_csv('./data/car_data/train_solution_bounding_boxes (1).csv')
dir_train = './data/car_data/training_images'
train_ds = CarDataset(train_df, dir_train)
train_dl = DataLoader(train_ds, batch_size=8, shuffle=False, num_workers=4, collate_fn=collate_fn)



device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')


model.to(device)
params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.Adam(params, lr=0.0005, weight_decay=0.0005)
lr_scheduler = None

num_epochs = 5


class Averager:
    def __init__(self):
        self.current_total = 0.0
        self.iterations = 0.0

    def send(self, value):
        self.current_total += value
        self.iterations += 1

    @property
    def value(self):
        if self.iterations == 0:
            return 0
        else:
            return 1.0 * self.current_total / self.iterations

    def reset(self):
        self.current_total = 0.0
        self.iterations = 0.0

loss_hist = Averager()
itr = 1
model.train()


for epoch in range(num_epochs):
    loss_hist.reset()
    
    for images, targets, image_ids in train_dl:
        images = list(image.to(device) for image in images)
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]
        
        loss_dict = model(images, targets)

        losses = sum(loss for loss in loss_dict.values())
        loss_value = losses.item()

        loss_hist.send(loss_value)

        optimizer.zero_grad()
        losses.backward()
        optimizer.step()

        if itr % 50 == 0:
            print(f"Iteration #{itr} loss: {loss_value}")

        itr += 1
    
    # update the learning rate
    if lr_scheduler is not None:
        lr_scheduler.step()

    print(f"Epoch #{epoch} loss: {loss_hist.value}")