Source code for timeWarpOB.timeWarpOB

# Using numpy for matrix manipulations
import numpy as np

# Check if the numba module is installed (compiled, high-speed functions)
import importlib.util
foundNumba = importlib.util.find_spec("numba") is not None

if foundNumba:
	#Numba is installed, import @jit decorator
	from numba import jit
else:
	# Numba not installed, warn the user and make a blank @jit decorator
	def jit(f):
		'''Numba was not found in the current python environment
		'''
		return f


[docs]def timeWarp(a,b,method='DTW',window=0,retMat=True,**kwargs): '''This function is the main time warping interface, and acts as a convenient wrapper to the other functions. Parameters ---------- a : list or numpy 1D-array First time series, which will be compared against time series b b : list or numpy 1D-array Second time series (reference) method : str Time warping method ``{'DTW','ERP'}`` - see below window : int Time warping window constraint (default = 0) retMat : bool Whether to include the cost matrices in the returned object ERPg : int g-value (for ``method = 'ERP'`` only) Returns ------- warpObj : dict A timeWarpOB warp object, containing the following items: warpObj.backTraceCost : float The sum cost of following the backtrace through the cost matrix warpObj.backTracePath : list List or numpy array of pairs of coordinates in time-space describing the backtrace through the cost matrix warpObj.cost : float Bottom left value on the cost matrix. With no warping window, this should equal warpObj.backTraceCost warpObj.costMat : list A matrix (list of lists) or numpy array describing the cost matrix between the two time series. For a series of length n, this matrix will be of size n x n. Only output if ``retMat = True`` in the input parameters warpObj.distMat : list A matrix (list of lists) or numpy array describing the L1-distance matrix between the two time series. For a series of length n, this matrix will be of size n x n. Only output if ``retMat = True`` in the input parameters warpObj.warpWindow : int Returning the warp window parameter used (used by plotting functions) warpObj.warpStats : dict Warp statistics object, containing: warpObj.warpStats.timeAhead : int The number of periods that time series a was in sync with time series b. warpObj.warpStats.timeAhead : int The number of periods that time series a was ahead of time series b. warpObj.warpStats.timeBehind : int The number of periods that time series a was behind of time series b. warpObj.warpStats.amountAhead : int The total sum of the number of periods that time series a was leading time series b by. warpObj.warpStats.amountBehind : int The total sum of the number of periods that time series a was lagging time series b by. warpObj.warpStats.avgAhead : int Average amount of periods that a was ahead of a by, i.e. amountAhead divided by timeAhead warpObj.warpStats.avgAhead : int Average amount of periods that a was behind b by, i.e. amountBehind divided by timeBehind warpObj.warpStats.avgWarp : int Average amount of periods that a ahead or behind b by, i.e. ``(amountAhead - amountBehind) / (timeAhead + timeBehind + timeSync)`` This will give positive values if a is on average ahead of b and negative values is a is on average behind b. Notes ----- * If lists are supplied, the output matrices will be in list form. If numpy arrays are supplied, the output will be as numpy arrays. * Numpy arrays will calculate faster, as no internal type conversion is required. * Time series should be of equal length. If they are not, the longer will be clipped from the end. * ERP and DTW methods are available. For information on how they work, see the module documentation. * Cost matricies should be returned for use by the plotting functions ''' # Check if a list has been passed or numpy object usingList = False if type(a) == list: usingList = True a = np.array(a) if type(b) == list: usingList = True b = np.array(b) # Create a warp object to return warpObj = {} # Clip any longer time series minLen = min(len(a),len(b)) a = a[0:minLen] b = b[0:minLen] # Get distance matrix dist = L1distances(a,b) # Get the cost matrix if method == 'DTW': costMat = DTWwarp(dist,a,b,w=window) elif method == 'ERP': ERPg = 0 if 'ERPg' in kwargs: ERPg = kwargs['ERPg'] costMat = ERPwarp(dist,a,b,w=window,g=ERPg) else: print("timeWarp error - incorrect warp method specified:", method) return -1 # Extract the cost cIndex = len(costMat[0]) - 1 cost = costMat[cIndex,cIndex] # Backtrace the warp path path, backTraceCost, warpStats = backTrace(costMat,dist) # Return the results if retMat == True: if usingList: warpObj["costMat"] = costMat.tolist() warpObj["distMat"] = dist.tolist() else: warpObj["costMat"] = costMat warpObj["distMat"] = dist warpObj["cost"] = cost warpObj["backTraceCost"] = backTraceCost warpObj["warpStats"] = warpStats if usingList: warpObj["backTracePath"] = path.tolist() else: warpObj["backTracePath"] = path if window == 0: warpObj["warpWindow"] = len(a) else: warpObj["warpWindow"] = window return warpObj
@jit
[docs]def L1distances(a,b): '''Calcluates the L1 distance matrix between two time series. Parameters ---------- a : list First time series, which will be compared against time series b b : list Second time series (reference) Returns ------- distance : list A matrix (list of lists) describing the L1-distance matrix between the two time series. For a series of length n, this matrix will be of size n x n. ''' n = len(a) m = len(b) distance = np.zeros((n,m)) for i in range(n): for j in range(m): distance[i,j] = abs(a[i] - b[j]) return distance
@jit
[docs]def ERPwarp(dist,x,y,w=0,g=0): '''Calcluates the ERP cost matrix between two time series. Parameters ---------- dist : list A matrix (list of lists) describing the L1-distance matrix between two time series. For a time series of length n, this matrix must be of size n x n. x : list First time series, which will be compared against time series y y : list Second time series (reference) w : int Time warping window constraint (default = 0) g : int ERP g-value (deafult = 0) Returns ------- costMat : list A matrix (list of lists) describing the ERP cost matrix between the two time series. For a series of length n, this matrix will be of size n x n. ''' n = len(x) m = len(y) costMat = np.zeros((n,m)) costMat[0,0] = dist[0,0] # Fill the edges for i in range(1, n): costMat[0,i] = costMat[0,i-1] + abs(dist[0,i] - g) for i in range(1, n): costMat[i,0] = costMat[i-1,0] + abs(dist[i,0] - g) # Fill the rest of the matrix for i in range(1, n): for j in range(1, n): OpMatch = costMat[i-1,j-1] + dist[i,j] OpIns = costMat[i-1,j] + abs(x[i] - g) OpDel = costMat[i,j-1] + abs(y[i] - g) costMat[i,j] = min(OpMatch,OpDel,OpIns) # Apply warp window if w != 0: for i in range(0, n): for j in range(0, n): if abs(i-j) >= w: costMat[i,j] = np.inf return costMat
@jit
[docs]def DTWwarp(dist,x,y,w=0): '''Calcluates the ERP cost matrix between two time series. Parameters ---------- dist : list A matrix (list of lists) describing the L1-distance matrix between two time series. For a time series of length n, this matrix must be of size n x n. x : list First time series, which will be compared against time series y y : list Second time series (reference) w : int Time warping window constraint (default = 0) Returns ------- costMat : list A matrix (list of lists) describing the DTW cost matrix between the two time series. For a series of length n, this matrix will be of size n x n. ''' n = len(x) m = len(y) costMat = np.zeros((n,m)) costMat[0,0] = dist[0,0] # Fill the edges for i in range(1, n): costMat[0,i] = dist[0,i] + costMat[0,i-1] for i in range(1, n): costMat[i,0] = dist[i,0] + costMat[i-1,0] # Fill the rest of the matrix for i in range(1, n): for j in range(1, n): minMove = min(costMat[i-1,j-1], costMat[i-1,j], costMat[i,j-1]) costMat[i,j] = minMove + dist[i,j] # Apply warp window if w != 0: for i in range(0, n): for j in range(0, n): if abs(i-j) >= w: costMat[i,j] = np.inf return costMat
[docs]def backTrace(costMat,dist): '''Finds the optimal warping path by backtracking through the cost matrix. Parameters ---------- dist : list A matrix (list of lists) describing the L1-distance matrix between two time series. For a time series of length n, this matrix must be of size n x n. costMat : list A matrix (list of lists) describing the time-warped cost matrix between two time series. For a time series of length n, this matrix must be of size n x n. Returns ------- path : list List of pairs of coordinates in time-space describing the backtrace through the cost matrix backTraceCost : float The sum cost of following the backtrace through the cost matrix warpStats : dict Warp statistics object, containing: warpStats.timeAhead : int The number of periods that time series a was in sync with time series b. warpStats.timeAhead : int The number of periods that time series a was ahead of time series b. warpStats.timeBehind : int The number of periods that time series a was behind of time series b. warpStats.amountAhead : int The total sum of the number of periods that time series a was leading time series b by. warpStats.amountBehind : int The total sum of the number of periods that time series a was lagging time series b by. warpStats.avgAhead : int Average amount of periods that a was ahead of a by, i.e. amountAhead divided by timeAhead warpStats.avgAhead : int Average amount of periods that a was behind b by, i.e. amountBehind divided by timeBehind warpStats.avgWarp : int Average amount of periods that a ahead or behind b by, i.e. ``(amountAhead - amountBehind) / (timeAhead + timeBehind + timeSync)`` This will give positive values if a is on average ahead of b and negative values is a is on average behind b. ''' timeAhead = 0 timeBehind = 0 timeSync = 0 amountAhead = 0 amountBehind = 0 i = len(costMat) - 1 j = len(costMat[0]) - 1 path = np.array([i,j]) backTraceCost = dist[i,j] while i>0 or j>0: if i==0: # Edge condition (only one direction) j = j - 1 elif j==0: # Edge condition (only one direction) i = i - 1 else: minMove = min(costMat[i-1,j-1], costMat[i-1,j], costMat[i,j-1]) if costMat[i-1,j] == minMove: i = i - 1 elif costMat[i,j-1] == minMove: j = j - 1 else: i = i - 1 j = j - 1 backTraceCost += dist[i,j] path = np.vstack([path,[i,j]]) if j > i: timeAhead += 1 amountAhead += j - i elif i > j: timeBehind += 1 amountBehind += i - j else: timeSync +=1 warpStats = {} warpStats["timeAhead"] = timeAhead warpStats["timeBehind"] = timeBehind warpStats["timeSync"] = timeSync warpStats["amountAhead"] = amountAhead warpStats["amountBehind"] = amountBehind warpStats["avgAhead"] = 0 warpStats["avgBehind"] = 0 if timeAhead > 0: warpStats["avgAhead"] = amountAhead / timeAhead if timeBehind > 0: warpStats["avgBehind"] = amountBehind / timeBehind warpStats["avgWarp"] = (amountBehind - amountAhead) / (timeAhead + timeBehind + timeSync) return path, backTraceCost, warpStats