需求

利用卷积神经网络,训练相关数据,使得自己的系统能够实时识别箕斗是否卸煤,是否在位。需要输出三个状态即 箕斗未在位,箕斗在位,箕斗卸煤。
卸煤过程:

卸煤结束:

箕斗不在位:

分析

这是个简单的分类问题,把箕斗的状态分为三种状态,所以可以设置输出值为100,010,001。因为训练时不用输入整张图片进行训练,所以选择有关键特征的小图来进行运算。

数据准备

截取短视屏

视频大部分是一个多小时时长的,因为视频中大部分都是相同的箕斗不在位的状态,所以从中截取一小部分时间的视频来制作数据集。因为视屏的帧率是25,所以一分钟的视屏都有1500张图片。在这里我截取了两段视屏来制作数据集,第一段就是正常卸煤状态的视屏,第二段是箕斗右侧其它箕斗卸煤的视屏,在这里目标箕斗一直是处于不在位的状态,但是与之前一段不同的是这段视屏中有红色的光存在,如图所示:

视屏截取代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
import cv2

def capture_video(video_path, result_video_path, video, result_video, start_time, end_time):
"""
功能:截取短视频
参数:
video_path:需要截取的视频路径
result_video_path:截取后的视频存放的路径
video:需要截取的视频的名称(不带后缀)
result_video:截取了的视频的名称(不带后缀)
start_time:截取开始时间(单位s)
end_time:截取结束时间(单位s)
"""

# 打印出起始和结束时间
print("The path of the video to be processed is:{}".format(video_path))
print("start_time={}".format(start_time))
print("end_time={}".format(end_time))

# 读取视屏
cap = cv2.VideoCapture(video_path + video)
print("The video load was successful!")

# 读取视屏帧率
fps_video = cap.get(cv2.CAP_PROP_FPS)
print("fps_video={}".format(fps_video))

# 设置写入视屏的编码格式
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
print("fourcc={}".format(fourcc))

# 获取视屏宽度和高度
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
print("frame_width={},frame_height={}".format(frame_width, frame_height))

# 设置写视屏的对象
videoWriter = cv2.VideoWriter(result_video_path + result_video, fourcc, fps_video, (frame_width, frame_height))
print("The object for writing video is set.")
print("Will enter a loop to read frame images!")

# 初始化一个计数器
acount = 0

print("It goes into the cycle!")

while (cap.isOpened()):

# 读取视屏里的图片
ret, frame = cap.read()

# 如果视屏没有读取结束
if ret == True:

# 计数器加一
acount += 1

# 截取相应时间内的视频信息
if(acount > (start_time * fps_video) and acount <= (end_time * fps_video)):
# 将图片写入视屏
videoWriter.write(frame)

if(acount == (end_time * fps_video)):
print("The video was captured successfully!")
print("The processed video is stored:{}".format(result_video_path))
break

else:
# 写入视屏结束
videoWriter.release()
print("Start time or end time exceeds the video time!")
print("Video capture failed!")
break

if __name__ == '__main__':
video_path = "jidou_video/"
result_video_path = "video/"
video = "wskip2020-12-03-20.mp4"
result_video = "first_video.mp4"
start_time = 5 * 60 + 30
end_time = 6 * 60 + 30
capture_video(video_path, result_video_path, video, result_video, start_time, end_time)

获取图片

截取到了关键短视频后就是把短视频拆分成一帧一帧的图片。拆分视频的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import cv2
import os

def apart(video_path, video_name, image_path):
"""
功能:将视频拆分成图片
参数:
video_path:要拆分的视频路径
video_name:要拆分的视频名字(不带后缀)
image_path:拆分后图片的存放路径
"""

# 在这里把后缀接上
video = os.path.join(video_path, video_name + '.mp4')

# 提取视频的频率,每1帧提取一个
frameFrequency = 1

if not os.path.exists(image_path):
#如果文件目录不存在则创建目录
os.makedirs(image_path)

# 获取视屏
use_video = cv2.VideoCapture(video)

# 初始化计数器
acount = 0

# 开始循环抽取图片
print('Start extracting images!')
while True:
res, image = use_video.read()
acount += 1

# 如果提取完图片,则退出循环
if not res:
print('not res , not image')
break

# 将图片写入文件夹中
cv2.imwrite(image_path + str(acount) + '.jpg', image)
print(image_path + str(acount) + '.jpg')

print('End of image extraction!')
use_video.release()

if __name__ == '__main__':
video_path = 'video/'
video_name = 'first_video'
image_path = 'image/'
apart(video_path, video_name, image_path)

标定数据

