contact

다중 카메라-다중 객체 추적 구현

쉽지 않았던 여정

계기

특정 공간에서 수많은 사람들 개개인이 다니는 이동 동선을 파악하여 공간의 방문 빈도를 알고, 공간의 효율을 높이는 솔루션의 초기 개발단계에 있었는데, 이러한 솔루션에서 기존의 CCTV 인프라로 사람들의 동선 데이터를 얻기 위해선 다중 카메라-다중 객체 추적(Multi Camera-Multi Object Tracking, MCMOT) 의 구현이 필수적이었다.

Object Detection은 YOLO 와 같이 성능 좋은 모델들이 이미 존재해 큰 문제가 되지 않았다. 또한 Single Camera Tracking도 DeepSORTByteTrack으로 쉽게 가능하다. 하지만 다중 카메라의 경우는 달랐다. Computer Vision 분야를 깊게 파고든 경력이 있는것도 아니라 생소하게 다가왔었고, 어떻게 구현해야할지 방법도 쉽게 떠오르지 않았다. 위대한 오픈소스 생태계에선 '과연 이런게 미리 구현되어있을까?' 하고 찾아보면 항상 기다린듯이 라이브러리나 맞춤형 모델이 이미 준비되어 있었고 쉽게 찾을 수 있었지만, 슬프게도 이번에는 그렇지 않았다.

그래서 기간 내에 개발이 가능할 지 검토해보기도 하는 등 제법 난이도 있는 과제로 여겨졌다.

한 문장의 단서만을 주고 사라진 영상

그러다 5개의 카메라에서 여러명의 사람을 성공적으로 구분하는 짧은 영상을 찾게 되었다.

Note that the detection happens separately on each camera and then unified into one since identification process.

영상의 캡션에는 위와 같은 한 마디 말만 남겨두고 있다. 일단 이것이 구현 가능하다는 점, 그리고 Detection이 먼저 각 카메라의 영상에서 이루어진 후에 한번에 Identification이 이루어진다는 점 정도는 알 수 있었다.

여러 카메라에서 객체 인식과 추적을 한번에 쉽게 해주는 모델이 있지는 않았지만, 많은 Paper가 존재하였고, 구현의 가능성이 점점 보였다.

그래서 도전해보기로 하였다.

시작 전 용어 정리

Detection

특정 객체를 탐지하고 구분하는 것. 단, 각 객체에 ID값을 부여하여 추적하려면 Tracking이 필요하다. 보통 어떤 물체인지에 대한 값과 물체의 좌표만 알려준다.

이미 YOLO 와 같은 모델들이 공개되어 훌륭한 성능을 보여주고 있다.

Tracking

개별 객체를 구분하는 것, 보통 ID값을 부여한다. 이 글에서는 단일 카메라-다중 객체 트래킹(Single Camera-Multi Object Tracking) 을 가리키는 용어로 쓴다.

DeepSORT,StrongSORT, ByteTrack 등의 모델들이 있다. ByteTrack 추천.

Identification

이 글에서는 바로 이 글의 주제인 다중 카메라-다중 객체 추적을 가리키는 용어로 쓰인다. Re-ID(Re-Identification) 이라고 많이 표현되지만 이 글에서는 Identification, Re-Identification 용어 둘 다 동일한 의미로 사용함.

글을 쓰는 시점 기준으로 Re-ID만을 위한 모델은 따로 존재하지 않는다.

구현

다중 카메라 환경에서의 해답 찾기

Multi-Camera Multi-Object Tracking에 관한 Paper들을 읽으면서 공통적으로 Re-Identification 이라는 용어가 등장하는 것을 보고 그에 관해 더 찾아보았다. 여러 개의 입력에서 동일한 객체를 알아내 다시 새롭게 Global ID를 부여하는 로직인데, 다중 카메라에서 동일 객체를 식별하는 나의 목표에 딱 맞는 것이었다.

Re-Identification을 구현하려면?

