본문 바로가기
AI, 빅데이터/Medical

[Medical] Image Classification for Covid19 Dataset

by Foxy현 2024. 1. 17.
728x90
반응형

 

Kaggle의 Covid19 Dataset을 이용하여

Computer Vision 및 BioMedical 분야의 공부를 진행해보고자

프로젝트를 진행하였습니다.

 

데이터 출처 : Covid-19 Image Dataset (kaggle.com)

 

Covid-19 Image Dataset

3 Way Classification - COVID-19, Viral Pneumonia, Normal

www.kaggle.com

 


 

먼저 필요한 라이브러리를 불러옵니다.

저의 경우, 셀마다 필요할때 라이브러리를 호출하는 것이 아닌,

맨 위의 셀에 추가하는 방식으로 라이브러리를 import합니다.

(개취 존중..)

 

import os
import copy
import random

import cv2
import torch
import numpy as np
from torch import nn
from torchvision import transforms, models
from torch.utils.data import Dataset, DataLoader
import matplotlib.pyplot as plt
from ipywidgets import interact
from torchsummary import summary

random_seed = 2024

random.seed(random_seed)
np.random.seed(random_seed)
torch.manual_seed(random_seed)
torch.cuda.manual_seed(random_seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

 

 

상수로 선언해줄 것들을 명시합니다.

IMAGE_FORMAT의 경우는 데이터에 있는 이미지 확장자의 명을 나열하였으며,

CLASS_LIST의 경우, 저희가 분류할 3개의 클래스를 나열하였습니다.

 

IMAGE_FORMAT = ['jpeg', 'jpg', 'png']
CLASS_LIST = ['Normal', 'Covid', 'Viral_Pneumonia']

 

 

다음으로, 데이터의 경로를 지정해줍니다.

data_dir의 경우, 이미지 확인등의 역할로 사용합니다.

 

list_image_files 함수는 지정 폴더 안의 이미지를 꺼내서

지정한 파일 이름명으로 변경하여 추가하기 위한 함수입니다.

 

get_RGB_image는 원본 이미지의 BGR 픽셀을 RGB로 변경하는 함수입니다.

 

data_dir = 'data/Covid19-dataset/train'
train_data_dir = 'data/Covid19-dataset/train'
val_data_dir = 'data/Covid19-dataset/test'

def list_image_files(data_dir, sub_dir):
    image_format = ["jpeg", "jpg", "png"]
    
    image_files = []
    images_dir = os.path.join(data_dir, sub_dir)
    for file_path in os.listdir(images_dir):
        if file_path.split(".")[-1] in image_format:
            image_files.append(os.path.join(sub_dir, file_path))
    return image_files

def get_RGB_image(data_dir, file_name):
    image_file = os.path.join(data_dir, file_name)
    image = cv2.imread(image_file)
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    return image

 

 

다음으로, 이 이미지들의 Normal, Covid, Viral 사진들은 무슨 차이가 있을까

확인하기 위해, 시각화를 진행합니다.

 

 

눈으로 봐서는 잘 모르겠네요

 

normals_list = list_image_files(data_dir, "Normal")
covids_list = list_image_files(data_dir, "Covid")
virals_list = list_image_files(data_dir, "Viral_Pneumonia")

test_normals_list = list_image_files(val_data_dir, "Normal")
test_covids_list = list_image_files(val_data_dir, "Covid")
test_pneumonias_list = list_image_files(val_data_dir, "Viral_Pneumonia")

min_num_files = min(len(normals_list), len(covids_list), len(virals_list)) # 70

@interact(index=(0, min_num_files-1))
def show_samples(index=0):
    normal_image = get_RGB_image(data_dir, normals_list[index])
    covid_image = get_RGB_image(data_dir, covids_list[index])
    viral_image = get_RGB_image(data_dir, virals_list[index])

    plt.figure(figsize=(12,8))

    plt.subplot(131)
    plt.title('Normal')
    plt.imshow(normal_image)

    plt.subplot(132)
    plt.title('Covid')
    plt.imshow(covid_image)

    plt.subplot(133)
    plt.title('Viral')
    plt.imshow(viral_image)

 

 

위 데이터들을 통합적으로 가져오고, 이미지를 전처리하는 기능을

Class로 사용해 편리하게 사용해보겠습니다.

 

class Chest_dataset(Dataset):
    def __init__(self, data_dir, transform=None):
        self.data_dir = data_dir
        normals = list_image_files(data_dir, "Normal")
        covids = list_image_files(data_dir, "Covid")
        pneumonias = list_image_files(data_dir, "Viral_Pneumonia")
        
        self.files_path = normals + covids + pneumonias
        self.transform = transform
        
    def __len__(self):
        return len(self.files_path)
    
    def __getitem__(self, index):
        image_file = os.path.join(self.data_dir, self.files_path[index])
        image = cv2.imread(image_file)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        
        target = CLASS_LIST.index(self.files_path[index].split(os.sep)[-2])
         
        target = CLASS_LIST.index(self.files_path[index].split(os.sep)[0])
        
        if self.transform:
            image = self.transform(image)
            target = torch.Tensor([target]).long()
            
        return {"image":image, "target":target}

 

 

현재 이미지의 픽셀 크기는 일정하지 않습니다.

대략 1200 x 1200 픽셀인데,, 이러면 계산량이 어마어마 하겠죠

Pytorch에서는 이러한 연산량을 줄이기 위해 

픽셀값을 줄이고, numpy를 Tensor로 변경후, Normalize해줘야 합니다.

이 부분에서는 순서가 꽤나 중요합니다.

Resize -> ToTenor -> Normalize 순으로 진행해야 합니다.

 

TypeError: img should be PIL Image. Got <class 'torch.Tensor'>

 

 

이 에러 때문에 꽤나 고생했거든요ㅠㅠ

역시 기초가 중요한 것 같아요

 

이후 인스턴스를 생성합니다.

 

transformer = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
])


