| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156 |
- #%%
- from __future__ import absolute_import, division, print_function, unicode_literals
- import pathlib
- import os
- import matplotlib.pyplot as plt
- import pandas as pd
- import seaborn as sns
-
- import tensorflow as tf
- from tensorflow import keras
- from tensorflow.keras import layers, losses
- print(tf.__version__)
- os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
- # 在线下载汽车效能数据集
- dataset_path = keras.utils.get_file("auto-mpg.data", "http://archive.ics.uci.edu/ml/machine-learning-databases/auto-mpg/auto-mpg.data")
- # 效能(公里数每加仑),气缸数,排量,马力,重量
- # 加速度,型号年份,产地
- column_names = ['MPG','Cylinders','Displacement','Horsepower','Weight',
- 'Acceleration', 'Model Year', 'Origin']
- raw_dataset = pd.read_csv(dataset_path, names=column_names,
- na_values = "?", comment='\t',
- sep=" ", skipinitialspace=True)
- dataset = raw_dataset.copy()
- # 查看部分数据
- dataset.tail()
- dataset.head()
- dataset
- #%%
- #%%
- # 统计空白数据,并清除
- dataset.isna().sum()
- dataset = dataset.dropna()
- dataset.isna().sum()
- dataset
- #%%
- # 处理类别型数据,其中origin列代表了类别1,2,3,分布代表产地:美国、欧洲、日本
- # 其弹出这一列
- origin = dataset.pop('Origin')
- # 根据origin列来写入新列
- dataset['USA'] = (origin == 1)*1.0
- dataset['Europe'] = (origin == 2)*1.0
- dataset['Japan'] = (origin == 3)*1.0
- dataset.tail()
- # 切分为训练集和测试集
- train_dataset = dataset.sample(frac=0.8,random_state=0)
- test_dataset = dataset.drop(train_dataset.index)
- #%% 统计数据
- sns.pairplot(train_dataset[["Cylinders", "Displacement", "Weight", "MPG"]],
- diag_kind="kde")
- #%%
- # 查看训练集的输入X的统计数据
- train_stats = train_dataset.describe()
- train_stats.pop("MPG")
- train_stats = train_stats.transpose()
- train_stats
- # 移动MPG油耗效能这一列为真实标签Y
- train_labels = train_dataset.pop('MPG')
- test_labels = test_dataset.pop('MPG')
- # 标准化数据
- def norm(x):
- return (x - train_stats['mean']) / train_stats['std']
- normed_train_data = norm(train_dataset)
- normed_test_data = norm(test_dataset)
- #%%
- print(normed_train_data.shape,train_labels.shape)
- print(normed_test_data.shape, test_labels.shape)
- #%%
- class Network(keras.Model):
- # 回归网络
- def __init__(self):
- super(Network, self).__init__()
- # 创建3个全连接层
- self.fc1 = layers.Dense(64, activation='relu')
- self.fc2 = layers.Dense(64, activation='relu')
- self.fc3 = layers.Dense(1)
- def call(self, inputs, training=None, mask=None):
- # 依次通过3个全连接层
- x = self.fc1(inputs)
- x = self.fc2(x)
- x = self.fc3(x)
- return x
- model = Network()
- model.build(input_shape=(None, 9))
- model.summary()
- optimizer = tf.keras.optimizers.RMSprop(0.001)
- train_db = tf.data.Dataset.from_tensor_slices((normed_train_data.values, train_labels.values))
- train_db = train_db.shuffle(100).batch(32)
- # # 未训练时测试
- # example_batch = normed_train_data[:10]
- # example_result = model.predict(example_batch)
- # example_result
- train_mae_losses = []
- test_mae_losses = []
- for epoch in range(200):
- for step, (x,y) in enumerate(train_db):
- with tf.GradientTape() as tape:
- out = model(x)
- loss = tf.reduce_mean(losses.MSE(y, out))
- mae_loss = tf.reduce_mean(losses.MAE(y, out))
- if step % 10 == 0:
- print(epoch, step, float(loss))
- grads = tape.gradient(loss, model.trainable_variables)
- optimizer.apply_gradients(zip(grads, model.trainable_variables))
- train_mae_losses.append(float(mae_loss))
- out = model(tf.constant(normed_test_data.values))
- test_mae_losses.append(tf.reduce_mean(losses.MAE(test_labels, out)))
- plt.figure()
- plt.xlabel('Epoch')
- plt.ylabel('MAE')
- plt.plot(train_mae_losses, label='Train')
- plt.plot(test_mae_losses, label='Test')
- plt.legend()
-
- # plt.ylim([0,10])
- plt.legend()
- plt.savefig('auto.svg')
- plt.show()
- #%%
|