DL-TensorFlow基础实践

简介

本文内容主要来源于在学习 TensorFlow 入门过程中的实践总结项目,内容主要包含以下实战项目:

  • 线性模型实战
  • 前向传播算法实践
  • AUTO-MPG 汽车油耗预测
  • 线性分类实战
  • MNIST 手写数字数据集分类

线性模型实战

本实战项目目的在于了解优化 wb 的梯度下降算法,数据采样自来自真实模型的多组数据,从指定的 w=1.477b=0.089 的模型中直接采样:y = 1.477 * x + 0.089

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
import numpy as np

# 采样自均值为0,标准值为0.01 的高斯分布:y = 1.477 * x + 0.089 + e
def load_data():
data = []
for i in range(100): # 采样 100 个点
x = np.random.uniform(-10., 10.)
eps = np.random.normal(0., 0.01) # 采用高斯噪声
y = 1.477 * x + 0.089 + eps
data.append([x, y])
data = np.array(data)
return data

# 循环计算在每个点[x, y] 处的预测值与真实值之间差的平方并累加,从而获得训练集上的均方误差损失值
def mse(w, b, points):
totalError = 0
for i in range(0, len(points)):
x = points[i, 0]
y = points[i, 1]
totalError += (y - (w * x + b)) ** 2
return totalError / float(len(points))

# 计算梯度
def step_gradient(w_curr, b_curr, points, lr):
b_gradient = 0
w_gradient = 0
M = float(len(points))
for i in range(0, len(points)):
x = points[i, 0]
y = points[i, 1]
b_gradient += (2/M) * ((w_curr * x + b_curr) - y)
w_gradient += (2/M) * x * ((w_curr * x + b_curr) - y)
new_w = w_curr - (lr * w_gradient)
new_b = b_curr - (lr * b_gradient)
return [new_w, new_b]

# 在计算出误差函数在 w 和 b 的梯度之后,根据对所有训练样本再训练来更新 w 和 b
def gradient_descent(points, starting_w, starting_b, lr, num_iter):
w = starting_w
b = starting_b
for step in range(num_iter):
w, b = step_gradient(w, b, np.array(points), lr)
loss = mse(b, w, points)
if step % 50 == 0:
print(step, loss, w, b)
return [b, w]

def main():
lr = 0.01 # 学习率
init_b = 0
init_w = 0
num_iterations = 1000
# 1. 采集数据
data = load_data()

# 2. 训练1000 次,返回最优 w 和 b 和训练下降 loss 的过程
[w, b] = gradient_descent(data, init_b, init_w, lr, num_iterations)

# 3. 计算最优解 w 和 b 的均方差 mse
loss = mse(w, b, data)
print('final', loss, w, b)

if __name__ == '__main__':
main()

1
根据输出可以看到最终的 wb 已经无限逼近采样数据模型。


前向传播算法实战

本章节的目标是利用 TensorFlow 的基础数据结构实现解析 MNIST 手写数字数据集问题,与神经网络的计算步骤一致,实现思路如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# 采用的数据集是 MNIST 手写图片集,输入点数为 784,第一层的输入节点为256,第二层的输入节点为128,第三层的输入节点为10,即样本的属于10类别的概率。

# 第一层的参数
w1 = tf.Variable(tf.random.truncated_normal([784, 256]), stddev=0.1)
b1 = tf.Variable(tf.zeros([256]))
# 第二层的参数
w2 = tf.Variable(tf.random.truncated_normal([256, 128]), stddev=0.1)
b2 = tf.Variable(tf.zeros([128]))
# 第三层的参数
w3 = tf.Variable(tf.random.truncated_normal([128, 10]), stddev=0.1)
b3 = tf.Variable(tf.zeros([10]))

# 在前向计算前,将 shape 为 [b,28,28] 的输入张量调整为 [b,784]
x = tf.reshape(x, [-1, 28*28])
y_onehot = tf.one_hot(y, depth=10)

