Coincidencia eficiente elemento a elemento de dos arrays en Python.
Estoy trabajando en un problema de coincidencia. Tengo dos matrices A y B del mismo tamaño (supongamos 1000×1000). Para cada elemento a_ij en A (i y j son los índices de fila y columna, respectivamente), necesito encontrar el elemento más cercano b_i? en la fila i de B. En este momento, he ideado tres soluciones:
- Bucle anidado (Tarda 5.47s)
- Bucle simple con transmisión (Tarda 6.40s)
- Bucle paralelo (Tarda 30.27s)
Creo que los tres métodos anteriores no son lo suficientemente eficientes en términos de tiempo, o al menos mi implementación no es lo suficientemente buena para lograr eso (¡el tercer método tarda más de lo que esperaba!). Sería genial si pudieras señalar cómo puedo mejorar mi código. ¡Gracias de antemano!
Mis soluciones actuales son las siguientes:
import numpy as np
import time
from joblib import Parallel, delayed
def metodo1(A,B): # Bucle anidado (como referencia)
output = np.zeros(A.shape)
for r in range(A.shape[0]):
rowB = B[r]
for c in range(A.shape[1]):
elementA = A[r,c]
diff_vector = np.abs(elementA – rowB)
output[r,c] = np.argmin(diff_vector)
return output
def metodo2(A,B): # Transmisión
output = np.zeros(A.shape)
for r in range(A.shape[0]):
diff_matrix = np.abs(A[r][:, np.newaxis] – B[r])
output[r] = np.argmin(diff_matrix, axis=1)
return output
def metodo3(A,B): # Bucle paralelo
def coincidente(r, A, B):
i = r//A.shape[1]
j = r % A.shape[1]
elementA = A[i, j]
rowB = B[i]
diff_vector = np.abs(elementA - rowB)
return np.argmin(diff_vector)
output = Parallel(n_jobs=4)(delayed(coincidente)(r, A, B) for r in range(A.shape[0]*A.shape[1]))
output = np.reshape(output, [A.shape[0], A.shape[1]])
return output
A = np.random.randn(1000,1000)
B = np.random.randn(1000,1000)
resultado1 = metodo1(A,B)
resultado2 = metodo2(A,B)
resultado3 = metodo3(A,B)
davy.ai
Hay varias maneras en las que se puede mejorar el código para hacerlo más eficiente en términos de tiempo. Aquí van algunas sugerencias:
argmin
para encontrar el índice del valor mínimo en el vector resultante de las diferencias.Usar la función de broadcasting de numpy: En la segunda solución, se utiliza un solo bucle para recorrer las filas de A y B y calcular la matriz de diferencias. Esto se puede hacer de manera mucho más eficiente utilizando la función de broadcasting de numpy. Se puede calcular la matriz de diferencias restando A de B a lo largo del eje de las columnas, lo que da como resultado un arreglo tridimensional donde cada elemento
(i,j,k)
corresponde a la diferencia entrea_ij
yb_ik
. Luego se puede utilizar la funciónargmin
para encontrar el índice del valor mínimo a lo largo del último eje, lo que resulta en un arreglo de índices que corresponden al elemento más cercano en B para cada elemento en A.Localidad de caché: En la tercera solución, se está paralelizando el bucle utilizando la biblioteca
joblib
. Si bien esto puede acelerar el cálculo, puede que no sea óptimo para los patrones de acceso a memoria. Cuando múltiples hilos acceden a las mismas ubicaciones de memoria, esto puede causar fallos de caché y reducir el rendimiento. Una manera de optimizar esto es dividir el espacio de iteración en trozos que sean procesados por cada hilo por separado, de modo que cada hilo opere en una sección contigua de memoria.Aquí hay una implementación que incorpora estas sugerencias:
Esta implementación aprovecha el broadcasting de numpy para calcular eficientemente la matriz de diferencias. También utiliza operaciones vectorizadas y patrones de acceso a memoria optimizados para acelerar el cálculo. Dependiendo del tamaño de los arreglos de entrada, esta implementación debería ser significativamente más rápida que las anteriores.