summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authordeva <deva>2005-05-19 14:10:22 +0000
committerdeva <deva>2005-05-19 14:10:22 +0000
commitd74c7a00c417cffdc93a82efa2841e23d823bea6 (patch)
treeea7f7b69ccbd0dc1df1ea5e05dd59cfafa194f25 /src
parenta597454b7ce1b931e3e4117e6fed509cc22517ff (diff)
Multithreading rulez?R0_2_2
Diffstat (limited to 'src')
-rw-r--r--src/mov_encoder.cc72
-rw-r--r--src/mov_encoder.h29
-rw-r--r--src/mov_encoder_thread.cc96
-rw-r--r--src/mov_encoder_thread.h7
-rw-r--r--src/queue.h49
5 files changed, 233 insertions, 20 deletions
diff --git a/src/mov_encoder.cc b/src/mov_encoder.cc
index 172051d..7bffc50 100644
--- a/src/mov_encoder.cc
+++ b/src/mov_encoder.cc
@@ -39,6 +39,10 @@
/*
* $Log$
+ * Revision 1.23 2005/05/19 14:10:22 deva
+ *
+ * Multithreading rulez?
+ *
* Revision 1.22 2005/05/17 19:16:26 deva
*
* Made new mpeg writer work, with proper file permissions.
@@ -104,8 +108,8 @@
MovEncoder::MovEncoder()
{
// FIXME: Hmmm... should this be detected somewhere?!
- static int w = 720;
- static int h = 576;
+ int w = 720;
+ int h = 576;
// Initialize yuv strucutre.
yuv.w = w;
@@ -207,6 +211,13 @@ MovEncoder::MovEncoder()
fame_init(fame_context, &fame_par, fame_buffer, FAME_BUFFER_SIZE);
+
+
+ // Thread stuff
+ sem_init(&sem, 0, 0);
+ sem_init(&done, 0, 0);
+
+ running = true;
}
MovEncoder::~MovEncoder()
@@ -314,3 +325,60 @@ void MovEncoder::encode_audio(Frame *dvframe)
{
// TODO: Do some audio stuff here sometime!
}
+
+void MovEncoder::encodeSequence(Queue<Frame> *queue)
+{
+ // set input queue
+ inputqueue = queue;
+
+ // unlock semaphore
+ sem_post(&sem);
+}
+
+
+// this runs in a thread
+void MovEncoder::run()
+{
+ fprintf(stderr, "Encoder Ready\n"); fflush(stderr);
+
+ while(running) {
+ // wait for semaphore
+ // lock semaphore
+ sem_wait(&sem);
+ if(inputqueue == NULL) continue;
+
+ fprintf(stderr, "."); fflush(stderr);
+
+ // allocate new output queue
+ outputqueue = new Queue<Frame>();
+ // while input queue.pop
+ Frame *fi, *fo;
+ while(inputqueue->length() > 0) {
+ fi = inputqueue->pop();
+
+ // encode frame from input queue
+ fo = encode(fi);
+
+ // and push result on output queue
+ outputqueue->push(fo);
+
+ // delete the input frame
+ //delete fi;
+ }
+ // delete input queue
+ delete inputqueue;
+ sem_post(&done);
+ }
+}
+
+Queue<Frame> *MovEncoder::getResultSequence()
+{
+ // wait for sempahore
+ sem_wait(&done);
+
+ // fprintf(stderr, "POP!\n"); fflush(stderr);
+
+ // return output queue
+ return outputqueue;
+}
+
diff --git a/src/mov_encoder.h b/src/mov_encoder.h
index 24525f2..13eb63b 100644
--- a/src/mov_encoder.h
+++ b/src/mov_encoder.h
@@ -36,6 +36,10 @@
/*
* $Log$
+ * Revision 1.9 2005/05/19 14:10:22 deva
+ *
+ * Multithreading rulez?
+ *
* Revision 1.8 2005/05/17 14:30:56 deva
* Added code, preparing threaded encoding.
*
@@ -70,19 +74,36 @@
#include <libdv/dv_types.h>
#include "frame.h"
-
+#include "queue.h"
#include "util.h"
+#include "thread.h"
+
// size specifies the length of the buffer.
#define FAME_BUFFER_SIZE (2*720*576*4) // FIXME: One size fits all...
-class MovEncoder {
- public:
+class MovEncoder : public Thread {
+public:
MovEncoder();
~MovEncoder();
Frame* encode(Frame *frame);
- private:
+ void run();
+
+ Queue<Frame> *getResultSequence();
+ void encodeSequence(Queue<Frame> *queue);
+
+ volatile bool running;
+
+private:
+ // Input queue
+ Queue<Frame> *inputqueue;
+ Queue<Frame> *outputqueue;
+
+ //thread stuff
+ sem_t sem;
+ sem_t done;
+
Frame *encode_video(Frame *frame);
void encode_audio(Frame *frame);
diff --git a/src/mov_encoder_thread.cc b/src/mov_encoder_thread.cc
index be86377..ed71a31 100644
--- a/src/mov_encoder_thread.cc
+++ b/src/mov_encoder_thread.cc
@@ -31,6 +31,10 @@
/*
* $Log$
+ * Revision 1.6 2005/05/19 14:10:22 deva
+ *
+ * Multithreading rulez?
+ *
* Revision 1.5 2005/05/19 10:55:49 deva
* Test for block encoding of length strlen("IPIPP").
*
@@ -61,43 +65,113 @@ MovEncoderThread::MovEncoderThread(const char *filename)
fprintf(stderr, "Could not open file for writing: %s\n", strerror(errno));
return;
}
- threads = 4;
+
+ threads = config->readInt("encoding_threads");
for(int cnt = 0; cnt < threads; cnt++) {
encs.push_back(new MovEncoder());
+ tids.push_back(new pthread_t);
+ pthread_create (tids[cnt], NULL, thread_run, encs[cnt]);
}
- int current_encoder = 0;
- int current_frame = 0;
+ current_encoder = 0;
+ current_frame = 0;
- int num_frames_in_block = config->readString("frame_sequence")->length();
+ num_frames_in_block = config->readString("frame_sequence")->length();
fprintf(stderr, "Frame sequence length [%d]\n", num_frames_in_block); fflush(stderr);
+
+ inputqueue = new Queue<Frame>();
}
MovEncoderThread::~MovEncoderThread()
{
+
+ fprintf(stderr, "Clear - encode last[%d]\n", current_encoder); fflush(stderr);
+
+ // Push any hanging frames.
+ encs[current_encoder]->encodeSequence(inputqueue);
+
+ fprintf(stderr, "Clear - Readback\n"); fflush(stderr);
+
+ /*
+ * Readback mode
+ */
+ for(int cnt = 0; cnt <= current_encoder; cnt++) {
+
+ // fprintf(stderr, "pop[%d]-", cnt); fflush(stderr);
+
+ Queue<Frame> *outputqueue = encs[cnt]->getResultSequence();
+ Frame *f;
+ while((f = outputqueue->pop())) {
+ int i = write(file, f->data, f->size);
+ if(i == -1) perror("Write failed");
+
+ // fprintf(stderr, "wrote[%d]-", i); fflush(stderr);
+
+ delete f;
+ }
+ }
+
+ fprintf(stderr, "Clear - join threads\n"); fflush(stderr);
+
for(int cnt = 0; cnt < threads; cnt++) {
+ encs[cnt]->running = false;
+ encs[cnt]->encodeSequence(NULL);
+ pthread_join(*tids[cnt], NULL);
delete encs[cnt];
}
+
+ fprintf(stderr, "Clear - close file\n"); fflush(stderr);
+
if(file != -1) close(file);
+
+ fprintf(stderr, "Clear - done\n"); fflush(stderr);
+
}
void MovEncoderThread::encode(Frame* frame)
{
if(file == -1) return;
- Frame *enc_frame = encs[current_encoder]->encode(frame);
- // fprintf(stderr, "[%d]", enc_frame->size); fflush(stderr);
- int i = write(file, enc_frame->data, enc_frame->size);
- if(i == -1) perror("Write failed");
- delete enc_frame;
-
+ // fprintf(stderr, "build[%d]-", current_encoder); fflush(stderr);
+ inputqueue->bpush(frame);
+
+ /*
+ * Encode mode
+ */
// Switch frame
current_frame++;
if(current_frame >= num_frames_in_block) {
+
+ // fprintf(stderr, "push[%d]-", current_encoder); fflush(stderr);
+
+ encs[current_encoder]->encodeSequence(inputqueue);
+ inputqueue = new Queue<Frame>();
+
// Switch encoder
current_frame = 0;
current_encoder++;
- if(current_encoder >= threads) current_encoder = 0;
+ if(current_encoder >= threads) {
+ // switch mode
+ /*
+ * Readback mode
+ */
+ for(int cnt = 0; cnt < threads; cnt++) {
+
+ // fprintf(stderr, "pop[%d]-", cnt); fflush(stderr);
+
+ Queue<Frame> *outputqueue = encs[cnt]->getResultSequence();
+ Frame *f;
+ while((f = outputqueue->pop())) {
+ int i = write(file, f->data, f->size);
+ if(i == -1) perror("Write failed");
+
+ // fprintf(stderr, "wrote[%d]-", i); fflush(stderr);
+
+ delete f;
+ }
+ }
+ current_encoder = 0;
+ }
}
}
diff --git a/src/mov_encoder_thread.h b/src/mov_encoder_thread.h
index 1e91b58..60f4c5c 100644
--- a/src/mov_encoder_thread.h
+++ b/src/mov_encoder_thread.h
@@ -31,6 +31,10 @@
/*
* $Log$
+ * Revision 1.4 2005/05/19 14:10:22 deva
+ *
+ * Multithreading rulez?
+ *
* Revision 1.3 2005/05/19 10:55:49 deva
* Test for block encoding of length strlen("IPIPP").
*
@@ -67,6 +71,8 @@ public:
void encode(Frame* frame);
private:
+ Queue<Frame> *inputqueue;
+
// Used for encoder switching
int current_encoder;
int current_frame;
@@ -76,6 +82,7 @@ private:
int threads;
int file;
vector<MovEncoder*> encs;
+ vector<pthread_t*> tids;
};
#endif/*__MIAV_MOV_ENCODER_THREAD_H__*/
diff --git a/src/queue.h b/src/queue.h
index de7b8ff..7c56e93 100644
--- a/src/queue.h
+++ b/src/queue.h
@@ -38,6 +38,10 @@
/*
* $Log$
+ * Revision 1.16 2005/05/19 14:10:22 deva
+ *
+ * Multithreading rulez?
+ *
* Revision 1.15 2005/05/16 16:00:57 deva
*
* Lots of stuff!
@@ -82,6 +86,7 @@ public:
~Queue();
void push(T *t);
+ void bpush(T *t);
T *pop();
T *peek();
@@ -127,9 +132,6 @@ Queue<T>::~Queue()
pthread_mutex_destroy(&mutex);
}
-
-
-
/**
* Push element on queue.
*/
@@ -172,6 +174,47 @@ void Queue<T>::push(T *t)
}
/**
+ * Push element on queue from the bottom.
+ */
+template<typename T>
+void Queue<T>::bpush(T *t)
+{
+ if(locked) {
+ delete t;
+ return;
+ }
+
+ pthread_mutex_lock(&mutex);
+
+ buf_t *b = (buf_t*)xmalloc(sizeof(*b));
+ b->data = (void*)t;
+
+ assert(b != NULL);
+
+ if(limit && count > 0) {
+ T* tmp = (T*)_pop();
+ delete tmp;
+ }
+
+ if(!head) {
+ head = tail = b;
+ b->next = b->prev = NULL;
+ count = 1;
+ pthread_mutex_unlock(&mutex);
+ return;
+ }
+
+ b->prev = head;
+ b->next = NULL;
+ if(head)
+ head->next = b;
+ head = b;
+ count++;
+
+ pthread_mutex_unlock(&mutex);
+}
+
+/**
* Pop element from queue.
* If queue is empty, NULL is returned.
*/