From 111b16802de661228ef414eb6ec7484e2fa186ed Mon Sep 17 00:00:00 2001 From: deva Date: Tue, 4 Oct 2005 21:39:53 +0000 Subject: *** empty log message *** --- src/Makefile.am | 8 ++ src/audio_encoder.cc | 125 +++++++++++++++++++++++++----- src/audio_encoder.h | 19 ++--- src/frame.h | 19 ----- src/mov_encoder.cc | 161 ++++++++++++++++++++++++++++++++++----- src/mov_encoder.h | 30 +++----- src/mov_encoder_thread.cc | 76 +++++++----------- src/mov_encoder_thread.h | 25 +++--- src/mov_encoder_writer.cc | 22 ++---- src/mov_encoder_writer.h | 25 ++---- src/multicast.cc | 109 ++++++++++++++++++++++++++ src/multicast.h | 47 ++++++++++++ src/multiplexer.cc | 52 +++---------- src/multiplexer.h | 14 ++-- src/threadsafe_queue.cc | 44 +++++++++++ src/threadsafe_queue.h | 56 ++++++++++++++ src/threadsafe_queue_fifo.cc | 83 ++++++++++++++++++++ src/threadsafe_queue_fifo.h | 82 ++++++++++++++++++++ src/threadsafe_queue_priority.cc | 93 ++++++++++++++++++++++ src/threadsafe_queue_priority.h | 64 ++++++++++++++++ 20 files changed, 921 insertions(+), 233 deletions(-) create mode 100644 src/multicast.cc create mode 100644 src/multicast.h create mode 100644 src/threadsafe_queue.cc create mode 100644 src/threadsafe_queue.h create mode 100644 src/threadsafe_queue_fifo.cc create mode 100644 src/threadsafe_queue_fifo.h create mode 100644 src/threadsafe_queue_priority.cc create mode 100644 src/threadsafe_queue_priority.h diff --git a/src/Makefile.am b/src/Makefile.am index 3adaf25..b0ed05f 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -34,6 +34,7 @@ miav_SOURCES = $(shell if [ $QT_CXXFLAGS ] ; then ../tools/MocList cc; fi ) \ mov_encoder.cc \ mov_encoder_thread.cc \ mov_encoder_writer.cc \ + multicast.cc \ multiplexer.cc \ network.cc \ player.cc \ @@ -41,6 +42,9 @@ miav_SOURCES = $(shell if [ $QT_CXXFLAGS ] ; then ../tools/MocList cc; fi ) \ server_status.cc \ socket.cc \ thread.cc \ + threadsafe_queue.cc \ + threadsafe_queue_fifo.cc \ + threadsafe_queue_priority.cc \ util.cc \ videowidget.cc \ yuv_draw.cc @@ -81,6 +85,7 @@ EXTRA_DIST = \ mov_encoder.h \ mov_encoder_thread.h \ mov_encoder_writer.h \ + multicast.h \ multiplexer.h \ network.h \ package.h \ @@ -90,6 +95,9 @@ EXTRA_DIST = \ server_status.h \ socket.h \ thread.h \ + threadsafe_queue.h \ + threadsafe_queue_fifo.h \ + threadsafe_queue_priority.h \ util.h \ videowidget.h \ yuv_draw.h diff --git a/src/audio_encoder.cc b/src/audio_encoder.cc index 8972e9a..97c6084 100644 --- a/src/audio_encoder.cc +++ b/src/audio_encoder.cc @@ -28,8 +28,10 @@ #include "audio_encoder.h" #include "util.h" -AudioEncoder::AudioEncoder(FramePriorityQueue *in, pthread_mutex_t *in_mutex, sem_t *in_sem, - FramePriorityQueue *out, pthread_mutex_t *out_mutex, sem_t *out_sem, +#include "liblame_wrapper.h" + +AudioEncoder::AudioEncoder(ThreadSafeQueuePriority *audio_input_queue, + ThreadSafeQueuePriority *audio_output_queue, Info *i) { info = i; @@ -37,18 +39,8 @@ AudioEncoder::AudioEncoder(FramePriorityQueue *in, pthread_mutex_t *in_mutex, se running = true; - // Queues - inputqueue = in; - outputqueue = out; - - // Queue mutexes - input_mutex = in_mutex; - output_mutex = out_mutex; - - input_sem = in_sem; - output_sem = out_sem; - - frame_number = 0; + input_queue = audio_input_queue; + output_queue = audio_output_queue; } AudioEncoder::~AudioEncoder() @@ -59,18 +51,109 @@ void AudioEncoder::thread_main() { info->info("AudioEncoder::run"); - unsigned int queuesize = 0; - // Run with slightly lower priority than MovEncoderWriter // nice(2); Frame *in_frame = NULL; Frame *out_frame = NULL; + + LibLAMEWrapper lame(info); + + while(running) { + in_frame = input_queue->pop(); + + if(in_frame == NULL) info->error("AudioEncoder: in_frame == NULL!"); + + // Check for end of stream + if(in_frame->endOfFrameStream == true) { + info->info("endOfFrameStream in AudioEncoder"); + running = false; + out_frame = lame.close(); + } else { + // Encode audio + out_frame = lame.encode(in_frame); + } + out_frame->number = in_frame->number; + out_frame->endOfFrameStream = in_frame->endOfFrameStream; + + //delete in_frame; + in_frame = NULL; + + output_queue->push(out_frame); + } + + info->info("AudioEncoder::stop"); +} + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +/* + +void AudioEncoder::thread_main() +{ + info->info("AudioEncoder::run"); + +#ifndef NEW_QUEUE + unsigned int queuesize = 0; Frame *tmpframe; +#endif + + // Run with slightly lower priority than MovEncoderWriter + nice(2); + + Frame *in_frame = NULL; + Frame *out_frame = NULL; LibLAMEWrapper lame(info); while(running) { + info->info("fisk"); +#ifdef NEW_QUEUE + in_frame = input_queue->pop(); +#else sem_wait(input_sem); // If no frame is in the buffer, get one from the queue @@ -94,7 +177,8 @@ void AudioEncoder::thread_main() sleep_0_2_frame(); } - +#endif + // Check for end of stream if(in_frame->endOfFrameStream == true) { info->info("endOfFrameStream in AudioEncoder"); @@ -110,6 +194,9 @@ void AudioEncoder::thread_main() delete in_frame; in_frame = NULL; +#ifdef NEW_QUEUE + output_queue->push(out_frame); +#else // Lock output mutex pthread_mutex_lock(output_mutex); outputqueue->push(out_frame); @@ -118,10 +205,14 @@ void AudioEncoder::thread_main() // Kick multiplexer (audio) sem_post(output_sem); +#endif } +#ifndef NEW_QUEUE // Kick multiplexer (audio) sem_post(output_sem); +#endif info->info("AudioEncoder::stop"); } +*/ diff --git a/src/audio_encoder.h b/src/audio_encoder.h index b15ce45..9d86178 100644 --- a/src/audio_encoder.h +++ b/src/audio_encoder.h @@ -36,12 +36,12 @@ #include "info.h" -#include "liblame_wrapper.h" +#include "threadsafe_queue_priority.h" class AudioEncoder : public Thread { public: - AudioEncoder(FramePriorityQueue *in, pthread_mutex_t *in_mutex, sem_t *in_sem, - FramePriorityQueue *out, pthread_mutex_t *out_mutex, sem_t *out_sem, + AudioEncoder(ThreadSafeQueuePriority *audio_input_queue, + ThreadSafeQueuePriority *audio_output_queue, Info *info); ~AudioEncoder(); @@ -50,19 +50,10 @@ public: volatile bool running; private: - unsigned int frame_number; - Info *info; - // Input/Output queues - FramePriorityQueue *inputqueue; - FramePriorityQueue *outputqueue; - pthread_mutex_t *input_mutex; - pthread_mutex_t *output_mutex; - - //thread stuff - sem_t *input_sem; - sem_t *output_sem; + ThreadSafeQueuePriority *input_queue; + ThreadSafeQueuePriority *output_queue; }; diff --git a/src/frame.h b/src/frame.h index 2b8d9f5..6859116 100644 --- a/src/frame.h +++ b/src/frame.h @@ -31,9 +31,6 @@ // Definition of vector #include -// Definition of priority_queue -#include - class Frame { public: Frame(unsigned char *d, int sz); @@ -54,22 +51,6 @@ public: bool endOfFrameStream; }; -#include - -// Method for use, when comparing Frames in priority queue. -template -struct frame_priority : std::binary_function { - bool operator() (const T& a, const T& b) const { - return ((Frame*)a)->number > ((Frame*)b)->number; - } -}; - -// Additional helper types. typedef std::vector< Frame* > FrameVector; -typedef std::queue< FrameVector* > FrameVectorQueue; -typedef std::priority_queue< Frame*, - std::vector, - frame_priority > FramePriorityQueue; - #endif/*__FRAME_H__*/ diff --git a/src/mov_encoder.cc b/src/mov_encoder.cc index cf45ae0..a455f42 100644 --- a/src/mov_encoder.cc +++ b/src/mov_encoder.cc @@ -42,11 +42,12 @@ #include "miav_config.h" #include "debug.h" +#include "libfame_wrapper.h" MovEncoder::MovEncoder(volatile bool *r, sem_t *r_sem, - FrameVectorQueue *in, sem_t *in_sem, pthread_mutex_t *in_mutex, - FramePriorityQueue *v_out, pthread_mutex_t *v_out_mutex, sem_t *v_out_sem, - FramePriorityQueue *a_out, pthread_mutex_t *a_out_mutex, sem_t *a_out_sem, + ThreadSafeQueueFIFO *in, + ThreadSafeQueuePriority *video_out, + ThreadSafeQueuePriority *audio_out, Info *i) { info = i; @@ -56,18 +57,9 @@ MovEncoder::MovEncoder(volatile bool *r, sem_t *r_sem, // Queues inputqueue = in; - video_outputqueue = v_out; - audio_outputqueue = a_out; + video_output_queue = video_out; + audio_output_queue = audio_out; - // Queue mutexes - input_mutex = in_mutex; - video_output_mutex = v_out_mutex; - audio_output_mutex = a_out_mutex; - - input_sem = in_sem; - video_output_sem = v_out_sem; - audio_output_sem = a_out_sem; - read_sem = r_sem; } @@ -82,12 +74,131 @@ void MovEncoder::thread_main() { info->info("MovEncoder::run"); // static volatile int test = 0; + // int insize = 0; + + // Run with slightly lower priority than MovEncoderWriter AND AudioEncoder + // nice(3); + + FrameVector *item; + Frame *in_frame; + Frame *out_v_frame; + Frame *out_a_frame; + + LibFAMEWrapper fame(info); + + // Process until running == false and the queue is empty + while(*running) { + + item = inputqueue->pop(); + + if(item) { + for(unsigned int cnt = 0; cnt < item->size(); cnt++) { + in_frame = item->at(cnt); + + // Check for end of stream + if(in_frame->endOfFrameStream == true) { + info->info("endOfFrameStream in MovEncoder"); + + // Signal to stop running + *running = false; + + // Kick them sleepy ones so they get the message. + int threads = config->readInt("encoding_threads"); + for(int cnt = 0; cnt < threads; cnt++) {/*sem_post(input_sem);*/} // FIXME: Kick the other encoders + } + + // Encode video + out_v_frame = fame.encode(in_frame); + out_v_frame->number = in_frame->number; + out_v_frame->endOfFrameStream = in_frame->endOfFrameStream; + + // Create audio frame + out_a_frame = in_frame; + + video_output_queue->push(out_v_frame); + audio_output_queue->push(out_a_frame); + } + + delete item; + + item = NULL; + + // Kick reader + sem_post(read_sem); + } + } + + info->info("MovEncoder::stop"); +} + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +/* + +// this runs in a thread +void MovEncoder::thread_main() +{ + info->info("MovEncoder::run"); + // static volatile int test = 0; +#ifndef NEW_QUEUE int v_outsize = 0; int a_outsize = 0; +#endif int insize = 0; // Run with slightly lower priority than MovEncoderWriter AND AudioEncoder - //nice(3); + nice(3); FrameVector *item; Frame *in_frame; @@ -131,16 +242,20 @@ void MovEncoder::thread_main() // Create audio frame out_a_frame = in_frame; +#ifdef NEW_QUEUE + video_output_queue->push(out_v_frame); + audio_output_queue->push(out_a_frame); +#else // Lock output mutex pthread_mutex_lock(video_output_mutex); video_outputqueue->push(out_v_frame); v_outsize = video_outputqueue->size(); pthread_mutex_unlock(video_output_mutex); // Unlock output mutex - + // Kick multiplexer (video) sem_post(video_output_sem); - + // Lock output mutex pthread_mutex_lock(audio_output_mutex); audio_outputqueue->push(out_a_frame); @@ -150,6 +265,7 @@ void MovEncoder::thread_main() // Kick audio encoder sem_post(audio_output_sem); +#endif } delete item; @@ -159,16 +275,19 @@ void MovEncoder::thread_main() sem_post(read_sem); } } - /* - info->info("Input pool size: %d, video output pool size: %d, audio output pool size: %d", - insize, v_outsize, a_outsize); - */ + + //info->info("Input pool size: %d, video output pool size: %d, audio output pool size: %d", + // insize, v_outsize, a_outsize); + +#ifndef NEW_QUEUE // Kick audio encoder sem_post(audio_output_sem); // Kick multiplexer (video) sem_post(video_output_sem); +#endif info->info("MovEncoder::stop"); } +*/ diff --git a/src/mov_encoder.h b/src/mov_encoder.h index 8488008..ace016c 100644 --- a/src/mov_encoder.h +++ b/src/mov_encoder.h @@ -48,15 +48,15 @@ using namespace std; #include "info.h" -#include "libfame_wrapper.h" -//#include "liblame_wrapper.h" +#include "threadsafe_queue_priority.h" +#include "threadsafe_queue_fifo.h" class MovEncoder : public Thread { public: MovEncoder(volatile bool *r, sem_t *r_sem, - FrameVectorQueue *in, sem_t *in_sem, pthread_mutex_t *in_mutex, - FramePriorityQueue *v_out, pthread_mutex_t *v_out_mutex, sem_t *v_out_sem, - FramePriorityQueue *a_out, pthread_mutex_t *a_out_mutex, sem_t *a_out_sem, + ThreadSafeQueueFIFO< FrameVector*> *in, + ThreadSafeQueuePriority *video_out, + ThreadSafeQueuePriority *audio_out, Info *info); ~MovEncoder(); @@ -65,24 +65,16 @@ public: volatile bool *running; private: - // LibFAMEWrapper *fame; - // LibLAMEWrapper *lame; - Info *info; - // Input/Output queues - FrameVectorQueue *inputqueue; - FramePriorityQueue *video_outputqueue; - FramePriorityQueue *audio_outputqueue; - pthread_mutex_t *input_mutex; - pthread_mutex_t *video_output_mutex; - pthread_mutex_t *audio_output_mutex; + // Input queue + ThreadSafeQueueFIFO< FrameVector* > *inputqueue; - //thread stuff - sem_t *input_sem; - sem_t *video_output_sem; - sem_t *audio_output_sem; + // Output queues + ThreadSafeQueuePriority *video_output_queue; + ThreadSafeQueuePriority *audio_output_queue; + // Reader (mov_encoder_thread.cc) semaphore sem_t *read_sem; }; diff --git a/src/mov_encoder_thread.cc b/src/mov_encoder_thread.cc index dab308d..7c7f5d9 100644 --- a/src/mov_encoder_thread.cc +++ b/src/mov_encoder_thread.cc @@ -34,17 +34,19 @@ MovEncoderThread::MovEncoderThread(const char *cpr, Info *i) info = i; info->info("MovEncoderThread"); - // Queues - inputqueue = new FrameVectorQueue(); - video_outputqueue = new FramePriorityQueue(); - audio_inputqueue = new FramePriorityQueue(); - audio_outputqueue = new FramePriorityQueue(); - - // Queue mutexes - pthread_mutex_init (&input_mutex, NULL); - pthread_mutex_init (&video_output_mutex, NULL); - pthread_mutex_init (&audio_input_mutex, NULL); - pthread_mutex_init (&audio_output_mutex, NULL); + // Queue + inputqueue = new ThreadSafeQueueFIFO(); + + // Initialize read semaphore + sem_init(&read_sem, 0, 0); + + video_output_queue = new ThreadSafeQueuePriority(info); + audio_input_queue = new ThreadSafeQueuePriority(info); + audio_output_queue = new ThreadSafeQueuePriority(info); + + info->info("video_output_queue: 0x%x", video_output_queue); + info->info("audio_input_queue: 0x%x", audio_input_queue); + info->info("audio_output_queue: 0x%x", audio_output_queue); block = new FrameVector(); @@ -53,13 +55,6 @@ MovEncoderThread::MovEncoderThread(const char *cpr, Info *i) info->info("Frame sequence length %d", num_frames_in_block); threads = config->readInt("encoding_threads"); - - // Thread stuff - sem_init(&in_sem, 0, 0); - sem_init(&video_out_sem, 0, 0); - sem_init(&audio_in_sem, 0, 0); - sem_init(&audio_out_sem, 0, 0); - sem_init(&read_sem, 0, 0); movencodersrunning = true; @@ -67,26 +62,25 @@ MovEncoderThread::MovEncoderThread(const char *cpr, Info *i) // Create the video encoders for(int cnt = 0; cnt < threads; cnt++) { - MovEncoder *movenc = - new MovEncoder(&movencodersrunning, &read_sem, - inputqueue, &in_sem, &input_mutex, - video_outputqueue, &video_output_mutex, &video_out_sem, - audio_inputqueue, &audio_input_mutex, &audio_in_sem, - info); + MovEncoder *movenc = new MovEncoder(&movencodersrunning, &read_sem, + inputqueue, + video_output_queue, + audio_input_queue, + info); movenc->run(); encs.push_back(movenc); } // Create the audio encoder - audioenc = new AudioEncoder(audio_inputqueue, &audio_input_mutex, &audio_in_sem, - audio_outputqueue, &audio_output_mutex, &audio_out_sem, + audioenc = new AudioEncoder(audio_input_queue, + audio_output_queue, info); audioenc->run(); // Create the multiplexer writer = new MovEncoderWriter(cpr, - video_outputqueue, &video_output_mutex, &video_out_sem, - audio_outputqueue, &audio_output_mutex, &audio_out_sem, + video_output_queue, + audio_output_queue, info); writer->run(); @@ -99,7 +93,7 @@ MovEncoderThread::~MovEncoderThread() info->info("~MovEncoderThread"); // First we destroy the movie encoders - for(int cnt = 0; cnt < threads; cnt++) sem_post(&in_sem); // Kick them + // for(int cnt = 0; cnt < threads; cnt++) sem_post(&in_sem); // Kick them for(int cnt = 0; cnt < threads; cnt++) { encs[cnt]->wait_stop(); // Wait for it to stop delete encs[cnt]; // Delete it @@ -108,7 +102,6 @@ MovEncoderThread::~MovEncoderThread() info->info("Deleted the movie encoders"); // Then we destroy the audio encoder - sem_post(&audio_in_sem); // Kick it audioenc->wait_stop(); // Wait for it to stop. delete audioenc; // delete the audio encoder @@ -116,18 +109,15 @@ MovEncoderThread::~MovEncoderThread() // Finally we destroy the writer. writer->running = false; - sem_post(&video_out_sem); // Kick it to make it stop. - sem_post(&audio_out_sem); // Kick it to make it stop. + + // FIXME: Post writer writer->wait_stop(); // Wait for it to stop. delete writer; // delete the writer (end thereby close the file) info->info("Deleted the writer"); // Destroy the semaphores. - sem_destroy(&in_sem); - sem_destroy(&video_out_sem); - sem_destroy(&audio_in_sem); - sem_destroy(&audio_out_sem); + // sem_destroy(&in_sem); sem_destroy(&read_sem); info->info("~MovEncoderThread::done"); @@ -139,12 +129,11 @@ void MovEncoderThread::encode(Frame* frame) if(output % 250 == 0) // 25 * 24 info->info("inputqueue: %d\tvideo_outputqueue: %d\taudio_inputqueue: %d\taudio_outputqueue: %d.", inputqueue->size(), - video_outputqueue->size(), - audio_inputqueue->size(), - audio_outputqueue->size()); + video_output_queue->size(), + audio_input_queue->size(), + audio_output_queue->size()); output++; - if(frame == NULL) { info->info("MovEncoderThread::encode - NULL frame detected."); // Terminate @@ -159,14 +148,7 @@ void MovEncoderThread::encode(Frame* frame) // Wait until a free encoder. sem_wait(&read_sem); - // Lock input mutex - pthread_mutex_lock(&input_mutex); inputqueue->push(block); - pthread_mutex_unlock(&input_mutex); - // Unlock input mutex - - // Kick encoders - sem_post(&in_sem); // Start new block block = new FrameVector; diff --git a/src/mov_encoder_thread.h b/src/mov_encoder_thread.h index e3fba27..8cc24f8 100644 --- a/src/mov_encoder_thread.h +++ b/src/mov_encoder_thread.h @@ -36,6 +36,11 @@ #include using namespace std; +#include "frame.h" + +#include "threadsafe_queue_priority.h" +#include "threadsafe_queue_fifo.h" + #include "mov_encoder.h" #include "audio_encoder.h" #include "mov_encoder_writer.h" @@ -52,24 +57,16 @@ public: private: Info *info; - FrameVectorQueue *inputqueue; - FramePriorityQueue *video_outputqueue; - FramePriorityQueue *audio_inputqueue; - FramePriorityQueue *audio_outputqueue; + // FrameVectorQueue *inputqueue; + ThreadSafeQueueFIFO< FrameVector* > *inputqueue; FrameVector *block; //thread stuff - sem_t in_sem; - sem_t video_out_sem; - sem_t audio_in_sem; - sem_t audio_out_sem; - sem_t read_sem; - pthread_mutex_t input_mutex; - pthread_mutex_t video_output_mutex; - pthread_mutex_t audio_input_mutex; - pthread_mutex_t audio_output_mutex; + ThreadSafeQueuePriority *video_output_queue; + ThreadSafeQueuePriority *audio_input_queue; + ThreadSafeQueuePriority *audio_output_queue; volatile bool movencodersrunning; @@ -79,8 +76,6 @@ private: unsigned int num_frames_in_block; MovEncoderWriter *writer; - // pthread_t* writer_tid; - AudioEncoder* audioenc; int threads; diff --git a/src/mov_encoder_writer.cc b/src/mov_encoder_writer.cc index 717998a..732f9ba 100644 --- a/src/mov_encoder_writer.cc +++ b/src/mov_encoder_writer.cc @@ -47,8 +47,8 @@ using namespace std; #include "multiplexer.h" MovEncoderWriter::MovEncoderWriter(const char* cpr, - FramePriorityQueue *v_q, pthread_mutex_t *v_m, sem_t *v_s, - FramePriorityQueue *a_q, pthread_mutex_t *a_m, sem_t *a_s, + ThreadSafeQueuePriority *video_q, + ThreadSafeQueuePriority *audio_q, Info *i) { info = i; @@ -74,23 +74,15 @@ MovEncoderWriter::MovEncoderWriter(const char* cpr, ltime = localtime(&t); sprintf(date, "%.4d%.2d%.2d", ltime->tm_year + 1900, - ltime->tm_mon, + ltime->tm_mon + 1, // Ranging from 0 to 11 ltime->tm_mday); sprintf(fname, "%s/%s/%s/%s-%s-", server_root->c_str(), birthmonth, cpr, cpr, date); file = new File(fname, "mpg", info); - video_queue = v_q; - video_sem = v_s; - video_mutex = v_m; - - audio_queue = a_q; - audio_sem = a_s; - audio_mutex = a_m; - - video_frame_number = 0; - audio_frame_number = 0; + video_queue = video_q; + audio_queue = audio_q; running = true; } @@ -107,8 +99,8 @@ void MovEncoderWriter::thread_main() info->info("MovEncoderWriter::run"); Multiplexer multiplexer(file, info, &running, - video_queue, video_mutex, video_sem, - audio_queue, audio_mutex, audio_sem); + video_queue, + audio_queue); multiplexer.multiplex(); info->info("MovEncoderWriter::stop"); diff --git a/src/mov_encoder_writer.h b/src/mov_encoder_writer.h index e653223..3146bf8 100644 --- a/src/mov_encoder_writer.h +++ b/src/mov_encoder_writer.h @@ -33,6 +33,8 @@ #include "file.h" #include "info.h" +#include "threadsafe_queue_priority.h" + #include using namespace std; @@ -42,8 +44,8 @@ using namespace std; class MovEncoderWriter : public Thread { public: MovEncoderWriter(const char* cpr, - FramePriorityQueue *video_queue, pthread_mutex_t *video_mutex, sem_t *video_sem, - FramePriorityQueue *audio_queue, pthread_mutex_t *audio_mutex, sem_t *audio_sem, + ThreadSafeQueuePriority *video_queue, + ThreadSafeQueuePriority *audio_queue, Info *info); ~MovEncoderWriter(); @@ -56,23 +58,8 @@ private: File *file; - FramePriorityQueue *video_queue; - FramePriorityQueue *audio_queue; - pthread_mutex_t *video_mutex; - pthread_mutex_t *audio_mutex; - sem_t *video_sem; - sem_t *audio_sem; - - unsigned int video_frame_number; - unsigned int audio_frame_number; - - // Timecode_struc SCR; - // double timestamp; - -// void write_audio_header(unsigned short int packetsize); -// void write_video_header(unsigned short int packetsize); -// void write_system_header(unsigned int audio_size, unsigned int video_size); -// void write_packet_header(unsigned int audio_size, unsigned int video_size); + ThreadSafeQueuePriority *video_queue; + ThreadSafeQueuePriority *audio_queue; }; diff --git a/src/multicast.cc b/src/multicast.cc new file mode 100644 index 0000000..0cf1b87 --- /dev/null +++ b/src/multicast.cc @@ -0,0 +1,109 @@ +/* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ +/*************************************************************************** + * multicast.cc + * + * Mon Sep 26 12:25:22 CEST 2005 + * Copyright 2005 Bent Bisballe Nyeng + * deva@aasimon.org + ****************************************************************************/ + +/* + * This file is part of MIaV. + * + * MIaV is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * MIaV is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with MIaV; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. + */ +#include "config.h" +#include "multicast.h" + +#include +#include +#include +#include +#include + +Multicast::Multicast(Info *i) +{ + char addr[] = "192.168.0.10"; + int port = 666; + + info = i; + if(!UDPOpen(addr, port)) info->error("Error creating socket %s:%d", addr, port); +} + +Multicast::~Multicast() +{ +} + +void Multicast::Write(char* buf, int size) +{ + if(write(sock, buf, size) != size) info->error("Error Writing to socket."); +} + + +/* + * open UDP socket + */ +bool Multicast::UDPOpen(char * address, int port) +{ + int enable = 1L; + struct sockaddr_in stAddr; + struct sockaddr_in stLclAddr; + struct hostent * host; + // int sock; + + stAddr.sin_family = AF_INET; + stAddr.sin_port = htons(port); + if((host = gethostbyname(address)) == NULL) return false; + stAddr.sin_addr = *((struct in_addr *) host->h_addr_list[0]); + + /* Create a UDP socket */ + if((sock = socket(AF_INET, SOCK_DGRAM, 0)) < 0) + return false; + + /* Allow multiple instance of the client to share the same address and port */ if(setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (char *) &enable, sizeof(unsigned long int)) < 0) return false; + +#ifdef USE_MULTICAST + /* If the address is multicast, register to the multicast group */ + if(is_address_multicast(stAddr.sin_addr.s_addr)) + { + struct ip_mreq stMreq; + + /* Bind the socket to port */ + stLclAddr.sin_family = AF_INET; + stLclAddr.sin_addr.s_addr = htonl(INADDR_ANY); + stLclAddr.sin_port = stAddr.sin_port; + if(bind(sock, (struct sockaddr*) & stLclAddr, sizeof(stLclAddr)) < 0) return false; + + /* Register to a multicast address */ + stMreq.imr_multiaddr.s_addr = stAddr.sin_addr.s_addr; + stMreq.imr_interface.s_addr = INADDR_ANY; + if(setsockopt(sock, IPPROTO_IP, IP_ADD_MEMBERSHIP, (char *) & stMreq, sizeof(stMreq)) < 0) + return false; + } + else +#endif + { + /* Bind the socket to port */ + stLclAddr.sin_family = AF_INET; + stLclAddr.sin_addr.s_addr = htonl(INADDR_ANY); + stLclAddr.sin_port = htons(0); + if(bind(sock, (struct sockaddr*) & stLclAddr, sizeof(stLclAddr)) < 0) + return false; + } + + connect(sock, (struct sockaddr*) & stAddr, sizeof(stAddr)); + + return true; +} diff --git a/src/multicast.h b/src/multicast.h new file mode 100644 index 0000000..f0a4979 --- /dev/null +++ b/src/multicast.h @@ -0,0 +1,47 @@ +/* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ +/*************************************************************************** + * multicast.h + * + * Mon Sep 26 12:25:22 CEST 2005 + * Copyright 2005 Bent Bisballe Nyeng + * deva@aasimon.org + ****************************************************************************/ + +/* + * This file is part of MIaV. + * + * MIaV is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * MIaV is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with MIaV; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. + */ +#include "config.h" +#ifndef __MIAV_MULTICAST_H__ +#define __MIAV_MULTICAST_H__ + +#include "info.h" + +class Multicast { +public: + Multicast(Info *info); + ~Multicast(); + + void Write(char* buf, int size); + +private: + Info *info; + + bool UDPOpen(char * address, int port); + int sock; +}; + +#endif/*__MIAV_MULTICAST_H__*/ diff --git a/src/multiplexer.cc b/src/multiplexer.cc index d2aecfc..0b54bf8 100644 --- a/src/multiplexer.cc +++ b/src/multiplexer.cc @@ -68,37 +68,27 @@ static double picture_rate_index[16] = { RESERVED, RESERVED, RESERVED, RESERVED, RESERVED, RESERVED, RESERVED }; */ - Multiplexer::Multiplexer(File *f, Info *i, volatile bool *r, - FramePriorityQueue *v_q, pthread_mutex_t *v_m, sem_t *v_s, - FramePriorityQueue *a_q, pthread_mutex_t *a_m, sem_t *a_s) + ThreadSafeQueuePriority *video_q, + ThreadSafeQueuePriority *audio_q) { running = r; file = f; info = i; - queue[TYPE_VIDEO] = v_q; - queue[TYPE_AUDIO] = a_q; - - sem[TYPE_VIDEO] = v_s; - sem[TYPE_AUDIO] = a_s; - - mutex[TYPE_VIDEO] = v_m; - mutex[TYPE_AUDIO] = a_m; - frame[TYPE_VIDEO] = NULL; - frame[TYPE_AUDIO] = NULL; + written[TYPE_VIDEO] = 0.0; - frame_number[TYPE_VIDEO] = 0; - frame_number[TYPE_AUDIO] = 0; + frame[TYPE_AUDIO] = NULL; + written[TYPE_AUDIO] = 0.0; - write_system_header = 0; write_audio_packet = 0; + write_system_header = 0; audio_header_read = false; - written[TYPE_VIDEO] = 0.0; - written[TYPE_AUDIO] = 0.0; + queue[TYPE_VIDEO] = video_q; + queue[TYPE_AUDIO] = audio_q; SCR = 3904;//0x40010003LL;//0x1E80; @@ -111,29 +101,11 @@ Multiplexer::~Multiplexer() Frame *Multiplexer::getFrame(StreamType type) { - Frame *tmpframe; - Frame *frame = NULL; - - sem_wait(sem[type]); - - while( frame == NULL ) { - // Lock output mutex - pthread_mutex_lock( mutex[type] ); - tmpframe = queue[type]->top(); - - if(tmpframe && tmpframe->number == frame_number[type] ) { - queue[type]->pop(); - frame = tmpframe; - frame_number[type]++; - read[type] = 0; - } - - pthread_mutex_unlock( mutex[type] ); - // Unlock output mutex + info->info("Get %s Frame", type==TYPE_AUDIO?"Audio\0":"Video\0"); + + read[type] = 0; - sleep_0_2_frame(); - } - return frame; + return queue[type]->pop(); } int Multiplexer::read_stream(char *buf, unsigned int size, StreamType type) diff --git a/src/multiplexer.h b/src/multiplexer.h index 8d67766..2604ddc 100644 --- a/src/multiplexer.h +++ b/src/multiplexer.h @@ -36,6 +36,8 @@ #include "info.h" #include "frame.h" +#include "threadsafe_queue_priority.h" + /** * Multiplexer configuration */ @@ -67,8 +69,8 @@ typedef enum { class Multiplexer { public: Multiplexer(File *file, Info *info, volatile bool *running, - FramePriorityQueue *v_q, pthread_mutex_t *v_m, sem_t *v_s, - FramePriorityQueue *a_q, pthread_mutex_t *a_m, sem_t *a_s); + ThreadSafeQueuePriority *video_queue, + ThreadSafeQueuePriority *audio_queue); ~Multiplexer(); void multiplex(); @@ -102,10 +104,6 @@ private: Frame *getFrame(StreamType type); int read_stream(char *buf, unsigned int size, StreamType type); - FramePriorityQueue *queue[NUM_TYPES]; - pthread_mutex_t *mutex[NUM_TYPES]; - sem_t *sem[NUM_TYPES]; - Frame *frame[NUM_TYPES]; unsigned int frame_number[NUM_TYPES]; unsigned int read[NUM_TYPES]; @@ -115,7 +113,9 @@ private: volatile bool *running; // Audio Header - bool audio_header_read; + bool audio_header_read; + + ThreadSafeQueuePriority *queue[NUM_TYPES]; }; #endif/*__MIAV_MULTIPLEXER_H__*/ diff --git a/src/threadsafe_queue.cc b/src/threadsafe_queue.cc new file mode 100644 index 0000000..89f2d6a --- /dev/null +++ b/src/threadsafe_queue.cc @@ -0,0 +1,44 @@ +/* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ +/*************************************************************************** + * threadsafe_queue.cc + * + * Tue Sep 27 14:43:45 CEST 2005 + * Copyright 2005 Bent Bisballe Nyeng + * deva@aasimon.org + ****************************************************************************/ + +/* + * This file is part of MIaV. + * + * MIaV is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * MIaV is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with MIaV; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. + */ +#include "config.h" +#include "threadsafe_queue.h" +/* +template +ThreadSafeQueue::ThreadSafeQueue() +{ + pthread_mutex_init (&mutex, NULL); + sem_init(&semaphore, 0, 0); +} + +template +ThreadSafeQueue::~ThreadSafeQueue() +{ + pthread_mutex_destroy(&mutex); + sem_destroy(&semaphore); +} + +*/ diff --git a/src/threadsafe_queue.h b/src/threadsafe_queue.h new file mode 100644 index 0000000..616e81e --- /dev/null +++ b/src/threadsafe_queue.h @@ -0,0 +1,56 @@ +/* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ +/*************************************************************************** + * threadsafe_queue.h + * + * Tue Sep 27 14:01:01 CEST 2005 + * Copyright 2005 Bent Bisballe Nyeng + * deva@aasimon.org + ****************************************************************************/ + +/* + * This file is part of MIaV. + * + * MIaV is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * MIaV is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with MIaV; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. + */ +#include "config.h" +#ifndef __MIAV_THREADSAFE_QUEUE_H__ +#define __MIAV_THREADSAFE_QUEUE_H__ + +#include +#include + +template +class ThreadSafeQueue { +public: + ThreadSafeQueue() { + pthread_mutex_init (&mutex, NULL); + sem_init(&semaphore, 0, 0); + } + + virtual ~ThreadSafeQueue() { + pthread_mutex_destroy(&mutex); + sem_destroy(&semaphore); + } + + virtual void push(T t) = 0; + virtual T pop() = 0; + virtual int size() = 0; + + //protected: + pthread_mutex_t mutex; + sem_t semaphore; +}; + +#endif/*__MIAV_THREADSAFE_QUEUE_H__*/ diff --git a/src/threadsafe_queue_fifo.cc b/src/threadsafe_queue_fifo.cc new file mode 100644 index 0000000..633cb58 --- /dev/null +++ b/src/threadsafe_queue_fifo.cc @@ -0,0 +1,83 @@ +/* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ +/*************************************************************************** + * threadsafe_queue_fifo.cc + * + * Tue Sep 27 14:01:10 CEST 2005 + * Copyright 2005 Bent Bisballe Nyeng + * deva@aasimon.org + ****************************************************************************/ + +/* + * This file is part of MIaV. + * + * MIaV is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * MIaV is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with MIaV; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. + */ +#include "config.h" +#include "threadsafe_queue_fifo.h" +/* +template +ThreadSafeQueueFIFO::ThreadSafeQueueFIFO() + // : ThreadSafeQueue() +{ +} + +template +ThreadSafeQueueFIFO::~ThreadSafeQueueFIFO() +{ +} + +template +void ThreadSafeQueueFIFO::push(T t) +{ + // Lock mutex + pthread_mutex_lock( &mutex ); + queue.push(t); + pthread_mutex_unlock( &mutex ); + // Unlock mutex + + sem_post(&semaphore); +} + +template +T ThreadSafeQueueFIFO::pop() +{ + sem_wait(&semaphore); + + T t; + + // Lock mutex + pthread_mutex_lock( &mutex ); + t = queue.front(); + queue.pop(); + pthread_mutex_unlock( &mutex ); + // Unlock mutex + + return t; +} + +template +int ThreadSafeQueueFIFO::size() +{ + int sz; + + // Lock mutex + pthread_mutex_lock( &mutex ); + sz = queue.size(); + pthread_mutex_unlock( &mutex ); + // Unlock mutex + + return sz; +} +*/ diff --git a/src/threadsafe_queue_fifo.h b/src/threadsafe_queue_fifo.h new file mode 100644 index 0000000..84b2fbb --- /dev/null +++ b/src/threadsafe_queue_fifo.h @@ -0,0 +1,82 @@ +/* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ +/*************************************************************************** + * threadsafe_queue_fifo.h + * + * Tue Sep 27 14:01:10 CEST 2005 + * Copyright 2005 Bent Bisballe Nyeng + * deva@aasimon.org + ****************************************************************************/ + +/* + * This file is part of MIaV. + * + * MIaV is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * MIaV is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with MIaV; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. + */ +#include "config.h" +#ifndef __MIAV_THREADSAFE_QUEUE_FIFO_H__ +#define __MIAV_THREADSAFE_QUEUE_FIFO_H__ + +#include "threadsafe_queue.h" +#include + +template +class ThreadSafeQueueFIFO: public ThreadSafeQueue { +public: + ThreadSafeQueueFIFO() {} + ~ThreadSafeQueueFIFO() {} + + void push(T t) { + // Lock mutex + pthread_mutex_lock( &mutex ); + queue.push(t); + pthread_mutex_unlock( &mutex ); + // Unlock mutex + + sem_post(&semaphore); + } + + T pop() { + sem_wait(&semaphore); + T t; + + // Lock mutex + pthread_mutex_lock( &mutex ); + t = queue.front(); + queue.pop(); + pthread_mutex_unlock( &mutex ); + // Unlock mutex + + return t; + } + + int size() { + int sz; + + // Lock mutex + pthread_mutex_lock( &mutex ); + sz = queue.size(); + pthread_mutex_unlock( &mutex ); + // Unlock mutex + + return sz; + } + +private: + std::queue queue; +}; + + + +#endif/*__MIAV_THREADSAFE_QUEUE_FIFO_H__*/ diff --git a/src/threadsafe_queue_priority.cc b/src/threadsafe_queue_priority.cc new file mode 100644 index 0000000..130b0f5 --- /dev/null +++ b/src/threadsafe_queue_priority.cc @@ -0,0 +1,93 @@ +/* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ +/*************************************************************************** + * threadsafe_queue_priority.cc + * + * Tue Sep 27 14:01:24 CEST 2005 + * Copyright 2005 Bent Bisballe Nyeng + * deva@aasimon.org + ****************************************************************************/ + +/* + * This file is part of MIaV. + * + * MIaV is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * MIaV is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with MIaV; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. + */ +#include "config.h" +#include "threadsafe_queue_priority.h" + +#include "util.h" + +ThreadSafeQueuePriority::ThreadSafeQueuePriority(Info* i, unsigned int number) + // : ThreadSafeQueue< Frame* >() +{ + info = i; + framenumber = number; +} + +ThreadSafeQueuePriority::~ThreadSafeQueuePriority() +{ +} + +void ThreadSafeQueuePriority::push(Frame *frame) +{ + // Lock mutex + pthread_mutex_lock( &mutex ); + queue.push(frame); + pthread_mutex_unlock( &mutex ); + // Unlock mutex + + sem_post(&semaphore); +} + +Frame *ThreadSafeQueuePriority::pop() +{ + sem_wait(&semaphore); + + Frame *tmpframe = NULL; + Frame *frame = NULL; + + while( frame == NULL ) { + // Lock mutex + pthread_mutex_lock( &mutex ); + + tmpframe = queue.top(); + + if(tmpframe && tmpframe->number == framenumber ) { + queue.pop(); + frame = tmpframe; + framenumber++; + } + + pthread_mutex_unlock( &mutex ); + // Unlock mutex + + if(frame == NULL) sleep_0_2_frame(); + } + + return frame; +} + +int ThreadSafeQueuePriority::size() +{ + int sz; + + // Lock mutex + pthread_mutex_lock( &mutex ); + sz = queue.size(); + pthread_mutex_unlock( &mutex ); + // Unlock mutex + + return sz; +} diff --git a/src/threadsafe_queue_priority.h b/src/threadsafe_queue_priority.h new file mode 100644 index 0000000..8d3cdf1 --- /dev/null +++ b/src/threadsafe_queue_priority.h @@ -0,0 +1,64 @@ +/* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ +/*************************************************************************** + * threadsafe_queue_priority.h + * + * Tue Sep 27 14:01:24 CEST 2005 + * Copyright 2005 Bent Bisballe Nyeng + * deva@aasimon.org + ****************************************************************************/ + +/* + * This file is part of MIaV. + * + * MIaV is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * MIaV is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with MIaV; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. + */ +#include "config.h" +#ifndef __MIAV_THREADSAFE_QUEUE_PRIORITY_H__ +#define __MIAV_THREADSAFE_QUEUE_PRIORITY_H__ + +#include "threadsafe_queue.h" + +#include "frame.h" + +#include +#include + +#include "info.h" + +// Method for use, when comparing Frames in priority queue. +template +struct priority : std::binary_function { + bool operator() (const T& a, const T& b) const { + return ((Frame*)a)->number > ((Frame*)b)->number; + } +}; + +class ThreadSafeQueuePriority: public ThreadSafeQueue< Frame* > { +public: + ThreadSafeQueuePriority(Info *info, unsigned int framenumber = 0); + ~ThreadSafeQueuePriority(); + + void push(Frame *frame); + Frame *pop(); + int size(); + +private: + Info* info; + + unsigned int framenumber; + std::priority_queue< Frame*, std::vector, priority > queue; +}; + +#endif/*__MIAV_THREADSAFE_QUEUE_PRIORITY_H__*/ -- cgit v1.2.3