因为在训练时需要将训练的图片和对应的输出结果进行匹配,所以要对图片进行标定。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
def image_tag(labels_path, start_number, end_number, skip_state):
"""
功能:对图片进行数据标定
参数:
labels_path:存放标签数据的路径
start_number:开始标记数据号码
end_number:结束标记号码
skip_state:箕斗状态(有1、2、3)三种状态
"""

# 开始检索标定
for i in range(end_number - start_number + 1):
with open(labels_path + '%s.txt' % str(start_number + i), 'w') as out_file:
if skip_state == 1:
out_file.write(" ".join(['1', '0', '0']))
elif skip_state == 2:
out_file.write(" ".join(['0', '1', '0']))
elif skip_state == 3:
out_file.write(" ".join(['0', '0', '1']))

if __name__ == '__main__':
labels_path = 'labels/'
start_number = 1501
end_number = 1675
skip_state = 1
image_tag(labels_path, start_number, end_number, skip_state)

因为训练时对应的图片的输出值是从txt文件里获取得到的,所以每张图片都制作一个txt文件来存放标定的数据。这里为了标定数据简单,就将图片和标定的数据的文件名按照数字的序号来写。

写训练集

这部主要是将需要训练的txt文件名(不带后缀)放入训练文档中。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def write_data(file_path, file_type, number):
"""
功能:将需要训练的数据的文件名写入文档中
参数:
file_path:训练文件名集存放的路径
file_type:文件类型,主要有训练和测试两种
number:需要用到的文件个数
"""
with open(file_path + '%s.txt' % (file_type), 'w') as out_file:
for i in range(number):
out_file.write(str(i + 1) + '\n')

if __name__ == '__main__':
file_path = 'ImageSets/Main/'
file_type = 'train'
number = 1675
write_data(file_path, file_type, number)

设置训练数据

在数据准备阶段先设置需要训练的数据,主要目的是通过pytorch来对训练集进行封装。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import cv2
import numpy as np
from torch.utils.data import Dataset

class dataset(Dataset):
def __init__(self, is_train = True):
# 设置存储着文件名的列表
self.file_names = []

# 如果是要进行训练读取训练数据文档,否则读取验证数据文档
if is_train:

# 将训练数据的文档名存储到文件列表中,.strip()去除字符串前后的空格或换行符
with open("ImageSets/Main/train.txt", 'r') as f:
self.file_names = [x.strip() for x in f]
else:

# 将验证数据的文档名存储到文件列表中
with open("ImageSets/Main/val.txt", 'r') as f:
self.file_names = [x.strip() for x in f]

# 图片存储路径
self.img_path = "image/"

# label数据文档存储数据
self.label_path = "labels/"

def __len__(self):
'''
功能:返回文件名的个数
'''
# 返回文件名的个数
return len(self.file_names)

def __getitem__(self, item):
'''
功能:对图像数据进行规范化处理
参数:
——picture_index:图片在文件名当中的索引
'''
img = cv2.imread(self.img_path + self.file_names[item] + '.jpg')

# 将图片进行裁剪,h(420~670),w(495~745)
img = img[420:670, 495:745]

# 将原图片中的(h,w,c)转换成(c,w,h)
img = img.transpose(2, 1, 0)

# 将图片进行归一化
img = img / 255.

with open(self.label_path + self.file_names[item] + ".txt") as f:

# 将.txt文档中的内容放入列表中
line = [x.split() for x in f]
label = [float(x) for y in line for x in y]

labels = np.zeros((3))

labels[0:3] = np.array([label[0], label[1], label[2]])

return img, labels

设计网络

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import torch
import torch.nn as nn

class net(nn.Module):
def __init__(self):
super(net, self).__init__()

self.Conv_layers1 = nn.Sequential(
nn.Conv2d(3, 128, 12, 2),
nn.LeakyReLU(),
nn.MaxPool2d(2, 2)
)

self.Conv_layers2 = nn.Sequential(
nn.Conv2d(128, 256, 6, 2),
nn.LeakyReLU(),
nn.MaxPool2d(2, 2)
)

self.Conn_layers1 = nn.Sequential(
nn.Linear(14 * 14 * 256, 1000),
nn.LeakyReLU()
)

self.Conn_layers2 = nn.Sequential(
nn.Linear(1000, 3),
nn.Sigmoid()
)

def forward(self, input):
input = self.Conv_layers1(input)
input = self.Conv_layers2(input)
input = input.view(input.size()[0], -1)
input = self.Conn_layers1(input)
input = self.Conn_layers2(input)
output = input.reshape(-1, 3)

return output

损失函数

