Chào mọi người, mình là SuNT, đến từ team AI, VTI VN!
Human Action Recognition (HAR) là quá trình sử dụng những cảnh quay trong video để nhận diện, phân loại các hành động khác nhau được thực hiện bởi người trong video đó. Nó được ứng dụng rất rộng rãi trong các lĩnh vực như giám sát, thể dục, thể thao, ...
Giả sử, bạn muốn tạo một ứng dụng dạy học Yoga trực tuyến. Trước tiên, bạn cần quay các video hướng dẫn để người học theo dõi và làm theo. Sau đó, mỗi người học tự tập và quay lại video của mình. Họ gửi các video đó lên ứng dụng của bạn. Dựa vào video nhận được, ứng dụng có thể đánh giá được mức độ chính xác trong mỗi động tác của người học. Từ đó đưa ra gợi ý cải thiện, ... Thật tuyệt vời phải không?
Source: learnopencv.com
Trong bài này, chúng ta sẽ cùng nhau tạo ra một model để nhận diện một số hành dộng của người, sử dụng pose estimation và mạng LSTM. Pytorch_Lightning được sử dụng trong bài này.
1. Tổng quan sơ đồ kiến trúc
Source: learnopencv.com
Để phân loại một hành động, trước tiên chúng ta cần xác định vị trí các bộ phận cơ thể khác nhau trong mọi khung hình, sau đó phân tích chuyển động của các bộ phận đó theo thời gian.
Bước đầu tiên đạt được bằng cách sử dụng Detectron2, nó xuất ra tư thế của cơ thể (17 Keypoints) sau khi quan sát một khung hình trong video.
Bước thứ hai là phân tích chuyển động của cơ thể theo thời gian và đưa ra dự đoán được thực hiện bằng mạng LSTM. Đầu vào là các Keypoints từ một chuỗi khung được, đầu ra là loại hành đồng được dự đoán.
2. Chuẩn bị dữ liệu
Đối với ứng dụng này, chúng ta chỉ cần huấn luyện mạng LSTM để phân loại các hành động, còn phần Pose Estimation thì chúng ta sẽ sử dụng pre-trained có sẵn cung cấp bởi Detectron2.
Bộ dữ liệu được dùng để huấn luyện mạng LSTM được tạo thành bằng cách sử dụng OpenPose trên các video của tập dữ liệu Berkeley Multimodal Human Action Database (MHAD). Sử dụng cách thức tương tự, chúng ta cũng có thể tạo ra bộ dữ liệu của riêng mình.
Download bộ dữ liệu tại đây. Nó bao gồm 6 hành động: JUMPING, JUMPING_JACKS, BOXING, WAVING_2HANDS, WAVING_1HAND, CLAPPING_HANDS.
Source: learnopencv.com
Training Data bao gồm các chuỗi 17 Keypoints kết hợp với một nhãn tương ứng. Mỗi Keypoint là một cặp tọa độ (x,y).
Source: learnopencv.com
Mỗi lần phân loại, chúng ta sẽ sử dụng 32 frames liên tiếp nhau. Như vậy thì kích thước dữ liệu của một Input Data sẽ là 32x34:
Source: learnopencv.com
!head -2 RNN-HAR-2D-Pose-database/X_train.txt
---
295.914,161.579,307.693,203.413,281.546,203.368,274.997,251.562,267.194,293.253,337.619,204.669,347.958,255.443,341.541,295.866,286.81,289.393,297.196,355.832,297.22,405.371,321.967,291.959,327.143,358.408,328.528,411.922,294.546,156.42,305.002,156.418,0,0,318.083,161.632
295.855,161.6,307.684,203.408,281.529,203.385,274.989,251.574,267.191,291.961,337.615,204.646,347.974,254.209,344.093,295.816,286.803,289.377,297.165,355.827,297.205,404.095,323.248,290.652,324.564,358.409,328.493,410.63,293.252,157.686,303.706,157.706,0,0,318.024,161.654
Bởi vì OpenPose trả về kết quả là 18 Keypoints, trong khi kết quả của Detectron2 chỉ là 17 Keypoints nên chúng ta sẽ phải thực hiện một bước chuyển đổi trước khi sử dụng bộ dữ liệu này.
WINDOW_SIZE = 32 # 32 continuous frames
class PoseDataset(Dataset):
def __init__(self, X, Y):
self.X = X
self.y = Y
def __len__(self):
return len(self.y)
def __getitem__(self, idx):
return self.X[idx], self.y[idx]
openpose_to_detectron_mapping = [0, 1, 28, 29, 26, 27, 32, 33, 30, 31, 8, 9, 2, 3, 10, 11, 4, 5, 12, 13, 6, 7, 20, 21, 14, 15, 22, 23, 16, 17, 24, 25, 18, 19]
class PoseDataModule(pl.LightningDataModule):
def __init__(self, data_root, batch_size):
super().__init__()
self.data_root = data_root
self.batch_size = batch_size
self.X_train_path = self.data_root + "X_train.txt"
self.X_test_path = self.data_root + "X_test.txt"
self.y_train_path = self.data_root + "Y_train.txt"
self.y_test_path = self.data_root + "Y_test.txt"
# Detectron2 produces only 17 key points while OpenPose produces 18 (or more) key points.
def convert_to_detectron_format(self, row):
row = row.split(',')
# filtering out coordinate of neck joint from the training/validation set originally generated using OpenPose.
temp = row[:2] + row[4:]
# change to Detectron2 order of key points
temp = [temp[i] for i in openpose_to_detectron_mapping]
return temp
def load_X(self, X_path):
file = open(X_path, 'r')
X = np.array(
[elem for elem in [
self.convert_to_detectron_format(row) for row in file
]],
dtype=np.float32
)
file.close()
blocks = int(len(X) / WINDOW_SIZE)
X_ = np.array(np.split(X, blocks))
return X_
# Load the networks outputs
def load_y(self, y_path):
file = open(y_path, 'r')
y = np.array(
[elem for elem in [
row.replace(' ', ' ').strip().split(' ') for row in file
]],
dtype=np.int32
)
file.close()
# for 0-based indexing
return y - 1
def prepare_data(self):
pass
def setup(self, stage=None):
X_train = self.load_X(self.X_train_path)
X_test = self.load_X(self.X_test_path)
y_train = self.load_y(self.y_train_path)
y_test = self.load_y(self.y_test_path)
self.train_dataset = PoseDataset(X_train, y_train)
self.val_dataset = PoseDataset(X_test, y_test)
def train_dataloader(self):
# train loader
train_loader = torch.utils.data.DataLoader(
self.train_dataset,
batch_size=self.batch_size,
shuffle=True
)
return train_loader
def val_dataloader(self):
# validation loader
val_loader = torch.utils.data.DataLoader(
self.val_dataset,
batch_size=self.batch_size,
shuffle=False
)
return val_loader
3. Xây dựng mô hình
3.1 Human Pose Estimation model
Phần này, chúng ta sử dụng pre-trained R50-FPN model từ Detectron2 Model Zoo.
# obtain detectron2's default config
cfg = get_cfg()
# load the pre trained model from Detectron2 model zoo
cfg.merge_from_file(model_zoo.get_config_file("COCO-Keypoints/keypoint_rcnn_R_50_FPN_3x.yaml"))
# set confidence threshold for this model
cfg.MODEL.ROI_HEADS.SCORE_THRESH_TEST = 0.5
# load model weights
cfg.MODEL.WEIGHTS = model_zoo.get_checkpoint_url("COCO-Keypoints/keypoint_rcnn_R_50_FPN_3x.yaml")
# create the predictor for pose estimation using the config
pose_detector = DefaultPredictor(cfg)
3.2 Định nghĩa LSTM model
LSTM model sẽ được khởi tạo với hidden_dim = 50, optimizer là Adam và sử dụng ReduceLROnPlateau scheduler để giảm learning_rate. Ở đây, mình chỉ sử dụng 1 LSTM layer, bạn có thể thí nghiệm với nhiều LSTM layers hơn.
# We have 6 output action classes.
TOT_ACTION_CLASSES = 6
#lstm classifier definition
class ActionClassificationLSTM(pl.LightningModule):
# initialise method
def __init__(self, input_features, hidden_dim, learning_rate=0.001):
super().__init__()
# save hyperparameters
self.save_hyperparameters()
# The LSTM takes word embeddings as inputs, and outputs hidden states
# with dimensionality hidden_dim.
self.lstm = nn.LSTM(input_features, hidden_dim, num_layers=2, batch_first=True)
# The linear layer that maps from hidden state space to classes
self.linear = nn.Linear(hidden_dim, TOT_ACTION_CLASSES)
def forward(self, x):
# invoke lstm layer
lstm_out, (ht, ct) = self.lstm(x)
# invoke linear layer
return self.linear(ht[-1])
def training_step(self, batch, batch_idx):
# get data and labels from batch
x, y = batch
# reduce dimension
y = torch.squeeze(y)
# convert to long
y = y.long()
# get prediction
y_pred = self(x)
# calculate loss
loss = F.cross_entropy(y_pred, y)
# get probability score using softmax
prob = F.softmax(y_pred, dim=1)
# get the index of the max probability
pred = prob.data.max(dim=1)[1]
# calculate accuracy
acc = torchmetrics.functional.accuracy(pred, y)
dic = {
'batch_train_loss': loss,
'batch_train_acc': acc
}
# log the metrics for pytorch lightning progress bar or any other operations
self.log('batch_train_loss', loss, prog_bar=True)
self.log('batch_train_acc', acc, prog_bar=True)
#return loss and dict
return {'loss': loss, 'result': dic}
def training_epoch_end(self, training_step_outputs):
# calculate average training loss end of the epoch
avg_train_loss = torch.tensor([x['result']['batch_train_loss'] for x in training_step_outputs]).mean()
# calculate average training accuracy end of the epoch
avg_train_acc = torch.tensor([x['result']['batch_train_acc'] for x in training_step_outputs]).mean()
# log the metrics for pytorch lightning progress bar and any further processing
self.log('train_loss', avg_train_loss, prog_bar=True)
self.log('train_acc', avg_train_acc, prog_bar=True)
def validation_step(self, batch, batch_idx):
# get data and labels from batch
x, y = batch
# reduce dimension
y = torch.squeeze(y)
# convert to long
y = y.long()
# get prediction
y_pred = self(x)
# calculate loss
loss = F.cross_entropy(y_pred, y)
# get probability score using softmax
prob = F.softmax(y_pred, dim=1)
# get the index of the max probability
pred = prob.data.max(dim=1)[1]
# calculate accuracy
acc = torchmetrics.functional.accuracy(pred, y)
dic = {
'batch_val_loss': loss,
'batch_val_acc': acc
}
# log the metrics for pytorch lightning progress bar and any further processing
self.log('batch_val_loss', loss, prog_bar=True)
self.log('batch_val_acc', acc, prog_bar=True)
#return dict
return dic
def validation_epoch_end(self, validation_step_outputs):
# calculate average validation loss end of the epoch
avg_val_loss = torch.tensor([x['batch_val_loss']
for x in validation_step_outputs]).mean()
# calculate average validation accuracy end of the epoch
avg_val_acc = torch.tensor([x['batch_val_acc']
for x in validation_step_outputs]).mean()
# log the metrics for pytorch lightning progress bar and any further processing
self.log('val_loss', avg_val_loss, prog_bar=True)
self.log('val_acc', avg_val_acc, prog_bar=True)
def configure_optimizers(self):
# adam optimiser
optimizer = optim.Adam(self.parameters(), lr=self.hparams.learning_rate)
# learning rate reducer scheduler
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=10, min_lr=1e-15, verbose=True)
# scheduler reduces learning rate based on the value of val_loss metric
return {"optimizer": optimizer,
"lr_scheduler": {"scheduler": scheduler, "interval": "epoch", "frequency": 1, "monitor": "val_loss"}}
4. Huấn luyện mô hình
Sử dụng ModelCheckpoint callback và LearningRateMonitor, chúng ta sẽ huấn luyện mạng LSTM như sau:
def do_training_validation():
pl.seed_everything(21)
parser = ArgumentParser()
parser = pl.Trainer.add_argparse_args(parser)
parser = configuration_parser(parser)
# args = parser.parse_args()
args, unknown = parser.parse_known_args()
# init model
hidden_dim = 50
WINDOW_SIZE = 32
model = ActionClassificationLSTM(WINDOW_SIZE, hidden_dim, learning_rate=args.learning_rate)
data_module = PoseDataModule(data_root=args.data_root,
batch_size=args.batch_size)
#save only the top 1 model based on val_loss
checkpoint_callback = ModelCheckpoint(save_top_k=1, monitor='val_loss')
lr_monitor = LearningRateMonitor(logging_interval='step')
#trainer
trainer = pl.Trainer.from_argparse_args(args,
# fast_dev_run=True,
max_epochs=args.epochs,
deterministic=True,
gpus=1,
progress_bar_refresh_rate=1,
callbacks=[EarlyStopping(monitor='train_loss', patience=15), checkpoint_callback, lr_monitor])
trainer.fit(model, data_module)
return model
Kết quả huấn luyện model:
-
Train Accuracy:
Hình 5 - Training Accuracy Chart. -
Train Loss:
Hình 6 - Training Loss Chart. -
Validation Accuracy:
Hình 7 - Validation Accuracy Chart. -
Validation Loss:
Hình 8 - Validation Loss Chart.
5. Thực hiên Inference
Sau khi đã train xong model, ta sẽ thử sử dụng nó để thực hiện nhận diện các hành động như trong tập huấn luyện.
# how many frames to skip while inferencing
# configuring a higher value will result in better FPS (frames per rate), but accuracy might get impacted
SKIP_FRAME_COUNT = 1
# analyse the video
def analyse_video(pose_detector, lstm_classifier, video_path):
# open the video
cap = cv2.VideoCapture(video_path)
# width of image frame
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
# height of image frame
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
# frames per second of the input video
fps = int(cap.get(cv2.CAP_PROP_FPS))
# total number of frames in the video
tot_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
# video output codec
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
# extract the file name from video path
file_name = ntpath.basename(video_path)
# video writer
vid_writer = cv2.VideoWriter('res_{}'.format(
file_name), fourcc, 30, (width, height))
# counter
counter = 0
# buffer to keep the output of detectron2 pose estimation
buffer_window = []
# start time
start = time.time()
label = None
# iterate through the video
while True:
# read the frame
ret, frame = cap.read()
# return if end of the video
if ret == False:
break
# make a copy of the frame
img = frame.copy()
if(counter % (SKIP_FRAME_COUNT+1) == 0):
# predict pose estimation on the frame
outputs = pose_detector(frame)
# filter the outputs with a good confidence score
persons, pIndicies = filter_persons(outputs)
if len(persons) >= 1:
# pick only pose estimation results of the first person.
# actually, we expect only one person to be present in the video.
p = persons[0]
# draw the body joints on the person body
draw_keypoints(p, img)
# input feature array for lstm
features = []
# add pose estimate results to the feature array
for i, row in enumerate(p):
features.append(row[0])
features.append(row[1])
# append the feature array into the buffer
# not that max buffer size is 32 and buffer_window operates in a sliding window fashion
if len(buffer_window) < WINDOW_SIZE:
buffer_window.append(features)
else:
# convert input to tensor
model_input = torch.Tensor(np.array(buffer_window, dtype=np.float32))
# add extra dimension
model_input = torch.unsqueeze(model_input, dim=0)
# predict the action class using lstm
y_pred = lstm_classifier(model_input)
prob = F.softmax(y_pred, dim=1)
# get the index of the max probability
pred_index = prob.data.max(dim=1)[1]
# pop the first value from buffer_window and add the new entry in FIFO fashion, to have a sliding window of size 32.
buffer_window.pop(0)
buffer_window.append(features)
label = LABELS[pred_index.numpy()[0]]
#print("Label detected ", label)
# add predicted label into the frame
if label is not None:
cv2.putText(img, 'Action: {}'.format(label),
(int(width-400), height-50), cv2.FONT_HERSHEY_COMPLEX, 0.9, (102, 255, 255), 2)
# increment counter
counter += 1
# write the frame into the result video
vid_writer.write(img)
# compute the completion percentage
percentage = int(counter*100/tot_frames)
# return the completion percentage
# yield "data:" + str(percentage) + "\n\n"
# show video results
cv2.imshow("image", img)
# Press Q on keyboard to exit
if cv2.waitKey(25) & 0xFF == ord('q'):
break
Kết quả thực hiện trên video:
6. Kết luận
Trong bài này, chúng ta đã cùng nhau thực hành xây dựng một mô hình để nhận diện hành động của người trong video bằng cách sử dụng kết hợp Detectron2 cho Pose Estimation và LSTM cho phân loại. Có rất nhiều thứ có thể cải tiến để có được kết quả tốt hơn mà bạn có thể thử nếu áp dụng vào bài toán thực tế:
- Tăng FPS để có thể chạy được realtime: Tối ưu hóa model (pruning, quantization), loại bỏ bớt frame khi nhận diện, sử dụng multi-threading, ...
- Sử dụng các Pose Estimation model khác như AlphaPose, OpenPose, ...
Toàn bộ source code của bài này, các bạn xem tại đây
Leave a Reply