with tf.gradientTape() as tape:
# 完成第一层的计算
h1 = x@w1 + tf.boradcast_to(b1, [x.shape[0], 256])
# 通过激活函数
h1 = tf.nn.relu(h1)
# 第二层计算
h2 = h1@w2 + b2
h2 = tf.nn.relu(h2)

# 输出层计算
out = h2@w3 + b3

# 将真实的标量 y 转换为 one-hot 编码,并计算与 out 的均方差
loss = tf.square(y_onehot - out)
# 误差标量
loss = tf.reduce_mean(loss)

# 通过 tape.gradient() 函数可以获得网络参数到梯度信息,结果保存到 grads 列表中
grads = tape.gradient(loss, [w1, b1, w2, b2, w3, b3])
w1.assign_sub(lr * grads[0])
b1.assign_sub(lr * grads[1])
w2.assign_sub(lr * grads[2])
b2.assign_sub(lr * grads[3])
w3.assign_sub(lr * grads[4])
b3.assign_sub(lr * grads[5]) # 其中的 assign_sub() 将自身减去给定的参数值,实现参数的原地更新

AUTO-MPG 汽车油耗

本节将利用全连接网络来训练模型完成预测汽车的效能指标 MPGMile Per Gallon,每加仑燃油英里数)。

引入依赖

1
2
3
4
5
from tensorflow import keras
import pandas as pd
from tensorflow.keras import optimizers, losses
import matplotlib.pyplot as plt
import seaborn as sns

加载数据

1
2
3
4
5
# 1. 读取数据 http://archive.ics.uci.edu/ml/machine-learning-databases/auto-mpg/auto-mpg.data
column_names = ['MPG', 'Cylinders', 'Displacement', 'horsepower', 'Weight', 'Acceleration', 'Model Year', 'Origin']
raw_dataset = pd.read_csv('auto-mpg.data', names=column_names, na_values='?', comment='\t', sep=' ', skipinitialspace=True)
dataset = raw_dataset.copy()
dataset.head()

2

数据预处理

1
2
3
4
5
6
7
8
# 2. 删除空白数据
dataset = dataset.dropna()

# 3. 预处理数据
origin = dataset.pop('Origin')
dataset['USA'] = (origin == 1) * 1.0
dataset['Europe'] = (origin == 2) * 1.0
dataset['Japan'] = (origin == 3) * 1.0

处理模型数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 4. 切分训练集和测试集
train_data = dataset.sample(frac=0.8, random_state=0)
test_data = dataset.drop(train_data.index)
train_label = train_data.pop('MPG')
test_label = test_data.pop('MPG')

# 4. 标准化数据
def norm(x):
return (x - train_stats['mean']) / train_stats['std']
norm_train_data = norm(train_data)
norm_test_data = norm(test_data)

# 6. 构建训练数据集和测试数据集对象
train_db = tf.data.Dataset.from_tensor_slices((norm_train_data.values, train_label.values))
train_db = train_db.shuffle(100).batch(32)
test_db = tf.data.Dataset.from_tensor_slices((norm_test_data.values, test_label.values))
test_db = test_db.shuffle(100).batch(32)

网络模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# 7. 创建网络
class Network(keras.Model):
# 回归网络模型
def __init__(self):
super(Network, self).__init__()
self.fc1 = layers.Dense(64, activation=tf.nn.relu)
self.fc2 = layers.Dense(64, activation=tf.nn.relu)
self.fc3 = layers.Dense(1)

def call(self, inputs, training=None, mask=None):
x = self.fc1(inputs)
x = self.fc2(x)
x = self.fc3(x)
return x

# 8. 训练模型
model = Network()
model.build(input_shape=(32, 9))
model.summary() # 打印网络信息
optimizer = optimizers.RMSprop(0.001) # 创建优化器并指定学习率

