From d74c7a00c417cffdc93a82efa2841e23d823bea6 Mon Sep 17 00:00:00 2001 From: deva Date: Thu, 19 May 2005 14:10:22 +0000 Subject: Multithreading rulez? --- TODO | 4 +- etc/miav.conf | 8 +++- src/mov_encoder.cc | 72 ++++++++++++++++++++++++++++++++++- src/mov_encoder.h | 29 ++++++++++++-- src/mov_encoder_thread.cc | 96 +++++++++++++++++++++++++++++++++++++++++------ src/mov_encoder_thread.h | 7 ++++ src/queue.h | 49 ++++++++++++++++++++++-- 7 files changed, 241 insertions(+), 24 deletions(-) diff --git a/TODO b/TODO index 0151357..d194079 100644 --- a/TODO +++ b/TODO @@ -13,7 +13,6 @@ http://www.linuxmanpages.com/man3/fame_start_frame.3.php // YUV420 format specification http://encyclopedia.laborlawtalk.com/YUV_4:2:0 -- file permissions - multithreded encoding - fopen med create unique @@ -97,7 +96,8 @@ Main: [x] - Use correct filenames and paths. [x] - Check for writabilty before trying to do any writing. [x] - Create fallback, when unable to write the requested filename. - [ ] - Permissions on sesrver file writing to be read from config. + [x] - Permissions on server file writing. Files: u+wr g+r a+x - Directories: u+wrx g+rx a+rx. + [ ] - Permissions on server file writing to be read from config. ========================================================================== TASKS (common) diff --git a/etc/miav.conf b/etc/miav.conf index a0692a2..476bbef 100644 --- a/etc/miav.conf +++ b/etc/miav.conf @@ -17,11 +17,11 @@ pixel_width = 1024 pixel_height = 768 # How and where to connect to the miav server? -server_addr = "10.3.20.232" +server_addr = "127.0.0.1" server_port = 18120 # Where top store the files recieved by the server -server_root = "/tmp/miav_files" +server_root = "/home/miav/miav_files" # Video output controls. A sequence of I and P, where I is keyframes # which is fast to create, but uses a lot of discspace. @@ -31,3 +31,7 @@ frame_sequence = "IIPIP" # quality in % - 100% is best quality frame_quality = 80 + +# The number of threads started for paralel encoding on the server +# (for multiprocessor systems) +encoding_threads = 4 diff --git a/src/mov_encoder.cc b/src/mov_encoder.cc index 172051d..7bffc50 100644 --- a/src/mov_encoder.cc +++ b/src/mov_encoder.cc @@ -39,6 +39,10 @@ /* * $Log$ + * Revision 1.23 2005/05/19 14:10:22 deva + * + * Multithreading rulez? + * * Revision 1.22 2005/05/17 19:16:26 deva * * Made new mpeg writer work, with proper file permissions. @@ -104,8 +108,8 @@ MovEncoder::MovEncoder() { // FIXME: Hmmm... should this be detected somewhere?! - static int w = 720; - static int h = 576; + int w = 720; + int h = 576; // Initialize yuv strucutre. yuv.w = w; @@ -207,6 +211,13 @@ MovEncoder::MovEncoder() fame_init(fame_context, &fame_par, fame_buffer, FAME_BUFFER_SIZE); + + + // Thread stuff + sem_init(&sem, 0, 0); + sem_init(&done, 0, 0); + + running = true; } MovEncoder::~MovEncoder() @@ -314,3 +325,60 @@ void MovEncoder::encode_audio(Frame *dvframe) { // TODO: Do some audio stuff here sometime! } + +void MovEncoder::encodeSequence(Queue *queue) +{ + // set input queue + inputqueue = queue; + + // unlock semaphore + sem_post(&sem); +} + + +// this runs in a thread +void MovEncoder::run() +{ + fprintf(stderr, "Encoder Ready\n"); fflush(stderr); + + while(running) { + // wait for semaphore + // lock semaphore + sem_wait(&sem); + if(inputqueue == NULL) continue; + + fprintf(stderr, "."); fflush(stderr); + + // allocate new output queue + outputqueue = new Queue(); + // while input queue.pop + Frame *fi, *fo; + while(inputqueue->length() > 0) { + fi = inputqueue->pop(); + + // encode frame from input queue + fo = encode(fi); + + // and push result on output queue + outputqueue->push(fo); + + // delete the input frame + //delete fi; + } + // delete input queue + delete inputqueue; + sem_post(&done); + } +} + +Queue *MovEncoder::getResultSequence() +{ + // wait for sempahore + sem_wait(&done); + + // fprintf(stderr, "POP!\n"); fflush(stderr); + + // return output queue + return outputqueue; +} + diff --git a/src/mov_encoder.h b/src/mov_encoder.h index 24525f2..13eb63b 100644 --- a/src/mov_encoder.h +++ b/src/mov_encoder.h @@ -36,6 +36,10 @@ /* * $Log$ + * Revision 1.9 2005/05/19 14:10:22 deva + * + * Multithreading rulez? + * * Revision 1.8 2005/05/17 14:30:56 deva * Added code, preparing threaded encoding. * @@ -70,19 +74,36 @@ #include #include "frame.h" - +#include "queue.h" #include "util.h" +#include "thread.h" + // size specifies the length of the buffer. #define FAME_BUFFER_SIZE (2*720*576*4) // FIXME: One size fits all... -class MovEncoder { - public: +class MovEncoder : public Thread { +public: MovEncoder(); ~MovEncoder(); Frame* encode(Frame *frame); - private: + void run(); + + Queue *getResultSequence(); + void encodeSequence(Queue *queue); + + volatile bool running; + +private: + // Input queue + Queue *inputqueue; + Queue *outputqueue; + + //thread stuff + sem_t sem; + sem_t done; + Frame *encode_video(Frame *frame); void encode_audio(Frame *frame); diff --git a/src/mov_encoder_thread.cc b/src/mov_encoder_thread.cc index be86377..ed71a31 100644 --- a/src/mov_encoder_thread.cc +++ b/src/mov_encoder_thread.cc @@ -31,6 +31,10 @@ /* * $Log$ + * Revision 1.6 2005/05/19 14:10:22 deva + * + * Multithreading rulez? + * * Revision 1.5 2005/05/19 10:55:49 deva * Test for block encoding of length strlen("IPIPP"). * @@ -61,43 +65,113 @@ MovEncoderThread::MovEncoderThread(const char *filename) fprintf(stderr, "Could not open file for writing: %s\n", strerror(errno)); return; } - threads = 4; + + threads = config->readInt("encoding_threads"); for(int cnt = 0; cnt < threads; cnt++) { encs.push_back(new MovEncoder()); + tids.push_back(new pthread_t); + pthread_create (tids[cnt], NULL, thread_run, encs[cnt]); } - int current_encoder = 0; - int current_frame = 0; + current_encoder = 0; + current_frame = 0; - int num_frames_in_block = config->readString("frame_sequence")->length(); + num_frames_in_block = config->readString("frame_sequence")->length(); fprintf(stderr, "Frame sequence length [%d]\n", num_frames_in_block); fflush(stderr); + + inputqueue = new Queue(); } MovEncoderThread::~MovEncoderThread() { + + fprintf(stderr, "Clear - encode last[%d]\n", current_encoder); fflush(stderr); + + // Push any hanging frames. + encs[current_encoder]->encodeSequence(inputqueue); + + fprintf(stderr, "Clear - Readback\n"); fflush(stderr); + + /* + * Readback mode + */ + for(int cnt = 0; cnt <= current_encoder; cnt++) { + + // fprintf(stderr, "pop[%d]-", cnt); fflush(stderr); + + Queue *outputqueue = encs[cnt]->getResultSequence(); + Frame *f; + while((f = outputqueue->pop())) { + int i = write(file, f->data, f->size); + if(i == -1) perror("Write failed"); + + // fprintf(stderr, "wrote[%d]-", i); fflush(stderr); + + delete f; + } + } + + fprintf(stderr, "Clear - join threads\n"); fflush(stderr); + for(int cnt = 0; cnt < threads; cnt++) { + encs[cnt]->running = false; + encs[cnt]->encodeSequence(NULL); + pthread_join(*tids[cnt], NULL); delete encs[cnt]; } + + fprintf(stderr, "Clear - close file\n"); fflush(stderr); + if(file != -1) close(file); + + fprintf(stderr, "Clear - done\n"); fflush(stderr); + } void MovEncoderThread::encode(Frame* frame) { if(file == -1) return; - Frame *enc_frame = encs[current_encoder]->encode(frame); - // fprintf(stderr, "[%d]", enc_frame->size); fflush(stderr); - int i = write(file, enc_frame->data, enc_frame->size); - if(i == -1) perror("Write failed"); - delete enc_frame; - + // fprintf(stderr, "build[%d]-", current_encoder); fflush(stderr); + inputqueue->bpush(frame); + + /* + * Encode mode + */ // Switch frame current_frame++; if(current_frame >= num_frames_in_block) { + + // fprintf(stderr, "push[%d]-", current_encoder); fflush(stderr); + + encs[current_encoder]->encodeSequence(inputqueue); + inputqueue = new Queue(); + // Switch encoder current_frame = 0; current_encoder++; - if(current_encoder >= threads) current_encoder = 0; + if(current_encoder >= threads) { + // switch mode + /* + * Readback mode + */ + for(int cnt = 0; cnt < threads; cnt++) { + + // fprintf(stderr, "pop[%d]-", cnt); fflush(stderr); + + Queue *outputqueue = encs[cnt]->getResultSequence(); + Frame *f; + while((f = outputqueue->pop())) { + int i = write(file, f->data, f->size); + if(i == -1) perror("Write failed"); + + // fprintf(stderr, "wrote[%d]-", i); fflush(stderr); + + delete f; + } + } + current_encoder = 0; + } } } diff --git a/src/mov_encoder_thread.h b/src/mov_encoder_thread.h index 1e91b58..60f4c5c 100644 --- a/src/mov_encoder_thread.h +++ b/src/mov_encoder_thread.h @@ -31,6 +31,10 @@ /* * $Log$ + * Revision 1.4 2005/05/19 14:10:22 deva + * + * Multithreading rulez? + * * Revision 1.3 2005/05/19 10:55:49 deva * Test for block encoding of length strlen("IPIPP"). * @@ -67,6 +71,8 @@ public: void encode(Frame* frame); private: + Queue *inputqueue; + // Used for encoder switching int current_encoder; int current_frame; @@ -76,6 +82,7 @@ private: int threads; int file; vector encs; + vector tids; }; #endif/*__MIAV_MOV_ENCODER_THREAD_H__*/ diff --git a/src/queue.h b/src/queue.h index de7b8ff..7c56e93 100644 --- a/src/queue.h +++ b/src/queue.h @@ -38,6 +38,10 @@ /* * $Log$ + * Revision 1.16 2005/05/19 14:10:22 deva + * + * Multithreading rulez? + * * Revision 1.15 2005/05/16 16:00:57 deva * * Lots of stuff! @@ -82,6 +86,7 @@ public: ~Queue(); void push(T *t); + void bpush(T *t); T *pop(); T *peek(); @@ -127,9 +132,6 @@ Queue::~Queue() pthread_mutex_destroy(&mutex); } - - - /** * Push element on queue. */ @@ -171,6 +173,47 @@ void Queue::push(T *t) pthread_mutex_unlock(&mutex); } +/** + * Push element on queue from the bottom. + */ +template +void Queue::bpush(T *t) +{ + if(locked) { + delete t; + return; + } + + pthread_mutex_lock(&mutex); + + buf_t *b = (buf_t*)xmalloc(sizeof(*b)); + b->data = (void*)t; + + assert(b != NULL); + + if(limit && count > 0) { + T* tmp = (T*)_pop(); + delete tmp; + } + + if(!head) { + head = tail = b; + b->next = b->prev = NULL; + count = 1; + pthread_mutex_unlock(&mutex); + return; + } + + b->prev = head; + b->next = NULL; + if(head) + head->next = b; + head = b; + count++; + + pthread_mutex_unlock(&mutex); +} + /** * Pop element from queue. * If queue is empty, NULL is returned. -- cgit v1.2.3