pytorch有自带的损失函数,所以也可以用框架内自带的损失函数。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import torch

def loss_function(predict, labels):
'''
功能:计算损失值
参数:
——predict:net的预测结果(batchsize,5),其中predict为tensor
——labels:样本数据(batchsize,5),其中labels为tensor
返回值:
——loss:平均损失
'''
# 设置初始损失值
loss = 0.

# 获取batchsize
batch = labels.size()[0]

# 计算每个批次的总损失
for batch_size in range(batch):
loss = loss + torch.sum((predict[batch_size, 0:3] - labels[batch_size, 0:3]) ** 2)

# 求平均损失
loss = loss / batch

return loss

训练网络

这里我采用的是GPU来训练网络,如果用CPU可以将.cuda()去掉。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import torch
import loss_function as LF
import dataset as D
import net as N
from torch.utils.data import Dataset, DataLoader

# 设置迭代次数
epoch = 10

# 设置批量的大小
batch_size = 10

# 设置学习率
lr = 0.01

# 加载数据
train_data = D.dataset(is_train = True)
data = DataLoader(train_data, batch_size = batch_size, shuffle = True)

# 初始化一个网络对象
net = N.net().cuda()

# 初始化优化器
optimizer = torch.optim.SGD(net.parameters(), lr = lr, momentum = 0.9, weight_decay = 0.0005)

# 进行迭代训练
for e in range(epoch):
for i, (inputs, labels) in enumerate(data):
inputs = inputs.float().cuda()
labels = labels.float().cuda()
predict = net(inputs)
loss = LF.loss_function(predict, labels)
print("epoch={},i={},loss={}".format(e, i, str(loss)))
optimizer.zero_grad()
loss.backward()
optimizer.step()

# 存储模型
torch.save(net, "weight/weight.pkl")

最后得到的损失如下:

从图中可以看出除了小部分数据有点跳动大部分的损失还是比较低的。

视频预测

将需要预测的视频放入网络中并将状态文本打在视频上。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# 利用训练好的权重导入到网络中并测试视屏效果
# 而且能够在视屏中实时用绿线表示出显示后的效果
import cv2
import torch
import numpy as np
import net as N

def predict_video(video_path, result_video_path):
"""
功能:对视频进行预测并输出
参数:
video_path:要进行预测的视频路径
result_video_path:预测结果的视频存放的路径
"""

# 设置批次数,为了能讲数据放入训练网络中
batch_size = 1

# 初始化文件名
video = "first_video.mp4"
result_video = "result_first_video_fast.mp4"

# 加载权重
net = torch.load("weight/weight.pkl")

# 读取视屏
cap = cv2.VideoCapture(video_path + video)

# 获取视屏帧率
fps_video = cap.get(cv2.CAP_PROP_FPS)
print("fps_video={}".format(fps_video))

# 设置写入视屏的编码格式
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
print("fourcc={}".format(fourcc))

# 获取视屏宽度和高度
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
print("frame_width={},frame_height{}".format(frame_width, frame_height))

# 设置写视屏的对象
videoWriter = cv2.VideoWriter(result_video_path + result_video, fourcc, fps_video, (frame_width, frame_height))

while (cap.isOpened()):
ret, frame = cap.read()

# 如果视屏没有结束
if ret == True:
# 将图片进行剪切
inputs = frame[420:670, 495:745]
inputs = inputs.transpose(2, 1, 0)
inputs = inputs / 255.0

inputs = np.array([inputs])

# 进行输入数据类型转换
inputs = torch.from_numpy(inputs)
inputs = inputs.float().cuda()

# 得到预测数据
predict = net(inputs)

# 取最大值索引
predict = torch.argmax(predict)

# 判断预测的值并给文本变量赋予相应的状态
if predict == 0:
text = "The skip is not in position!"
elif predict == 1:
text = "The skip is discharging coal!"
elif predict == 2:
text = "The skip is in position!"

# 在图片上显示红色文本
cv2.putText(frame, text, (200, 100), cv2.FONT_HERSHEY_COMPLEX, 2.0, (0, 0, 255), 5)

# 在图片中显示出绿色的矩形框
cv2.rectangle(frame, (495, 420), (745, 670), (0, 255, 0), 2, 4)

# 显示视频
cv2.imshow('Frame',frame)

# 刷新视频
cv2.waitKey(10)

# 按q键退出
if cv2.waitKey(25) & 0xFF == ord('q'):
break

# 将图片写入视屏
videoWriter.write(frame)
else:
# 写入视屏结束
videoWriter.release()
break

最后处理好的结果如下所示:
箕斗不在位:

箕斗卸煤:

箕斗在位: