#%% from __future__ import absolute_import, division, print_function, unicode_literals import pathlib import os os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2' import matplotlib.pyplot as plt import pandas as pd import seaborn as sns import tensorflow as tf from tensorflow import keras from tensorflow.keras import layers, losses print(tf.__version__) # 在线下载汽车效能数据集 dataset_path = keras.utils.get_file("auto-mpg.data", "http://archive.ics.uci.edu/ml/machine-learning-databases/auto-mpg/auto-mpg.data") # 效能(公里数每加仑),气缸数,排量,马力,重量 # 加速度,型号年份,产地 column_names = ['MPG','Cylinders','Displacement','Horsepower','Weight', 'Acceleration', 'Model Year', 'Origin'] raw_dataset = pd.read_csv(dataset_path, names=column_names, na_values = "?", comment='\t', sep=" ", skipinitialspace=True) dataset = raw_dataset.copy() # 查看部分数据 dataset.tail() dataset.head() dataset #%% #%% # 统计空白数据,并清除 dataset.isna().sum() dataset = dataset.dropna() dataset.isna().sum() dataset #%% # 处理类别型数据,其中origin列代表了类别1,2,3,分布代表产地:美国、欧洲、日本 # 其弹出这一列 origin = dataset.pop('Origin') # 根据origin列来写入新列 dataset['USA'] = (origin == 1)*1.0 dataset['Europe'] = (origin == 2)*1.0 dataset['Japan'] = (origin == 3)*1.0 dataset.tail() # 切分为训练集和测试集 train_dataset = dataset.sample(frac=0.8,random_state=0) test_dataset = dataset.drop(train_dataset.index) #%% 统计数据 sns.pairplot(train_dataset[["Cylinders", "Displacement", "Weight", "MPG"]], diag_kind="kde") #%% # 查看训练集的输入X的统计数据 train_stats = train_dataset.describe() train_stats.pop("MPG") train_stats = train_stats.transpose() train_stats # 移动MPG油耗效能这一列为真实标签Y train_labels = train_dataset.pop('MPG') test_labels = test_dataset.pop('MPG') # 标准化数据 def norm(x): return (x - train_stats['mean']) / train_stats['std'] normed_train_data = norm(train_dataset) normed_test_data = norm(test_dataset) #%% print(normed_train_data.shape,train_labels.shape) print(normed_test_data.shape, test_labels.shape) #%% class Network(keras.Model): # 回归网络 def __init__(self): super(Network, self).__init__() # 创建3个全连接层 self.fc1 = layers.Dense(64, activation='relu') self.fc2 = layers.Dense(64, activation='relu') self.fc3 = layers.Dense(1) def call(self, inputs, training=None, mask=None): # 依次通过3个全连接层 x = self.fc1(inputs) x = self.fc2(x) x = self.fc3(x) return x model = Network() model.build(input_shape=(None, 9)) model.summary() optimizer = tf.keras.optimizers.RMSprop(0.001) train_db = tf.data.Dataset.from_tensor_slices((normed_train_data.values, train_labels.values)) train_db = train_db.shuffle(100).batch(32) # # 未训练时测试 # example_batch = normed_train_data[:10] # example_result = model.predict(example_batch) # example_result train_mae_losses = [] test_mae_losses = [] for epoch in range(200): for step, (x,y) in enumerate(train_db): with tf.GradientTape() as tape: out = model(x) loss = tf.reduce_mean(losses.MSE(y, out)) mae_loss = tf.reduce_mean(losses.MAE(y, out)) if step % 10 == 0: print(epoch, step, float(loss)) grads = tape.gradient(loss, model.trainable_variables) optimizer.apply_gradients(zip(grads, model.trainable_variables)) train_mae_losses.append(float(mae_loss)) out = model(tf.constant(normed_test_data.values)) test_mae_losses.append(tf.reduce_mean(losses.MAE(test_labels, out))) plt.figure() plt.xlabel('Epoch') plt.ylabel('MAE') plt.plot(train_mae_losses, label='Train') plt.plot(test_mae_losses, label='Test') plt.legend() # plt.ylim([0,10]) plt.legend() plt.savefig('auto.svg') plt.show() #%%