summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authordeva <deva>2005-10-04 21:39:53 +0000
committerdeva <deva>2005-10-04 21:39:53 +0000
commit111b16802de661228ef414eb6ec7484e2fa186ed (patch)
treed6d1ce9e8b857371414f33ff460266258c462275 /src
parent9640339f2e9dc126406f6b6f8a091b924898b4f5 (diff)
*** empty log message ***
Diffstat (limited to 'src')
-rw-r--r--src/Makefile.am8
-rw-r--r--src/audio_encoder.cc125
-rw-r--r--src/audio_encoder.h19
-rw-r--r--src/frame.h19
-rw-r--r--src/mov_encoder.cc161
-rw-r--r--src/mov_encoder.h30
-rw-r--r--src/mov_encoder_thread.cc76
-rw-r--r--src/mov_encoder_thread.h25
-rw-r--r--src/mov_encoder_writer.cc22
-rw-r--r--src/mov_encoder_writer.h25
-rw-r--r--src/multicast.cc109
-rw-r--r--src/multicast.h47
-rw-r--r--src/multiplexer.cc52
-rw-r--r--src/multiplexer.h14
-rw-r--r--src/threadsafe_queue.cc44
-rw-r--r--src/threadsafe_queue.h56
-rw-r--r--src/threadsafe_queue_fifo.cc83
-rw-r--r--src/threadsafe_queue_fifo.h82
-rw-r--r--src/threadsafe_queue_priority.cc93
-rw-r--r--src/threadsafe_queue_priority.h64
20 files changed, 921 insertions, 233 deletions
diff --git a/src/Makefile.am b/src/Makefile.am
index 3adaf25..b0ed05f 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -34,6 +34,7 @@ miav_SOURCES = $(shell if [ $QT_CXXFLAGS ] ; then ../tools/MocList cc; fi ) \
mov_encoder.cc \
mov_encoder_thread.cc \
mov_encoder_writer.cc \
+ multicast.cc \
multiplexer.cc \
network.cc \
player.cc \
@@ -41,6 +42,9 @@ miav_SOURCES = $(shell if [ $QT_CXXFLAGS ] ; then ../tools/MocList cc; fi ) \
server_status.cc \
socket.cc \
thread.cc \
+ threadsafe_queue.cc \
+ threadsafe_queue_fifo.cc \
+ threadsafe_queue_priority.cc \
util.cc \
videowidget.cc \
yuv_draw.cc
@@ -81,6 +85,7 @@ EXTRA_DIST = \
mov_encoder.h \
mov_encoder_thread.h \
mov_encoder_writer.h \
+ multicast.h \
multiplexer.h \
network.h \
package.h \
@@ -90,6 +95,9 @@ EXTRA_DIST = \
server_status.h \
socket.h \
thread.h \
+ threadsafe_queue.h \
+ threadsafe_queue_fifo.h \
+ threadsafe_queue_priority.h \
util.h \
videowidget.h \
yuv_draw.h
diff --git a/src/audio_encoder.cc b/src/audio_encoder.cc
index 8972e9a..97c6084 100644
--- a/src/audio_encoder.cc
+++ b/src/audio_encoder.cc
@@ -28,8 +28,10 @@
#include "audio_encoder.h"
#include "util.h"
-AudioEncoder::AudioEncoder(FramePriorityQueue *in, pthread_mutex_t *in_mutex, sem_t *in_sem,
- FramePriorityQueue *out, pthread_mutex_t *out_mutex, sem_t *out_sem,
+#include "liblame_wrapper.h"
+
+AudioEncoder::AudioEncoder(ThreadSafeQueuePriority *audio_input_queue,
+ ThreadSafeQueuePriority *audio_output_queue,
Info *i)
{
info = i;
@@ -37,18 +39,8 @@ AudioEncoder::AudioEncoder(FramePriorityQueue *in, pthread_mutex_t *in_mutex, se
running = true;
- // Queues
- inputqueue = in;
- outputqueue = out;
-
- // Queue mutexes
- input_mutex = in_mutex;
- output_mutex = out_mutex;
-
- input_sem = in_sem;
- output_sem = out_sem;
-
- frame_number = 0;
+ input_queue = audio_input_queue;
+ output_queue = audio_output_queue;
}
AudioEncoder::~AudioEncoder()
@@ -59,18 +51,109 @@ void AudioEncoder::thread_main()
{
info->info("AudioEncoder::run");
- unsigned int queuesize = 0;
-
// Run with slightly lower priority than MovEncoderWriter
// nice(2);
Frame *in_frame = NULL;
Frame *out_frame = NULL;
+
+ LibLAMEWrapper lame(info);
+
+ while(running) {
+ in_frame = input_queue->pop();
+
+ if(in_frame == NULL) info->error("AudioEncoder: in_frame == NULL!");
+
+ // Check for end of stream
+ if(in_frame->endOfFrameStream == true) {
+ info->info("endOfFrameStream in AudioEncoder");
+ running = false;
+ out_frame = lame.close();
+ } else {
+ // Encode audio
+ out_frame = lame.encode(in_frame);
+ }
+ out_frame->number = in_frame->number;
+ out_frame->endOfFrameStream = in_frame->endOfFrameStream;
+
+ //delete in_frame;
+ in_frame = NULL;
+
+ output_queue->push(out_frame);
+ }
+
+ info->info("AudioEncoder::stop");
+}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+/*
+
+void AudioEncoder::thread_main()
+{
+ info->info("AudioEncoder::run");
+
+#ifndef NEW_QUEUE
+ unsigned int queuesize = 0;
Frame *tmpframe;
+#endif
+
+ // Run with slightly lower priority than MovEncoderWriter
+ nice(2);
+
+ Frame *in_frame = NULL;
+ Frame *out_frame = NULL;
LibLAMEWrapper lame(info);
while(running) {
+ info->info("fisk");
+#ifdef NEW_QUEUE
+ in_frame = input_queue->pop();
+#else
sem_wait(input_sem);
// If no frame is in the buffer, get one from the queue
@@ -94,7 +177,8 @@ void AudioEncoder::thread_main()
sleep_0_2_frame();
}
-
+#endif
+
// Check for end of stream
if(in_frame->endOfFrameStream == true) {
info->info("endOfFrameStream in AudioEncoder");
@@ -110,6 +194,9 @@ void AudioEncoder::thread_main()
delete in_frame;
in_frame = NULL;
+#ifdef NEW_QUEUE
+ output_queue->push(out_frame);
+#else
// Lock output mutex
pthread_mutex_lock(output_mutex);
outputqueue->push(out_frame);
@@ -118,10 +205,14 @@ void AudioEncoder::thread_main()
// Kick multiplexer (audio)
sem_post(output_sem);
+#endif
}
+#ifndef NEW_QUEUE
// Kick multiplexer (audio)
sem_post(output_sem);
+#endif
info->info("AudioEncoder::stop");
}
+*/
diff --git a/src/audio_encoder.h b/src/audio_encoder.h
index b15ce45..9d86178 100644
--- a/src/audio_encoder.h
+++ b/src/audio_encoder.h
@@ -36,12 +36,12 @@
#include "info.h"
-#include "liblame_wrapper.h"
+#include "threadsafe_queue_priority.h"
class AudioEncoder : public Thread {
public:
- AudioEncoder(FramePriorityQueue *in, pthread_mutex_t *in_mutex, sem_t *in_sem,
- FramePriorityQueue *out, pthread_mutex_t *out_mutex, sem_t *out_sem,
+ AudioEncoder(ThreadSafeQueuePriority *audio_input_queue,
+ ThreadSafeQueuePriority *audio_output_queue,
Info *info);
~AudioEncoder();
@@ -50,19 +50,10 @@ public:
volatile bool running;
private:
- unsigned int frame_number;
-
Info *info;
- // Input/Output queues
- FramePriorityQueue *inputqueue;
- FramePriorityQueue *outputqueue;
- pthread_mutex_t *input_mutex;
- pthread_mutex_t *output_mutex;
-
- //thread stuff
- sem_t *input_sem;
- sem_t *output_sem;
+ ThreadSafeQueuePriority *input_queue;
+ ThreadSafeQueuePriority *output_queue;
};
diff --git a/src/frame.h b/src/frame.h
index 2b8d9f5..6859116 100644
--- a/src/frame.h
+++ b/src/frame.h
@@ -31,9 +31,6 @@
// Definition of vector
#include <vector>
-// Definition of priority_queue
-#include <queue>
-
class Frame {
public:
Frame(unsigned char *d, int sz);
@@ -54,22 +51,6 @@ public:
bool endOfFrameStream;
};
-#include <functional>
-
-// Method for use, when comparing Frames in priority queue.
-template <typename T>
-struct frame_priority : std::binary_function<T, T, bool> {
- bool operator() (const T& a, const T& b) const {
- return ((Frame*)a)->number > ((Frame*)b)->number;
- }
-};
-
-// Additional helper types.
typedef std::vector< Frame* > FrameVector;
-typedef std::queue< FrameVector* > FrameVectorQueue;
-typedef std::priority_queue< Frame*,
- std::vector<Frame*>,
- frame_priority<Frame*> > FramePriorityQueue;
-
#endif/*__FRAME_H__*/
diff --git a/src/mov_encoder.cc b/src/mov_encoder.cc
index cf45ae0..a455f42 100644
--- a/src/mov_encoder.cc
+++ b/src/mov_encoder.cc
@@ -42,11 +42,12 @@
#include "miav_config.h"
#include "debug.h"
+#include "libfame_wrapper.h"
MovEncoder::MovEncoder(volatile bool *r, sem_t *r_sem,
- FrameVectorQueue *in, sem_t *in_sem, pthread_mutex_t *in_mutex,
- FramePriorityQueue *v_out, pthread_mutex_t *v_out_mutex, sem_t *v_out_sem,
- FramePriorityQueue *a_out, pthread_mutex_t *a_out_mutex, sem_t *a_out_sem,
+ ThreadSafeQueueFIFO<FrameVector*> *in,
+ ThreadSafeQueuePriority *video_out,
+ ThreadSafeQueuePriority *audio_out,
Info *i)
{
info = i;
@@ -56,18 +57,9 @@ MovEncoder::MovEncoder(volatile bool *r, sem_t *r_sem,
// Queues
inputqueue = in;
- video_outputqueue = v_out;
- audio_outputqueue = a_out;
+ video_output_queue = video_out;
+ audio_output_queue = audio_out;
- // Queue mutexes
- input_mutex = in_mutex;
- video_output_mutex = v_out_mutex;
- audio_output_mutex = a_out_mutex;
-
- input_sem = in_sem;
- video_output_sem = v_out_sem;
- audio_output_sem = a_out_sem;
-
read_sem = r_sem;
}
@@ -82,12 +74,131 @@ void MovEncoder::thread_main()
{
info->info("MovEncoder::run");
// static volatile int test = 0;
+ // int insize = 0;
+
+ // Run with slightly lower priority than MovEncoderWriter AND AudioEncoder
+ // nice(3);
+
+ FrameVector *item;
+ Frame *in_frame;
+ Frame *out_v_frame;
+ Frame *out_a_frame;
+
+ LibFAMEWrapper fame(info);
+
+ // Process until running == false and the queue is empty
+ while(*running) {
+
+ item = inputqueue->pop();
+
+ if(item) {
+ for(unsigned int cnt = 0; cnt < item->size(); cnt++) {
+ in_frame = item->at(cnt);
+
+ // Check for end of stream
+ if(in_frame->endOfFrameStream == true) {
+ info->info("endOfFrameStream in MovEncoder");
+
+ // Signal to stop running
+ *running = false;
+
+ // Kick them sleepy ones so they get the message.
+ int threads = config->readInt("encoding_threads");
+ for(int cnt = 0; cnt < threads; cnt++) {/*sem_post(input_sem);*/} // FIXME: Kick the other encoders
+ }
+
+ // Encode video
+ out_v_frame = fame.encode(in_frame);
+ out_v_frame->number = in_frame->number;
+ out_v_frame->endOfFrameStream = in_frame->endOfFrameStream;
+
+ // Create audio frame
+ out_a_frame = in_frame;
+
+ video_output_queue->push(out_v_frame);
+ audio_output_queue->push(out_a_frame);
+ }
+
+ delete item;
+
+ item = NULL;
+
+ // Kick reader
+ sem_post(read_sem);
+ }
+ }
+
+ info->info("MovEncoder::stop");
+}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+/*
+
+// this runs in a thread
+void MovEncoder::thread_main()
+{
+ info->info("MovEncoder::run");
+ // static volatile int test = 0;
+#ifndef NEW_QUEUE
int v_outsize = 0;
int a_outsize = 0;
+#endif
int insize = 0;
// Run with slightly lower priority than MovEncoderWriter AND AudioEncoder
- //nice(3);
+ nice(3);
FrameVector *item;
Frame *in_frame;
@@ -131,16 +242,20 @@ void MovEncoder::thread_main()
// Create audio frame
out_a_frame = in_frame;
+#ifdef NEW_QUEUE
+ video_output_queue->push(out_v_frame);
+ audio_output_queue->push(out_a_frame);
+#else
// Lock output mutex
pthread_mutex_lock(video_output_mutex);
video_outputqueue->push(out_v_frame);
v_outsize = video_outputqueue->size();
pthread_mutex_unlock(video_output_mutex);
// Unlock output mutex
-
+
// Kick multiplexer (video)
sem_post(video_output_sem);
-
+
// Lock output mutex
pthread_mutex_lock(audio_output_mutex);
audio_outputqueue->push(out_a_frame);
@@ -150,6 +265,7 @@ void MovEncoder::thread_main()
// Kick audio encoder
sem_post(audio_output_sem);
+#endif
}
delete item;
@@ -159,16 +275,19 @@ void MovEncoder::thread_main()
sem_post(read_sem);
}
}
- /*
- info->info("Input pool size: %d, video output pool size: %d, audio output pool size: %d",
- insize, v_outsize, a_outsize);
- */
+
+ //info->info("Input pool size: %d, video output pool size: %d, audio output pool size: %d",
+ // insize, v_outsize, a_outsize);
+
+#ifndef NEW_QUEUE
// Kick audio encoder
sem_post(audio_output_sem);
// Kick multiplexer (video)
sem_post(video_output_sem);
+#endif
info->info("MovEncoder::stop");
}
+*/
diff --git a/src/mov_encoder.h b/src/mov_encoder.h
index 8488008..ace016c 100644
--- a/src/mov_encoder.h
+++ b/src/mov_encoder.h
@@ -48,15 +48,15 @@ using namespace std;
#include "info.h"
-#include "libfame_wrapper.h"
-//#include "liblame_wrapper.h"
+#include "threadsafe_queue_priority.h"
+#include "threadsafe_queue_fifo.h"
class MovEncoder : public Thread {
public:
MovEncoder(volatile bool *r, sem_t *r_sem,
- FrameVectorQueue *in, sem_t *in_sem, pthread_mutex_t *in_mutex,
- FramePriorityQueue *v_out, pthread_mutex_t *v_out_mutex, sem_t *v_out_sem,
- FramePriorityQueue *a_out, pthread_mutex_t *a_out_mutex, sem_t *a_out_sem,
+ ThreadSafeQueueFIFO< FrameVector*> *in,
+ ThreadSafeQueuePriority *video_out,
+ ThreadSafeQueuePriority *audio_out,
Info *info);
~MovEncoder();
@@ -65,24 +65,16 @@ public:
volatile bool *running;
private:
- // LibFAMEWrapper *fame;
- // LibLAMEWrapper *lame;
-
Info *info;
- // Input/Output queues
- FrameVectorQueue *inputqueue;
- FramePriorityQueue *video_outputqueue;
- FramePriorityQueue *audio_outputqueue;
- pthread_mutex_t *input_mutex;
- pthread_mutex_t *video_output_mutex;
- pthread_mutex_t *audio_output_mutex;
+ // Input queue
+ ThreadSafeQueueFIFO< FrameVector* > *inputqueue;
- //thread stuff
- sem_t *input_sem;
- sem_t *video_output_sem;
- sem_t *audio_output_sem;
+ // Output queues
+ ThreadSafeQueuePriority *video_output_queue;
+ ThreadSafeQueuePriority *audio_output_queue;
+ // Reader (mov_encoder_thread.cc) semaphore
sem_t *read_sem;
};
diff --git a/src/mov_encoder_thread.cc b/src/mov_encoder_thread.cc
index dab308d..7c7f5d9 100644
--- a/src/mov_encoder_thread.cc
+++ b/src/mov_encoder_thread.cc
@@ -34,17 +34,19 @@ MovEncoderThread::MovEncoderThread(const char *cpr, Info *i)
info = i;
info->info("MovEncoderThread");
- // Queues
- inputqueue = new FrameVectorQueue();
- video_outputqueue = new FramePriorityQueue();
- audio_inputqueue = new FramePriorityQueue();
- audio_outputqueue = new FramePriorityQueue();
-
- // Queue mutexes
- pthread_mutex_init (&input_mutex, NULL);
- pthread_mutex_init (&video_output_mutex, NULL);
- pthread_mutex_init (&audio_input_mutex, NULL);
- pthread_mutex_init (&audio_output_mutex, NULL);
+ // Queue
+ inputqueue = new ThreadSafeQueueFIFO<FrameVector*>();
+
+ // Initialize read semaphore
+ sem_init(&read_sem, 0, 0);
+
+ video_output_queue = new ThreadSafeQueuePriority(info);
+ audio_input_queue = new ThreadSafeQueuePriority(info);
+ audio_output_queue = new ThreadSafeQueuePriority(info);
+
+ info->info("video_output_queue: 0x%x", video_output_queue);
+ info->info("audio_input_queue: 0x%x", audio_input_queue);
+ info->info("audio_output_queue: 0x%x", audio_output_queue);
block = new FrameVector();
@@ -53,13 +55,6 @@ MovEncoderThread::MovEncoderThread(const char *cpr, Info *i)
info->info("Frame sequence length %d", num_frames_in_block);
threads = config->readInt("encoding_threads");
-
- // Thread stuff
- sem_init(&in_sem, 0, 0);
- sem_init(&video_out_sem, 0, 0);
- sem_init(&audio_in_sem, 0, 0);
- sem_init(&audio_out_sem, 0, 0);
- sem_init(&read_sem, 0, 0);
movencodersrunning = true;
@@ -67,26 +62,25 @@ MovEncoderThread::MovEncoderThread(const char *cpr, Info *i)
// Create the video encoders
for(int cnt = 0; cnt < threads; cnt++) {
- MovEncoder *movenc =
- new MovEncoder(&movencodersrunning, &read_sem,
- inputqueue, &in_sem, &input_mutex,
- video_outputqueue, &video_output_mutex, &video_out_sem,
- audio_inputqueue, &audio_input_mutex, &audio_in_sem,
- info);
+ MovEncoder *movenc = new MovEncoder(&movencodersrunning, &read_sem,
+ inputqueue,
+ video_output_queue,
+ audio_input_queue,
+ info);
movenc->run();
encs.push_back(movenc);
}
// Create the audio encoder
- audioenc = new AudioEncoder(audio_inputqueue, &audio_input_mutex, &audio_in_sem,
- audio_outputqueue, &audio_output_mutex, &audio_out_sem,
+ audioenc = new AudioEncoder(audio_input_queue,
+ audio_output_queue,
info);
audioenc->run();
// Create the multiplexer
writer = new MovEncoderWriter(cpr,
- video_outputqueue, &video_output_mutex, &video_out_sem,
- audio_outputqueue, &audio_output_mutex, &audio_out_sem,
+ video_output_queue,
+ audio_output_queue,
info);
writer->run();
@@ -99,7 +93,7 @@ MovEncoderThread::~MovEncoderThread()
info->info("~MovEncoderThread");
// First we destroy the movie encoders
- for(int cnt = 0; cnt < threads; cnt++) sem_post(&in_sem); // Kick them
+ // for(int cnt = 0; cnt < threads; cnt++) sem_post(&in_sem); // Kick them
for(int cnt = 0; cnt < threads; cnt++) {
encs[cnt]->wait_stop(); // Wait for it to stop
delete encs[cnt]; // Delete it
@@ -108,7 +102,6 @@ MovEncoderThread::~MovEncoderThread()
info->info("Deleted the movie encoders");
// Then we destroy the audio encoder
- sem_post(&audio_in_sem); // Kick it
audioenc->wait_stop(); // Wait for it to stop.
delete audioenc; // delete the audio encoder
@@ -116,18 +109,15 @@ MovEncoderThread::~MovEncoderThread()
// Finally we destroy the writer.
writer->running = false;
- sem_post(&video_out_sem); // Kick it to make it stop.
- sem_post(&audio_out_sem); // Kick it to make it stop.
+
+ // FIXME: Post writer
writer->wait_stop(); // Wait for it to stop.
delete writer; // delete the writer (end thereby close the file)
info->info("Deleted the writer");
// Destroy the semaphores.
- sem_destroy(&in_sem);
- sem_destroy(&video_out_sem);
- sem_destroy(&audio_in_sem);
- sem_destroy(&audio_out_sem);
+ // sem_destroy(&in_sem);
sem_destroy(&read_sem);
info->info("~MovEncoderThread::done");
@@ -139,12 +129,11 @@ void MovEncoderThread::encode(Frame* frame)
if(output % 250 == 0) // 25 * 24
info->info("inputqueue: %d\tvideo_outputqueue: %d\taudio_inputqueue: %d\taudio_outputqueue: %d.",
inputqueue->size(),
- video_outputqueue->size(),
- audio_inputqueue->size(),
- audio_outputqueue->size());
+ video_output_queue->size(),
+ audio_input_queue->size(),
+ audio_output_queue->size());
output++;
-
if(frame == NULL) {
info->info("MovEncoderThread::encode - NULL frame detected.");
// Terminate
@@ -159,14 +148,7 @@ void MovEncoderThread::encode(Frame* frame)
// Wait until a free encoder.
sem_wait(&read_sem);
- // Lock input mutex
- pthread_mutex_lock(&input_mutex);
inputqueue->push(block);
- pthread_mutex_unlock(&input_mutex);
- // Unlock input mutex
-
- // Kick encoders
- sem_post(&in_sem);
// Start new block
block = new FrameVector;
diff --git a/src/mov_encoder_thread.h b/src/mov_encoder_thread.h
index e3fba27..8cc24f8 100644
--- a/src/mov_encoder_thread.h
+++ b/src/mov_encoder_thread.h
@@ -36,6 +36,11 @@
#include <vector>
using namespace std;
+#include "frame.h"
+
+#include "threadsafe_queue_priority.h"
+#include "threadsafe_queue_fifo.h"
+
#include "mov_encoder.h"
#include "audio_encoder.h"
#include "mov_encoder_writer.h"
@@ -52,24 +57,16 @@ public:
private:
Info *info;
- FrameVectorQueue *inputqueue;
- FramePriorityQueue *video_outputqueue;
- FramePriorityQueue *audio_inputqueue;
- FramePriorityQueue *audio_outputqueue;
+ // FrameVectorQueue *inputqueue;
+ ThreadSafeQueueFIFO< FrameVector* > *inputqueue;
FrameVector *block;
//thread stuff
- sem_t in_sem;
- sem_t video_out_sem;
- sem_t audio_in_sem;
- sem_t audio_out_sem;
-
sem_t read_sem;
- pthread_mutex_t input_mutex;
- pthread_mutex_t video_output_mutex;
- pthread_mutex_t audio_input_mutex;
- pthread_mutex_t audio_output_mutex;
+ ThreadSafeQueuePriority *video_output_queue;
+ ThreadSafeQueuePriority *audio_input_queue;
+ ThreadSafeQueuePriority *audio_output_queue;
volatile bool movencodersrunning;
@@ -79,8 +76,6 @@ private:
unsigned int num_frames_in_block;
MovEncoderWriter *writer;
- // pthread_t* writer_tid;
-
AudioEncoder* audioenc;
int threads;
diff --git a/src/mov_encoder_writer.cc b/src/mov_encoder_writer.cc
index 717998a..732f9ba 100644
--- a/src/mov_encoder_writer.cc
+++ b/src/mov_encoder_writer.cc
@@ -47,8 +47,8 @@ using namespace std;
#include "multiplexer.h"
MovEncoderWriter::MovEncoderWriter(const char* cpr,
- FramePriorityQueue *v_q, pthread_mutex_t *v_m, sem_t *v_s,
- FramePriorityQueue *a_q, pthread_mutex_t *a_m, sem_t *a_s,
+ ThreadSafeQueuePriority *video_q,
+ ThreadSafeQueuePriority *audio_q,
Info *i)
{
info = i;
@@ -74,23 +74,15 @@ MovEncoderWriter::MovEncoderWriter(const char* cpr,
ltime = localtime(&t);
sprintf(date, "%.4d%.2d%.2d",
ltime->tm_year + 1900,
- ltime->tm_mon,
+ ltime->tm_mon + 1, // Ranging from 0 to 11
ltime->tm_mday);
sprintf(fname, "%s/%s/%s/%s-%s-", server_root->c_str(), birthmonth, cpr, cpr, date);
file = new File(fname, "mpg", info);
- video_queue = v_q;
- video_sem = v_s;
- video_mutex = v_m;
-
- audio_queue = a_q;
- audio_sem = a_s;
- audio_mutex = a_m;
-
- video_frame_number = 0;
- audio_frame_number = 0;
+ video_queue = video_q;
+ audio_queue = audio_q;
running = true;
}
@@ -107,8 +99,8 @@ void MovEncoderWriter::thread_main()
info->info("MovEncoderWriter::run");
Multiplexer multiplexer(file, info, &running,
- video_queue, video_mutex, video_sem,
- audio_queue, audio_mutex, audio_sem);
+ video_queue,
+ audio_queue);
multiplexer.multiplex();
info->info("MovEncoderWriter::stop");
diff --git a/src/mov_encoder_writer.h b/src/mov_encoder_writer.h
index e653223..3146bf8 100644
--- a/src/mov_encoder_writer.h
+++ b/src/mov_encoder_writer.h
@@ -33,6 +33,8 @@
#include "file.h"
#include "info.h"
+#include "threadsafe_queue_priority.h"
+
#include <string>
using namespace std;
@@ -42,8 +44,8 @@ using namespace std;
class MovEncoderWriter : public Thread {
public:
MovEncoderWriter(const char* cpr,
- FramePriorityQueue *video_queue, pthread_mutex_t *video_mutex, sem_t *video_sem,
- FramePriorityQueue *audio_queue, pthread_mutex_t *audio_mutex, sem_t *audio_sem,
+ ThreadSafeQueuePriority *video_queue,
+ ThreadSafeQueuePriority *audio_queue,
Info *info);
~MovEncoderWriter();
@@ -56,23 +58,8 @@ private:
File *file;
- FramePriorityQueue *video_queue;
- FramePriorityQueue *audio_queue;
- pthread_mutex_t *video_mutex;
- pthread_mutex_t *audio_mutex;
- sem_t *video_sem;
- sem_t *audio_sem;
-
- unsigned int video_frame_number;
- unsigned int audio_frame_number;
-
- // Timecode_struc SCR;
- // double timestamp;
-
-// void write_audio_header(unsigned short int packetsize);
-// void write_video_header(unsigned short int packetsize);
-// void write_system_header(unsigned int audio_size, unsigned int video_size);
-// void write_packet_header(unsigned int audio_size, unsigned int video_size);
+ ThreadSafeQueuePriority *video_queue;
+ ThreadSafeQueuePriority *audio_queue;
};
diff --git a/src/multicast.cc b/src/multicast.cc
new file mode 100644
index 0000000..0cf1b87
--- /dev/null
+++ b/src/multicast.cc
@@ -0,0 +1,109 @@
+/* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
+/***************************************************************************
+ * multicast.cc
+ *
+ * Mon Sep 26 12:25:22 CEST 2005
+ * Copyright 2005 Bent Bisballe Nyeng
+ * deva@aasimon.org
+ ****************************************************************************/
+
+/*
+ * This file is part of MIaV.
+ *
+ * MIaV is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * MIaV is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with MIaV; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
+ */
+#include "config.h"
+#include "multicast.h"
+
+#include <netinet/in.h>
+#include <netdb.h>
+#include <sys/socket.h>
+#include <sys/param.h>
+#include <arpa/inet.h>
+
+Multicast::Multicast(Info *i)
+{
+ char addr[] = "192.168.0.10";
+ int port = 666;
+
+ info = i;
+ if(!UDPOpen(addr, port)) info->error("Error creating socket %s:%d", addr, port);
+}
+
+Multicast::~Multicast()
+{
+}
+
+void Multicast::Write(char* buf, int size)
+{
+ if(write(sock, buf, size) != size) info->error("Error Writing to socket.");
+}
+
+
+/*
+ * open UDP socket
+ */
+bool Multicast::UDPOpen(char * address, int port)
+{
+ int enable = 1L;
+ struct sockaddr_in stAddr;
+ struct sockaddr_in stLclAddr;
+ struct hostent * host;
+ // int sock;
+
+ stAddr.sin_family = AF_INET;
+ stAddr.sin_port = htons(port);
+ if((host = gethostbyname(address)) == NULL) return false;
+ stAddr.sin_addr = *((struct in_addr *) host->h_addr_list[0]);
+
+ /* Create a UDP socket */
+ if((sock = socket(AF_INET, SOCK_DGRAM, 0)) < 0)
+ return false;
+
+ /* Allow multiple instance of the client to share the same address and port */ if(setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (char *) &enable, sizeof(unsigned long int)) < 0) return false;
+
+#ifdef USE_MULTICAST
+ /* If the address is multicast, register to the multicast group */
+ if(is_address_multicast(stAddr.sin_addr.s_addr))
+ {
+ struct ip_mreq stMreq;
+
+ /* Bind the socket to port */
+ stLclAddr.sin_family = AF_INET;
+ stLclAddr.sin_addr.s_addr = htonl(INADDR_ANY);
+ stLclAddr.sin_port = stAddr.sin_port;
+ if(bind(sock, (struct sockaddr*) & stLclAddr, sizeof(stLclAddr)) < 0) return false;
+
+ /* Register to a multicast address */
+ stMreq.imr_multiaddr.s_addr = stAddr.sin_addr.s_addr;
+ stMreq.imr_interface.s_addr = INADDR_ANY;
+ if(setsockopt(sock, IPPROTO_IP, IP_ADD_MEMBERSHIP, (char *) & stMreq, sizeof(stMreq)) < 0)
+ return false;
+ }
+ else
+#endif
+ {
+ /* Bind the socket to port */
+ stLclAddr.sin_family = AF_INET;
+ stLclAddr.sin_addr.s_addr = htonl(INADDR_ANY);
+ stLclAddr.sin_port = htons(0);
+ if(bind(sock, (struct sockaddr*) & stLclAddr, sizeof(stLclAddr)) < 0)
+ return false;
+ }
+
+ connect(sock, (struct sockaddr*) & stAddr, sizeof(stAddr));
+
+ return true;
+}
diff --git a/src/multicast.h b/src/multicast.h
new file mode 100644
index 0000000..f0a4979
--- /dev/null
+++ b/src/multicast.h
@@ -0,0 +1,47 @@
+/* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
+/***************************************************************************
+ * multicast.h
+ *
+ * Mon Sep 26 12:25:22 CEST 2005
+ * Copyright 2005 Bent Bisballe Nyeng
+ * deva@aasimon.org
+ ****************************************************************************/
+
+/*
+ * This file is part of MIaV.
+ *
+ * MIaV is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * MIaV is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with MIaV; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
+ */
+#include "config.h"
+#ifndef __MIAV_MULTICAST_H__
+#define __MIAV_MULTICAST_H__
+
+#include "info.h"
+
+class Multicast {
+public:
+ Multicast(Info *info);
+ ~Multicast();
+
+ void Write(char* buf, int size);
+
+private:
+ Info *info;
+
+ bool UDPOpen(char * address, int port);
+ int sock;
+};
+
+#endif/*__MIAV_MULTICAST_H__*/
diff --git a/src/multiplexer.cc b/src/multiplexer.cc
index d2aecfc..0b54bf8 100644
--- a/src/multiplexer.cc
+++ b/src/multiplexer.cc
@@ -68,37 +68,27 @@ static double picture_rate_index[16] = {
RESERVED, RESERVED, RESERVED, RESERVED, RESERVED, RESERVED, RESERVED
};
*/
-
Multiplexer::Multiplexer(File *f, Info *i, volatile bool *r,
- FramePriorityQueue *v_q, pthread_mutex_t *v_m, sem_t *v_s,
- FramePriorityQueue *a_q, pthread_mutex_t *a_m, sem_t *a_s)
+ ThreadSafeQueuePriority *video_q,
+ ThreadSafeQueuePriority *audio_q)
{
running = r;
file = f;
info = i;
- queue[TYPE_VIDEO] = v_q;
- queue[TYPE_AUDIO] = a_q;
-
- sem[TYPE_VIDEO] = v_s;
- sem[TYPE_AUDIO] = a_s;
-
- mutex[TYPE_VIDEO] = v_m;
- mutex[TYPE_AUDIO] = a_m;
-
frame[TYPE_VIDEO] = NULL;
- frame[TYPE_AUDIO] = NULL;
+ written[TYPE_VIDEO] = 0.0;
- frame_number[TYPE_VIDEO] = 0;
- frame_number[TYPE_AUDIO] = 0;
+ frame[TYPE_AUDIO] = NULL;
+ written[TYPE_AUDIO] = 0.0;
- write_system_header = 0;
write_audio_packet = 0;
+ write_system_header = 0;
audio_header_read = false;
- written[TYPE_VIDEO] = 0.0;
- written[TYPE_AUDIO] = 0.0;
+ queue[TYPE_VIDEO] = video_q;
+ queue[TYPE_AUDIO] = audio_q;
SCR = 3904;//0x40010003LL;//0x1E80;
@@ -111,29 +101,11 @@ Multiplexer::~Multiplexer()
Frame *Multiplexer::getFrame(StreamType type)
{
- Frame *tmpframe;
- Frame *frame = NULL;
-
- sem_wait(sem[type]);
-
- while( frame == NULL ) {
- // Lock output mutex
- pthread_mutex_lock( mutex[type] );
- tmpframe = queue[type]->top();
-
- if(tmpframe && tmpframe->number == frame_number[type] ) {
- queue[type]->pop();
- frame = tmpframe;
- frame_number[type]++;
- read[type] = 0;
- }
-
- pthread_mutex_unlock( mutex[type] );
- // Unlock output mutex
+ info->info("Get %s Frame", type==TYPE_AUDIO?"Audio\0":"Video\0");
+
+ read[type] = 0;
- sleep_0_2_frame();
- }
- return frame;
+ return queue[type]->pop();
}
int Multiplexer::read_stream(char *buf, unsigned int size, StreamType type)
diff --git a/src/multiplexer.h b/src/multiplexer.h
index 8d67766..2604ddc 100644
--- a/src/multiplexer.h
+++ b/src/multiplexer.h
@@ -36,6 +36,8 @@
#include "info.h"
#include "frame.h"
+#include "threadsafe_queue_priority.h"
+
/**
* Multiplexer configuration
*/
@@ -67,8 +69,8 @@ typedef enum {
class Multiplexer {
public:
Multiplexer(File *file, Info *info, volatile bool *running,
- FramePriorityQueue *v_q, pthread_mutex_t *v_m, sem_t *v_s,
- FramePriorityQueue *a_q, pthread_mutex_t *a_m, sem_t *a_s);
+ ThreadSafeQueuePriority *video_queue,
+ ThreadSafeQueuePriority *audio_queue);
~Multiplexer();
void multiplex();
@@ -102,10 +104,6 @@ private:
Frame *getFrame(StreamType type);
int read_stream(char *buf, unsigned int size, StreamType type);
- FramePriorityQueue *queue[NUM_TYPES];
- pthread_mutex_t *mutex[NUM_TYPES];
- sem_t *sem[NUM_TYPES];
-
Frame *frame[NUM_TYPES];
unsigned int frame_number[NUM_TYPES];
unsigned int read[NUM_TYPES];
@@ -115,7 +113,9 @@ private:
volatile bool *running;
// Audio Header
- bool audio_header_read;
+ bool audio_header_read;
+
+ ThreadSafeQueuePriority *queue[NUM_TYPES];
};
#endif/*__MIAV_MULTIPLEXER_H__*/
diff --git a/src/threadsafe_queue.cc b/src/threadsafe_queue.cc
new file mode 100644
index 0000000..89f2d6a
--- /dev/null
+++ b/src/threadsafe_queue.cc
@@ -0,0 +1,44 @@
+/* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
+/***************************************************************************
+ * threadsafe_queue.cc
+ *
+ * Tue Sep 27 14:43:45 CEST 2005
+ * Copyright 2005 Bent Bisballe Nyeng
+ * deva@aasimon.org
+ ****************************************************************************/
+
+/*
+ * This file is part of MIaV.
+ *
+ * MIaV is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * MIaV is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with MIaV; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
+ */
+#include "config.h"
+#include "threadsafe_queue.h"
+/*
+template <typename T>
+ThreadSafeQueue<T>::ThreadSafeQueue()
+{
+ pthread_mutex_init (&mutex, NULL);
+ sem_init(&semaphore, 0, 0);
+}
+
+template <typename T>
+ThreadSafeQueue<T>::~ThreadSafeQueue()
+{
+ pthread_mutex_destroy(&mutex);
+ sem_destroy(&semaphore);
+}
+
+*/
diff --git a/src/threadsafe_queue.h b/src/threadsafe_queue.h
new file mode 100644
index 0000000..616e81e
--- /dev/null
+++ b/src/threadsafe_queue.h
@@ -0,0 +1,56 @@
+/* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
+/***************************************************************************
+ * threadsafe_queue.h
+ *
+ * Tue Sep 27 14:01:01 CEST 2005
+ * Copyright 2005 Bent Bisballe Nyeng
+ * deva@aasimon.org
+ ****************************************************************************/
+
+/*
+ * This file is part of MIaV.
+ *
+ * MIaV is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * MIaV is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with MIaV; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
+ */
+#include "config.h"
+#ifndef __MIAV_THREADSAFE_QUEUE_H__
+#define __MIAV_THREADSAFE_QUEUE_H__
+
+#include <pthread.h>
+#include <semaphore.h>
+
+template <typename T>
+class ThreadSafeQueue {
+public:
+ ThreadSafeQueue() {
+ pthread_mutex_init (&mutex, NULL);
+ sem_init(&semaphore, 0, 0);
+ }
+
+ virtual ~ThreadSafeQueue() {
+ pthread_mutex_destroy(&mutex);
+ sem_destroy(&semaphore);
+ }
+
+ virtual void push(T t) = 0;
+ virtual T pop() = 0;
+ virtual int size() = 0;
+
+ //protected:
+ pthread_mutex_t mutex;
+ sem_t semaphore;
+};
+
+#endif/*__MIAV_THREADSAFE_QUEUE_H__*/
diff --git a/src/threadsafe_queue_fifo.cc b/src/threadsafe_queue_fifo.cc
new file mode 100644
index 0000000..633cb58
--- /dev/null
+++ b/src/threadsafe_queue_fifo.cc
@@ -0,0 +1,83 @@
+/* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
+/***************************************************************************
+ * threadsafe_queue_fifo.cc
+ *
+ * Tue Sep 27 14:01:10 CEST 2005
+ * Copyright 2005 Bent Bisballe Nyeng
+ * deva@aasimon.org
+ ****************************************************************************/
+
+/*
+ * This file is part of MIaV.
+ *
+ * MIaV is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * MIaV is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with MIaV; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
+ */
+#include "config.h"
+#include "threadsafe_queue_fifo.h"
+/*
+template <typename T>
+ThreadSafeQueueFIFO<T>::ThreadSafeQueueFIFO()
+ // : ThreadSafeQueue<T>()
+{
+}
+
+template <typename T>
+ThreadSafeQueueFIFO<T>::~ThreadSafeQueueFIFO()
+{
+}
+
+template <typename T>
+void ThreadSafeQueueFIFO<T>::push(T t)
+{
+ // Lock mutex
+ pthread_mutex_lock( &mutex );
+ queue.push(t);
+ pthread_mutex_unlock( &mutex );
+ // Unlock mutex
+
+ sem_post(&semaphore);
+}
+
+template <typename T>
+T ThreadSafeQueueFIFO<T>::pop()
+{
+ sem_wait(&semaphore);
+
+ T t;
+
+ // Lock mutex
+ pthread_mutex_lock( &mutex );
+ t = queue.front();
+ queue.pop();
+ pthread_mutex_unlock( &mutex );
+ // Unlock mutex
+
+ return t;
+}
+
+template <typename T>
+int ThreadSafeQueueFIFO<T>::size()
+{
+ int sz;
+
+ // Lock mutex
+ pthread_mutex_lock( &mutex );
+ sz = queue.size();
+ pthread_mutex_unlock( &mutex );
+ // Unlock mutex
+
+ return sz;
+}
+*/
diff --git a/src/threadsafe_queue_fifo.h b/src/threadsafe_queue_fifo.h
new file mode 100644
index 0000000..84b2fbb
--- /dev/null
+++ b/src/threadsafe_queue_fifo.h
@@ -0,0 +1,82 @@
+/* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
+/***************************************************************************
+ * threadsafe_queue_fifo.h
+ *
+ * Tue Sep 27 14:01:10 CEST 2005
+ * Copyright 2005 Bent Bisballe Nyeng
+ * deva@aasimon.org
+ ****************************************************************************/
+
+/*
+ * This file is part of MIaV.
+ *
+ * MIaV is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * MIaV is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with MIaV; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
+ */
+#include "config.h"
+#ifndef __MIAV_THREADSAFE_QUEUE_FIFO_H__
+#define __MIAV_THREADSAFE_QUEUE_FIFO_H__
+
+#include "threadsafe_queue.h"
+#include <queue>
+
+template <typename T>
+class ThreadSafeQueueFIFO: public ThreadSafeQueue<T> {
+public:
+ ThreadSafeQueueFIFO() {}
+ ~ThreadSafeQueueFIFO() {}
+
+ void push(T t) {
+ // Lock mutex
+ pthread_mutex_lock( &mutex );
+ queue.push(t);
+ pthread_mutex_unlock( &mutex );
+ // Unlock mutex
+
+ sem_post(&semaphore);
+ }
+
+ T pop() {
+ sem_wait(&semaphore);
+ T t;
+
+ // Lock mutex
+ pthread_mutex_lock( &mutex );
+ t = queue.front();
+ queue.pop();
+ pthread_mutex_unlock( &mutex );
+ // Unlock mutex
+
+ return t;
+ }
+
+ int size() {
+ int sz;
+
+ // Lock mutex
+ pthread_mutex_lock( &mutex );
+ sz = queue.size();
+ pthread_mutex_unlock( &mutex );
+ // Unlock mutex
+
+ return sz;
+ }
+
+private:
+ std::queue<T> queue;
+};
+
+
+
+#endif/*__MIAV_THREADSAFE_QUEUE_FIFO_H__*/
diff --git a/src/threadsafe_queue_priority.cc b/src/threadsafe_queue_priority.cc
new file mode 100644
index 0000000..130b0f5
--- /dev/null
+++ b/src/threadsafe_queue_priority.cc
@@ -0,0 +1,93 @@
+/* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
+/***************************************************************************
+ * threadsafe_queue_priority.cc
+ *
+ * Tue Sep 27 14:01:24 CEST 2005
+ * Copyright 2005 Bent Bisballe Nyeng
+ * deva@aasimon.org
+ ****************************************************************************/
+
+/*
+ * This file is part of MIaV.
+ *
+ * MIaV is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * MIaV is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with MIaV; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
+ */
+#include "config.h"
+#include "threadsafe_queue_priority.h"
+
+#include "util.h"
+
+ThreadSafeQueuePriority::ThreadSafeQueuePriority(Info* i, unsigned int number)
+ // : ThreadSafeQueue< Frame* >()
+{
+ info = i;
+ framenumber = number;
+}
+
+ThreadSafeQueuePriority::~ThreadSafeQueuePriority()
+{
+}
+
+void ThreadSafeQueuePriority::push(Frame *frame)
+{
+ // Lock mutex
+ pthread_mutex_lock( &mutex );
+ queue.push(frame);
+ pthread_mutex_unlock( &mutex );
+ // Unlock mutex
+
+ sem_post(&semaphore);
+}
+
+Frame *ThreadSafeQueuePriority::pop()
+{
+ sem_wait(&semaphore);
+
+ Frame *tmpframe = NULL;
+ Frame *frame = NULL;
+
+ while( frame == NULL ) {
+ // Lock mutex
+ pthread_mutex_lock( &mutex );
+
+ tmpframe = queue.top();
+
+ if(tmpframe && tmpframe->number == framenumber ) {
+ queue.pop();
+ frame = tmpframe;
+ framenumber++;
+ }
+
+ pthread_mutex_unlock( &mutex );
+ // Unlock mutex
+
+ if(frame == NULL) sleep_0_2_frame();
+ }
+
+ return frame;
+}
+
+int ThreadSafeQueuePriority::size()
+{
+ int sz;
+
+ // Lock mutex
+ pthread_mutex_lock( &mutex );
+ sz = queue.size();
+ pthread_mutex_unlock( &mutex );
+ // Unlock mutex
+
+ return sz;
+}
diff --git a/src/threadsafe_queue_priority.h b/src/threadsafe_queue_priority.h
new file mode 100644
index 0000000..8d3cdf1
--- /dev/null
+++ b/src/threadsafe_queue_priority.h
@@ -0,0 +1,64 @@
+/* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
+/***************************************************************************
+ * threadsafe_queue_priority.h
+ *
+ * Tue Sep 27 14:01:24 CEST 2005
+ * Copyright 2005 Bent Bisballe Nyeng
+ * deva@aasimon.org
+ ****************************************************************************/
+
+/*
+ * This file is part of MIaV.
+ *
+ * MIaV is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * MIaV is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with MIaV; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
+ */
+#include "config.h"
+#ifndef __MIAV_THREADSAFE_QUEUE_PRIORITY_H__
+#define __MIAV_THREADSAFE_QUEUE_PRIORITY_H__
+
+#include "threadsafe_queue.h"
+
+#include "frame.h"
+
+#include <queue>
+#include <functional>
+
+#include "info.h"
+
+// Method for use, when comparing Frames in priority queue.
+template <typename T>
+struct priority : std::binary_function<T, T, bool> {
+ bool operator() (const T& a, const T& b) const {
+ return ((Frame*)a)->number > ((Frame*)b)->number;
+ }
+};
+
+class ThreadSafeQueuePriority: public ThreadSafeQueue< Frame* > {
+public:
+ ThreadSafeQueuePriority(Info *info, unsigned int framenumber = 0);
+ ~ThreadSafeQueuePriority();
+
+ void push(Frame *frame);
+ Frame *pop();
+ int size();
+
+private:
+ Info* info;
+
+ unsigned int framenumber;
+ std::priority_queue< Frame*, std::vector<Frame*>, priority<Frame*> > queue;
+};
+
+#endif/*__MIAV_THREADSAFE_QUEUE_PRIORITY_H__*/