diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/mov_encoder.cc | 72 | ||||
| -rw-r--r-- | src/mov_encoder.h | 29 | ||||
| -rw-r--r-- | src/mov_encoder_thread.cc | 96 | ||||
| -rw-r--r-- | src/mov_encoder_thread.h | 7 | ||||
| -rw-r--r-- | src/queue.h | 49 | 
5 files changed, 233 insertions, 20 deletions
| diff --git a/src/mov_encoder.cc b/src/mov_encoder.cc index 172051d..7bffc50 100644 --- a/src/mov_encoder.cc +++ b/src/mov_encoder.cc @@ -39,6 +39,10 @@  /*   * $Log$ + * Revision 1.23  2005/05/19 14:10:22  deva + * + * Multithreading rulez? + *   * Revision 1.22  2005/05/17 19:16:26  deva   *   * Made new mpeg writer work, with proper file permissions. @@ -104,8 +108,8 @@  MovEncoder::MovEncoder()  {    // FIXME: Hmmm... should this be detected somewhere?! -  static int w = 720; -  static int h = 576; +  int w = 720; +  int h = 576;    // Initialize yuv strucutre.    yuv.w = w; @@ -207,6 +211,13 @@ MovEncoder::MovEncoder()    fame_init(fame_context, &fame_par, fame_buffer, FAME_BUFFER_SIZE); + + +  // Thread stuff +	sem_init(&sem, 0, 0); +	sem_init(&done, 0, 0); + +  running = true;  }  MovEncoder::~MovEncoder() @@ -314,3 +325,60 @@ void MovEncoder::encode_audio(Frame *dvframe)  {    // TODO: Do some audio stuff here sometime!  } + +void MovEncoder::encodeSequence(Queue<Frame> *queue) +{ +  // set input queue +  inputqueue = queue; + +  // unlock semaphore +  sem_post(&sem); +} + + +// this runs in a thread +void MovEncoder::run() +{ +  fprintf(stderr, "Encoder Ready\n"); fflush(stderr); +   +  while(running) { +    // wait for semaphore +    // lock semaphore +    sem_wait(&sem); +    if(inputqueue == NULL) continue; + +    fprintf(stderr, "."); fflush(stderr); + +    // allocate new output queue +    outputqueue = new Queue<Frame>(); +    // while input queue.pop +    Frame *fi, *fo; +    while(inputqueue->length() > 0) { +      fi = inputqueue->pop(); + +      // encode frame from input queue +      fo = encode(fi); + +      // and push result on output queue +      outputqueue->push(fo); + +      // delete the input frame +      //delete fi; +    } +    // delete input queue +    delete inputqueue; +    sem_post(&done); +  } +} + +Queue<Frame> *MovEncoder::getResultSequence() +{ +  // wait for sempahore +  sem_wait(&done); + +  //  fprintf(stderr, "POP!\n"); fflush(stderr); + +  // return output queue +  return outputqueue; +} + diff --git a/src/mov_encoder.h b/src/mov_encoder.h index 24525f2..13eb63b 100644 --- a/src/mov_encoder.h +++ b/src/mov_encoder.h @@ -36,6 +36,10 @@  /*   * $Log$ + * Revision 1.9  2005/05/19 14:10:22  deva + * + * Multithreading rulez? + *   * Revision 1.8  2005/05/17 14:30:56  deva   * Added code, preparing threaded encoding.   * @@ -70,19 +74,36 @@  #include <libdv/dv_types.h>  #include "frame.h" - +#include "queue.h"  #include "util.h" +#include "thread.h" +  // size specifies the length of the buffer.   #define FAME_BUFFER_SIZE	(2*720*576*4)	// FIXME: One size fits all... -class MovEncoder { - public: +class MovEncoder : public Thread { +public:    MovEncoder();    ~MovEncoder();    Frame* encode(Frame *frame); - private: +  void run(); + +  Queue<Frame> *getResultSequence(); +  void encodeSequence(Queue<Frame> *queue); + +  volatile bool running; + +private: +  // Input queue +  Queue<Frame> *inputqueue; +  Queue<Frame> *outputqueue; + +  //thread stuff +  sem_t sem; +  sem_t done; +      Frame *encode_video(Frame *frame);    void encode_audio(Frame *frame); diff --git a/src/mov_encoder_thread.cc b/src/mov_encoder_thread.cc index be86377..ed71a31 100644 --- a/src/mov_encoder_thread.cc +++ b/src/mov_encoder_thread.cc @@ -31,6 +31,10 @@  /*   * $Log$ + * Revision 1.6  2005/05/19 14:10:22  deva + * + * Multithreading rulez? + *   * Revision 1.5  2005/05/19 10:55:49  deva   * Test for block encoding of length strlen("IPIPP").   * @@ -61,43 +65,113 @@ MovEncoderThread::MovEncoderThread(const char *filename)      fprintf(stderr, "Could not open file for writing: %s\n", strerror(errno));      return;    } -  threads = 4; + +  threads = config->readInt("encoding_threads");    for(int cnt = 0; cnt < threads; cnt++) {      encs.push_back(new MovEncoder()); +    tids.push_back(new pthread_t); +    pthread_create (tids[cnt], NULL, thread_run, encs[cnt]);    } -  int current_encoder = 0; -  int current_frame = 0; +  current_encoder = 0; +  current_frame = 0; -  int num_frames_in_block = config->readString("frame_sequence")->length(); +  num_frames_in_block = config->readString("frame_sequence")->length();    fprintf(stderr, "Frame sequence length [%d]\n", num_frames_in_block); fflush(stderr); + +  inputqueue = new Queue<Frame>();  }  MovEncoderThread::~MovEncoderThread()  { +     +  fprintf(stderr, "Clear - encode last[%d]\n", current_encoder); fflush(stderr); +     +  // Push any hanging frames. +  encs[current_encoder]->encodeSequence(inputqueue); +     +  fprintf(stderr, "Clear - Readback\n"); fflush(stderr); +     +  /* +   * Readback mode +   */ +  for(int cnt = 0; cnt <= current_encoder; cnt++) { +     +    //    fprintf(stderr, "pop[%d]-", cnt); fflush(stderr); +     +    Queue<Frame> *outputqueue = encs[cnt]->getResultSequence(); +    Frame *f; +    while((f = outputqueue->pop())) { +      int i = write(file, f->data, f->size); +      if(i == -1) perror("Write failed"); +       +      //      fprintf(stderr, "wrote[%d]-", i); fflush(stderr); +       +      delete f; +    } +  } +   +  fprintf(stderr, "Clear - join threads\n"); fflush(stderr); +    for(int cnt = 0; cnt < threads; cnt++) { +    encs[cnt]->running = false; +    encs[cnt]->encodeSequence(NULL); +    pthread_join(*tids[cnt], NULL);      delete encs[cnt];    } + +  fprintf(stderr, "Clear - close file\n"); fflush(stderr); +    if(file != -1) close(file); + +  fprintf(stderr, "Clear - done\n"); fflush(stderr); +  }  void MovEncoderThread::encode(Frame* frame)  {    if(file == -1) return; -  Frame *enc_frame = encs[current_encoder]->encode(frame); -  //  fprintf(stderr, "[%d]", enc_frame->size); fflush(stderr); -  int i = write(file, enc_frame->data, enc_frame->size); -  if(i == -1) perror("Write failed"); -  delete enc_frame; - +  //  fprintf(stderr, "build[%d]-", current_encoder); fflush(stderr); +  inputqueue->bpush(frame); +   +  /* +   * Encode mode +   */    // Switch frame    current_frame++;    if(current_frame >= num_frames_in_block) { + +    //    fprintf(stderr, "push[%d]-", current_encoder); fflush(stderr); + +    encs[current_encoder]->encodeSequence(inputqueue); +    inputqueue = new Queue<Frame>(); +      // Switch encoder      current_frame = 0;      current_encoder++; -    if(current_encoder >= threads) current_encoder = 0; +    if(current_encoder >= threads) { +      // switch mode +      /* +       * Readback mode +       */ +      for(int cnt = 0; cnt < threads; cnt++) { + +        //        fprintf(stderr, "pop[%d]-", cnt); fflush(stderr); + +        Queue<Frame> *outputqueue = encs[cnt]->getResultSequence(); +        Frame *f; +        while((f = outputqueue->pop())) { +          int i = write(file, f->data, f->size); +          if(i == -1) perror("Write failed"); + +          //          fprintf(stderr, "wrote[%d]-", i); fflush(stderr); + +          delete f; +        } +      } +      current_encoder = 0; +    }    }  } diff --git a/src/mov_encoder_thread.h b/src/mov_encoder_thread.h index 1e91b58..60f4c5c 100644 --- a/src/mov_encoder_thread.h +++ b/src/mov_encoder_thread.h @@ -31,6 +31,10 @@  /*   * $Log$ + * Revision 1.4  2005/05/19 14:10:22  deva + * + * Multithreading rulez? + *   * Revision 1.3  2005/05/19 10:55:49  deva   * Test for block encoding of length strlen("IPIPP").   * @@ -67,6 +71,8 @@ public:    void encode(Frame* frame);  private: +  Queue<Frame> *inputqueue; +    // Used for encoder switching    int current_encoder;    int current_frame; @@ -76,6 +82,7 @@ private:    int threads;    int file;    vector<MovEncoder*> encs; +  vector<pthread_t*> tids;  };  #endif/*__MIAV_MOV_ENCODER_THREAD_H__*/ diff --git a/src/queue.h b/src/queue.h index de7b8ff..7c56e93 100644 --- a/src/queue.h +++ b/src/queue.h @@ -38,6 +38,10 @@  /*   * $Log$ + * Revision 1.16  2005/05/19 14:10:22  deva + * + * Multithreading rulez? + *   * Revision 1.15  2005/05/16 16:00:57  deva   *   * Lots of stuff! @@ -82,6 +86,7 @@ public:    ~Queue();    void push(T *t); +  void bpush(T *t);    T *pop();    T *peek(); @@ -127,9 +132,6 @@ Queue<T>::~Queue()    pthread_mutex_destroy(&mutex);  } - - -  /**   * Push element on queue.   */ @@ -172,6 +174,47 @@ void Queue<T>::push(T *t)  }  /** + * Push element on queue from the bottom. + */ +template<typename T> +void Queue<T>::bpush(T *t) +{ +  if(locked) { +    delete t; +    return; +  } + +  pthread_mutex_lock(&mutex); + +  buf_t *b = (buf_t*)xmalloc(sizeof(*b)); +  b->data = (void*)t; + +  assert(b != NULL); +   +  if(limit && count > 0) { +    T* tmp = (T*)_pop(); +    delete tmp; +  } +   +  if(!head) { +    head = tail = b; +    b->next = b->prev = NULL; +    count = 1; +    pthread_mutex_unlock(&mutex); +    return; +  } +   +  b->prev = head; +  b->next = NULL; +  if(head) +    head->next = b; +  head = b; +  count++; +   +  pthread_mutex_unlock(&mutex); +} + +/**   * Pop element from queue.   * If queue is empty, NULL is returned.   */ | 
