diff --git a/src/spek-pipeline.vala b/src/spek-pipeline.vala index b3f45cb..3f6f92a 100644 --- a/src/spek-pipeline.vala +++ b/src/spek-pipeline.vala @@ -38,11 +38,20 @@ namespace Spek { private uint8[] buffer; private Fft.Plan fft; - private int nfft; + private int nfft; // Size of the FFT transform. + private const int NFFT = 64; // Number of FFTs to pre-fetch. + private int input_size; + private int input_pos; private float[] input; private float[] output; private unowned Thread reader_thread = null; + private unowned Thread worker_thread; + private Mutex reader_mutex; + private Cond reader_cond; + private Mutex worker_mutex; + private Cond worker_cond; + private bool worker_done = false; private bool quit = false; public Pipeline (string file_name, int bands, int samples, int threshold, Callback cb) { @@ -83,7 +92,8 @@ namespace Spek { this.buffer = new uint8[cx.buffer_size]; this.nfft = 2 * bands - 2; this.fft = new Fft.Plan (nfft, threshold); - this.input = new float[nfft]; + this.input_size = nfft * (NFFT * 2 + 1); + this.input = new float[input_size]; this.output = new float[bands]; this.cx.start (samples); } @@ -95,6 +105,12 @@ namespace Spek { public void start () { stop (); + input_pos = 0; + reader_mutex = new Mutex (); + reader_cond = new Cond (); + worker_mutex = new Mutex (); + worker_cond = new Cond (); + try { reader_thread = Thread.create (reader_func, true); } catch (ThreadError e) { @@ -114,41 +130,101 @@ namespace Spek { } private void * reader_func () { - int pos = 0; + int pos = 0, prev_pos = 0; + int block_size = cx.width * cx.channels / 8; + int size; + + try { + worker_thread = Thread.create (worker_func, true); + } catch (ThreadError e) { + return null; + } + + while ((size = cx.read (this.buffer)) > 0) { + lock (quit) if (quit) break; + + uint8 *buffer = (uint8 *) this.buffer; + while (size >= block_size) { + input[pos] = average_input (buffer); + buffer += block_size; + size -= block_size; + pos = (pos + 1) % input_size; + + // Wake up the worker if we have enough data. + if ((pos > prev_pos ? pos : pos + input_size) - prev_pos == nfft * NFFT) { + reader_sync (prev_pos = pos); + } + } + assert (size == 0); + } + + if (pos != prev_pos) { + // Process the remaining data. + reader_sync (pos); + } + // Force the worker to quit. + reader_sync (-1); + worker_thread.join (); + + return null; + } + + private void reader_sync (int pos) { + reader_mutex.lock (); + while (!worker_done) reader_cond.wait (reader_mutex); + worker_done = false; + reader_mutex.unlock (); + + worker_mutex.lock (); + input_pos = pos; + worker_cond.signal (); + worker_mutex.unlock (); + } + + private void * worker_func () { int sample = 0; int64 frames = 0; int64 num_fft = 0; int64 acc_error = 0; float cf = 2f * (float) Math.PI / nfft; - int size; + int head = 0, tail = 0; + int prev_head = 0; Memory.set (output, 0, sizeof (float) * bands); - while ((size = cx.read (this.buffer)) > 0) { - lock (quit) { - if (quit) { - return null; - } + while (true) { + reader_mutex.lock (); + worker_done = true; + reader_cond.signal (); + reader_mutex.unlock (); + + worker_mutex.lock (); + while (tail == input_pos) worker_cond.wait (worker_mutex); + tail = input_pos; + worker_mutex.unlock (); + + if (tail == -1) { + return null; } - uint8 *buffer = (uint8 *) this.buffer; - var block_size = cx.width * cx.channels / 8; - while (size >= block_size) { - input[pos] = average_input (buffer); - buffer += block_size; - size -= block_size; - pos = (pos + 1) % nfft; + while (true) { + head = (head + 1) % input_size; + if (head == tail) { + head = prev_head; + break; + } frames++; // If we have enough frames for an FFT or we // have all frames required for the interval run // an FFT. In the last case we probably take the // FFT of frames that we already handled. - if (frames % nfft == 0 || - acc_error < cx.error_base && frames == cx.frames_per_interval || - acc_error >= cx.error_base && frames == 1 + cx.frames_per_interval) { + bool int_full = acc_error < cx.error_base && frames == cx.frames_per_interval; + bool int_over = acc_error >= cx.error_base && frames == 1 + cx.frames_per_interval; + if (frames % nfft == 0 || int_full || int_over) { + prev_head = head; for (int i = 0; i < nfft; i++) { - float val = input[(pos + i) % nfft]; + float val = input[(input_size + head - nfft + i) % input_size]; // Hamming window. val *= 0.53836f - 0.46164f * Math.cosf (cf * i); fft.input[i] = val; @@ -160,10 +236,8 @@ namespace Spek { } } // Do we have the FFTs for one interval? - if (acc_error < cx.error_base && frames == cx.frames_per_interval || - acc_error >= cx.error_base && frames == 1 + cx.frames_per_interval) { - - if (acc_error >= cx.error_base) { + if (int_full || int_over) { + if (int_over) { acc_error -= cx.error_base; } else { acc_error += cx.error_per_interval; @@ -173,18 +247,15 @@ namespace Spek { output[i] /= num_fft; } + if (sample == samples) break; cb (sample++, output); - if (sample == samples) { - return null; - } + Memory.set (output, 0, sizeof (float) * bands); frames = 0; num_fft = 0; } } - assert (size == 0); } - return null; } private float average_input (uint8 *buffer) {