# 9. 数据训练
mse_loss_list = []
mae_loss_list = []
for epoch in range(200):
for step, (x, y) in enumerate(train_db):
with tf.GradientTape() as tape:
out = model(x)
loss = tf.reduce_mean(losses.MSE(y, out))
mae_loss = tf.reduce_mean(losses.MAE(y, out))
if step % 10 == 0:
mse_loss_list.append(loss.numpy())
mae_loss_list.append(mae_loss.numpy())
if step % 100 == 0:
print(epoch, step, float(loss), float(mae_loss))
grads = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(grads, model.trainable_variables))

3

性能指标

1
2
3
4
5
6
# 10. 展示 loss 训练的结果
data = pd.DataFrame()
data['mse_loss'] = mse_loss_list
data['mae_loss'] = mae_loss_list
plt.ylim([0, 100]) # 设置 y 轴范围
sns.lineplot(data=data)

4


反向传播算法实战

误差反向传播算法(Backpropagation, BP是神经网络中的核心算法之一。

利用多层全连接网络的梯度推导结果,直接利用循环计算每一层的梯度,并按照梯度下降算法手动更新。
本次推导使用的梯度传播公式是基于多层全连接网络,只有 Sigmoid 一种激活函数,并且损失函数为均方误差函数的网络类型。

本次将实现一个四层的全连接层网络来完成二分类任务。网络输入节点数为 2,隐藏层的节点数设计为 25, 50, 25,输出层两个节点,分别表示属于类别 1 的概率和类别 2 的概率。

5

引入依赖

1
2
3
4
5
6
import tensorflow as tf
from sklearn.datasets import make_moons
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np

数据集

此处使用 sklearn 库提供的工具生成 2000 个线性不可分的二分类数据集,数据的特征长度为 2,采样数据分布如下所示。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
N_SAMPLES = 3000 # 采样点数
x, y = make_moons(n_samples=N_SAMPLES, noise=0.2, random_state=200)
x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.3, random_state=42) # 按照 7:3 比例划分训练集和测试集
print(x.shape, y.shape)

# 绘制数据集的分布
def make_plot(x, y, plot_name):
plt.figure(figsize=(16, 12))
axes = plt.gca()
plt.title(plot_name, fontsize=20)
axes.set(xlabel='x', ylabel='y')
plt.scatter(x[:, 0], x[:, 1], c=y.ravel(), s=40)

make_plot(x, y, 'data-set')
plt.show()

6

网络层

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# 网络层
class Layer:
def __init__(self, n_input, n_neurons, activation=None, weights=None, bias=None):
self.weights = weights if weights is not None else np.random.randn(n_input, n_neurons) * np.sqrt(1 / n_neurons)
self.bias = bias if bias is not None else np.random.rand(n_neurons) * 0.1
self.activation = activation
self.last_activation = None
self.error = None
self.delta = None

def activate(self, x):
r = np.dot(x, self.weights) + self.bias
self.last_activation = self._apply_activation(r)
return self.last_activation

def _apply_activation(self, r):
if self.activation is None:
return r
elif self.activation == 'relu':
return np.maximun(r, 0)
elif self.activation == 'tanh':
return np.tanh(r)
elif self.activation == 'sigmoid':
return 1 / (1 + np.exp(-r))
return r

def apply_activation_derivative(self, r):
if self.activation is None:
return np.ones_like(r)
# ReLU 函数的导数实现
elif self.activation == 'relu':
grad = np.array(r, copy=True)
grad[r > 0] = 1.
grad[r <= 0] = 0.
return grad
# tanh 函数的导数实现
elif self.activation == 'tanh':
return 1 - r ** 2
# Sigmoid 函数的导数实现
elif self.activation == 'sigmoid':
return r - pow(r, 2)
return r

网络模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# 网络模型
class NeuralNetwork:
def __init__(self):
self._layers = []

def add_layer(self, layer):
self._layers.append(layer)

def feed_forward(self, x):
for layer in self._layers:
x = layer.activate(x)
return x

