From d74c7a00c417cffdc93a82efa2841e23d823bea6 Mon Sep 17 00:00:00 2001 From: deva Date: Thu, 19 May 2005 14:10:22 +0000 Subject: Multithreading rulez? --- src/mov_encoder_thread.cc | 96 +++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 85 insertions(+), 11 deletions(-) (limited to 'src/mov_encoder_thread.cc') diff --git a/src/mov_encoder_thread.cc b/src/mov_encoder_thread.cc index be86377..ed71a31 100644 --- a/src/mov_encoder_thread.cc +++ b/src/mov_encoder_thread.cc @@ -31,6 +31,10 @@ /* * $Log$ + * Revision 1.6 2005/05/19 14:10:22 deva + * + * Multithreading rulez? + * * Revision 1.5 2005/05/19 10:55:49 deva * Test for block encoding of length strlen("IPIPP"). * @@ -61,43 +65,113 @@ MovEncoderThread::MovEncoderThread(const char *filename) fprintf(stderr, "Could not open file for writing: %s\n", strerror(errno)); return; } - threads = 4; + + threads = config->readInt("encoding_threads"); for(int cnt = 0; cnt < threads; cnt++) { encs.push_back(new MovEncoder()); + tids.push_back(new pthread_t); + pthread_create (tids[cnt], NULL, thread_run, encs[cnt]); } - int current_encoder = 0; - int current_frame = 0; + current_encoder = 0; + current_frame = 0; - int num_frames_in_block = config->readString("frame_sequence")->length(); + num_frames_in_block = config->readString("frame_sequence")->length(); fprintf(stderr, "Frame sequence length [%d]\n", num_frames_in_block); fflush(stderr); + + inputqueue = new Queue(); } MovEncoderThread::~MovEncoderThread() { + + fprintf(stderr, "Clear - encode last[%d]\n", current_encoder); fflush(stderr); + + // Push any hanging frames. + encs[current_encoder]->encodeSequence(inputqueue); + + fprintf(stderr, "Clear - Readback\n"); fflush(stderr); + + /* + * Readback mode + */ + for(int cnt = 0; cnt <= current_encoder; cnt++) { + + // fprintf(stderr, "pop[%d]-", cnt); fflush(stderr); + + Queue *outputqueue = encs[cnt]->getResultSequence(); + Frame *f; + while((f = outputqueue->pop())) { + int i = write(file, f->data, f->size); + if(i == -1) perror("Write failed"); + + // fprintf(stderr, "wrote[%d]-", i); fflush(stderr); + + delete f; + } + } + + fprintf(stderr, "Clear - join threads\n"); fflush(stderr); + for(int cnt = 0; cnt < threads; cnt++) { + encs[cnt]->running = false; + encs[cnt]->encodeSequence(NULL); + pthread_join(*tids[cnt], NULL); delete encs[cnt]; } + + fprintf(stderr, "Clear - close file\n"); fflush(stderr); + if(file != -1) close(file); + + fprintf(stderr, "Clear - done\n"); fflush(stderr); + } void MovEncoderThread::encode(Frame* frame) { if(file == -1) return; - Frame *enc_frame = encs[current_encoder]->encode(frame); - // fprintf(stderr, "[%d]", enc_frame->size); fflush(stderr); - int i = write(file, enc_frame->data, enc_frame->size); - if(i == -1) perror("Write failed"); - delete enc_frame; - + // fprintf(stderr, "build[%d]-", current_encoder); fflush(stderr); + inputqueue->bpush(frame); + + /* + * Encode mode + */ // Switch frame current_frame++; if(current_frame >= num_frames_in_block) { + + // fprintf(stderr, "push[%d]-", current_encoder); fflush(stderr); + + encs[current_encoder]->encodeSequence(inputqueue); + inputqueue = new Queue(); + // Switch encoder current_frame = 0; current_encoder++; - if(current_encoder >= threads) current_encoder = 0; + if(current_encoder >= threads) { + // switch mode + /* + * Readback mode + */ + for(int cnt = 0; cnt < threads; cnt++) { + + // fprintf(stderr, "pop[%d]-", cnt); fflush(stderr); + + Queue *outputqueue = encs[cnt]->getResultSequence(); + Frame *f; + while((f = outputqueue->pop())) { + int i = write(file, f->data, f->size); + if(i == -1) perror("Write failed"); + + // fprintf(stderr, "wrote[%d]-", i); fflush(stderr); + + delete f; + } + } + current_encoder = 0; + } } } -- cgit v1.2.3