This blog discusses how to calculate Mahalanobis distance using tensorflow. I will consider full variance approach, i.e., each cluster has its own general covariance matrix, so I do not assume common variance accross clusters unlike the previous post. Calculation of Mahalanobis distance is important for classification when each cluster has different covariance structure. Let's take a lookt at this situation using toy data.
Bonus: This blog post goes over how to use tf.while_loop
Example Data¶
In the following toy data, I generate 60 samples from 2-d Gaussian mixture model with three components: 20 samples from each of a 2-d gaussian. Notice that each gaussian distribution has different variance matrix.
import numpy as np
np.random.seed(0)
import matplotlib.pyplot as plt
N_CLUSTER = 3
N_FEATURES = 2
_N = 20
N_SAMPLE = _N * N_CLUSTER
S1 = np.identity(N_FEATURES)
S1[0,1] = S1[1,0] = 0
S2 = np.identity(N_FEATURES)*0.1
S3 = np.identity(N_FEATURES)*3
S3[0,1] = S3[1,0] = -2.8
mu1 = np.array([1, 2])
mu2 = np.array([-2,-2])
mu3 = np.array([-3, 2])
npMEANS = np.array([mu1,mu2,mu3])
npSIGMAS = np.array([S1,S2,S3])
colors = np.array(["red","green","blue"])
npX = []
plt.figure(figsize=(5,5))
for icluster in range(N_CLUSTER):
color = colors[icluster]
npmean = npMEANS[icluster]
s_cluster = np.random.multivariate_normal(npmean,npSIGMAS[icluster],_N)
npX.extend(s_cluster)
plt.plot(npmean[0],npmean[1],"X",color=color)
plt.plot(s_cluster[:,0],s_cluster[:,1],"p",alpha=0.3,color=color,label="cluster={}".format(icluster))
plt.title("Sample distribution from {} clusters".format(N_CLUSTER))
plt.legend()
plt.show()
GT = [0 for _ in range(_N)] + [1 for _ in range(_N)] + [2 for _ in range(_N)]
npX = np.array(npX)
print("npX: Data Dimension = (N_SAMPLE,N_FEATURES) = {}".format(npX.shape))
print("npMEANS: Data Dimension = (N_CLUSTER,N_FEATURES) = {}".format(npMEANS.shape))
print("npSIGMAS: Data Dimension = (N_CLUSTER,N_FEATURES,N_FEATURES) = {}".format(npSIGMAS.shape))
for icluster in range(N_CLUSTER):
print("\n***CLUSTER={}***".format(icluster))
print(">>>MEAN")
print(npMEANS[icluster])
print(">>>SIGMA")
print(npSIGMAS[icluster])
In the plot above, the samples are colored with ground truth clusters. Due to the difference in variance matrix, the shape of each cluster is different!
- red cluster: circle
- blue cluster: high negatice correlation
- green cluster: circle, small variance
If you use Euclidean distance, covariance matrix of each cluster is implicitly assumed to be the same (as an identity matrix) so many blue points in the bottom of the blue clusters are classified into the red cluster.
The script below shows this issue.
Euclidean distance based classification (numpy)¶
The classification accuracy is 0.9.
from scipy.spatial import distance
import scipy
npEuclidean = [[0 for _ in range(npMEANS.shape[0])]
for _ in range(npX.shape[0])]
for isample in range(npX.shape[0]):
for icluster in range(npMEANS.shape[0]):
npEuclidean[isample][icluster] = distance.euclidean(npX[isample],npMEANS[icluster])
npEuclidean= np.array(npEuclidean)
pred = npEuclidean.argmin(axis=1)
plt.figure(figsize=(5,5))
plt.scatter(npX[:,0],npX[:,1],c=colors[pred])
plt.title("Classification using Euclidean distance acc={}".format(np.mean(GT == pred)))
plt.show()
Mahalanobis distance based classification (numpy)¶
If you use mahalanobis distance to calcualte the distance between samples and cluster centers, classification performance improves to 0.98.
from scipy.spatial import distance
import scipy
npMAHALANOBIS = [[0 for _ in range(npMEANS.shape[0])]
for _ in range(npX.shape[0])]
for isample in range(npX.shape[0]):
for icluster in range(npMEANS.shape[0]):
npMAHALANOBIS[isample][icluster] = distance.mahalanobis(npX[isample],npMEANS[icluster],
VI=scipy.linalg.pinv(npSIGMAS[icluster]))
npMAHALANOBIS = np.array(npMAHALANOBIS)
pred = npMAHALANOBIS.argmin(axis=1)
plt.figure(figsize=(5,5))
plt.scatter(npX[:,0],npX[:,1],c=colors[pred])
plt.title("Classification using Mahalanobis distance, acc={:3.2f}".format(np.mean(GT == pred)))
plt.show()
print("npMAHALANOBIS")
npMAHALANOBIS
I hope that you are convinced that Mahalanobis distance is more preferable especially when cluster shapes are different. Now, we will calculate the Mahalanobis distance with tensorflow!
Tensorflow calculation¶
Previously, Calculate Mahalanobis distance with tensorflow 2.0 discussed how to utilize Euclidean distance function to compute Mahalanobis distance. The idea was to first decompose the inverse of variance matrix by Cholesky decomposition and standardize the samples. We will use codes from there.
The following code calculates the Euclidean distances between two groups.
import tensorflow as tf
def Euclidean(A,B):
v = tf.expand_dims(tf.reduce_sum(tf.square(A), 1), 1)
p1 = tf.reshape(tf.reduce_sum(v,axis=1),(-1,1))
v = tf.reshape(tf.reduce_sum(tf.square(B), 1), shape=[-1, 1])
p2 = tf.transpose(tf.reshape(tf.reduce_sum(v,axis=1),(-1,1)))
res = tf.sqrt(tf.add(p1, p2) - 2 * tf.matmul(A, B, transpose_b=True))
return(res)
## Sanity check
npA = np.random.random((N_SAMPLE,N_FEATURES))
npB = np.random.random((N_CLUSTER,N_FEATURES))
tfA = tf.constant(npA,dtype="float32")
tfB = tf.constant(npB,dtype="float32")
with tf.Session() as sess:
tfE = sess.run(Euclidean(tfA,tfB))
assert tfE.shape == (N_SAMPLE,N_CLUSTER)
from scipy.spatial.distance import cdist
print("Average MSE={}".format(np.mean((tfE-cdist(npA,npB,metric="euclidean"))**2)))
Convert numpy array to tensor objects¶
.
I will keep:
- tfSinv_half: Cholesky decomposition of the inverse of the cluster-covariance matrix
- tfMEANS: standardized cluster means. Standardization is done by the cholesky decomposition of the inverse of the cluster-covariance matrix
- tfX: sampled ata
def get_mu1_Sinv_half(mu1,S1):
Sinv = np.linalg.pinv(S1)
#Sinv_half = np.linalg.cholesky(Sinv + 0.01*np.identity(Sinv.shape[0]))
Sinv_half = np.linalg.cholesky(Sinv)
mu1_stand = np.matmul(mu1,Sinv_half)
return(mu1_stand,Sinv_half)
npMEANS_stand = []
npSinv_half = []
for icluster in range(N_CLUSTER):
mu_stand,Sinv_half = get_mu1_Sinv_half(npMEANS[icluster], # (N_FEATURES,)
npSIGMAS[icluster]) # (N_FEATURES,N_FEATURES)
npMEANS_stand.append(mu_stand)
npSinv_half.append(Sinv_half)
npMEANS_stand = np.array(npMEANS_stand)
npSinv_half = np.array(npSinv_half)
tfSinv_half = tf.constant(npSinv_half,dtype="float32")
print("tfSinv_half:",tfSinv_half.get_shape())
tfMEANS = tf.constant(npMEANS_stand,dtype="float32")
print("tfMEANS:",tfMEANS.get_shape())
tfX = tf.constant(npX,dtype="float32")
print("tfX:",tfX.get_shape())
Example of tf.while_loop: Simple looping with indexing within the loop¶
I will use tf.while_loop to calcualte the Mahalanobis distance. Here I shows the example usage of the tf.while_loop.
The following code simply slice each standardized cluster mean 3 times and return the final standardized cluster mean.
def body(i,x):
x = tfMEANS[i]
i += 1
return(i,x)
def condition(i,x):
return i < N_CLUSTER
x = tf.ones(N_FEATURES,dtype="float32")
i = tf.constant(0,dtype="int32")
# while_loop must be instantiated AFTER all the global variables are defined
print("Looping N_CLUSTER={}".format(N_CLUSTER))
wl = tf.while_loop(condition,
body,
loop_vars=[i,x])
with tf.Session() as sess:
print(sess.run(wl))
Looping with Euclidean distance function¶
You can modify the code above to compute the Mahalanobis distance between the first sample and each cluster center. The code below returns the Mahalanobis distance between the first sample and the final cluster.
import sys
def body(i,out):
mu_stand = tf.reshape(tfMEANS[i],(1,-1)) ## (n_feat)
Sinv_half = tfSinv_half[i] ## (n_feat, n_feat)
X_stand = tf.matmul(tfXi,Sinv_half)
out = Euclidean(X_stand,mu_stand)
i += 1
return(i,out)
def condition(i,x):
return i < N_CLUSTER
INDEX_FIRST_SAMPLE = 0
tfXi = tf.reshape(tfX[INDEX_FIRST_SAMPLE],(1,-1))
print("the dimension of sample:",tfXi.get_shape())
x = tf.ones((1,1),dtype="float32")
i = tf.constant(0,dtype="int32")
# while_loop must be instantiated AFTER all the global variables are defined
wl = tf.while_loop(condition,
body,
loop_vars=[i,x])
with tf.Session() as sess:
print(sess.run(wl))
print("Mahalanobis distance between the 3rd cluster center and the first cluster mean (numpy) {}".format(npMAHALANOBIS[0,-1]))
Mahalanobis distance calculation¶
Modify the function above to finally obtain the Mahalanobis distances between ALL samples and ALL cluster centers.
Please see the example in tf.while_loop to understand how to handle a shape invariance of the looping variables.
def EfficientMahalanobis(tfX,tfMEANS,tfSinv_half):
"""
tfX : (N_SAMPLE, N_FEATURES)
tfSinv_half : (N_CLUSTER, N_FEATURES,N_FEATURES)
tfMEANS : (N_CLUSTER, N_FEATURES)
Global variables need to be defined.
N_CLUSTER
OUTPUT:
Mdist : (N_SAMPLE,N_CLUSTER)
"""
def Euclidean(A,B):
v = tf.expand_dims(tf.reduce_sum(tf.square(A), 1), 1)
p1 = tf.reshape(tf.reduce_sum(v,axis=1),(-1,1))
v = tf.reshape(tf.reduce_sum(tf.square(B), 1), shape=[-1, 1])
p2 = tf.transpose(tf.reshape(tf.reduce_sum(v,axis=1),(-1,1)))
res = tf.sqrt(tf.add(p1, p2) - 2 * tf.matmul(A, B, transpose_b=True))
return(res)
def body(i,out):
mu_stand = tf.reshape(tfMEANS[i],(1,-1)) ## (n_feat)
Sinv_half = tfSinv_half[i] ## (n_feat, n_feat)
X_stand = tf.matmul(tfX,Sinv_half)
euc = Euclidean(X_stand,mu_stand) # (N_SAMPLE,1)
out = tf.cond(tf.equal(i,0), lambda : euc, lambda : tf.concat([out,euc],axis=1))
i += 1
return(i,out)
def condition(i,x):
return i < N_CLUSTER
i = tf.constant(0,dtype="int32")
out = tf.ones((0,0),dtype="float32")
print("out.get_shape()",out)
# while_loop must be instantiated AFTER all the global variables are defined
_ncluster, Mdist = tf.while_loop(condition,
body,
loop_vars=[i,out],
shape_invariants=[i.get_shape(),
tf.TensorShape([None, # N_SAMPLE
None])]) # N_CLUSTER
print("_ncluster ",_ncluster)
print("Mdist ",Mdist)
return(Mdist)
Mdist = EfficientMahalanobis(tfX,tfMEANS,tfSinv_half)
with tf.Session() as sess:
tfMAHALANOBIS = sess.run(Mdist)
print("tfMAHALANOBIS.shape",tfMAHALANOBIS.shape)
tfMAHALANOBIS
print("MSE between the tensorflow and numpy Mahalanobis calculations",np.mean((tfMAHALANOBIS - npMAHALANOBIS)**2))
4. Check its classifiation performance.¶
pred = tfMAHALANOBIS.argmin(axis=1)
plt.figure(figsize=(5,5))
plt.scatter(npX[:,0],npX[:,1],c=colors[pred])
plt.title("Classification using Mahalanobis distance, acc={:3.2f}".format(np.mean(GT == pred)))
plt.show()