summaryrefslogtreecommitdiff
path: root/src/mov_encoder_thread.cc
diff options
context:
space:
mode:
authordeva <deva>2005-05-22 15:49:22 +0000
committerdeva <deva>2005-05-22 15:49:22 +0000
commitb04122f3f7884de08eb5d59bb3fd2bff829f9039 (patch)
tree6898a061f4e10a6026faa1ffe4a7a319256dacdf /src/mov_encoder_thread.cc
parentd74c7a00c417cffdc93a82efa2841e23d823bea6 (diff)
Added multithreaded encoding support.
Diffstat (limited to 'src/mov_encoder_thread.cc')
-rw-r--r--src/mov_encoder_thread.cc176
1 files changed, 91 insertions, 85 deletions
diff --git a/src/mov_encoder_thread.cc b/src/mov_encoder_thread.cc
index ed71a31..d95961d 100644
--- a/src/mov_encoder_thread.cc
+++ b/src/mov_encoder_thread.cc
@@ -31,6 +31,9 @@
/*
* $Log$
+ * Revision 1.7 2005/05/22 15:49:22 deva
+ * Added multithreaded encoding support.
+ *
* Revision 1.6 2005/05/19 14:10:22 deva
*
* Multithreading rulez?
@@ -58,120 +61,123 @@
MovEncoderThread::MovEncoderThread(const char *filename)
{
- file = open(filename,
- O_CREAT | O_WRONLY, //| O_LARGEFILE
- S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
- if(file == -1) {
- fprintf(stderr, "Could not open file for writing: %s\n", strerror(errno));
- return;
- }
+ outputqueue = new FramePriorityQueue();
+ inputqueue = new FrameVectorQueue();
+ block = new FrameVector();
+
+ num_frames_in_block = config->readString("frame_sequence")->length();
+ fprintf(stderr, "Frame sequence length [%d]\n", num_frames_in_block); fflush(stderr);
threads = config->readInt("encoding_threads");
+ // Thread stuff
+ sem_init(&in_sem, 0, 0);
+ sem_init(&out_sem, 0, 0);
+ sem_init(&read_sem, 0, 0);
+
+ for(int cnt = 0; cnt < threads; cnt++) sem_post(&read_sem);
+
+ pthread_mutex_init (&input_mutex, NULL);
+ pthread_mutex_init (&output_mutex, NULL);
+
+ writer = new MovEncoderWriter(filename, outputqueue, &out_sem, &output_mutex);
+ writer_tid = new pthread_t;
+ pthread_create (writer_tid, NULL, thread_run, writer);
+
for(int cnt = 0; cnt < threads; cnt++) {
- encs.push_back(new MovEncoder());
+ encs.push_back(new MovEncoder(&read_sem,
+ inputqueue, &in_sem, &input_mutex,
+ outputqueue, &out_sem, &output_mutex));
tids.push_back(new pthread_t);
pthread_create (tids[cnt], NULL, thread_run, encs[cnt]);
}
- current_encoder = 0;
current_frame = 0;
-
- num_frames_in_block = config->readString("frame_sequence")->length();
- fprintf(stderr, "Frame sequence length [%d]\n", num_frames_in_block); fflush(stderr);
-
- inputqueue = new Queue<Frame>();
+ frame_number = 0;
}
MovEncoderThread::~MovEncoderThread()
{
-
- fprintf(stderr, "Clear - encode last[%d]\n", current_encoder); fflush(stderr);
-
- // Push any hanging frames.
- encs[current_encoder]->encodeSequence(inputqueue);
-
- fprintf(stderr, "Clear - Readback\n"); fflush(stderr);
-
- /*
- * Readback mode
- */
- for(int cnt = 0; cnt <= current_encoder; cnt++) {
-
- // fprintf(stderr, "pop[%d]-", cnt); fflush(stderr);
-
- Queue<Frame> *outputqueue = encs[cnt]->getResultSequence();
- Frame *f;
- while((f = outputqueue->pop())) {
- int i = write(file, f->data, f->size);
- if(i == -1) perror("Write failed");
-
- // fprintf(stderr, "wrote[%d]-", i); fflush(stderr);
-
- delete f;
- }
- }
-
- fprintf(stderr, "Clear - join threads\n"); fflush(stderr);
+ // These should not be deleted here... its done elsewhere.
+ inputqueue = NULL;
+ sem_post(&out_sem);
+
+ // Stop the encoding threads.
for(int cnt = 0; cnt < threads; cnt++) {
encs[cnt]->running = false;
- encs[cnt]->encodeSequence(NULL);
+ }
+
+ // Kick them to initiate the exit.
+ for(int cnt = 0; cnt < threads; cnt++) {
+ sem_post(&in_sem);
+ }
+
+ // They should be exited now, so we can delete them.
+ for(int cnt = 0; cnt < threads; cnt++) {
pthread_join(*tids[cnt], NULL);
delete encs[cnt];
+ delete tids[cnt];
}
+
+ // Tell the writer to stop
+ writer->running = false;
- fprintf(stderr, "Clear - close file\n"); fflush(stderr);
+ // Kick it to make it stop.
+ sem_post(&out_sem);
- if(file != -1) close(file);
+ // Destroy the thread
+ pthread_join(*writer_tid, NULL);
+ delete writer_tid;
- fprintf(stderr, "Clear - done\n"); fflush(stderr);
+ // delete the writer (end thereby close the file)
+ delete writer;
+ // Destroy the semaphores.
+ sem_destroy(&in_sem);
+ sem_destroy(&out_sem);
+ sem_destroy(&read_sem);
}
void MovEncoderThread::encode(Frame* frame)
{
- if(file == -1) return;
-
- // fprintf(stderr, "build[%d]-", current_encoder); fflush(stderr);
- inputqueue->bpush(frame);
-
- /*
- * Encode mode
- */
- // Switch frame
- current_frame++;
- if(current_frame >= num_frames_in_block) {
-
- // fprintf(stderr, "push[%d]-", current_encoder); fflush(stderr);
+ if(frame == NULL) {
+ fprintf(stderr, "NULL frame detected.\n");
+ // Terminate
+ return;
+ }
- encs[current_encoder]->encodeSequence(inputqueue);
- inputqueue = new Queue<Frame>();
+ frame->number = frame_number;
+ block->push_back(frame);
- // Switch encoder
+ // Switch frame
+ if(block->size() == num_frames_in_block) {
+ // Wait until a free encoder.
+ /*
+ int val;
+ sem_getvalue(&read_sem, &val);
+ fprintf(stderr, "Sem Value: %d\n", val); fflush(stderr);
+ */
+ sem_wait(&read_sem);
+
+ // Lock input mutex
+ pthread_mutex_lock(&input_mutex);
+ inputqueue->push(block);
+ pthread_mutex_unlock(&input_mutex);
+ // Unlock input mutex
+
+ fprintf(stderr, "Frame vector [%d-%d] pushed\n",
+ block->at(0)->number,
+ block->at(block->size() - 1)->number);
+ fflush(stderr);
+
+ // Kick encoders
+ sem_post(&in_sem);
+
+ // Start new block
current_frame = 0;
- current_encoder++;
- if(current_encoder >= threads) {
- // switch mode
- /*
- * Readback mode
- */
- for(int cnt = 0; cnt < threads; cnt++) {
-
- // fprintf(stderr, "pop[%d]-", cnt); fflush(stderr);
-
- Queue<Frame> *outputqueue = encs[cnt]->getResultSequence();
- Frame *f;
- while((f = outputqueue->pop())) {
- int i = write(file, f->data, f->size);
- if(i == -1) perror("Write failed");
-
- // fprintf(stderr, "wrote[%d]-", i); fflush(stderr);
-
- delete f;
- }
- }
- current_encoder = 0;
- }
+ block = new FrameVector;
}
+
+ frame_number ++;
}