Re-Identification은 다음과 같은 순서로 이루어진다.

  1. 각 카메라마다 객체들에 대해 벡터를 추출한다.
  2. 서로 다른 카메라의 객체들끼리 벡터값을 비교하여 가장 흡사한 객체끼리 매칭한다. (매칭 알고리즘)

나의 구상을 살펴보자.

// 첫 생각

Detection => Identification?

Single Camera의 경우, Detection을 하고 나서 얻은 각 객체들의 좌표값에 대해 Tracking 연산을 하여 id값이 부여된다.

Multi Camera에서도 동일하게 각각 Detection과 Tracking이 가능하다.

Detecting

# Detector 클래스 내에서, YOLO 모델로 Detection 수행하는  메서드
def detect(self, img, cls=0):
    # 이미지 전처리 메서드 호출하여 전처리된 이미지 리턴
    # 리사이징, 색변환, Numpy Array -> Pytorch Tensor 변환 등 수행
    input_image, scale = self.preprocess_image(img, (640, 640))

    # YOLO model로 분석
    self.detections = self.model(input_image)

    # 각 개체의 왼쪽 위 x,y 좌표, 오른쪽 아래 x,y 좌표, confidence값(확률) 리턴
    self.detections = [
        [
            int(det.boxes.xyxy.cpu().numpy().tolist()[0][0] / scale[0]),
            int(det.boxes.xyxy.cpu().numpy().tolist()[0][1] / scale[1]),
            int(det.boxes.xyxy.cpu().numpy().tolist()[0][2] / scale[0]),
            int(det.boxes.xyxy.cpu().numpy().tolist()[0][3] / scale[1]),
            round(det.boxes.conf[0].item(), 2),
        ]
        for det in self.detections[0]
        if int(det.boxes.cls[0].item()) == cls
    ]

    # 후처리 메서드 호출
    # detections 배열을 ByteTrack의 매개변수로 사용하기 위해 Pytorch Tensor로 변환하여 리턴
    self.processed_detections = self.process_detections()

Tracking

Tracking 과정은 ByteTrack이 높은 정확도로 잘 수행한다.

특이한 점으로는, 이 프로젝트는 가만히 있는 직원과 감상하지 않고 지나쳐가는 행인 등 outliar들의 존재를 제거할 필요가 있었고, 그래서 거기에 활용하기 위해 이전 프레임과 비교해서 달라진 위치(즉, 이동거리)를 구하는 로직이 존재했다. 해당 로직을 함수가 있다 생각하고 축약했다. (잘 생각해보면 간단히 구현할 수 있는 로직이다.)

# Tracker 클래스에서 ByteTrack으로 Detection 수행하는 메서드
def track(self, detections, frame_size):
    # ByteTrack 수행
    tracks = self.tracker.update(detections, frame_size, frame_size)

    # 이전 프레임과 비교하여 이미지상의 이동거리를 구하는 로직이 있으나, 여기선 간단히 calculate_movement_distance() 함수가 있다고 생각하고 생략
    for track in tracks:
        self.track_move[track.track_id] += calculate_movement_distance(track)

    # id값, Bounding Box 만들기 좌표값 생성하여 리턴,
    return [
        (
            track.track_id,
            int(max(0, track.tlbr[0])),
            int(max(0, track.tlbr[1])),
            int(min(track.tlbr[2], frame_size[0])),
            int(min(track.tlbr[3], frame_size[1])),
        )
        for track in tracks
    ]

이상치 제거

아까 구한 이동거리를 각 방문객 객체의 대표 속성값으로 삼아, 일정 이동거리 이하의 방문객들은 이상치로 간주하고 제거하는 로직이다.

제거 여부를 판가름할 threshold 값이 얼마일진 나도 모른다.

각 개수별로 클러스터링을 해본 뒤에 각각 Silhouette Score 값을 계산하여 가장 높은 값을 찾으면 이상적인 클러스터의 개수를 찾을 수 있다.

Silhouette Score를 통해 각 Cluster가 얼마나 잘 응집되어 있는지, 다른 Cluster랑은 떨어져있는지 알 수 있다. -1에 가까울수록 안좋고, 1에 가까울수록 좋다.

