Adaptive Echo Cancellation¶
In this notebook we will use the adaptive LMS filter to estimate the impulse response of a reverberating room.
A typical use case is handsfree telephony: since the received signal, amplified and played by the speaker, is picked up by the microphone, we need to eliminate it from the signal that is transmitted back in order to avoid a feedback loop. The signal picked up by the microphone, however, has been "processed" by the speaker and by the room and therefore simple signal subtraction is not sufficient. We need to first estimate the reverberation introduced by the room and pre-filter the received signal with the resulting impulse response before subtracting it from the transmitted signal.
The same setup can be used to combat the effects of a a communication channel that introduces multiples echos.
%matplotlib inline
import matplotlib.pyplot as plt
import numpy as np
import scipy.signal as sp
import IPython
from scipy.io import wavfile
plt.rcParams["figure.figsize"] = (14,4)
1. The echo model¶
Let's begin by implementing an echo filter that simulates the reverbereation in a room. We will use a recursive model in which each reflection introduces a 180-degree phase shift (i.e. a change in sign) together with attenuation and lowpass filtering.
In the above block diagram, $M$ is the echo's delay, $-1 < \alpha < 0$ is the attenuation factor for each repetition and $H(z) = (1-\lambda)/(1 - \lambda z^{-1})$ is a simple leaky integrator with $\lambda$ relatively small in order to just attenuate more and more the signal with each reflection.
The CCDE governing the system turns out to be
$$ y[n] = x[n] ā \lambda x[n ā 1] + \lambda y[n ā 1] + \alpha (1 ā \lambda)y[n ā M] $$
which is easily implemented as follows:
def add_echo(x, M, lmb=0.6, alpha=-0.8):
# if the first argument is a scalar, assume the input is a delta sequence of length x
# in this case, the function returns the truncated impulse response of the room.
if np.isscalar(x):
x = np.zeros(int(x))
x[0] = 1
y = np.zeros(len(x))
for n in range(0, len(x)):
if n >= M:
y[n] = x[n] - lmb * x[n-1] + lmb * y[n-1] + alpha * (1 - lmb) * y[n - M]
elif n > 0:
y[n] = x[n] - lmb * x[n-1] + lmb * y[n-1]
else:
y[n] = x[n]
return y
The resulting echo system is an IIR filter. Let's look a portion of its impulse response for $M$ small; we can observe the periodic pattern of the reflections, together with a decaying magnitude:
plt.plot(add_echo(1000, 100));
How does it sound?¶
Let's load a brief speech sample that we will use in the rest of the notebook.
Fs, speech = wavfile.read('speech2.wav')
speech = speech / 32767.0 # scale the signal to floats in [-1, 1]
print('sampling rate:', Fs, 'Hz, data length:', len(speech), 'samples')
IPython.display.Audio(speech, rate=Fs)
sampling rate: 8000 Hz, data length: 19063 samples
For audio signals, that's how the reverberation sounds for a delay to about 20ms
speech_echo = add_echo(speech, int(0.020 * Fs))
IPython.display.Audio(speech_echo, rate=Fs)
and we can easily convince ourselves that simple subtraction does not work; here we play the signal with echo first to compare volume levels.
IPython.display.Audio(np.r_[speech_echo, speech_echo - speech], rate=Fs)
2. The LMS filter¶
Let's now implement the standard LMS filter algorithm. Given the original signal $x[n]$ and its echo-corrupted version $d[n] = h[n] \ast x[n]$, the LMS algorithm will estimate $h[n]$ iteratively as
\begin{align} e[n] &= d[n] - \mathbf{h}_{n}^{T} \mathbf{x}_n \\ \mathbf{h}_{n+1} &= \mathbf{h}_n + \alpha_n\, e[n] \, \mathbf{x}_n \end{align}
where $\mathbf{h}_n$ is the set of estimated filter coefficients at iteration $n$: $$\mathbf{h}_n = \begin{bmatrix} h_n[0] & h_n[1] & h_n[2] & \ldots & h_n[N-1] \end{bmatrix}$$ and where $$\mathbf{x}_n = \begin{bmatrix} x[n] & x[n-1] & x[n-2] & \ldots & x[n - N + 1] \end{bmatrix}$$
def lms(x, d, N, step_size=0.001):
# Run the LMS adaptation using x as the input signal, d as the desired output signal and a as the step size
# Will return an N-tap FIR filter
#
# initial guess for the filter is a delta
h = np.zeros(N)
h[0] = 1
# number of iterations
L = min(len(x), len(d))
# let's store the error at each iteration to plot the MSE
e = np.zeros(L)
# run the adaptation
for n in range(N, L):
e[n] = d[n] - np.dot(h, x[n:n-N:-1])
h = h + step_size * e[n] * x[n:n-N:-1]
return h, e[N:]
Convergence tests with WGN¶
Let's now test the LMS filter using unit-variance white Gaussian noise as the input signal; with this maximally decorrelated input the convergence is faster. First, let's verify that the filter converges to a good approximation of the echo's impulse response:
# create the signals
wgn = np.random.randn(10000)
DELAY = 160
wgn_echo = add_echo(wgn, DELAY)
TAPS = 500
h_orig = add_echo(TAPS, DELAY)
def plot_lms_test(N, step_size=0.0008):
h_est, _ = lms(wgn[:N], wgn_echo[:N], TAPS, step_size)
plt.plot(h_orig, 'g', label='original impulse response');
plt.plot(h_est, 'r', label=f'estimated impulse response ({N} iterations)');
plt.legend();
return h_est
plot_lms_test(1000);
plot_lms_test(2000);
h_est = plot_lms_test(5000);
The approximation obtained with the highest number of iterations is actually quite good, as we can see by plotting the difference between the original and the estimated impulse response:
plt.plot(h_orig - h_est);
Clearly the precision depends on the number of steps in the adaptation. We can try and play with the value of the step size, for instance, and see how it affects the convergence. In the following example we see that increasing the learning factor actually decreases the precision of the estimation
h_est = plot_lms_test(5000, step_size=0.002);
plt.plot(h_orig - h_est);
Increasing the learning factor even more prevents convergence:
plot_lms_test(5000, step_size=0.005);
Eror decay¶
To have a quantitative description of the convergence process we can average the value of the MSE at each instant over a set of independent experiments.
def wgn_mse(L, step_size=0.0008):
wgn = np.random.randn(L)
wgn_echo = add_echo(wgn, DELAY)
_, err = lms(wgn, wgn_echo, TAPS, step_size)
return np.square(err)
TRIALS = 200 # number of independent experiments
L = 6000
mse = wgn_mse(L)
for n in range(1, TRIALS):
mse = mse + wgn_mse(L)
mse = mse / TRIALS
plt.plot(mse);
As you can see, with these parameters the error stops decreasing after about 3000 iterations. Here too you can play with the step size and see how it affects the convergence rate.
3. The LMS echo canceler¶
Let's now run the LMS adaptation using a voice signal as the input. Since the voice signal is very correlated, the convergenge will be slower, but we can use a much larger step size. Since our sound snippet is short, we will use it multiple times in the adaptation.
# let's build the echo signal with a 20ms delay
speech_echo_delay = int(0.020 * Fs)
audio = np.tile(speech, 5)
# now let's estimate the first 1000 taps of the echo impulse response using the speech signal
speech_echo_taps = 1500
h_est, err = lms(audio, add_echo(audio, speech_echo_delay), speech_echo_taps, step_size=0.021)
If we plot the difference between the ideal and estimated impulse response we can see that the match is not perfect. This is because the speech signal, as opposed to white noise, does not drive the adaptation as effectively since it doesn't "hit" all of the frequencies. You can try and use more copies of the audio in sequence to improve the adaptation; in a normal use case the LMS filter would be running all the time, using much more data to converge.
h_orig = add_echo(len(h_est), speech_echo_delay)
plt.plot(h_orig, 'g', label='original impulse response');
plt.plot(h_est, 'r', label=f'estimated impulse response ({len(audio)} iterations)');
plt.legend();
plt.plot(h_orig - h_est);
We can now listen to the effectiveness of the echo canceler; listen in sequence to the reverberated sound, the cancellation performed by simple subtraction and the cancellation after filtering with the coefficients produced by the LMS adaptation. The results should speak for themselves!
speech_echo = add_echo(speech, speech_echo_delay)
speech_echo_estimated = sp.lfilter(h_est, 1, speech)
audio_sequence = np.r_[speech_echo, speech_echo - speech, speech_echo - speech_echo_estimated]
IPython.display.Audio(audio_sequence, rate=Fs)
plt.plot(audio_sequence);