# # Patch managed by http://www.holgerschurig.de/patcher.html # --- asterisk/rtp.c~jb-rtp-sip.scx.patch +++ asterisk/rtp.c @@ -68,6 +68,56 @@ #define FLAG_3389_WARNING (1 << 0) + +/* Slav - some static jitterbuffer stuff */ +#ifdef USE_JB +#include "scx_jitterbuf.h" +#include +#include + +/* Macros for JB logs */ +/*#define jb_verbose(...) ast_verbose(VERBOSE_PREFIX_3 " ***[RTP JB LOG]*** " __VA_ARGS__)*/ +#define jb_verbose(...) if(1){\ + char tmp[192];\ + char msg[128];\ + snprintf(msg, sizeof(msg), VERBOSE_PREFIX_3 "***[RTP JB LOG]*** " __VA_ARGS__);\ + ast_verbose("%s\n", term_color(tmp, msg, COLOR_BRGREEN, 0, sizeof(tmp)));} + +/* Macros for the frame log files */ +#define jb_framelog(...) if(rtp->jb_logfile) {fprintf(rtp->jb_logfile, __VA_ARGS__); fflush(rtp->jb_logfile);} + +/* Counter for the frame log files */ +static int rtp_framelog_counter = 0; + +/* Counter for jb create-destroy debuging */ +static int jb_alloc_counter = 0; + +/* Scheduler context for the jitterbuffer */ +static struct sched_context *jb_sched; + +/* Scheduler thread */ +static pthread_t jb_sched_thread; + +/* Mutex for the scheduler queue synchronization */ +AST_MUTEX_DEFINE_STATIC(jb_sched_lock); + +/* Conditional variable for the scheduler queue synchronization */ +static pthread_cond_t jb_sched_flag = PTHREAD_COND_INITIALIZER; + +/* JB function prototypes */ +static void * jb_scheduler_thread(void *data); +static struct ast_frame *ast_rtp_read_real(struct ast_rtp *rtp); +static int put_in_jb(struct ast_rtp *rtp, struct ast_frame *f); +static int get_from_jb(void *data); +static int rtp_jb_create(struct ast_rtp *rtp, struct ast_frame *frame, int len, long ts); +static void rtp_jb_clear(struct ast_rtp *rtp); +static void rtp_jb_destroy(struct ast_rtp *rtp); +static long rtp_get_now(struct ast_rtp *rtp); +static void jb_get_timespec(struct timespec *ts, int millisec_offset); +#endif +/* End Slav */ + + struct ast_rtp { int s; char resp; @@ -106,6 +156,34 @@ int rtp_lookup_code_cache_result; int rtp_offered_from_local; struct ast_rtcp *rtcp; + /* Slav - jitterbuffer related members */ +#ifdef USE_JB + /* Is jitterbuffer enabled on this rtp? */ + int jb_enabled; + /* Configured size of the jitterbuf */ + int jbsize; + /* The jitterbuffer itself */ + struct scx_jb *jb; + /* Time base for the jb */ + struct timeval timebase; + /* Scheduler id */ + int jbid; + /* Mutex for synchronization of the jb_put() - jb_get() operations and for destroy safety also */ + ast_mutex_t jblock; + /* Indicates that this rtp has passed ast_rtp_destroy() */ + int destroyed; + /* The pvt helper */ + struct ast_rtp_pvt_helper pvt_helper; + /* is the channel up? */ + int isup; + /* Is the timestamp tracing enabled for this jb? */ + int jblog; + /* File for frame timestamp tracing */ + FILE *jb_logfile; + /* Pathname of the log file */ + char jb_logfile_pathname[32]; +#endif + /* End Slav */ }; struct ast_rtcp { @@ -306,6 +384,620 @@ return f; } + + + + + +/* Slav - ****************************************************** + ******************** Utility functions ********************** + *************************************************************/ + + +/* Slav - just get the time in millisec, measured for the rtp timebase. + rtp->timebase should be initialized before calling this. */ +#ifdef USE_JB +static long rtp_get_now(struct ast_rtp *rtp) +{ + struct timeval tv; + + gettimeofday(&tv, NULL); + + return (long) ((tv.tv_sec - rtp->timebase.tv_sec) * 1000) + + (long) ((double) (tv.tv_usec - rtp->timebase.tv_usec) / 1000.0); +} +#endif +/* End Slav */ + + +/* Slav - utility function - fills in the members of a timespec struct with the time now + millisec_offset + millisec_offset must be between 0 and 1000 */ +#ifdef USE_JB +static void jb_get_timespec(struct timespec *ts, int millisec_offset) +{ + struct timeval tv; + + gettimeofday(&tv, NULL); + + ts->tv_sec = tv.tv_sec; + ts->tv_nsec = tv.tv_usec * 1000 + millisec_offset * 1000000; + if(ts->tv_nsec >= 1000000000) + { + ts->tv_sec += 1; + ts->tv_nsec -= 1000000000; + } +} +#endif +/* End Slav */ + +/* End Slav - ************************************************** + ****************** End Utility functions ******************** + *************************************************************/ + + + + +/* Slav - ****************************************************** + ***************** Jitterbuf main functions ****************** + *************************************************************/ + +/* Slav - puts a frame into the jitterbuf */ +#ifdef USE_JB +static int put_in_jb(struct ast_rtp *rtp, struct ast_frame *f) +{ + long ts; + long now; + int len; + struct ast_frame *frame; + + /* if the call is not yet answered, don't put the frame into the jb - just let it go through + the regular way. */ + if(rtp->pvt_helper.get_chan_state(rtp->pvt_helper.pvt) != AST_STATE_UP) + { + return -1; + } + else if(!rtp->isup) + { + /* The channel is up for the very first time. */ + rtp->isup = 1; + + /* Check if we actually need a jitterbuf on this channel */ + if(rtp->pvt_helper.wants_jitter(rtp->pvt_helper.pvt)) + { + /* Disable the use of a jitterbuf as far as the bridged channel can accept jitter */ + rtp->jb_enabled = 0; + jb_verbose("Detected bridged channel that can accept jitter. Disabling the jitterbuffer."); + return -1; + } + + /* We expect the first frame in jb to be a voice frame. Else reset the isup flag until a + voice frame is received */ + if(f->frametype != AST_FRAME_VOICE) + { + rtp->isup = 0; + return -1; + } + + /* convert the timestamp of the frame from samples to millisec */ + ts = rtp->lastrxts / 8; + len = ast_codec_get_samples(f) / 8; + /* Allocate a new frame */ + frame = ast_frisolate(f); + if(frame == NULL) + { + ast_log(LOG_ERROR, "Cannot isolate frame for the jitterbuffer. Disabling the jitterbuffer.\n"); + /* Disable the jb */ + rtp->jb_enabled = 0; + return -1; + } + + /* Create and start a jitterbuf on this rtp */ + return rtp_jb_create(rtp, frame, len, ts); + } + + /* convert the timestamp of the frame from samples to millisec */ + ts = rtp->lastrxts / 8; + len = 0; + if(f->frametype == AST_FRAME_VOICE) + { + len = ast_codec_get_samples(f) / 8; + } + else + { + /* Maybe this is DTMF? Set the force resynch flag */ + scx_jb_set_force_resynch(rtp->jb); + + /* Some logging here */ + if(rtp->jb_logfile != NULL) + { + now = rtp_get_now(rtp); + /* lock the jb mutex */ + ast_mutex_lock(&rtp->jblock); + jb_framelog("JB_PUT {now=%ld}: Received non voice frame (maybe DTMF) with ts=%ld and len=%d." + " Force resynching jb...\n", now, ts, len); + /* unlock the jb mutex */ + ast_mutex_unlock(&rtp->jblock); + } + + /* Put only voice frames into the jb */ + return -1; + } + + /* Allocate a new frame */ + frame = ast_frisolate(f); + if(frame == NULL) + { + ast_log(LOG_ERROR, "Cannot isolate frame for the jitterbuffer.\n"); + return -1; + } + + /* lock the jb mutex */ + ast_mutex_lock(&rtp->jblock); + + /* The moment now in jb time (in millisec) */ + now = rtp_get_now(rtp); + + /* Put into the jitterbuffer. If jb_put returns JB_DROP, the frame should be dropped */ + if(scx_jb_put(rtp->jb, frame, len, ts, now) == JB_DROP) + { + jb_framelog("JB_PUT {now=%ld}: Dropped frame with ts=%ld and len=%d\n", now, ts, len); + + /* unlock the jb mutex */ + ast_mutex_unlock(&rtp->jblock); + + /* Drop the frame - just do nothing, except of cource, freeing the isolated frame. */ + ast_frfree(frame); + } + else + { + jb_framelog("JB_PUT {now=%ld}: Queued frame with ts=%ld and len=%d\n", now, ts, len); + + /* unlock the jb mutex */ + ast_mutex_unlock(&rtp->jblock); + } + + return 0; +} +#endif +/* End Slav */ + + +/* Slav - gets a frame from the jitterbuf for time now */ +#ifdef USE_JB +static int get_from_jb(void *data) +{ + struct ast_rtp *rtp = (struct ast_rtp *) data; + struct scx_jb_frame frame; + struct ast_frame af; + int ret; + long now; + long next; + long when; + int interpl; + + if(rtp == NULL) + { + /* this should never happens */ + ast_log(LOG_ERROR, "Received NULL rtp data!\n"); + /* if in debug - abort asterisk */ + assert(0); + /* if no - it will crash on the next line */ + } + + /* lock the jb mutex */ + ast_mutex_lock(&rtp->jblock); + + /* check for destroy conditon */ + if(rtp->destroyed) + { + /* unlock the jb mutex */ + ast_mutex_unlock(&rtp->jblock); + + /* Clear and destroy the jb and the rtp */ + rtp_jb_clear(rtp); + rtp_jb_destroy(rtp); + + /* return 0 - no reschedules of this jb should happen after this. */ + return 0; + } + + /* The moment now in jb time (in millisec) */ + now = rtp_get_now(rtp); + + /* is it time for the next frame in the jb? */ + if(now >= (next = scx_jb_next(rtp->jb))) + { + /* get a frame from the jb for time now */ + interpl = ast_codec_interp_len(rtp->lastrxformat); + ret = scx_jb_get(rtp->jb, &frame, now, interpl); + + /* Proceed with the frame timestamps logging while still holding the jb mutex lock */ + if(rtp->jb_logfile != NULL) + { + switch(ret) + { + case JB_OK: + jb_framelog("\tJB_GET {now=%ld}: Delivered frame with ts=%ld and len=%ld\n", now, frame.ts, frame.ms); + break; + case JB_INTERP: + jb_framelog("\tJB_GET {now=%ld}: Interpolated frame with len=%d\n", now, interpl); + break; + case JB_DROP: + jb_framelog("\tJB_GET {now=%ld}: Dropped frame with ts=%ld and len=%ld\n", now, frame.ts, frame.ms); + break; + case JB_NOFRAME: + jb_framelog("\tJB_GET {now=%ld}: No frame\n", now); + break; + default: + ast_log(LOG_ERROR, "This should never happens!\n"); + assert(0); + break; + } + } + + /* unlock the jb mutex */ + ast_mutex_unlock(&rtp->jblock); + + switch(ret) + { + case JB_OK: + /* deliver the frame into the channel's queue (internally locks the channel mutex!) */ + if(!rtp->destroyed) + { + rtp->pvt_helper.queue_frame(rtp->pvt_helper.pvt, frame.data); + } + ast_frfree(frame.data); + break; + case JB_INTERP: + /* Create an interpolation frame */ + if(rtp->destroyed) + { + break; + } + af.frametype = AST_FRAME_VOICE; + af.subclass = rtp->pvt_helper.get_nativeformats(rtp->pvt_helper.pvt); + af.datalen = 0; + af.samples = frame.ms * 8; + af.mallocd = 0; + af.src = "SIP JB interpolation"; + af.data = NULL; + /* TODO: Do we need to set a delivery timeval? */ + af.delivery.tv_sec = rtp->rxcore.tv_sec; + af.delivery.tv_usec = rtp->rxcore.tv_usec; + af.delivery.tv_sec += next / 1000; + af.delivery.tv_usec += (next % 1000) * 1000; + af.offset=AST_FRIENDLY_OFFSET; + if(af.delivery.tv_usec >= 1000000) + { + af.delivery.tv_usec -= 1000000; + af.delivery.tv_sec += 1; + } + /* deliver the frame into the channel's queue (internally locks the channel mutex!) */ + if(!rtp->destroyed) + { + rtp->pvt_helper.queue_frame(rtp->pvt_helper.pvt, &af); + } + break; + case JB_DROP: + /* Just free the frame */ + ast_frfree(frame.data); + break; + case JB_NOFRAME: + break; + default: + ast_log(LOG_ERROR, "This should never happens!"); + assert(0); + break; + } + } + else + { + jb_framelog("\tJB_GET {now=%ld}: now < next=%ld\n", now, next); + + /* unlock the jb mutex */ + ast_mutex_unlock(&rtp->jblock); + } + + /* Get next until is >= now */ + ast_mutex_lock(&rtp->jblock); + while((next = scx_jb_next(rtp->jb)) < now) + { + ret = scx_jb_get(rtp->jb, &frame, now, ast_codec_interp_len(rtp->lastrxformat)); + switch(ret) + { + case JB_OK: + ast_frfree(frame.data); + break; + case JB_INTERP: + break; + case JB_DROP: + ast_frfree(frame.data); + break; + case JB_NOFRAME: + break; + default: + ast_log(LOG_ERROR, "This should never happens!"); + assert(0); + break; + } + } + ast_mutex_unlock(&rtp->jblock); + + /* Calc how many millisec remains to the next time a frame should be delivered */ + when = next - now; + /* Don't use when < 3ms */ + if(when < 3) + { + when = 3; + } + + /* reschedule (adds again this schedule) */ + rtp->jbid = ast_sched_add(jb_sched, when, get_from_jb, rtp); + + return 0; +} +#endif +/* End Slav */ + + +/* Slav - called from a channel who wants a jitterbuff enabled - preforms jitterbuff initialization */ +#ifdef USE_JB +void ast_rtp_jb_enable(struct ast_rtp *rtp, struct ast_rtp_pvt_helper *pvt_helper, int jbsize, int jblog) +{ + /* Initialize the destroy flag */ + rtp->destroyed = 0; + + /* Initialize the jb sched id */ + rtp->jbid = -1; + + /* Copy the pvt_helper data. TODO: Make sure the pvt_helper members are valid */ + memcpy(&rtp->pvt_helper, pvt_helper, sizeof(struct ast_rtp_pvt_helper)); + + /* Set the configured size */ + rtp->jbsize = jbsize; + + rtp->jblog = jblog; + + /* Set the enabled flag */ + rtp->jb_enabled = 1; +} +#endif +/* End Slav */ + + +/* Slav - allocates new jitterbuf */ +#ifdef USE_JB +static int rtp_jb_create(struct ast_rtp *rtp, struct ast_frame *frame, int len, long ts) +{ + struct scx_jb_conf jbconf; + int res; + long now, next, when; + + /* Shouldn't be called more than once on a given rtp! */ + assert(rtp->jb == NULL); + + jbconf.jbsize = rtp->jbsize; + jbconf.resync_threshold = 1000; + + /* Create the jitterbuf */ + rtp->jb = scx_jb_new(&jbconf); + + /* Is the new jitterbuf successfuly created? */ + if(rtp->jb != NULL) + { + /* Initialize the jb mutex */ + ast_mutex_init(&rtp->jblock); + + /* Init the timebase */ + gettimeofday(&rtp->timebase, NULL); + + /* Get now */ + now = rtp_get_now(rtp); + + /* Put the first frame passed */ + res = scx_jb_put_first(rtp->jb, frame, len, ts, now); + /* Putting the first frame should always result is JB_OK */ + assert(res == JB_OK); + /* Calc when the first get should happen */ + next = scx_jb_next(rtp->jb); + when = next - now; + /* When shouldnt be smaller than 1 */ + assert(when > 0); + + /* lock the queue mutex */ + ast_mutex_lock(&jb_sched_lock); + + /* Create a frame log file */ + if(rtp->jblog) + { + snprintf(rtp->jb_logfile_pathname, sizeof(rtp->jb_logfile_pathname), + "/tmp/jb_frames_%.5d.log", rtp_framelog_counter++); + rtp->jb_logfile = fopen(rtp->jb_logfile_pathname, "w+b"); + + jb_framelog("JB_PUT_FIRST {now=%ld}: Queued frame with ts=%ld and len=%d\n", now, ts, len); + } + + /* Increment the jb alloc counter */ + jb_alloc_counter++; + + /* add a new schedule for get_from_jb() (scheduled each 20 ms?) */ + rtp->jbid = ast_sched_add(jb_sched, when, get_from_jb, rtp); + + /* signal the schedule queue to wakeup the scheduler (if its waiting) */ + pthread_cond_signal(&jb_sched_flag); + + jb_verbose("Jitterbuffer created and started. jb_alloc=%d.", jb_alloc_counter); + + /* unlock the queue mutex */ + ast_mutex_unlock(&jb_sched_lock); + } + else + { + /* Cannot allocate memory for the jb - only log a warning and continue using the + channel without a jb */ + ast_log(LOG_WARNING, "Unable to allocate new jitterbuffer.\n"); + rtp->jb_enabled = 0; + return -1; + } + + return 0; +} +#endif +/* End Slav */ + + +/* Slav - removes all remaining frames from the jb and frees them */ +#ifdef USE_JB +static void rtp_jb_clear(struct ast_rtp *rtp) +{ + struct scx_jb_frame frame; + + /* remove and free any remaining frames in the jb */ + while(scx_jb_remove(rtp->jb,&frame) == JB_OK) + { + ast_frfree(frame.data); + } +} +#endif +/* End Slav */ + + +/* Slav - destroys the jitterbuf and the jb mutex and frees its rtp container */ +#ifdef USE_JB +static void rtp_jb_destroy(struct ast_rtp *rtp) +{ + /* destroy the jitterbuffer */ + scx_jb_destroy(rtp->jb); + rtp->jb = NULL; + + /* close the framelog file */ + if(rtp->jb_logfile != NULL) + { + fseek(rtp->jb_logfile, 0, SEEK_END); + int len = ftell(rtp->jb_logfile); + fclose(rtp->jb_logfile); + rtp->jb_logfile = NULL; + + /* If there is less than 1024 bytes logged in this file - delete it */ + if(len < 1024) + { + remove(rtp->jb_logfile_pathname); + } + } + + /* lock the queue mutex */ + ast_mutex_lock(&jb_sched_lock); + /* Decrement the jb_alloc_counter (Called with the jb_sched_lock held) */ + jb_alloc_counter--; + jb_verbose("Jitterbuffer destroyed. *** *** jb_alloc_counter=%d *** ***.", jb_alloc_counter); + /* unlock the queue mutex */ + ast_mutex_unlock(&jb_sched_lock); + + /* destroy the jb mutex */ + ast_mutex_destroy(&rtp->jblock); + + /* finally free the rtp */ + free(rtp); +} +#endif +/* End Slav */ + + +/* Slav - jitterbuffer scheduler thread function */ +#ifdef USE_JB +static void * jb_scheduler_thread(void *data) +{ + int when; + struct timespec abstime; + int res; + + jb_verbose("The rtp jitterbuffer scheduler thread is running."); + + while(1) + { + /* lock the queue mutex */ + ast_mutex_lock(&jb_sched_lock); + + /* get how many millisecs to wait until the first event scheduled */ + when = ast_sched_wait(jb_sched); + /* don't wait more than 1 sec. when < 0 means that the sched queue is empty */ + if(when < 0 || when > 1000) + { + when = 1000; + } + + /* TODO: What will be the effect of this? */ + if(when == 0) + { + when = 1; + } + + /* wait until it is time for the first schedule - can be interrupted by adding new schedule + (internally unlocks the queue mutex and locks it after the waiting is finished) */ + if(when > 0) + { + jb_get_timespec(&abstime, when); + res = pthread_cond_timedwait(&jb_sched_flag, &jb_sched_lock, &abstime); + if(res) + { + if(res != ETIMEDOUT) + { + ast_log(LOG_ERROR, "pthread_cond_timedwait() returned an error: '%s'\n", strerror(res)); + } + } + } + + /* unlock the queue mutex */ + ast_mutex_unlock(&jb_sched_lock); + + /* exec all for time now */ + ast_sched_runq(jb_sched); + } + + return NULL; +} +#endif +/* End Slav */ + + +/* Slav - the jitterbuf version of ast_rtp_read(). Returns AST_FRAME_NULL if the jb is used */ +#ifdef USE_JB +struct ast_frame *ast_rtp_read(struct ast_rtp *rtp) +{ + static struct ast_frame *frr, nf = { AST_FRAME_NULL, }; + + /* Is a jitterbuffer enabled on this rtp? */ + if(rtp->jb_enabled) + { + /* Read a frame from the network */ + frr = ast_rtp_read_real(rtp); + + /* Put the frame into the jb. */ + if(put_in_jb(rtp, frr) == 0) + { + return &nf; + } + + /* If put_in_jb() returns non zero (e.g. the channel is not up), deliver the frame immediately. */ + return frr; + } + + /* If not using jb, deliver the frame immediately. */ + return ast_rtp_read_real(rtp); +} +#endif +/* End Slav */ + + +/* End Slav - ************************************************** + *************** End Jitterbuf main functions **************** + *************************************************************/ + + + + + + static int rtpread(int *id, int fd, short events, void *cbdata) { struct ast_rtp *rtp = cbdata; @@ -374,7 +1066,16 @@ *tv = ast_tvadd(rtp->rxcore, ts); } +/* Slav - the real ast_rtp_read() function will be used if the jitterbuffer is not used */ +#ifndef USE_JB +/* End Slav */ struct ast_frame *ast_rtp_read(struct ast_rtp *rtp) +/* Slav - else predefine the real ast_rtp_read() func, so the jb version will be used from + all rtp channels */ +#else +static struct ast_frame *ast_rtp_read_real(struct ast_rtp *rtp) +#endif +/* End Slav */ { int res; struct sockaddr_in sin; @@ -1015,7 +1716,45 @@ close(rtp->rtcp->s); free(rtp->rtcp); } + /* Slav - don't free the rtp here if we are using the jb - just add it to the free list. */ +#ifndef USE_JB + /* End Slav */ free(rtp); + /* Slav - set the destroy flag to the rtp, but only if a jb is enabled */ +#else + /* + if(rtp->jb_enabled) + { + jb_verbose("ast_rtp_destroy() invoked."); + } + */ + /* Does a jitterbuf is used for this rtp? */ + if(rtp->jb != NULL) + { + /* lock the jb mutex */ + ast_mutex_lock(&rtp->jblock); + + /* set the destroy flag to let get_from_jb() knows that the rtp is on the free list */ + rtp->destroyed = 1; + rtp->jb_enabled = 0; + + /* unlock the jb mutex */ + ast_mutex_unlock(&rtp->jblock); + + /* If there was no sched added yet (the channel was never up), destroy here */ + if(rtp->jbid == -1) + { + rtp_jb_clear(rtp); + rtp_jb_destroy(rtp); + } + } + else + { + /* No - free the rtp */ + free(rtp); + } +#endif + /* End Slav */ } static unsigned int calc_txstamp(struct ast_rtp *rtp, struct timeval *delivery) @@ -1773,4 +2512,24 @@ ast_cli_register(&cli_debug_ip); ast_cli_register(&cli_no_debug); ast_rtp_reload(); + /* Slav - init the rtp jb scheduler stuff */ +#ifdef USE_JB + /* Create a scheduler context */ + jb_sched = sched_context_create(); + if(jb_sched == NULL) + { + ast_log(LOG_ERROR, "Unable to create rtp jb scheduler context.\n"); + assert(0); + } + + /* Start the scheduler thread */ + if(ast_pthread_create(&jb_sched_thread, NULL, jb_scheduler_thread, NULL) < 0) + { + ast_log(LOG_ERROR, "Unable to start rtp jb scheduler thread.\n"); + assert(0); + } + + jb_verbose("Started the rtp jitterbuffer scheduler thread."); +#endif + /* End Slav */ } --- asterisk/include/asterisk/rtp.h~jb-rtp-sip.scx.patch +++ asterisk/include/asterisk/rtp.h @@ -25,6 +25,12 @@ extern "C" { #endif + +/* Slav - define USE_JB to enable the jitterbuffer for all rtp channels */ +#define USE_JB +/* End Slav */ + + /* Codes for RTP-specific data - not defined by our AST_FORMAT codes */ /*! DTMF (RFC2833) */ #define AST_RTP_DTMF (1 << 0) @@ -49,6 +55,35 @@ struct ast_rtp; + +/* Slav - jitterbuffer related stuff */ +#ifdef USE_JB +typedef int (*pvt_queue_frame)(void *pvt, struct ast_frame *f); +typedef int (*pvt_get_chan_state)(void *pvt); +typedef int (*pvt_get_nativeformats)(void *pvt); +typedef int (*pvt_wants_jitter)(void *pvt); + +/* Defines technology independent way to access some channel functionality */ +struct ast_rtp_pvt_helper +{ + /* Pointer to the tech private structure */ + void *pvt; + /* Pointer to a function for queueing a frame into a channel */ + pvt_queue_frame queue_frame; + /* Pointer to a function, returning the current channel state */ + pvt_get_chan_state get_chan_state; + /* Pointer to a function, returning the supported from the channel formats */ + pvt_get_nativeformats get_nativeformats; + /* Pointer to a function, indicating if the underlying channel can accept jitter */ + pvt_wants_jitter wants_jitter; +}; + +/* Enables the jitterbuff for the specified channel (and rtp). Performs jitterbuff initialization */ +void ast_rtp_jb_enable(struct ast_rtp *rtp, struct ast_rtp_pvt_helper *pvt_helper, int jbsize, int jblog); +#endif +/* End Slav */ + + typedef int (*ast_rtp_callback)(struct ast_rtp *rtp, struct ast_frame *f, void *data); struct ast_rtp *ast_rtp_new(struct sched_context *sched, struct io_context *io, int rtcpenable, int callbackmode); --- asterisk/channels/chan_sip.c~jb-rtp-sip.scx.patch +++ asterisk/channels/chan_sip.c @@ -2887,10 +2887,97 @@ snprintf(callid, len, "@%s", ast_inet_ntoa(iabuf, sizeof(iabuf), ourip)); } + +/* Slav - jitterbuf global config */ +#ifdef USE_JB +static int usejb = 1; +static int jbsize_default = 300; +static int jbsize = 300; +static int forcejb = 0; +static int jblog = 0; +#endif +/* End Slav */ + + +/* Slav - functions for the rtp pvt helper */ +#ifdef USE_JB +static int sip_queue_frame(void *pvt, struct ast_frame *f) +{ + struct sip_pvt *p = (struct sip_pvt *) pvt; + if(p->owner != NULL) + { + return ast_queue_frame(p->owner, f); + } + else + { + return -1; + } +} + +static int sip_get_chan_state(void *pvt) +{ + struct sip_pvt *p = (struct sip_pvt *) pvt; + if(p->owner != NULL) + { + return p->owner->_state; + } + else + { + return -1; + } +} + +static int sip_get_nativeformats(void *pvt) +{ + struct sip_pvt *p = (struct sip_pvt *) pvt; + if(p->owner != NULL) + { + return p->owner->nativeformats; + } + else + { + return -1; + } +} + +static int sip_wants_jitter(void *pvt) +{ + struct sip_pvt *p = (struct sip_pvt *) pvt; + struct ast_channel *chan, *bridged_chan; + + /* Always return false if forcejb is true - this will force the use of a jitterbuf + even on channels that can accept jitter */ + if(forcejb) + { + return 0; + } + + chan = p->owner; + bridged_chan = ast_bridged_channel(chan); + + if(chan == NULL || bridged_chan == NULL) + { + /* If we don't have a channel yet (or already?), or we're not bridged yet (or already?) + say yes to the caller to not allow it to create a jitterbuf. In principle this shouldn't + happens. */ + return 1; + } + + return (bridged_chan->tech->properties & AST_CHAN_TP_WANTSJITTER); +} +#endif +/* End Slav */ + + /*--- sip_alloc: Allocate SIP_PVT structure and set defaults ---*/ static struct sip_pvt *sip_alloc(char *callid, struct sockaddr_in *sin, int useglobal_nat, const int intended_method) { struct sip_pvt *p; + /* Slav - pvt_helper fot the rtp jitterbuffer */ +#ifdef USE_JB + struct ast_rtp_pvt_helper pvt_helper; +#endif + /* End Slav */ p = malloc(sizeof(struct sip_pvt)); if (!p) @@ -2924,6 +3011,21 @@ if (sip_methods[intended_method].need_rtp) { p->rtp = ast_rtp_new_with_bindaddr(sched, io, 1, 0, bindaddr.sin_addr); + /* Slav - enable a jitterbuffer on the newly created rtp */ +#ifdef USE_JB + if(p->rtp != NULL && usejb) + { + /* Init the pvt helper functions */ + pvt_helper.pvt = p; + pvt_helper.queue_frame = sip_queue_frame; + pvt_helper.get_chan_state = sip_get_chan_state; + pvt_helper.get_nativeformats = sip_get_nativeformats; + pvt_helper.wants_jitter = sip_wants_jitter; + /* enable the jitterbuffer */ + ast_rtp_jb_enable(p->rtp, &pvt_helper, jbsize, jblog); + } +#endif + /* End Slav */ if (videosupport) p->vrtp = ast_rtp_new_with_bindaddr(sched, io, 1, 0, bindaddr.sin_addr); if (!p->rtp) { @@ -11238,7 +11340,34 @@ ast_log(LOG_WARNING, "'%s' is not a valid RTP hold time at line %d. Using default.\n", v->value, v->lineno); global_rtptimeout = 0; } - } else if (!strcasecmp(v->name, "rtpholdtimeout")) { + } + /* Slav - jitterbuffer config */ +#ifdef USE_JB + else if(!strcasecmp(v->name, "usejb")) + { + usejb = ast_true(v->value); + } + else if(!strcasecmp(v->name, "jbsize")) + { + if((sscanf(v->value, "%d", &jbsize) != 1) || (jbsize < 0)) + { + ast_log(LOG_WARNING, + "'%s' is not a valid SIP jitterbuf size at line %d. Using default.\n", + v->value, v->lineno); + jbsize = jbsize_default; + } + } + else if(!strcasecmp(v->name, "forcejb")) + { + forcejb = ast_true(v->value); + } + else if(!strcasecmp(v->name, "jblog")) + { + jblog = ast_true(v->value); + } +#endif + /* End Slav */ + else if (!strcasecmp(v->name, "rtpholdtimeout")) { if ((sscanf(v->value, "%d", &global_rtpholdtimeout) != 1) || (global_rtpholdtimeout < 0)) { ast_log(LOG_WARNING, "'%s' is not a valid RTP hold time at line %d. Using default.\n", v->value, v->lineno); global_rtpholdtimeout = 0; --- /dev/null +++ asterisk/scx_jitterbuf.c @@ -0,0 +1,416 @@ +/* + * Securax jitterbuffer. + * + * Securax jitterbuffer implementation. + * + * Slav Klenov + * + */ + +#include +#include +#include +#include +#include "scx_jitterbuf.h" + + +/* private scx_jb structure */ +struct scx_jb +{ + struct scx_jb_frame *frames; + struct scx_jb_frame *tail; + struct scx_jb_frame *free_frames; + struct scx_jb_frame *free_frames_start; + struct scx_jb_conf conf; + long rxcore; + long delay; + long next_delivery; + int force_resynch; +}; + + +static struct scx_jb_frame * alloc_jb_frame(struct scx_jb *jb); +static void release_jb_frame(struct scx_jb *jb, struct scx_jb_frame *frame); +static void get_jb_head(struct scx_jb *jb, struct scx_jb_frame *frame); +static int resynch_jb(struct scx_jb *jb, void *data, long ms, long ts, long now); + + +static struct scx_jb_frame * alloc_jb_frame(struct scx_jb *jb) +{ + struct scx_jb_frame *frame; + + /* As far as we are using fixed size jitterbuf, we shouldn't use more frames + than we have allocated initialy in the free list. */ + assert(jb->free_frames != NULL); + + frame = jb->free_frames; + jb->free_frames = frame->next; + + return frame; +} + + +static void release_jb_frame(struct scx_jb *jb, struct scx_jb_frame *frame) +{ + frame->next = jb->free_frames; + jb->free_frames = frame; +} + + +static void get_jb_head(struct scx_jb *jb, struct scx_jb_frame *frame) +{ + struct scx_jb_frame *fr; + + /* unlink the frame */ + fr = jb->frames; + jb->frames = fr->next; + if(jb->frames != NULL) + { + jb->frames->prev = NULL; + } + else + { + /* the jb is empty - update tail */ + jb->tail = NULL; + } + + /* update next */ + jb->next_delivery = fr->delivery + fr->ms; + + /* copy the destination */ + memcpy(frame, fr, sizeof(struct scx_jb_frame)); + + /* and release the frame */ + release_jb_frame(jb, fr); +} + + +struct scx_jb * scx_jb_new(struct scx_jb_conf *conf) +{ + struct scx_jb *jb; + struct scx_jb_frame *frame, *next; + int i, fcount; + + jb = calloc(1, sizeof(struct scx_jb)); + if(jb == NULL) + { + return NULL; + } + + /* First copy our config */ + memcpy(&jb->conf, conf, sizeof(struct scx_jb_conf)); + /* we dont need the passed config anymore - continue working with the saved one */ + conf = &jb->conf; + + /* validate the configuration */ + if(conf->jbsize < 1) + { + conf->jbsize = JB_SIZE_DEFAULT; + } + if(conf->resync_threshold < 1) + { + conf->resync_threshold = JB_RESYNCH_THRESHOLD_DEFAULT; + } + + /* Calculate the maximum number of frames used based on 10ms lengths, rounded up */ + fcount = (int) ((double) conf->jbsize / 10.0); + if(conf->jbsize % 10 != 0) + { + fcount++; + conf->jbsize = 10 * fcount; + } + + /* Set the constatnt delay to the jitterbuf */ + jb->delay = conf->jbsize; + + /* Allocate size for the maximum number of frames used */ + jb->free_frames_start = calloc(fcount, sizeof(struct scx_jb_frame)); + if(jb->free_frames_start == NULL) + { + free(jb); + return NULL; + } + /* And link them */ + jb->free_frames = jb->free_frames_start; + frame = jb->free_frames; + for(i=0; inext = next; + frame = next; + } + + return jb; +} + + +void scx_jb_destroy(struct scx_jb *jb) +{ + /* jitterbuf MUST be empty before it can be destroyed */ + assert(jb->frames == NULL); + + /* the free list shouldn't be empty, unless we missed release_jb_frame(). */ + assert(jb->free_frames_start != NULL); + + /* release the free list */ + free(jb->free_frames_start); + + /* finaly release the jb itself */ + free(jb); +} + + +static int resynch_jb(struct scx_jb *jb, void *data, long ms, long ts, long now) +{ + long diff, offset; + struct scx_jb_frame *frame; + + /* If jb is empty, just reinitialize the jb */ + if(jb->frames == NULL) + { + /* debug check: tail should also be NULL */ + assert(jb->tail == NULL); + + return scx_jb_put_first(jb, data, ms, ts, now); + } + + /* Adjust all jb state just as the new frame is with delivery = the delivery of the last + frame (e.g. this one with max delivery) + the length of the last frame. */ + + /* Get the diff in timestamps */ + diff = ts - jb->tail->ts; + + /* Ideally this should be just the length of the last frame. The deviation is the desired + offset */ + offset = diff - jb->tail->ms; + + /* Do we really need to resynch, or this is just a frame for dropping? */ + if(!jb->force_resynch && (offset < jb->conf.resync_threshold && offset > -jb->conf.resync_threshold)) + { + return JB_DROP; + } + + /* Reset the force resynch flag */ + jb->force_resynch = 0; + + /* apply the offset to the jb state */ + jb->rxcore -= offset; + frame = jb->frames; + while(frame) + { + frame->ts += offset; + frame = frame->next; + } + + /* now jb_put() should add the frame at a last position */ + return scx_jb_put(jb, data, ms, ts, now); +} + + +void scx_jb_set_force_resynch(struct scx_jb *jb) +{ + jb->force_resynch = 1; +} + + +int scx_jb_put_first(struct scx_jb *jb, void *data, long ms, long ts, long now) +{ + /* this is our first frame - set the base of the receivers time */ + jb->rxcore = now - ts; + + /* init next for a first time - it should be the time the first frame should be played */ + jb->next_delivery = now + jb->delay; + + /* put the frame */ + return scx_jb_put(jb, data, ms, ts, now); +} + + +int scx_jb_put(struct scx_jb *jb, void *data, long ms, long ts, long now) +{ + struct scx_jb_frame *frame, *next, *newframe; + long delivery; + + /* debug check the validity of the input params */ + assert(data != NULL); + /* do not allow frames shorter than 10ms */ + assert(ms >= 10); + assert(ts >= 0); + assert(now >= 0); + + delivery = jb->rxcore + jb->delay + ts; + + /* check if the new frame is not too late */ + if(delivery < jb->next_delivery) + { + /* should drop the frame, but let first resynch_jb() check if this is not a jump in ts, or + the force resynch flag was not set. */ + return resynch_jb(jb, data, ms, ts, now); + } + + /* what if the delivery time is bigger than next + delay? Seems like a frame for the future. + However, allow more 100 ms in advance */ + if(delivery > jb->next_delivery + jb->delay + 100) + { + /* should drop the frame, but let first resynch_jb() check if this is not a jump in ts, or + the force resynch flag was not set. */ + return resynch_jb(jb, data, ms, ts, now); + } + + /* find the right place in the frames list, sorted by delivery time */ + frame = jb->tail; + while(frame != NULL && frame->delivery > delivery) + { + frame = frame->prev; + } + + /* Check if the new delivery time is not covered already by the chosen frame */ + if(frame && (frame->delivery == delivery || + delivery < frame->delivery + frame->ms || + (frame->next && delivery + ms > frame->next->delivery))) + { + /* TODO: Should we check for resynch here? Be careful to do not allow threshold smaller than + the size of the jb */ + + /* should drop the frame, but let first resynch_jb() check if this is not a jumt in ts, or + the force resynch flag was not set. */ + return resynch_jb(jb, data, ms, ts, now); + } + + /* Reset the force resynch flag */ + jb->force_resynch = 0; + + /* Get a new frame */ + newframe = alloc_jb_frame(jb); + newframe->data = data; + newframe->ts = ts; + newframe->ms = ms; + newframe->delivery = delivery; + + /* and insert it right on place */ + if(frame != NULL) + { + next = frame->next; + frame->next = newframe; + if(next != NULL) + { + newframe->next = next; + next->prev = newframe; + } + else + { + /* insert after the last frame - should update tail */ + jb->tail = newframe; + newframe->next = NULL; + } + newframe->prev = frame; + + return JB_OK; + } + else if(jb->frames == NULL) + { + /* the frame list is empty or thats just the first frame ever */ + /* tail should also be NULL is that case */ + assert(jb->tail == NULL); + jb->frames = jb->tail = newframe; + newframe->next = NULL; + newframe->prev = NULL; + + return JB_OK; + } + else + { + /* insert on a first position - should update frames head */ + newframe->next = jb->frames; + newframe->prev = NULL; + jb->frames->prev = newframe; + jb->frames = newframe; + + return JB_OK; + } +} + + +int scx_jb_get(struct scx_jb *jb, struct scx_jb_frame *frame, long now, long interpl) +{ + long halflen; + + assert(now >= 0); + assert(interpl >= 10); + + if(now < jb->next_delivery) + { + /* too early for the next frame */ + return JB_NOFRAME; + } + + /* Is the jb empty? */ + if(jb->frames == NULL) + { + /* should interpolate a frame */ + /* update next */ + jb->next_delivery += interpl; + + return JB_INTERP; + } + + /* debug check - the time of the next delivery shouldn't be greater than the delivery time of + the first frame */ + /*assert(jb->frames->delivery >= jb->next_delivery);*/ + /* TODO: I beleave the line above is the right one, but it causes a problem, so just drop... */ + if(jb->frames->delivery < jb->next_delivery) + { + /* this frame will never be played - drop the frame */ + get_jb_head(jb, frame); + + return JB_DROP; + } + + halflen = (long) ((double) jb->frames->ms / 2.0); + + /* Isn't it too late for the first frame available in the jb? */ + if(now > jb->frames->delivery + halflen) + { + /* yes - should drop this frame and update next to point the next frame (get_jb_head() does it) */ + get_jb_head(jb, frame); + + return JB_DROP; + } + + /* isn't it too early to play the first frame available? */ + if(now < jb->frames->delivery - halflen) + { + /* yes - should interpolate one frame */ + /* update next */ + jb->next_delivery += interpl; + + return JB_INTERP; + } + + /* we have a frame for playing now (get_jb_head() updates next) */ + get_jb_head(jb, frame); + + return JB_OK; +} + + +long scx_jb_next(struct scx_jb *jb) +{ + return jb->next_delivery; +} + + +int scx_jb_remove(struct scx_jb *jb, struct scx_jb_frame *frameout) +{ + if(jb->frames == NULL) + { + return JB_NOFRAME; + } + + get_jb_head(jb, frameout); + + return JB_OK; +} + + + --- /dev/null +++ asterisk/scx_jitterbuf.h @@ -0,0 +1,62 @@ +/* + * Securax jitterbuffer. + * + * Securax jitterbuffer data and function declarations. + * + * Slav Klenov + * + */ + + +/* return codes */ +#define JB_OK 0 +#define JB_DROP 1 +#define JB_INTERP 2 +#define JB_NOFRAME 3 + + +/* defaults */ +#define JB_SIZE_DEFAULT 300 +#define JB_RESYNCH_THRESHOLD_DEFAULT 1000 + + +/* jb configuration properties */ +struct scx_jb_conf +{ + long jbsize; + long resync_threshold; +}; + + +struct scx_jb_frame +{ + void *data; + long ts; + long ms; + long delivery; + struct scx_jb_frame *next; + struct scx_jb_frame *prev; +}; + + +struct scx_jb; + + +/* jb interface */ + +struct scx_jb * scx_jb_new(struct scx_jb_conf *conf); + +void scx_jb_destroy(struct scx_jb *jb); + +int scx_jb_put_first(struct scx_jb *jb, void *data, long ms, long ts, long now); + +int scx_jb_put(struct scx_jb *jb, void *data, long ms, long ts, long now); + +int scx_jb_get(struct scx_jb *jb, struct scx_jb_frame *frame, long now, long interpl); + +long scx_jb_next(struct scx_jb *jb); + +int scx_jb_remove(struct scx_jb *jb, struct scx_jb_frame *frameout); + +void scx_jb_set_force_resynch(struct scx_jb *jb); + --- asterisk/configs/sip.conf.sample~jb-rtp-sip.scx.patch +++ asterisk/configs/sip.conf.sample @@ -88,6 +88,12 @@ ;sipdebug = yes ; Turn on SIP debugging by default, from ; the moment the channel loads this configuration +; Jitterbuf settings +;usejb=yes +;jbsize=300 +;forcejb=yes +;jblog=no + ; ; If regcontext is specified, Asterisk will dynamically ; create and destroy a NoOp priority 1 extension for a given --- asterisk/Makefile~jb-rtp-sip.scx.patch +++ asterisk/Makefile @@ -295,7 +295,7 @@ cdr.o tdd.o acl.o rtp.o manager.o asterisk.o \ dsp.o chanvars.o indications.o autoservice.o db.o privacy.o \ astmm.o enum.o srv.o dns.o aescrypt.o aestab.o aeskey.o \ - utils.o config_old.o plc.o jitterbuf.o dnsmgr.o devicestate.o \ + utils.o config_old.o plc.o jitterbuf.o scx_jitterbuf.o dnsmgr.o devicestate.o \ netsock.o slinfactory.o ifeq (${OSARCH},Darwin) OBJS+=poll.o dlfcn.o