from sklearn.cluster import KMeans
from sklearn.metrics import silhouette_score


def filter_ids_with_low_appear(self):
    # 각 ID의 등장 횟수를 카운트
    moves = np.array(list(self.track_move.values())).reshape(-1, 1)

    # Silhouette Score를 기반으로 최적의 클러스터 수 결정
    # 1개부터 min([사람 수],10)개의 클러스터까지 시도
    silhouette_scores = []
    K = range(2, min(len(np.unique(moves)), 10))

    for k in K:
        kmeans = KMeans(n_clusters=k, random_state=0).fit(moves) # K-means Clustering 수행
        score = silhouette_score(moves, kmeans.labels_) # Silhouette Score 계산
        silhouette_scores.append(score) # 스코어 추가

    # Silhouette Score를 확인하면 최적의 클러스터 수를 알 수 있다
    optimal_k = K[silhouette_scores.index(max(silhouette_scores))]
    # 최적의 클러스터 수로 K-means Clustering 수행
    kmeans = KMeans(n_clusters=optimal_k, random_state=0).fit(moves)

    # 각 클러스터의 최소값을 임계값으로 사용하고 정렬
    thresholds = sorted([min(moves[kmeans.labels_ == i]) for i in range(optimal_k)])

    # 선택된 임계값 (두 번째로 작은 값, 클러스터가 하나인 경우를 대비해 len 체크)
    if len(thresholds) > 1:
        selected_threshold = thresholds[1]
    else:
        # 클러스터가 하나만 있는 경우 첫 번째 값을 사용
        selected_threshold = thresholds[0]


    # 임계값 이상 이동한 객체의 ID만 포함하는 리스트 생성
    filtered_ids = [
        id
        for id, move in self.track_move.items()
        if move >= min(selected_threshold)
    ]

    # 결과 출력
    print("Original moves:", self.track_move)
    print("Optimal number of clusters:", optimal_k)
    print("Thresholds per cluster:", thresholds)
    print("Threshold:", selected_threshold)
    print("Filtered moves (dynamic thresholds):", filtered_ids)

    self.filtered_ids = filtered_ids

이제 Detection, Tracking, 이상치 제거까지 완료했다. Identification을 할 차례.

Identifying

이제 각 카메라 각도별로 id값이 부여되었으며, 이미지에서 객체의 특성을 직접 추출해야한다.

resnet50을 선택했다. CNN모델 중 하나이다. 이미지 구분을 위한 Training을 마치면 아래와 같은 용도로 쓸 수 있는 모델이다.

  • Object Detection
  • Image Classification으로 의자, 자동차, 강아지 등 사물 구별
  • 객체의 영역을 구분하는 Image Segmentation

그러나 우리는 벡터값, 즉 이미지 속성을 추출해 서로 다른 각도에서 비교해 동일인물을 찾아내는데에 쓸 것이다. 정확히는 매 프레임마다 벡터값을 추출하여, 마지막에 평균값을 구해 인물의 특징을 구할 예정이다.

resnet50 모델 정보

- 데이터셋: msmt17
- 입력 이미지 크기: 256x128(h,w)
- Epoch 수: 150
- Learning Rate: 0.0015
- Batch 크기: 64

내가 사용한 모델은 여기서 다운로드 가능하다.

이제 이미지를 resnet50 모델에 입력시켜보도록 하자.

아래는 내가 만든 Identifier 클래스이다. 라이브러리들과 직접 만든 util 함수들도 import되어 포함되어있지만 이해하기 어렵지 않을 것이다.

import torch
import torchvision.transforms as transforms
import torchreid
import cv2
import numpy as np
from analyzer.device import get_device
from PIL import Image


