2019 年 4 月

第 34 卷,第 4 期

[測試回合]

使用 PyTorch 的神經異常偵測

藉由James McCaffrey

James McCaffrey異常偵測,也稱為 [極端值偵測是尋找極少數的項目資料集內的程序。範例包括用來識別伺服器記錄檔中的惡意事件,以及尋找詐騙的線上廣告。

請參閱這篇文章的走向的好方法是要看看的示範程式**[圖 1**。示範會分析已知的修改 National Institute of Standards and technology 和技術 (MNIST) 資料集的 1,000 項目子集。每個資料項目是從 0 到 9 個手寫數字的 28 x 28 灰階映像 (784 像素為單位)。完整的 MNIST 資料集有 60,000 訓練影像和 10,000 測試映像。

使用 Keras MNSIT 映像的異常偵測
[圖 1 MNSIT 映像異常偵測使用 Keras

示範程式會建立並定型 784-100-50-100-784 深度類神經 autoencoder 使用 PyTorch 程式碼程式庫。Autoencoder 是學習來預測其輸入的類神經網路。在訓練之後示範掃描所有 1,000 個映像,並尋找是最異常,其中一個映像最異常表示最高重構錯誤。最異常的數字是看起來可能是八改為三。

本文假設您有中繼或更佳的程式設計技能,使用 C 系列語言和機器學習服務基本的認識,但不會假設您知道任何關於 autoencoders。在這篇文章會示範程式碼。程式碼和資料也會是可在隨附的下載項目。已移除所有一般錯誤檢查要保留的主要概念盡可能清楚。

安裝 PyTorch

PyTorch 是相對較低層級的程式碼程式庫來建立類神經網路。很差相彷彿就功能而言 TensorFlow 與 CNTK。PyTorch 以 c + + 撰寫,但有 Python 語言 API 更容易的程式設計。

安裝 PyTorch 包含兩個主要步驟。首先,您會安裝 Python 和數個必要輔助的套件,例如 NumPy 和 SciPy。然後您可以安裝 PyTorch Python 附加元件套件的形式。雖然您可安裝 Python,而且執行 PyTorch 所需的封裝分開,最好是許多安裝 Python 散發套件,這是集合,包含的基底的 Python 解譯器和與每個相容的其他套件其他。我們的示範中,我安裝 Anaconda3 5.2.0 分佈,其中包含 Python 3.6.5。

安裝 Anaconda 之後, 我去pytorch.org Web 站台,並選取 Windows OS,Pip 安裝程式、 Python 3.6 和沒有 CUDA GPU 版本的選項。此舉可讓我的 URL,指向對應的 (唸成"wheel") 的.whl 檔案下載到我的本機電腦。如果您還不熟悉 Python 生態系統,您可以將 Python.whl 檔案為有點類似 Windows.msi 檔案。在本例中,我可以下載 PyTorch 1.0.0 版。我開啟命令殼層,瀏覽至保存.whl 檔的目錄,並輸入命令:

pip install torch-1.0.0-cp36-cp36m-win_amd64.whl

示範程式

完整的示範程式,以節省空間,少數稍加編輯所示**[圖 2**。縮排兩個空格,而不是一般的四個空格,以節省空間。請注意,Python 會使用"\"字元的行接續符號。我可以使用 [記事本] 來編輯我的程式。我的同事大多偏好較複雜的編輯器中,不過我想殘忍簡單的 「 記事本 」。

[圖 2 異常偵測示範程式

# auto_anom_mnist.py
# PyTorch 1.0.0 Anaconda3 5.2.0 (Python 3.6.5)
# autoencoder anomaly detection on MNIST
import numpy as np
import torch as T
import matplotlib.pyplot as plt
# -----------------------------------------------------------
def display(raw_data_x, raw_data_y, idx):
  label = raw_data_y[idx]  # like '5'
  print("digit/label = ", str(label), "\n")
  pixels = np.array(raw_data_x[idx])  # target row of pixels
  pixels = pixels.reshape((28,28))
  plt.rcParams['toolbar'] = 'None'
  plt.imshow(pixels, cmap=plt.get_cmap('gray_r'))
  plt.show() 
# -----------------------------------------------------------
class Batcher:
  def __init__(self, num_items, batch_size, seed=0):
    self.indices = np.arange(num_items)
    self.num_items = num_items
    self.batch_size = batch_size
    self.rnd = np.random.RandomState(seed)
    self.rnd.shuffle(self.indices)
    self.ptr = 0
  def __iter__(self):
    return self
  def __next__(self):
    if self.ptr + self.batch_size > self.num_items:
      self.rnd.shuffle(self.indices)
      self.ptr = 0
      raise StopIteration  # ugh.
    else:
      result = self.indices[self.ptr:self.ptr+self.batch_size]
      self.ptr += self.batch_size
      return result
# -----------------------------------------------------------
class Net(T.nn.Module):
  def __init__(self):
    super(Net, self).__init__()
    self.layer1 = T.nn.Linear(784, 100)  # hidden 1
    self.layer2 = T.nn.Linear(100, 50)
    self.layer3 = T.nn.Linear(50,100)
    self.layer4 = T.nn.Linear(100, 784)
    T.nn.init.xavier_uniform_(self.layer1.weight)  # glorot
    T.nn.init.zeros_(self.layer1.bias)
    T.nn.init.xavier_uniform_(self.layer2.weight) 
    T.nn.init.zeros_(self.layer2.bias)
    T.nn.init.xavier_uniform_(self.layer3.weight) 
    T.nn.init.zeros_(self.layer3.bias)
    T.nn.init.xavier_uniform_(self.layer4.weight) 
    T.nn.init.zeros_(self.layer4.bias)
  def forward(self, x):
    z = T.tanh(self.layer1(x))
    z = T.tanh(self.layer2(z))
    z = T.tanh(self.layer3(z))
    z = T.tanh(self.layer4(z))  # consider none or sigmoid
    return z
# -----------------------------------------------------------
def main():
  # 0. get started
  print("Begin autoencoder for MNIST anomaly detection")
  T.manual_seed(1)
  np.random.seed(1)
  # 1. load data
  print("Loading MNIST subset data into memory ")
  data_file = ".\\Data\\mnist_pytorch_1000.txt"
  data_x = np.loadtxt(data_file, delimiter=" ",
    usecols=range(2,786), dtype=np.float32)
  labels = np.loadtxt(data_file, delimiter=" ",
    usecols=[0], dtype=np.float32)
  norm_x = data_x / 255
  # 2. create autoencoder model
  net = Net()
  # 3. train autoencoder model
  net = net.train()  # explicitly set
  bat_size = 40
  loss_func = T.nn.MSELoss()
  optimizer = T.optim.Adam(net.parameters(), lr=0.01)
  batcher = Batcher(num_items=len(norm_x),
    batch_size=bat_size, seed=1)
  max_epochs = 100
  print("Starting training")
  for epoch in range(0, max_epochs):
    if epoch > 0 and epoch % (max_epochs/10) == 0:
      print("epoch = %6d" % epoch, end="")
      print("  prev batch loss = %7.4f" % loss_obj.item())
    for curr_bat in batcher:
      X = T.Tensor(norm_x[curr_bat])
      optimizer.zero_grad()
      oupt = net(X)
      loss_obj = loss_func(oupt, X)  # note X not Y
      loss_obj.backward()
      optimizer.step()
  print("Training complete")
  # 4. analyze - find item(s) with large(st) error
  net = net.eval()  # not needed - no dropout
  X = T.Tensor(norm_x)  # all input item as Tensors
  Y = net(X)            # all outputs as Tensors
  N = len(data_x)
  max_se = 0.0; max_ix = 0
  for i in range(N):
    curr_se = T.sum((X[i]-Y[i])*(X[i]-Y[i]))
    if curr_se.item() > max_se:
      max_se = curr_se.item()
      max_ix = i
  raw_data_x = data_x.astype(np.int)
  raw_data_y = labels.astype(np.int)
  print("Highest reconstruction error is index ", max_ix)
  display(raw_data_x, raw_data_y, max_ix)
  print("End autoencoder anomaly detection demo ")
# -----------------------------------------------------------
if __name__ == "__main__":
  main()

示範程式一開始會匯入 NumPy、 PyTorch 和 Matplotlib 封裝。Matplotlib 封裝來以視覺化方式顯示 [最異常模型所找到的數字。匯入整個 PyTorch 套件的替代方式是匯入只是必要的模組,例如,匯入 torch.optim 為選擇加入。

將資料載入記憶體

使用原始的 MNIST 資料是相當困難因為它會儲存在專屬的二進位格式。我撰寫了公用程式來擷取 60,000 訓練項目中的前 1,000 個項目。我會將資料儲存成 mnist_pytorch_1000.txt 資料子目錄中。

產生的資料看起來像這樣:

7 = 0 255 67 . . 123
2 = 113 28 0 . . 206
...
9 = 0 21 110 . . 254

每一行代表一個數字。在每一行的第一個值是數字。第二個值是任意的等號字元只以提高可讀性。下一步] 28 x 28 = 784 值是介於 0 到 255 的灰階像素值。所有的值會以單一的空白空間字元分隔。[圖 3資料處的項目索引 [30] 會顯示在資料檔案中,也就是典型"的 3"的數字。

典型的 MNIST 數字
圖 3 一般 MNIST 數字

資料集載入至記憶體,這些陳述式:

data_file = ".\\Data\\mnist_pytorch_1000.txt"
data_x = np.loadtxt(data_file, delimiter=" ",
  usecols=range(2,786), dtype=np.float32)
labels = np.loadtxt(data_file, delimiter=" ",
  usecols=[0], dtype=np.float32)
norm_x = data_x / 255

請注意,數字/標籤是在資料行零和 784 的像素值位於兩個到 785 的資料行。畢竟 1,000 個映像會載入到記憶體,由每個像素值除以 255,使相應的像素值介於 0.0 到 1.0 之間的所有資料的正規化的版本。

定義 Autoencoder 模型

示範程式定義 784-100-50-100-784 autoencoder。輸入和輸出層 (784) 中的節點數目取決於資料,但隱藏層的數目和每個圖層中的節點數目是必須由試驗和錯誤的超參數。

示範程式會使用程式定義類別,Net,來定義層架構和 autoencoder 的輸入-輸出機制。替代方式是建立 autoencoder 直接使用順序函式,例如:

net = T.nn.Sequential(
  T.nn.Linear(784,100), T.nn.Tanh(),
  T.nn.Linear(100,50), T.nn.Tanh(),
  T.nn.Linear(50,100), T.nn.Tanh(),
  T.nn.Linear(100,784), T.nn.Tanh())

權數初始化演算法 (Glorot 統一),隱藏的層啟用函數 (tanh) 和輸出層啟用函數 (tanh) 是超參數。因為所有的輸入和輸出值是介於 0.0 到 1.0,此問題,羅吉斯 sigmoid 是很好的替代方案,輸出啟用探索。

定型和評估 Autoencoder 模型

示範程式會準備訓練,搭配這些陳述式:

net = net.train()  # explicitly set
bat_size = 40
loss_func = T.nn.MSELoss()
optimizer = T.optim.Adam(net.parameters(), lr=0.01)
batcher = Batcher(num_items=len(norm_x),
  batch_size=bat_size, seed=1)
max_epochs = 100

因為示範 autoencoder 未使用中輟或批次的正規化,不需要明確地將網路設定到定型模式,但在我看來很好的樣式,若要這樣做。最佳化演算法 (Adam)、 初始學習率 (0.01) 和 epoch (100) 的最大數目定型集的批次大小 (40),是所有的超參數。如果您還不熟悉類神經的機器學習服務,您可能會想 「 類神經網路確定有超參數,很多 」,您就會正確。

一次直到已經處理所有的 1,000 個項目 (一個 epoch) 40 隨機的資料項目的索引提供的程式定義的批次處理程式物件。另一個方法是使用 torch.utils.data 模組中的內建的資料集和 DataLoader 物件。

定型程序的結構是:

for epoch in range(0, max_epochs):
  # print loss every 10 epochs
  for curr_bat in batcher:
    X = T.Tensor(norm_x[curr_bat])
    optimizer.zero_grad()
    oupt = net(X)
    loss_obj = loss_func(oupt, X)
    loss_obj.backward()
    optimizer.step()

每個批次項目會建立使用預設的資料類型可做為 torch.float32 Tensor 建構函式。請注意 loss_func 函式會比較計算的輸出的輸入,其中包含定型以預測其輸入的值網路的影響。

定型之後, 您通常會想要儲存在模型中,但這有點超出本文的範圍。PyTorch 文件有良好的範例,示範如何將定型的模型儲存在數個不同的方式。

當使用 autoencoders,在大部分情況下 (包括此範例中) 沒有任何固有定義的模型精確度。您必須決定如何關閉計算的輸出值必須為相關聯的輸入值才會計為正確的預測,並再撰寫程式定義的函數來計算精確度計量。

使用 Autoencoder 模型來尋找異常資料

Autoencoder 模型定型之後,其概念是尋找資料很難正確預測的項目或者同樣地,很難進行重新建構的項目。示範程式碼會掃描所有 1,000 個資料的所有項目,並計算正規化的輸入的值,然後像這樣的計算的輸出值的平方的差異:

net = net.eval()  # not needed - no dropout
X = T.Tensor(norm_x)  # all input item as Tensors
Y = net(X)            # all outputs as Tensors
N = len(data_x)
max_se = 0.0; max_ix = 0
for i in range(N):
  curr_se = T.sum((X[i]-Y[i])*(X[i]-Y[i]))
  if curr_se.item() > max_se:
    max_se = curr_se.item()
    max_ix = i

最大值的平方的誤差 (max_se) 會計算並儲存相關聯的映像 (max_ix) 的索引。尋找具有最大的重構錯誤的單一項目是儲存所有的平方的誤差、 加以排序,並傳回前 n 個項目,n 值取決於特定的問題您的替代項目正在調查。

在找到單一-大多數-異常資料項目之後,它會顯示使用程式定義顯示函數:

raw_data_x = data_x.astype(np.int)
raw_data_y = labels.astype(np.int)
print("Highest reconstruction error is index ", max_ix)
display(raw_data_x, raw_data_y, max_ix)

像素和標籤值會從轉換 float32 類型為 int 大部分的原則視為因為 Matplotlib imshow 函式內程式定義顯示函數可以接受兩種資料類型。

總結

在本文中所述,使用深度類神經的 autoencoder,異常偵測不完善調查的技巧。使用類神經 autoencoder,相較於大部分標準叢集技術的一大優點是類神經的技術,可以編碼該資料來處理非數值資料。大部分叢集技術而定的數字的量值,例如歐幾里得距離,這表示來源資料必須是完全是數字。

異常偵測的相關,但也少探索技術是建立資料集進行調查 autoencoder。然後,而不是使用重構錯誤來尋找異常的資料,您可以叢集化資料使用標準的演算法,例如的 k-means,因為最內層的隱藏的層節點保留每個資料項目的嚴格數值表示法。後叢集,您可以尋找叢集,有極少資料的項目,或尋找最遠距從其叢集距心的群集內的資料項目。這種方法都有類似於類神經的字組內嵌,其中字會轉換成數值的向量,然後可以用來計算文字之間的距離量值的特性。


Dr。James McCaffrey適用於在美國華盛頓州雷德蒙的 Microsoft Research他參與開發數種主要的 Microsoft 產品,包括 Azure、 Bing。Dr。McCaffrey 要聯絡jamccaff@microsoft.com

感謝下列 Microsoft 技術專家檢閱這篇文章:Chris Lee, Ricky Loynd