train_dset = Chest_dataset(train_data_dir, transformer)

 

 

dataloader 함수를 생성합니다.

Chest_dataset 클래스를 불러와서 train, val 데이터를 각각 분리하는 역할을 합니다.

 

def build_dataloader(train_data_dir, val_data_dir):
    dataloaders = {}
    train_dset = Chest_dataset(train_data_dir, transformer)
    dataloaders["train"] = DataLoader(train_dset, batch_size=4, shuffle=True, drop_last=True)

    val_dset = Chest_dataset(val_data_dir, transformer)
    dataloaders["val"] = DataLoader(val_dset, batch_size=1, shuffle=False, drop_last=False)
    return dataloaders

 

 

build_vgg19_based_model 함수를 생성합니다.

저의 경우, vgg19 모델을 사용하였으며,

1000개의 클래스가 아닌, 3개(Normal, Covid, Pneumonia)를

분류해야 하므로, 출력층을 추가해주었습니다.

 

def build_vgg19_based_model(device_name='cpu'):
    device = torch.device(device_name)
    model = models.vgg19(pretrained=True)
    model.avgpool = nn.AdaptiveAvgPool2d(output_size=(1,1))
    model.classifier = nn.Sequential(
        nn.Flatten(),
        nn.Linear(512, 256),
        nn.ReLU(),
        nn.Linear(256, len(CLASS_LIST)),
        nn.Softmax(dim=1)
    )
    return model.to(device)

 

 

 

마지막 분류층에서 3개로 분류하는 것을 확인할 수 있었습니다.

확인 코드는 아래와 같습니다.

저의 경우, GPU를 사용하므로 cuda라고 지정하였는데,

일반적으로 cpu로 지정하시면 됩니다.

 

model = build_vgg19_based_model(device_name='cuda')
summary(model, (3,224,224), batch_size=1, device='cuda')

 

 

훈련을 진행하기 위한 함수를 생성합니다.

phase를 train, val로 지정하여

train일 땐 모델을 학습하고,

val일 땐 모델을 평가하도록 설정합니다.

 

def train_one_epoch(dataloaders, model, optimizer, loss_func, device):
    losses = {}
    accuracies = {}
    for phase in ["train", "val"]:
        
        running_loss = 0.0
        running_correct = 0
        
        if phase == "train":
            model.train()
        else:
            model.eval()
        
        for index, batch in enumerate(dataloaders[phase]):
            image = batch["image"].to(device)
            target = batch["target"].squeeze(1).to(device)
            
            optimizer.zero_grad()

            with torch.set_grad_enabled(phase == "train"):
                prediction = model(image)
                loss = loss_func(prediction, target)
                
                if phase == "train":
                    loss.backward()
                    optimizer.step()
            
            running_loss += loss.item()
            running_correct += get_accuracy(image, target, model)
            
            if phase == "train":
                if index % 10 == 0:
                    print(f"{index}/{len(dataloaders[phase])} - Running Loss: {loss.item()}")

        losses[phase] = running_loss / len(dataloaders[phase])
        accuracies[phase] = running_correct / len(dataloaders[phase])
    return losses, accuracies

 

 

 

모델의 accuracy를 확인하기 위한 코드를 작성합니다.

또한 가장 좋은 state 일 때 모델을 저장하는 코드를 작성하였습니다.

 

 

@torch.no_grad()
def get_accuracy(image, target, model):
    batch_size = image.shape[0]
    prediction = model(image)
    _, pred_label = torch.max(prediction, dim=1)
    is_correct = (pred_label == target)
    return is_correct.cpu().numpy().sum() / batch_size

def save_best_model(model_state, model_name, save_dir="./trained_model"):
    os.makedirs(save_dir, exist_ok=True)
    torch.save(model_state, os.path.join(save_dir, model_name))

 

 

이제 진짜 모델을 학습할건데,

학습하기에 앞서 Loss Function과 Optimizer를 지정합니다.

 

loss_func = nn.CrossEntropyLoss(reduction='mean')
optimizer = torch.optim.SGD(model.parameters(), lr=0.001, momentum=0.9)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

num_epochs = 10
best_acc = 0.0

train_loss, train_accuracy = [], []
val_loss, val_accuracy = [], []


for epoch in range(num_epochs):
    losses, accuracies = train_one_epoch(dataloaders, model, optimizer, loss_func, device)
    train_loss.append(losses["train"])
    val_loss.append(losses["val"])
    train_accuracy.append(accuracies["train"])
    val_accuracy.append(accuracies["val"])
    
    print(f"{epoch+1}/{num_epochs}-Train Loss: {losses['train']}, Val Loss: {losses['val']}")
    print(f"{epoch+1}/{num_epochs}-Train Acc: {accuracies['train']}, Val Acc: {accuracies['val']}")
    
    if (epoch > 3) and (accuracies["val"] > best_acc):
        best_acc = accuracies["val"]
        best_model = copy.deepcopy(model.state_dict())
        save_best_model(best_model, f"model_{epoch+1:02d}.pth")
        
print(f"Best Accuracy: {best_acc}")

 

 

결과는 이렇습니다.

Accuracy가 약 97퍼센트로 꽤 좋은 성능을 나타내는 것 같네요

 


 

이상 포스팅 마치겠습니다!

 

파이토치는 처음이라 어색한데,

Tensorflow와 다른 매력이 있네요

공부하면 할수록 재밌는 느낌..?

 

 

 

728x90
반응형