import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.utils.data as data
from torch.optim import Adam
import torchvision.transforms as transforms
import torchvision.datasets as datasets
from torch.utils.data import DataLoader
from torch.utils.data import Dataset
from random import randint
from PIL import Image
import matplotlib.pyplot as plt
import numpy as np
import pathlib
import os
import copy
import random
import time
# torch.rand() : 0과 1 사이의 숫자를 균등하게 생성
# torch.randn() : 평균이 0이고 표준편차가 1인 가우시안 정규분포를 이용해 생성
# torch.randint() : 주어진 범위 내의 정수를 균등하게 생성, 자료형은 torch.float32
# torch.randperm() : 주어진 범위 내의 정수를 랜덤하게 생성
'''
난수 생성기의 seed를 고정하면, 매번 프로그램을 실행할 때마다 생성되는
난수들의 수열이 같게 할 수 있다. 그래서 pytorch와 관련 라이브러리에서 사용되는
난수 관련 seed를 고정하여야 한다.
pytorch_lightning에선 pytorch와 관련된 난수 생성기의 seed를 고정하는 코드가 있다.
그 코드를 보면, pytorch의 seed 설정 함수와 함께 python random 모듈, numpy의 seed를
고정하는 모습을 볼 수 있다.
'''
SEED = 1234
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
'''
cudnn은 convolution을 수행하는 과정에 벤치마킹을 통해서 지금 환경에 가장 적합한 알고리즘을 선정해 수행한다고 한다.
이 과정에서 다른 알고리즘이 선정되면 연산 후 값이 달라질 수 있는 것이다.
이 설정을 켜 놓으면 성능 향상에 도움이 된다고 한다.
'''
torch.backends.cudnn.deterministic = True
path = "./practic_torch/data/Rice_Image_Dataset/"
# https://brownbears.tistory.com/415 pathlib
data_dir = pathlib.Path(path)
df_labels = {
'Arborio' : 0,
'Basmati' : 1,
'Ipsala' : 2,
'Jasmine' : 3,
'Karacadag': 4
}
class_names = ['Karacadag', 'Basmati', 'Jasmine', 'Arborio', 'Ipsala']
num_class = len(class_names)
image_files = [[os.path.join(path, class_name, x) for x in os.listdir(os.path.join(path, class_name))] for class_name in class_names]
images_paths = []
for i in range(5):
for j in range(len(image_files[i])):
current = image_files[i]
images_paths.append(current[j])
random.shuffle(images_paths)
#
train_paths = images_paths[:60000]
test_paths = images_paths[60000:70000]
valid_paths = images_paths[70000:]
#transform
transformations = transforms.Compose([
transforms.Resize((256,256)),
transforms.RandomHorizontalFlip(),
transforms.RandomRotation(30),
transforms.ToTensor(),
transforms.Normalize((0.5), (0.5))
])
class ImageDataset(Dataset):
def __init__(self, df_labels, base_dir, transform=None):
super().__init__()
self.base_dir = base_dir
self.df_labels = df_labels
self.transform = transform
def __len__(self):
return len(self.base_dir)
def __getitem__(self, index):
image_path = self.base_dir[index]
image = Image.open(image_path)
label_name = image_path.split('/')[-2]
label = self.df_labels[label_name]
if self.transform is not None:
image = self.transform(image)
return (image, label)
train_data = ImageDataset(df_labels, train_paths, transformations)
test_data = ImageDataset(df_labels, test_paths, transformations)
val_data = ImageDataset(df_labels, valid_paths, transformations)
train_ds = DataLoader(train_data, batch_size= 64, shuffle=True)
test_ds = DataLoader(test_data, batch_size= 64, shuffle=True)
valid_ds = DataLoader(val_data, batch_size= 64, shuffle=True)
# Model Creation
class LeNet(nn.Module):
def __init__(self, output_dim):
super().__init__()
self.conv1 = nn.Conv2d(in_channels=3, out_channels=6, kernel_size=5)
self.conv2 = nn.Conv2d(in_channels=6, out_channels=16, kernel_size=5)
self.fc_1 = nn.Linear(16*61*61, 120)
self.fc_2 = nn.Linear(120, 84)
self.fc_3 = nn.Linear(84, output_dim)
def forward(self, x):
#(3, 256, 256) ---> input
x = self.conv1(x)
#(6, 252, 252) ---> output
x = F.max_pool2d(x, kernel_size=2)
#(6, 126, 126)
x = F.relu(x)
x = self.conv2(x)
#(16, 122, 122)
x = F.max_pool2d(x, kernel_size=2)
#(16, 61, 61)
x = F.relu(x)
x = x.view(x.shape[0], -1)
h = x
x = self.fc_1(x)
x = F.relu(x)
x = self.fc_2(x)
x = F.relu(x)
x = self.fc_3(x)
return x, h
OUTPUT_DIM = 5
model = LeNet(OUTPUT_DIM)
def count_parameters(model):
# torch.numel input 텐서 의 총 요소 수를 반환합니다
return sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f'The model has {count_parameters(model)} : traninable parameters')
device = torch.device("cuda" if torch.cuda.is_available() else 'cpu')
mode = model.to(device)
optimizer = Adam(model.parameters(), lr = 0.0001)
loss_fun = F.cross_entropy
# Training & Evaluation
def train(mode, epoch, train_ds):
model.train()
total_num = len(train_ds.dataset)
train_loss = 0
correct_num = 0
for image, label in train_ds :
image = image.to(device)
label = label.to(device)
label = label.to(torch.long)
output, _ = model(image)
loss = loss_fun(output, label)
train_loss += loss.item() * label.size(0)
optimizer.zero_grad()
loss.backward()
optimizer.step()
predict = torch.argmax(output, dim = -1)
correct_num += label.eq(predict).sum()
train_loss = train_loss / total_num
train_acc = correct_num / total_num
print('epoch: {} --> train_loss: {:.6f} - train_acc: {:.6f} - '.format(
epoch, train_loss, train_acc), end='')
def evaluate(model, eval_ds, mode='val'):
model.eval()
total_num = len(eval_ds.dataset)
eval_loss = 0
correct_num = 0
for image, label in eval_ds:
image = image.to(device)
label = label.to(device)
label = label.to(torch.long)
output,_ = model(image)
loss = loss_fun(output, label)
eval_loss += loss.item() * label.size(0)
predict = torch.argmax(output, dim=-1)
correct_num += label.eq(predict).sum()
eval_loss = eval_loss / total_num
eval_acc = correct_num / total_num
print('{}_loss: {:.6f} - {}_acc: {:.6f}'.format(
mode, eval_loss, mode, eval_acc))
for epoch in range(5):
train(model, epoch, train_ds)
evaluate(model, val_ds)