# 反向传播
def backpropagation(self, x, y, learning_rate):
output = self.feed_forward(x)
for i in reversed(range(len(self._layers))):
layer = self._layers[i]
if layer == self._layers[-1]:
layer.error = y - output
layer.delta = layer.error * layer.apply_activation_derivative(output)
else:
next_layer = self._layers[i+1]
layer.error = np.dot(next_layer.weights, next_layer.delta)
layer.delta = layer.error * layer.apply_activation_derivative(layer.last_activation)

for i in range(len(self._layers)):
layer = self._layers[i]
o_i = np.atleast_2d(x if i == 0 else self._layers[i-1].last_activation)
layer.weights += layer.delta * o_i.T * learning_rate

# 网络训练
def train(self, x_train, y_train, x_test, y_test, learning_rate, max_epochs):
y_onehot = np.zeros((y_train.shape[0], 2))
y_onehot[np.arange(y_train.shape[0]), y_train] = 1

mses = []
accs = []
for i in range(max_epochs):
for j in range(len(x_train)):
self.backpropagation(x_train[j], y_onehot[j], learning_rate)

if i % 10 == 0:
mse = np.mean(np.square(y_onehot - self.feed_forward(x_train)))
mses.append(mse)
print(i, float(mse))
acc = self.accuracy(self.predict(x_test), y_test.flatten()) * 100
accs.append(acc)
print('acc', acc)
return mses, accs

def accuracy(self, y_output, y_test):
return np.mean(np.argmax(y_output, axis=1) == y_test)

def predict(self, x_test):
return self.feed_forward(x_test)

# 模型全连接层
nn = NeuralNetwork()
nn.add_layer(Layer(2, 25, 'sigmoid'))
nn.add_layer(Layer(25, 50, 'sigmoid'))
nn.add_layer(Layer(50, 25, 'sigmoid'))
nn.add_layer(Layer(25, 2, 'sigmoid'))

网络训练

1
2
# 模型训练
mses, accs = nn.train(x_train, y_train, x_test, y_test, 0.001, 300) # learning_rate 学习率为 0.001

网络性能

1
2
3
4
5
6
7
8
9
10
11
12
# mse 数据图标展示
plt.figure()
plt.plot(mses)
plt.title('mse')
plt.show()

# acc 数据图表展示
plt.figure()
plt.plot(accs)
plt.ylim(0, 100)
plt.title('acc')
plt.show()

7
8


MNIST 数据集

经典机器学习入门实践。

引入依赖

1
2
3
import  tensorflow as tf
from tensorflow import keras
from tensorflow.keras import datasets

加载数据

1
2
3
4
5
6
7
8
9
(x, y), (x_test, y_test) = datasets.mnist.load_data() # x: [60k, 28, 28], [10, 28, 28], y: [60k], [10k]
x = tf.convert_to_tensor(x, dtype=tf.float32) / 255. # x: [0~255] => [0~1.]
y = tf.convert_to_tensor(y, dtype=tf.int32)
x_test = tf.convert_to_tensor(x_test, dtype=tf.float32) / 255.
y_test = tf.convert_to_tensor(y_test, dtype=tf.int32)

print(x.shape, y.shape, x.dtype, y.dtype)
print(tf.reduce_min(x), tf.reduce_max(x))
print(tf.reduce_min(y), tf.reduce_max(y))

9

处理数据

1
2
3
4
5
train_db = tf.data.Dataset.from_tensor_slices((x,y)).batch(128)
test_db = tf.data.Dataset.from_tensor_slices((x_test,y_test)).batch(128)
train_iter = iter(train_db)
sample = next(train_iter)
print('batch:', sample[0].shape, sample[1].shape)

10

搭建模型逻辑