class Identifier:
    # 클래스 인스턴스 선언 함수
    def __init__(self) -> None:
        self.device = get_device() # 디바이스명
        self.features_by_id = {}
        self.average_features_by_id = {}

        model = torchreid.models.resnet50(num_classes=4101, pretrained=True)

        # 가중치 불러오기
        checkpoint = torch.load(
            "models/resnet50_msmt17_combineall_256x128_amsgrad_ep150_stp60_lr0.0015_b64_fb10_softmax_labelsmooth_flip_jitter.pth",
            map_location=self.device,
        )

        new_state_dict = {}  # checkpoint의 state_dict를 처리하여 저장할 딕셔너리

        key_map = {
            "fc.weight": "classifier.weight",
            "fc.bias": "classifier.bias",
        }  # 키 매핑

        # 체크포인트의 각 키에 대해 "model." 접두어 제거
        for key, value in checkpoint.items():
            key = key.replace("model.", "")  # "model." 문자열 제거
            if key in key_map:
                new_state_dict[key_map[key]] = value
            else:
                new_state_dict[key] = value

        model.load_state_dict(new_state_dict, strict=False)  # 수정된 state dict 로드

        model = model.to(self.device)  # 디바이스 설정

        model.eval()  # 평가 모드로 설정

        self.model = model # 모델 설정

    # 이미지 전처리 함수
    def preprocess_image(self, img):
        img = Image.fromarray(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))
        transform = transforms.Compose(
            [
                transforms.Resize((256, 128)),
                transforms.ToTensor(),
                transforms.Normalize(
                    mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]
                ),
            ]
        )
        img = transform(img).unsqueeze(0).to(self.device)
        return img

    # 이미지 벡터 추출
    def extract_feature(self, img):
        img = self.preprocess_image(img)
        with torch.no_grad():
            feature = self.model(img)
        return feature.cpu().numpy()

    # 추출한 이미지 벡터 저장
    def save_feature(self, id, feature):
        if id not in self.features_by_id:
            self.features_by_id[id] = list()
        self.features_by_id[id].append(feature)

    # 이미지 벡터 평균값을 구하는 메서드
    # 가장 마지막에 사용
    def calculate_average_feature(self):
        self.average_features_by_id = {
            id: np.mean(np.stack(features), axis=0)
            for id, features in self.features_by_id.items()
        }
        return self.average_features_by_id

동일인물 찾기

Identifierclass의 메서드들을 잘 활용한다면, 각 카메라별로 백터 추출을 하고, 각 영상의 객체별 벡터속성 평균값(즉 대표값)을 구할 수 있을 것이다.

이제 영상별로 유사한 벡터를 그룹화(=영상별 동일인물 추정 객체끼리 모으기)해야할 차례이다. 이는 크게 어렵지 않다. 벡터에 대해 Clustering 해주면 된다. 다양한 Clustering 방법이 많으니 하나 골라서 쓰면 된다. 이 프로젝트에선 K-means Clustering을 사용했다. 코드는 생략.

각 영상마다 동일인물 n명이 있고, [(ByteTrack id값),(추출한 벡터값)] 형태의 크기 n인 배열이 영상 갯수만큼 있다고 할 때, Global ID값(0,1,2..n)에 대해 각 영상별 ByteTrack id값을 매칭해주면 끝이 난다.

결과

이제 동일 인물끼리 매칭이 되었으므로, 간단한 시각화만 하면 Identification까지 마쳐 Global ID가 부여된 사람들의 모습을 볼 수 있을 것이다.

서로 마주보는 두 개의 카메라에서 동일 인물에 대해 같은 id값이 부여된 모습

Detection과 Tracking 과정에서 100%의 정확도를 달성하지 않는 이상 중간에 놓치는 부분이 생긴다는 점은 아쉬운 부분이다. 하지만 개별 영상에서 Tracking한 id값은 쭉 유지되므로 잘 활용할 수 있을 것이다.

정리된 글로 볼 땐 금방 끝났지만, 수많은 시행착오를 겪으며 익히고, 그걸 다시 지식으로 정립하는 과정의 반복이었다. 내겐 꽤나 뜻깊은 프로젝트.

궁금한 사항이 있으시면 여기로 메일 주세요. 가능한 도와드리겠습니다.

← blog