From b04122f3f7884de08eb5d59bb3fd2bff829f9039 Mon Sep 17 00:00:00 2001 From: deva Date: Sun, 22 May 2005 15:49:22 +0000 Subject: Added multithreaded encoding support. --- src/mov_encoder_thread.cc | 176 ++++++++++++++++++++++++---------------------- 1 file changed, 91 insertions(+), 85 deletions(-) (limited to 'src/mov_encoder_thread.cc') diff --git a/src/mov_encoder_thread.cc b/src/mov_encoder_thread.cc index ed71a31..d95961d 100644 --- a/src/mov_encoder_thread.cc +++ b/src/mov_encoder_thread.cc @@ -31,6 +31,9 @@ /* * $Log$ + * Revision 1.7 2005/05/22 15:49:22 deva + * Added multithreaded encoding support. + * * Revision 1.6 2005/05/19 14:10:22 deva * * Multithreading rulez? @@ -58,120 +61,123 @@ MovEncoderThread::MovEncoderThread(const char *filename) { - file = open(filename, - O_CREAT | O_WRONLY, //| O_LARGEFILE - S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); - if(file == -1) { - fprintf(stderr, "Could not open file for writing: %s\n", strerror(errno)); - return; - } + outputqueue = new FramePriorityQueue(); + inputqueue = new FrameVectorQueue(); + block = new FrameVector(); + + num_frames_in_block = config->readString("frame_sequence")->length(); + fprintf(stderr, "Frame sequence length [%d]\n", num_frames_in_block); fflush(stderr); threads = config->readInt("encoding_threads"); + // Thread stuff + sem_init(&in_sem, 0, 0); + sem_init(&out_sem, 0, 0); + sem_init(&read_sem, 0, 0); + + for(int cnt = 0; cnt < threads; cnt++) sem_post(&read_sem); + + pthread_mutex_init (&input_mutex, NULL); + pthread_mutex_init (&output_mutex, NULL); + + writer = new MovEncoderWriter(filename, outputqueue, &out_sem, &output_mutex); + writer_tid = new pthread_t; + pthread_create (writer_tid, NULL, thread_run, writer); + for(int cnt = 0; cnt < threads; cnt++) { - encs.push_back(new MovEncoder()); + encs.push_back(new MovEncoder(&read_sem, + inputqueue, &in_sem, &input_mutex, + outputqueue, &out_sem, &output_mutex)); tids.push_back(new pthread_t); pthread_create (tids[cnt], NULL, thread_run, encs[cnt]); } - current_encoder = 0; current_frame = 0; - - num_frames_in_block = config->readString("frame_sequence")->length(); - fprintf(stderr, "Frame sequence length [%d]\n", num_frames_in_block); fflush(stderr); - - inputqueue = new Queue(); + frame_number = 0; } MovEncoderThread::~MovEncoderThread() { - - fprintf(stderr, "Clear - encode last[%d]\n", current_encoder); fflush(stderr); - - // Push any hanging frames. - encs[current_encoder]->encodeSequence(inputqueue); - - fprintf(stderr, "Clear - Readback\n"); fflush(stderr); - - /* - * Readback mode - */ - for(int cnt = 0; cnt <= current_encoder; cnt++) { - - // fprintf(stderr, "pop[%d]-", cnt); fflush(stderr); - - Queue *outputqueue = encs[cnt]->getResultSequence(); - Frame *f; - while((f = outputqueue->pop())) { - int i = write(file, f->data, f->size); - if(i == -1) perror("Write failed"); - - // fprintf(stderr, "wrote[%d]-", i); fflush(stderr); - - delete f; - } - } - - fprintf(stderr, "Clear - join threads\n"); fflush(stderr); + // These should not be deleted here... its done elsewhere. + inputqueue = NULL; + sem_post(&out_sem); + + // Stop the encoding threads. for(int cnt = 0; cnt < threads; cnt++) { encs[cnt]->running = false; - encs[cnt]->encodeSequence(NULL); + } + + // Kick them to initiate the exit. + for(int cnt = 0; cnt < threads; cnt++) { + sem_post(&in_sem); + } + + // They should be exited now, so we can delete them. + for(int cnt = 0; cnt < threads; cnt++) { pthread_join(*tids[cnt], NULL); delete encs[cnt]; + delete tids[cnt]; } + + // Tell the writer to stop + writer->running = false; - fprintf(stderr, "Clear - close file\n"); fflush(stderr); + // Kick it to make it stop. + sem_post(&out_sem); - if(file != -1) close(file); + // Destroy the thread + pthread_join(*writer_tid, NULL); + delete writer_tid; - fprintf(stderr, "Clear - done\n"); fflush(stderr); + // delete the writer (end thereby close the file) + delete writer; + // Destroy the semaphores. + sem_destroy(&in_sem); + sem_destroy(&out_sem); + sem_destroy(&read_sem); } void MovEncoderThread::encode(Frame* frame) { - if(file == -1) return; - - // fprintf(stderr, "build[%d]-", current_encoder); fflush(stderr); - inputqueue->bpush(frame); - - /* - * Encode mode - */ - // Switch frame - current_frame++; - if(current_frame >= num_frames_in_block) { - - // fprintf(stderr, "push[%d]-", current_encoder); fflush(stderr); + if(frame == NULL) { + fprintf(stderr, "NULL frame detected.\n"); + // Terminate + return; + } - encs[current_encoder]->encodeSequence(inputqueue); - inputqueue = new Queue(); + frame->number = frame_number; + block->push_back(frame); - // Switch encoder + // Switch frame + if(block->size() == num_frames_in_block) { + // Wait until a free encoder. + /* + int val; + sem_getvalue(&read_sem, &val); + fprintf(stderr, "Sem Value: %d\n", val); fflush(stderr); + */ + sem_wait(&read_sem); + + // Lock input mutex + pthread_mutex_lock(&input_mutex); + inputqueue->push(block); + pthread_mutex_unlock(&input_mutex); + // Unlock input mutex + + fprintf(stderr, "Frame vector [%d-%d] pushed\n", + block->at(0)->number, + block->at(block->size() - 1)->number); + fflush(stderr); + + // Kick encoders + sem_post(&in_sem); + + // Start new block current_frame = 0; - current_encoder++; - if(current_encoder >= threads) { - // switch mode - /* - * Readback mode - */ - for(int cnt = 0; cnt < threads; cnt++) { - - // fprintf(stderr, "pop[%d]-", cnt); fflush(stderr); - - Queue *outputqueue = encs[cnt]->getResultSequence(); - Frame *f; - while((f = outputqueue->pop())) { - int i = write(file, f->data, f->size); - if(i == -1) perror("Write failed"); - - // fprintf(stderr, "wrote[%d]-", i); fflush(stderr); - - delete f; - } - } - current_encoder = 0; - } + block = new FrameVector; } + + frame_number ++; } -- cgit v1.2.3