1
2
3
4
5
6
7
8
9
# [b, 784] => [b, 256] => [b, 128] => [b, 10]
w1 = tf.Variable(tf.random.truncated_normal([784, 256], stddev=0.1)) # [dim_in, dim_out], [dim_out]
b1 = tf.Variable(tf.zeros([256]))
w2 = tf.Variable(tf.random.truncated_normal([256, 128], stddev=0.1))
b2 = tf.Variable(tf.zeros([128]))
w3 = tf.Variable(tf.random.truncated_normal([128, 10], stddev=0.1))
b3 = tf.Variable(tf.zeros([10]))

lr = 1e-3 # 学习率

训练模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
for epoch in range(100): # iterate db for 10
for step, (x, y) in enumerate(train_db): # for every batch
# [b, 28, 28] => [b, 28*28]
x = tf.reshape(x, [-1, 28*28])

with tf.GradientTape() as tape: # tf.Variable
# x: [b, 28*28]
# [b, 784]@[784, 256] + [256] => [b, 256] + [256] => [b, 256] + [b, 256]
h1 = x@w1 + tf.broadcast_to(b1, [x.shape[0], 256])
h1 = tf.nn.relu(h1)
# [b, 256] => [b, 128]
h2 = h1@w2 + b2
h2 = tf.nn.relu(h2)
# [b, 128] => [b, 10]
out = h2@w3 + b3

# compute loss
y_onehot = tf.one_hot(y, depth=10) # [b] => [b, 10]

# mse = mean(sum(y-out)^2)
loss = tf.square(y_onehot - out) # [b, 10]
loss = tf.reduce_mean(loss) # mean: scalar

# compute gradients
grads = tape.gradient(loss, [w1, b1, w2, b2, w3, b3])
# w1 = w1 - lr * w1_grad
w1.assign_sub(lr * grads[0])
b1.assign_sub(lr * grads[1])
w2.assign_sub(lr * grads[2])
b2.assign_sub(lr * grads[3])
w3.assign_sub(lr * grads[4])
b3.assign_sub(lr * grads[5])

if step % 100 == 0:
print(epoch, step, 'loss:', float(loss))

# test/evluation
# [w1, b1, w2, b2, w3, b3]
total_correct, total_num = 0, 0
for step, (x,y) in enumerate(test_db):

# [b, 28, 28] => [b, 28*28]
x = tf.reshape(x, [-1, 28*28])

# [b, 784] => [b, 256] => [b, 128] => [b, 10]
h1 = tf.nn.relu(x@w1 + b1)
h2 = tf.nn.relu(h1@w2 + b2)
out = h2@w3 +b3

# out: [b, 10] ~ R
# prob: [b, 10] ~ [0, 1]
prob = tf.nn.softmax(out, axis=1)
pred = tf.argmax(prob, axis=1) # [b, 10] => [b]
pred = tf.cast(pred, dtype=tf.int32)
correct = tf.cast(tf.equal(pred, y), dtype=tf.int32) # y: [b] [b], int32
correct = tf.reduce_sum(correct)

total_correct += int(correct)
total_num += x.shape[0]

acc = total_correct / total_num
print('test acc:', acc)

11

模型验证

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 验证测试集
not_equal_count = 0
start = 10
stop = 20
for i in range(start, stop):
x = x_test[i]
y = y_test[i]
x = tf.reshape(x, [-1, 28*28])

h1 = tf.nn.relu(x@w1 + b1)
h2 = tf.nn.relu(h1@w2 + b2)
out = h2@w3 +b3

prob = tf.nn.softmax(out, axis=1)
pred = tf.argmax(prob, axis=1) # [b, 10] => [b]
print(i, y.numpy(), pred[0].numpy())
print('-'*30)

12


在上述的实例中出现很多第一次出现的内容,这些内容都是卷积神经网络 CNN 中的网络层,不过不用担心,后续会出一篇新的文章去讲卷积神经网络,敬请期待!


总结

俗话说:“实践是检验真理的唯一标准”,而对于机器学习则更加要多多实践!


引用


个人备注

此博客内容均为作者学习《TensorFlow深度学习》所做笔记,侵删!
若转作其他用途,请注明来源!