Index: channels/chan_motif.c =================================================================== --- channels/chan_motif.c (revision 424126) +++ channels/chan_motif.c (working copy) @@ -1805,6 +1805,7 @@ case AST_CONTROL_CONNECTED_LINE: break; case AST_CONTROL_PVT_CAUSE_CODE: + case AST_CONTROL_MASQUERADE_NOTIFY: case -1: res = -1; break; Index: channels/chan_sip.c =================================================================== --- channels/chan_sip.c (revision 424126) +++ channels/chan_sip.c (working copy) @@ -7856,6 +7856,7 @@ res = -1; break; case AST_CONTROL_PVT_CAUSE_CODE: /* these should be handled by the code in channel.c */ + case AST_CONTROL_MASQUERADE_NOTIFY: case -1: res = -1; break; Index: channels/chan_iax2.c =================================================================== --- channels/chan_iax2.c (revision 424126) +++ channels/chan_iax2.c (working copy) @@ -1421,6 +1421,8 @@ /* Only meaningful across a bridge on this machine for direct-media exchange. */ case AST_CONTROL_PVT_CAUSE_CODE: /* Intended only for the sending machine's local channel structure. */ + case AST_CONTROL_MASQUERADE_NOTIFY: + /* Intended only for masquerades when calling ast_indicate_data(). */ case AST_CONTROL_STREAM_STOP: case AST_CONTROL_STREAM_SUSPEND: case AST_CONTROL_STREAM_RESTART: @@ -5770,6 +5772,7 @@ } break; case AST_CONTROL_PVT_CAUSE_CODE: + case AST_CONTROL_MASQUERADE_NOTIFY: res = -1; goto done; } Index: channels/chan_misdn.c =================================================================== --- channels/chan_misdn.c (revision 424126) +++ channels/chan_misdn.c (working copy) @@ -7108,6 +7108,7 @@ chan_misdn_log(1, p->bc->port, " --> * Unknown Indication:%d pid:%d\n", cond, p->bc->pid); /* fallthrough */ case AST_CONTROL_PVT_CAUSE_CODE: + case AST_CONTROL_MASQUERADE_NOTIFY: return -1; } Index: channels/chan_skinny.c =================================================================== --- channels/chan_skinny.c (revision 424126) +++ channels/chan_skinny.c (working copy) @@ -5336,6 +5336,7 @@ ast_log(LOG_WARNING, "Don't know how to indicate condition %d\n", ind); /* fallthrough */ case AST_CONTROL_PVT_CAUSE_CODE: + case AST_CONTROL_MASQUERADE_NOTIFY: return -1; /* Tell asterisk to provide inband signalling */ } return 0; Index: channels/chan_pjsip.c =================================================================== --- channels/chan_pjsip.c (revision 424126) +++ channels/chan_pjsip.c (working copy) @@ -698,41 +698,25 @@ return res; } -struct fixup_data { - struct ast_sip_session *session; - struct ast_channel *chan; -}; - -static int fixup(void *data) -{ - struct fixup_data *fix_data = data; - struct ast_sip_channel_pvt *channel = ast_channel_tech_pvt(fix_data->chan); - struct chan_pjsip_pvt *pvt = channel->pvt; - - channel->session->channel = fix_data->chan; - set_channel_on_rtp_instance(pvt, ast_channel_uniqueid(fix_data->chan)); - - return 0; -} - /*! \brief Function called by core to change the underlying owner channel */ static int chan_pjsip_fixup(struct ast_channel *oldchan, struct ast_channel *newchan) { struct ast_sip_channel_pvt *channel = ast_channel_tech_pvt(newchan); - struct fixup_data fix_data; + struct chan_pjsip_pvt *pvt = channel->pvt; - fix_data.session = channel->session; - fix_data.chan = newchan; - if (channel->session->channel != oldchan) { return -1; } - if (ast_sip_push_task_synchronous(channel->session->serializer, fixup, &fix_data)) { - ast_log(LOG_WARNING, "Unable to perform channel fixup\n"); - return -1; - } + /* + * The masquerade has suspended the channel's session + * serializer so we can safely change it outside of + * the serializer thread. + */ + channel->session->channel = newchan; + set_channel_on_rtp_instance(pvt, ast_channel_uniqueid(newchan)); + return 0; } @@ -1208,6 +1192,24 @@ case AST_CONTROL_PVT_CAUSE_CODE: res = -1; break; + case AST_CONTROL_MASQUERADE_NOTIFY: + ast_assert(datalen == sizeof(int)); + if (*(int *) data) { + /* + * Masquerade is beginning: + * Wait for session serializer to get suspended. + */ + ast_channel_unlock(ast); + ast_sip_session_suspend(channel->session); + ast_channel_lock(ast); + } else { + /* + * Masquerade is complete: + * Unsuspend the session serializer. + */ + ast_sip_session_unsuspend(channel->session); + } + break; case AST_CONTROL_HOLD: chan_pjsip_add_hold(ast_channel_uniqueid(ast)); device_buf_size = strlen(ast_channel_name(ast)) + 1; Index: channels/chan_unistim.c =================================================================== --- channels/chan_unistim.c (revision 424126) +++ channels/chan_unistim.c (working copy) @@ -5369,6 +5369,7 @@ ast_log(LOG_WARNING, "Don't know how to indicate condition %d\n", ind); /* fallthrough */ case AST_CONTROL_PVT_CAUSE_CODE: + case AST_CONTROL_MASQUERADE_NOTIFY: return -1; } Index: addons/chan_ooh323.c =================================================================== --- addons/chan_ooh323.c (revision 424126) +++ addons/chan_ooh323.c (working copy) @@ -1396,6 +1396,7 @@ break; case AST_CONTROL_PROCEEDING: case AST_CONTROL_PVT_CAUSE_CODE: + case AST_CONTROL_MASQUERADE_NOTIFY: case -1: break; default: Index: funcs/func_frame_trace.c =================================================================== --- funcs/func_frame_trace.c (revision 424126) +++ funcs/func_frame_trace.c (working copy) @@ -334,6 +334,10 @@ case AST_CONTROL_PVT_CAUSE_CODE: ast_verbose("SubClass: PVT_CAUSE_CODE\n"); break; + case AST_CONTROL_MASQUERADE_NOTIFY: + /* Should never happen. */ + ast_assert(0); + break; case AST_CONTROL_STREAM_STOP: ast_verbose("SubClass: STREAM_STOP\n"); break; Index: include/asterisk/taskprocessor.h =================================================================== --- include/asterisk/taskprocessor.h (revision 424126) +++ include/asterisk/taskprocessor.h (working copy) @@ -247,6 +247,31 @@ int ast_taskprocessor_execute(struct ast_taskprocessor *tps); /*! + * \brief Set the thread id of the given taskprocessor. + * \since 12.7.0 + * + * \param tps Taskprocessor to get new thread id. + * \param thread_id What id to set. + * + * \return Nothing + */ +void ast_taskprocessor_set_thread_id(struct ast_taskprocessor *tps, pthread_t thread_id); + +/*! + * \brief Am I the given taskprocessor's current task. + * \since 12.7.0 + * + * \param tps Taskprocessor to check. + * + * \note This may not be supported by the specific taskprocessor. + * Currently supported by the default taskprocessor implementation + * and threadpool serializers. + * + * \retval non-zero if current thread is the taskprocessor thread. + */ +int ast_taskprocessor_is_task(struct ast_taskprocessor *tps); + +/*! * \brief Return the name of the taskprocessor singleton * \since 1.6.1 */ Index: include/asterisk/res_pjsip_session.h =================================================================== --- include/asterisk/res_pjsip_session.h (revision 424126) +++ include/asterisk/res_pjsip_session.h (working copy) @@ -87,6 +87,9 @@ */ struct ast_sip_session_delayed_request; +/*! \brief Opaque struct controlling the suspension of the session's serializer. */ +struct ast_sip_session_suspender; + /*! * \brief A structure describing a SIP session * @@ -96,43 +99,45 @@ * to use the term "SIP session" to refer to the INVITE dialog itself. */ struct ast_sip_session { - /* Dialplan extension where incoming call is destined */ + /*! Dialplan extension where incoming call is destined */ char exten[AST_MAX_EXTENSION]; - /* The endpoint with which Asterisk is communicating */ + /*! The endpoint with which Asterisk is communicating */ struct ast_sip_endpoint *endpoint; - /* The contact associated with this session */ + /*! The contact associated with this session */ struct ast_sip_contact *contact; - /* The PJSIP details of the session, which includes the dialog */ + /*! The PJSIP details of the session, which includes the dialog */ struct pjsip_inv_session *inv_session; - /* The Asterisk channel associated with the session */ + /*! The Asterisk channel associated with the session */ struct ast_channel *channel; - /* Registered session supplements */ + /*! Registered session supplements */ AST_LIST_HEAD(, ast_sip_session_supplement) supplements; - /* Datastores added to the session by supplements to the session */ + /*! Datastores added to the session by supplements to the session */ struct ao2_container *datastores; - /* Media streams */ + /*! Media streams */ struct ao2_container *media; - /* Serializer for tasks relating to this SIP session */ + /*! Serializer for tasks relating to this SIP session */ struct ast_taskprocessor *serializer; - /* Requests that could not be sent due to current inv_session state */ + /*! Non-null if the session serializer is suspended or being suspended. */ + struct ast_sip_session_suspender *suspended; + /*! Requests that could not be sent due to current inv_session state */ AST_LIST_HEAD_NOLOCK(, ast_sip_session_delayed_request) delayed_requests; - /* When we need to reschedule a reinvite, we use this structure to do it */ + /*! When we need to reschedule a reinvite, we use this structure to do it */ pj_timer_entry rescheduled_reinvite; - /* Format capabilities pertaining to direct media */ + /*! Format capabilities pertaining to direct media */ struct ast_format_cap *direct_media_cap; - /* When we need to forcefully end the session */ + /*! When we need to forcefully end the session */ pj_timer_entry scheduled_termination; - /* Identity of endpoint this session deals with */ + /*! Identity of endpoint this session deals with */ struct ast_party_id id; - /* Requested capabilities */ + /*! Requested capabilities */ struct ast_format_cap *req_caps; - /* Optional DSP, used only for inband DTMF detection if configured */ + /*! Optional DSP, used only for inband DTMF detection if configured */ struct ast_dsp *dsp; - /* Whether the termination of the session should be deferred */ + /*! Whether the termination of the session should be deferred */ unsigned int defer_terminate:1; - /* Deferred incoming re-invite */ + /*! Deferred incoming re-invite */ pjsip_rx_data *deferred_reinvite; - /* Current T.38 state */ + /*! Current T.38 state */ enum ast_sip_session_t38state t38state; }; @@ -388,6 +393,28 @@ struct ast_sip_contact *contact, pjsip_inv_session *inv); /*! + * \brief Request and wait for the session serializer to be suspended. + * \since 12.7.0 + * + * \param session Which session to suspend the serializer. + * + * \note No channel locks can be held while calling without risk of deadlock. + * + * \return Nothing + */ +void ast_sip_session_suspend(struct ast_sip_session *session); + +/*! + * \brief Request the session serializer be unsuspended. + * \since 12.7.0 + * + * \param session Which session to unsuspend the serializer. + * + * \return Nothing + */ +void ast_sip_session_unsuspend(struct ast_sip_session *session); + +/*! * \brief Create a new outgoing SIP session * * The endpoint that is passed in will have its reference count increased by one since Index: include/asterisk/frame.h =================================================================== --- include/asterisk/frame.h (revision 424126) +++ include/asterisk/frame.h (working copy) @@ -290,6 +290,7 @@ AST_CONTROL_MCID = 31, /*!< Indicate that the caller is being malicious. */ AST_CONTROL_UPDATE_RTP_PEER = 32, /*!< Interrupt the bridge and have it update the peer */ AST_CONTROL_PVT_CAUSE_CODE = 33, /*!< Contains an update to the protocol-specific cause-code stored for branching dials */ + AST_CONTROL_MASQUERADE_NOTIFY = 34, /*!< A masquerade is about to begin/end. (Never sent as a frame but directly with ast_indicate_data().) */ /* * WARNING WARNING WARNING WARNING WARNING WARNING WARNING WARNING WARNING Index: main/taskprocessor.c =================================================================== --- main/taskprocessor.c (revision 424126) +++ main/taskprocessor.c (working copy) @@ -78,9 +78,9 @@ long tps_queue_size; /*! \brief Taskprocessor queue */ AST_LIST_HEAD_NOLOCK(tps_queue, tps_task) tps_queue; - /*! \brief Taskprocessor singleton list entry */ - AST_LIST_ENTRY(ast_taskprocessor) list; struct ast_taskprocessor_listener *listener; + /*! Current thread executing the tasks */ + pthread_t thread; /*! Indicates if the taskprocessor is currently executing a task */ unsigned int executing:1; }; @@ -173,6 +173,7 @@ int sem_value; int res; + ast_taskprocessor_set_thread_id(tps, pthread_self()); while (!pvt->dead) { res = ast_sem_wait(&pvt->sem); if (res != 0 && errno != EINTR) { @@ -183,6 +184,7 @@ } ast_taskprocessor_execute(tps); } + ast_taskprocessor_set_thread_id(tps, AST_PTHREADT_NULL); /* No posting to a dead taskprocessor! */ res = ast_sem_getvalue(&pvt->sem, &sem_value); @@ -600,6 +602,8 @@ ao2_ref(listener, +1); p->listener = listener; + p->thread = AST_PTHREADT_NULL; + ao2_ref(p, +1); listener->tps = p; @@ -789,3 +793,20 @@ } return size > 0; } + +void ast_taskprocessor_set_thread_id(struct ast_taskprocessor *tps, pthread_t thread_id) +{ + ao2_lock(tps); + tps->thread = thread_id; + ao2_unlock(tps); +} + +int ast_taskprocessor_is_task(struct ast_taskprocessor *tps) +{ + int is_task; + + ao2_lock(tps); + is_task = pthread_self() == tps->thread; + ao2_unlock(tps); + return is_task; +} Index: main/bridge_channel.c =================================================================== --- main/bridge_channel.c (revision 424126) +++ main/bridge_channel.c (working copy) @@ -2065,6 +2065,10 @@ ast_indicate(chan, -1); } break; + case AST_CONTROL_MASQUERADE_NOTIFY: + /* Should never happen. */ + ast_assert(0); + break; default: ast_indicate_data(chan, fr->subclass.integer, fr->data.ptr, fr->datalen); break; Index: main/channel.c =================================================================== --- main/channel.c (revision 424126) +++ main/channel.c (working copy) @@ -4285,6 +4285,7 @@ case AST_CONTROL_MCID: case AST_CONTROL_UPDATE_RTP_PEER: case AST_CONTROL_PVT_CAUSE_CODE: + case AST_CONTROL_MASQUERADE_NOTIFY: case AST_CONTROL_STREAM_STOP: case AST_CONTROL_STREAM_SUSPEND: case AST_CONTROL_STREAM_REVERSE: @@ -4451,7 +4452,9 @@ ast_channel_lock(chan); /* Don't bother if the channel is about to go away, anyway. */ - if (ast_test_flag(ast_channel_flags(chan), AST_FLAG_ZOMBIE) || ast_check_hangup(chan)) { + if ((ast_test_flag(ast_channel_flags(chan), AST_FLAG_ZOMBIE) + || ast_check_hangup(chan)) + && condition != AST_CONTROL_MASQUERADE_NOTIFY) { res = -1; goto indicate_cleanup; } @@ -4599,6 +4602,7 @@ case AST_CONTROL_AOC: case AST_CONTROL_END_OF_Q: case AST_CONTROL_MCID: + case AST_CONTROL_MASQUERADE_NOTIFY: case AST_CONTROL_UPDATE_RTP_PEER: case AST_CONTROL_STREAM_STOP: case AST_CONTROL_STREAM_SUSPEND: @@ -6445,6 +6449,11 @@ * original channel's backend. While the features are nice, which is the * reason we're keeping it, it's still awesomely weird. XXX */ + /* Indicate to each channel that a masquerade is about to begin. */ + x = 1; + ast_indicate_data(original, AST_CONTROL_MASQUERADE_NOTIFY, &x, sizeof(x)); + ast_indicate_data(clonechan, AST_CONTROL_MASQUERADE_NOTIFY, &x, sizeof(x)); + /* * The container lock is necessary for proper locking order * because the channels must be unlinked to change their @@ -6485,8 +6494,9 @@ /* Start the masquerade channel contents rearangement. */ ast_channel_lock_both(original, clonechan); - ast_debug(4, "Actually Masquerading %s(%u) into the structure of %s(%u)\n", - ast_channel_name(clonechan), ast_channel_state(clonechan), ast_channel_name(original), ast_channel_state(original)); + ast_debug(1, "Actually Masquerading %s(%u) into the structure of %s(%u)\n", + ast_channel_name(clonechan), ast_channel_state(clonechan), + ast_channel_name(original), ast_channel_state(original)); /* * Remember the original read/write formats. We turn off any @@ -6759,6 +6769,19 @@ ast_channel_unlock(original); ast_channel_unlock(clonechan); + /* + * Indicate to each channel that a masquerade is complete. + * + * We can still do this to clonechan even though it is a + * zombie because ast_indicate_data() will explicitly pass + * this control and ast_hangup() is held off until the + * ast_channel_masq() and ast_channel_masqr() pointers are + * cleared. + */ + x = 0; + ast_indicate_data(original, AST_CONTROL_MASQUERADE_NOTIFY, &x, sizeof(x)); + ast_indicate_data(clonechan, AST_CONTROL_MASQUERADE_NOTIFY, &x, sizeof(x)); + ast_bridge_notify_masquerade(original); if (clone_hold_state == AST_CONTROL_HOLD) { Index: main/threadpool.c =================================================================== --- main/threadpool.c (revision 424126) +++ main/threadpool.c (working copy) @@ -1154,9 +1154,11 @@ { struct ast_taskprocessor *tps = data; + ast_taskprocessor_set_thread_id(tps, pthread_self()); while (ast_taskprocessor_execute(tps)) { /* No-op */ } + ast_taskprocessor_set_thread_id(tps, AST_PTHREADT_NULL); ast_taskprocessor_unreference(tps); return 0; Index: main/core_unreal.c =================================================================== --- main/core_unreal.c (revision 424126) +++ main/core_unreal.c (working copy) @@ -530,6 +530,12 @@ ao2_ref(p, 1); /* ref for unreal_queue_frame */ switch (condition) { + case AST_CONTROL_MASQUERADE_NOTIFY: + /* + * Always block this because this is the channel being + * masqueraded; not anything down the chain. + */ + break; case AST_CONTROL_CONNECTED_LINE: case AST_CONTROL_REDIRECTING: res = unreal_colp_redirect_indicate(p, ast, condition); Index: res/res_pjsip_session.exports.in =================================================================== --- res/res_pjsip_session.exports.in (revision 424126) +++ res/res_pjsip_session.exports.in (working copy) @@ -15,6 +15,8 @@ LINKER_SYMBOL_PREFIXast_sip_session_send_request; LINKER_SYMBOL_PREFIXast_sip_session_create_invite; LINKER_SYMBOL_PREFIXast_sip_session_create_outgoing; + LINKER_SYMBOL_PREFIXast_sip_session_suspend; + LINKER_SYMBOL_PREFIXast_sip_session_unsuspend; LINKER_SYMBOL_PREFIXast_sip_dialog_get_session; LINKER_SYMBOL_PREFIXast_sip_session_resume_reinvite; LINKER_SYMBOL_PREFIXast_sip_channel_pvt_alloc; Index: res/res_pjsip_session.c =================================================================== --- res/res_pjsip_session.c (revision 424126) +++ res/res_pjsip_session.c (working copy) @@ -1223,6 +1223,108 @@ return session; } +/*! \brief struct controlling the suspension of the session's serializer. */ +struct ast_sip_session_suspender { + ast_cond_t cond_suspended; + ast_cond_t cond_complete; + int suspended; + int complete; +}; + +static void sip_session_suspender_dtor(void *vdoomed) +{ + struct ast_sip_session_suspender *doomed = vdoomed; + + ast_cond_destroy(&doomed->cond_suspended); + ast_cond_destroy(&doomed->cond_complete); +} + +/*! + * \internal + * \brief Block the session serializer thread task. + * + * \param data Pushed serializer task data for suspension. + * + * \retval 0 + */ +static int sip_session_suspend_task(void *data) +{ + struct ast_sip_session_suspender *suspender = data; + + ao2_lock(suspender); + + /* Signal that the serializer task is now suspended. */ + suspender->suspended = 1; + ast_cond_signal(&suspender->cond_suspended); + + /* Wait for the the serializer suspension to be completed. */ + while (!suspender->complete) { + ast_cond_wait(&suspender->cond_complete, ao2_object_get_lockaddr(suspender)); + } + + ao2_unlock(suspender); + ao2_ref(suspender, -1); + + return 0; +} + +void ast_sip_session_suspend(struct ast_sip_session *session) +{ + struct ast_sip_session_suspender *suspender; + int res; + + ast_assert(session->suspended == NULL); + + if (ast_taskprocessor_is_task(session->serializer)) { + /* I am the session's serializer thread so I cannot suspend. */ + return; + } + + suspender = ao2_alloc(sizeof(*suspender), sip_session_suspender_dtor); + if (!suspender) { + /* We will just have to hope that the system does not deadlock */ + return; + } + ast_cond_init(&suspender->cond_suspended, NULL); + ast_cond_init(&suspender->cond_complete, NULL); + + ao2_ref(suspender, +1); + res = ast_sip_push_task(session->serializer, sip_session_suspend_task, suspender); + if (res) { + /* We will just have to hope that the system does not deadlock */ + ao2_ref(suspender, -2); + return; + } + + session->suspended = suspender; + + /* Wait for the serializer to get suspended. */ + ao2_lock(suspender); + while (!suspender->suspended) { + ast_cond_wait(&suspender->cond_suspended, ao2_object_get_lockaddr(suspender)); + } + ao2_unlock(suspender); +} + +void ast_sip_session_unsuspend(struct ast_sip_session *session) +{ + struct ast_sip_session_suspender *suspender = session->suspended; + + if (!suspender) { + /* Nothing to do */ + return; + } + session->suspended = NULL; + + /* Signal that the serializer task suspension is now complete. */ + ao2_lock(suspender); + suspender->complete = 1; + ast_cond_signal(&suspender->cond_complete); + ao2_unlock(suspender); + + ao2_ref(suspender, -1); +} + static int session_outbound_auth(pjsip_dialog *dlg, pjsip_tx_data *tdata, void *user_data) { pjsip_inv_session *inv = pjsip_dlg_get_inv_session(dlg);