Index: common/common.c =================================================================== --- common/common.c (revision 680) +++ common/common.c (working copy) @@ -245,6 +245,13 @@ else p->i_threads = atoi(value); } + OPT("thread-queue") + { + if( !strcmp(value, "auto") ) + p->i_thread_queue = 0; + else + p->i_thread_queue = atoi(value); + } OPT2("deterministic", "n-deterministic") p->b_deterministic = atobool(value); OPT2("level", "level-idc") Index: common/common.h =================================================================== --- common/common.h (revision 680) +++ common/common.h (working copy) @@ -78,6 +78,7 @@ #include "cabac.h" #include "csp.h" #include "quant.h" +#include "cpu.h" /**************************************************************************** * Generals functions @@ -233,12 +234,18 @@ struct x264_t { /* encoder parameters */ - x264_param_t param; + x264_param_t param; - x264_t *thread[X264_THREAD_MAX]; - x264_pthread_t thread_handle; - int b_thread_active; - int i_thread_phase; /* which thread to use for the next frame */ + x264_t *thread[X264_THREAD_MAX]; /* contexts for each frame in progress */ + x264_t **thread_queue; /* frames that have been prepared but not yet claimed by a worker thread */ + x264_pthread_cond_t thread_queue_cv; + x264_pthread_mutex_t thread_queue_mutex; + x264_pthread_t *thread_handle; + int thread_active; + x264_pthread_cond_t thread_active_cv; + x264_pthread_mutex_t thread_active_mutex; + int thread_exit; + int i_thread_phase; /* which thread to use for the next frame */ /* bitstream output */ struct Index: common/cpu.c =================================================================== --- common/cpu.c (revision 680) +++ common/cpu.c (working copy) @@ -234,3 +234,28 @@ return 1; #endif } + +#ifdef HAVE_PTHREAD +void x264_cond_broadcast( x264_pthread_cond_t *cv, x264_pthread_mutex_t *mutex, int *var, int val ) +{ + x264_pthread_mutex_lock( mutex ); + *var = val; + x264_pthread_cond_broadcast( cv ); + x264_pthread_mutex_unlock( mutex ); +} + +void x264_cond_wait( x264_pthread_cond_t *cv, x264_pthread_mutex_t *mutex, int *var, int val, int cmp ) +{ + x264_pthread_mutex_lock( mutex ); + while( cmp ? (*var < val*cmp) : (*var != val) ) + x264_pthread_cond_wait( cv, mutex ); + x264_pthread_mutex_unlock( mutex ); +} + +#else +void x264_cond_broadcast( x264_pthread_cond_t *cv, x264_pthread_mutex_t *mutex, int *var, int val ) +{} +void x264_cond_wait( x264_pthread_cond_t *cv, x264_pthread_mutex_t *mutex, int *var, int val, int cmp ) +{} +#endif + Index: common/cpu.h =================================================================== --- common/cpu.h (revision 680) +++ common/cpu.h (working copy) @@ -44,4 +44,7 @@ #define x264_stack_align(func,arg) func(arg) #endif +void x264_cond_broadcast( x264_pthread_cond_t *cv, x264_pthread_mutex_t *mutex, int *var, int val ); +void x264_cond_wait( x264_pthread_cond_t *cv, x264_pthread_mutex_t *mutex, int *var, int val, int cmp ); + #endif Index: common/osdep.h =================================================================== --- common/osdep.h (revision 680) +++ common/osdep.h (working copy) @@ -106,6 +106,7 @@ #define x264_pthread_cond_destroy pthread_cond_destroy #define x264_pthread_cond_broadcast pthread_cond_broadcast #define x264_pthread_cond_wait pthread_cond_wait +#define x264_pthread_cond_signal pthread_cond_signal #else #define x264_pthread_mutex_t int #define x264_pthread_mutex_init(m,f) @@ -117,6 +118,7 @@ #define x264_pthread_cond_destroy(c) #define x264_pthread_cond_broadcast(c) #define x264_pthread_cond_wait(c,m) usleep(100) +#define x264_pthread_cond_signal(c) #endif #endif //_OSDEP_H Index: encoder/analyse.c =================================================================== --- encoder/analyse.c (revision 680) +++ encoder/analyse.c (working copy) @@ -251,7 +251,7 @@ int i_ref = i ? h->i_ref1 : h->i_ref0; for( j=0; jcv, &fref[j]->mutex, &fref[j]->i_lines_completed, thresh, 1 ); thread_mvy_range = X264_MIN( thread_mvy_range, fref[j]->i_lines_completed - pix_y ); } } Index: encoder/encoder.c =================================================================== --- encoder/encoder.c (revision 680) +++ encoder/encoder.c (working copy) @@ -62,6 +62,7 @@ static void x264_encoder_frame_end( x264_t *h, x264_t *thread_current, x264_nal_t **pp_nal, int *pi_nal, x264_picture_t *pic_out ); +static int x264_slices_write_thread( x264_t *h ); /**************************************************************************** * @@ -359,7 +360,12 @@ if( h->param.i_scenecut_threshold >= 0 ) h->param.b_pre_scenecut = 1; #endif + if( h->param.i_thread_queue == 0 ) + h->param.i_thread_queue = h->param.i_threads * 2; + h->param.i_thread_queue = x264_clip3( h->param.i_thread_queue, h->param.i_threads, X264_THREAD_MAX ); } + else + h->param.i_thread_queue = 1; if( h->param.b_interlaced ) { @@ -636,7 +642,7 @@ h->mb.i_mb_count = h->sps->i_mb_width * h->sps->i_mb_height; /* Init frames. */ - h->frames.i_delay = h->param.i_bframe + h->param.i_threads - 1; + h->frames.i_delay = h->param.i_bframe + h->param.i_thread_queue - 1; h->frames.i_max_ref0 = h->param.i_frame_reference; h->frames.i_max_ref1 = h->sps->vui.i_num_reorder_frames; h->frames.i_max_dpb = h->sps->vui.i_max_dec_frame_buffering; @@ -689,19 +695,41 @@ h->thread[0] = h; h->i_thread_num = 0; - for( i = 1; i < h->param.i_threads; i++ ) + for( i = 1; i < h->param.i_thread_queue; i++ ) h->thread[i] = x264_malloc( sizeof(x264_t) ); - for( i = 0; i < h->param.i_threads; i++ ) + if( h->param.i_threads > 1 ) { + h->thread_handle = x264_malloc( h->param.i_threads * sizeof(x264_pthread_t) ); + h->thread_queue = x264_malloc( (h->param.i_thread_queue + 1) * sizeof(x264_t*) ); + memset( h->thread_queue, 0, (h->param.i_thread_queue + 1) * sizeof(x264_t*) ); + x264_pthread_cond_init( &h->thread_queue_cv, NULL ); + x264_pthread_mutex_init( &h->thread_queue_mutex, NULL ); + } + + for( i = 0; i < h->param.i_thread_queue; i++ ) + { + x264_t *t = h->thread[i]; if( i > 0 ) - *h->thread[i] = *h; - h->thread[i]->fdec = x264_frame_pop_unused( h ); - h->thread[i]->out.p_bitstream = x264_malloc( h->out.i_bitstream ); - if( x264_macroblock_cache_init( h->thread[i] ) < 0 ) + *t = *h; + t->fdec = x264_frame_pop_unused( h ); + t->out.p_bitstream = x264_malloc( h->out.i_bitstream ); + if( x264_macroblock_cache_init( t ) < 0 ) return NULL; + + if( h->param.i_threads > 1 ) + { + x264_pthread_cond_init( &t->thread_active_cv, NULL ); + x264_pthread_mutex_init( &t->thread_active_mutex, NULL ); + } } + if( h->param.i_threads > 1 ) + { + for( i = 0; i < h->param.i_threads; i++ ) + x264_pthread_create( &h->thread_handle[i], NULL, (void*(*)(void*))x264_slices_write_thread, h ); + } + if( x264_ratecontrol_new( h ) < 0 ) return NULL; @@ -930,7 +958,8 @@ if( h->param.i_threads > 1 && h->fdec->b_kept_as_ref ) { - x264_frame_cond_broadcast( h->fdec, mb_y*16 + (b_end ? 10000 : -(X264_THREAD_HEIGHT << h->sh.b_mbaff)) ); + x264_cond_broadcast( &h->fdec->cv, &h->fdec->mutex, &h->fdec->i_lines_completed, + mb_y*16 + (b_end ? 10000 : -(X264_THREAD_HEIGHT << h->sh.b_mbaff)) ); } } @@ -1230,6 +1259,36 @@ return 0; } +static int x264_slices_write_thread( x264_t *h ) +{ + for(;;) + { + int i_frame_size; + x264_t *t = NULL; + + // get one frame from the queue + x264_pthread_mutex_lock( &h->thread_queue_mutex ); + while( !h->thread_queue[0] && !h->thread_exit ) + x264_pthread_cond_wait( &h->thread_queue_cv, &h->thread_queue_mutex ); + if( h->thread_queue[0] ) + t = (void*)x264_frame_shift( (void*)h->thread_queue ); + x264_pthread_mutex_unlock( &h->thread_queue_mutex ); + if( h->thread_exit ) + return 0; + if( !t ) + continue; + + x264_stack_align( x264_slice_write, t ); + i_frame_size = t->out.nal[t->out.i_nal-1].i_payload; + x264_fdec_filter_row( t, t->sps->i_mb_height ); + t->out.i_frame_size = i_frame_size; + + x264_cond_broadcast( &t->thread_active_cv, &t->thread_active_mutex, &t->thread_active, 0 ); + } + + return 0; +} + /**************************************************************************** * x264_encoder_encode: * XXX: i_poc : is the poc of the current given picture @@ -1257,14 +1316,13 @@ if( h->param.i_threads > 1) { int i = ++h->i_thread_phase; - int t = h->param.i_threads; + int t = h->param.i_thread_queue; thread_current = h->thread[ i%t ]; thread_prev = h->thread[ (i-1)%t ]; thread_oldest = h->thread[ (i+1)%t ]; x264_thread_sync_context( thread_current, thread_prev ); x264_thread_sync_ratecontrol( thread_current, thread_prev, thread_oldest ); h = thread_current; -// fprintf(stderr, "current: %p prev: %p oldest: %p \n", thread_current, thread_prev, thread_oldest); } else { @@ -1301,7 +1359,7 @@ if( h->frames.b_have_lowres ) x264_frame_init_lowres( h->param.cpu, fenc ); - if( h->frames.i_input <= h->frames.i_delay + 1 - h->param.i_threads ) + if( h->frames.i_input <= h->frames.i_delay + 1 - h->param.i_thread_queue ) { /* Nothing yet to encode */ /* waiting for filling bframe buffer */ @@ -1474,8 +1532,12 @@ /* Write frame */ if( h->param.i_threads > 1 ) { - x264_pthread_create( &h->thread_handle, NULL, (void*)x264_slices_write, h ); - h->b_thread_active = 1; + assert( h->thread_active == 0 ); + h->thread_active = 1; + x264_pthread_mutex_lock( &h->thread[0]->thread_queue_mutex ); + x264_frame_push( (void*)h->thread_queue, (void*)h ); + x264_pthread_cond_signal( &h->thread[0]->thread_queue_cv ); + x264_pthread_mutex_unlock( &h->thread[0]->thread_queue_mutex ); } else x264_slices_write( h ); @@ -1592,11 +1654,9 @@ int i; char psz_message[80]; - if( h->b_thread_active ) - { - x264_pthread_join( h->thread_handle, NULL ); - h->b_thread_active = 0; - } + if( h->param.i_threads > 1 ) + x264_cond_wait( &h->thread_active_cv, &h->thread_active_mutex, &h->thread_active, 0, 0 ); + if( !h->out.i_nal ) { pic_out->i_type = X264_TYPE_AUTO; @@ -1765,11 +1825,24 @@ int64_t i_yuv_size = 3 * h->param.i_width * h->param.i_height / 2; int i; - for( i=0; iparam.i_threads; i++ ) + if( h->param.i_threads > 1 ) { // don't strictly have to wait for the other threads, but it's simpler than cancelling them - if( h->thread[i]->b_thread_active ) - x264_pthread_join( h->thread[i]->thread_handle, NULL ); + h->thread_exit = 1; + x264_pthread_mutex_lock( &h->thread_queue_mutex ); + x264_pthread_cond_broadcast( &h->thread_queue_cv ); + x264_pthread_mutex_unlock( &h->thread_queue_mutex ); + for( i=0; iparam.i_threads; i++ ) + x264_pthread_join( h->thread_handle[i], NULL ); + for( i=0; iparam.i_thread_queue; i++ ) + { + x264_pthread_cond_destroy( &h->thread[i]->thread_active_cv ); + x264_pthread_mutex_destroy( &h->thread[i]->thread_active_mutex ); + } + x264_pthread_cond_destroy( &h->thread_queue_cv ); + x264_pthread_mutex_destroy( &h->thread_queue_mutex ); + x264_free( h->thread_handle ); + x264_free( h->thread_queue ); } #ifdef DEBUG_BENCHMARK Index: encoder/ratecontrol.c =================================================================== --- encoder/ratecontrol.c (revision 680) +++ encoder/ratecontrol.c (working copy) @@ -176,8 +176,8 @@ x264_cpu_restore( h->param.cpu ); - rc = h->rc = x264_malloc( h->param.i_threads * sizeof(x264_ratecontrol_t) ); - memset( rc, 0, h->param.i_threads * sizeof(x264_ratecontrol_t) ); + rc = h->rc = x264_malloc( h->param.i_thread_queue * sizeof(x264_ratecontrol_t) ); + memset( rc, 0, h->param.i_thread_queue * sizeof(x264_ratecontrol_t) ); rc->b_abr = h->param.rc.i_rc_method != X264_RC_CQP && !h->param.rc.b_stat_read; rc->b_2pass = h->param.rc.i_rc_method == X264_RC_ABR && h->param.rc.b_stat_read; @@ -457,7 +457,8 @@ x264_free( p ); } - for( i=1; iparam.i_threads; i++ ) + rc->frame_size_planned = rc->buffer_rate; // init so that idles threads have no effect on the planning + for( i=1; iparam.i_thread_queue; i++ ) { h->thread[i]->rc = rc+i; rc[i] = rc[0]; @@ -1163,12 +1164,10 @@ { int j = h->rc - h->thread[0]->rc; int i; - for( i=1; iparam.i_threads; i++ ) + for( i=1; iparam.i_thread_queue; i++ ) { - x264_t *t = h->thread[ (j+i)%h->param.i_threads ]; + x264_t *t = h->thread[ (j+i)%h->param.i_thread_queue ]; double bits = t->rc->frame_size_planned; - if( !t->b_thread_active ) - continue; rcc->buffer_fill += rcc->buffer_rate - bits; rcc->buffer_fill = x264_clip3( rcc->buffer_fill, 0, rcc->buffer_size ); } @@ -1363,7 +1362,7 @@ } else { - int i_frame_done = h->fenc->i_frame + 1 - h->param.i_threads; + int i_frame_done = h->fenc->i_frame + 1 - h->param.i_thread_queue; q = get_qscale( h, &rce, rcc->wanted_bits_window / rcc->cplxr_sum, h->fenc->i_frame ); Index: encoder/slicetype.c =================================================================== --- encoder/slicetype.c (revision 680) +++ encoder/slicetype.c (working copy) @@ -380,7 +380,7 @@ for( j = 0; h->frames.next[j]; j++ ) frames[j+1] = h->frames.next[j]; keyint_limit = h->param.i_keyint_max - frames[0]->i_frame + h->frames.i_last_idr - 1; - num_frames = X264_MIN( j, keyint_limit ); + num_frames = X264_MIN3( j, keyint_limit, h->param.i_bframe+1 ); if( num_frames == 0 ) return; Index: x264.c =================================================================== --- x264.c (revision 680) +++ x264.c (working copy) @@ -304,6 +304,7 @@ H0( " --no-psnr Disable PSNR computation\n" ); H0( " --no-ssim Disable SSIM computation\n" ); H0( " --threads Parallel encoding\n" ); + H1( " --thread-queue Number of delay frames for thread sync\n" ); H0( " --thread-input Run Avisynth in its own thread\n" ); H1( " --non-deterministic Slightly improve quality of SMP, at the cost of repeatability\n" ); H1( " --no-asm Disable all CPU optimizations\n" ); @@ -425,6 +426,7 @@ { "zones", required_argument, NULL, 0 }, { "qpfile", required_argument, NULL, OPT_QPFILE }, { "threads", required_argument, NULL, 0 }, + { "thread-queue", required_argument, NULL, 0 }, { "thread-input", no_argument, NULL, OPT_THREAD_INPUT }, { "non-deterministic", no_argument, NULL, 0 }, { "no-psnr", no_argument, NULL, 0 }, Index: x264.h =================================================================== --- x264.h (revision 680) +++ x264.h (working copy) @@ -139,6 +139,7 @@ /* CPU flags */ unsigned int cpu; int i_threads; /* encode multiple frames in parallel */ + int i_thread_queue; /* number of frames to prepare in advance (>= i_threads) */ int b_deterministic; /* whether to allow non-deterministic optimizations when threaded */ /* Video Properties */