Index: apps/app_queue.c =================================================================== --- apps/app_queue.c (revision 226685) +++ apps/app_queue.c (working copy) @@ -192,7 +192,7 @@ static char *app_aqm = "AddQueueMember" ; static char *app_aqm_synopsis = "Dynamically adds queue members" ; static char *app_aqm_descrip = -" AddQueueMember(queuename[|interface[|penalty[|options[|membername[|state_interface]]]]]):\n" +" AddQueueMember(queuename[|interface[|penalty[|options[|membername[|state_interface[|incominglimit]]]]]]):\n" "Dynamically adds interface to an existing queue.\n" "If the interface is already in the queue and there exists an n+101 priority\n" "then it will then jump to this priority. Otherwise it will return an error\n" @@ -365,6 +365,8 @@ int realtime; /*!< Is this member realtime? */ int status; /*!< Status of queue member */ int paused; /*!< Are we paused (not accepting calls)? */ + int current_calls; /*!< Number of calls this member is servicing */ + int incominglimit; /*!< Maximum number of calls this member can be servicing */ time_t lastcall; /*!< When last successful call was hungup */ unsigned int dead:1; /*!< Used to detect members deleted in realtime */ unsigned int delme:1; /*!< Flag to delete entry on reload */ @@ -563,6 +565,11 @@ continue; } + if (member->incominglimit && member->current_calls >= member->incominglimit) { + ao2_ref(member, -1); + continue; + } + switch (member->status) { case AST_DEVICE_INVALID: /* nothing to do */ @@ -628,9 +635,20 @@ "CallsTaken: %d\r\n" "LastCall: %d\r\n" "Status: %d\r\n" - "Paused: %d\r\n", - q->name, cur->interface, cur->membername, cur->dynamic ? "dynamic" : cur->realtime ? "realtime" : "static", - cur->penalty, cur->calls, (int)cur->lastcall, cur->status, cur->paused); + "Paused: %d\r\n" + "IncomingLimit: %d\r\n" + "CurrentCalls: %d\r\n", + q->name, + cur->interface, + cur->membername, + cur->dynamic ? "dynamic" : cur->realtime ? "realtime" : "static", + cur->penalty, + cur->calls, + (int)cur->lastcall, + cur->status, + cur->paused, + cur->incominglimit, + cur->current_calls); } ao2_ref(cur, -1); } @@ -756,7 +774,7 @@ return 0; } /*! \brief allocate space for new queue member and set fields based on parameters passed */ -static struct member *create_queue_member(const char *interface, const char *membername, int penalty, int paused, const char *state_interface) +static struct member *create_queue_member(const char *interface, const char *membername, int penalty, int paused, const char *state_interface, int incominglimit) { struct member *cur; @@ -776,6 +794,7 @@ if (!strchr(cur->interface, '/')) ast_log(LOG_WARNING, "No location at interface '%s'\n", interface); cur->status = ast_device_state(cur->state_interface); + cur->incominglimit = incominglimit; } return cur; @@ -1123,11 +1142,12 @@ } } -static void rt_handle_member_record(struct call_queue *q, char *interface, const char *membername, const char *penalty_str, const char *paused_str, const char *state_interface) +static void rt_handle_member_record(struct call_queue *q, char *interface, const char *membername, const char *penalty_str, const char *paused_str, const char *state_interface, const char *incominglimit_str) { struct member *m, tmpmem; int penalty = 0; int paused = 0; + int incominglimit = 0; if (penalty_str) { penalty = atoi(penalty_str); @@ -1141,13 +1161,19 @@ paused = 0; } + if (incominglimit_str) { + incominglimit = atoi(incominglimit_str); + if (incominglimit < 0) + incominglimit = 0; + } + /* Find the member, or the place to put a new one. */ ast_copy_string(tmpmem.interface, interface, sizeof(tmpmem.interface)); m = ao2_find(q->members, &tmpmem, OBJ_POINTER); - /* Create a new one if not found, else update penalty */ + /* Create a new one if not found, else update penalty and incominglimit */ if (!m) { - if ((m = create_queue_member(interface, membername, penalty, paused, state_interface))) { + if ((m = create_queue_member(interface, membername, penalty, paused, state_interface, incominglimit))) { m->dead = 0; m->realtime = 1; add_to_interfaces(m->state_interface); @@ -1166,6 +1192,7 @@ add_to_interfaces(m->state_interface); } m->penalty = penalty; + m->incominglimit = incominglimit; ao2_ref(m, -1); } } @@ -1301,7 +1328,8 @@ ast_variable_retrieve(member_config, interface, "membername"), ast_variable_retrieve(member_config, interface, "penalty"), ast_variable_retrieve(member_config, interface, "paused"), - S_OR(ast_variable_retrieve(member_config, interface, "state_interface"),interface)); + S_OR(ast_variable_retrieve(member_config, interface, "state_interface"),interface), + ast_variable_retrieve(member_config, interface, "incominglimit")); } /* Delete all realtime members that have been deleted in DB. */ @@ -1374,7 +1402,8 @@ S_OR(ast_variable_retrieve(member_config, interface, "membername"), interface), ast_variable_retrieve(member_config, interface, "penalty"), ast_variable_retrieve(member_config, interface, "paused"), - S_OR(ast_variable_retrieve(member_config, interface, "state_interface"), interface)); + S_OR(ast_variable_retrieve(member_config, interface, "state_interface"), interface), + ast_variable_retrieve(member_config, interface, "incominglimit")); } /* Delete all realtime members that have been deleted in DB. */ @@ -1768,7 +1797,8 @@ /* else fall through */ case AST_DEVICE_NOT_INUSE: case AST_DEVICE_UNKNOWN: - if (!mem->paused) { + if (!mem->paused && + !(mem->incominglimit && mem->current_calls >= mem->incominglimit)) { avl++; } break; @@ -1909,6 +1939,16 @@ tmp->stillgoing = 0; return 0; } + + if (tmp->member->incominglimit && tmp->member->current_calls >= tmp->member->incominglimit) { + if (option_debug) + ast_log(LOG_DEBUG, "Incoming limit reached for %s, can't receive call\n", tmp->interface); + if (qe->chan->cdr) + ast_cdr_busy(qe->chan->cdr); + tmp->stillgoing = 0; + return 0; + } + if (use_weight && compare_weight(qe->parent,tmp->member)) { ast_log(LOG_DEBUG, "Priority queue delaying call to %s:%s\n", qe->parent->name, tmp->interface); if (qe->chan->cdr) @@ -3119,6 +3159,8 @@ return -1; } + member->current_calls++; + if (qe->parent->setinterfacevar) pbx_builtin_setvar_helper(qe->chan, "MEMBERINTERFACE", member->interface); @@ -3312,6 +3354,7 @@ } ast_channel_unlock(qe->chan); ast_hangup(peer); + member->current_calls--; res = bridge ? bridge : 1; ao2_ref(member, -1); } @@ -3357,7 +3400,7 @@ /* Dump all members in a specific queue to the database * - * / = ;;;[|...] + * / = ;;;;[|...] * */ static void dump_queue_members(struct call_queue *pm_queue) @@ -3380,8 +3423,8 @@ continue; } - res = snprintf(value + value_len, sizeof(value) - value_len, "%s%s;%d;%d;%s;%s", - value_len ? "|" : "", cur_member->interface, cur_member->penalty, cur_member->paused, cur_member->membername, cur_member->state_interface); + res = snprintf(value + value_len, sizeof(value) - value_len, "%s%s;%d;%d;%s;%s;%d", + value_len ? "|" : "", cur_member->interface, cur_member->penalty, cur_member->paused, cur_member->membername, cur_member->state_interface, cur_member->incominglimit); ao2_ref(cur_member, -1); @@ -3452,7 +3495,7 @@ } -static int add_to_queue(const char *queuename, const char *interface, const char *membername, int penalty, int paused, int dump, const char *state_interface) +static int add_to_queue(const char *queuename, const char *interface, const char *membername, int penalty, int paused, int dump, const char *state_interface, int incominglimit) { struct call_queue *q; struct member *new_member, *old_member; @@ -3467,7 +3510,7 @@ ast_mutex_lock(&q->lock); if ((old_member = interface_exists(q, interface)) == NULL) { - if ((new_member = create_queue_member(interface, membername, penalty, paused, state_interface))) { + if ((new_member = create_queue_member(interface, membername, penalty, paused, state_interface, incominglimit))) { add_to_interfaces(new_member->state_interface); new_member->dynamic = 1; ao2_link(q->members, new_member); @@ -3481,11 +3524,20 @@ "CallsTaken: %d\r\n" "LastCall: %d\r\n" "Status: %d\r\n" - "Paused: %d\r\n", - q->name, new_member->interface, new_member->membername, + "Paused: %d\r\n" + "IncomingLimit: %d\r\n" + "CurrentCalls: %d\r\n", + q->name, + new_member->interface, + new_member->membername, "dynamic", - new_member->penalty, new_member->calls, (int) new_member->lastcall, - new_member->status, new_member->paused); + new_member->penalty, + new_member->calls, + (int)new_member->lastcall, + new_member->status, + new_member->paused, + new_member->incominglimit, + new_member->current_calls); ao2_ref(new_member, -1); new_member = NULL; @@ -3563,6 +3615,8 @@ char *state_interface; char *penalty_tok; int penalty = 0; + char *incominglimit_tok; + int incominglimit = 0; char *paused_tok; int paused = 0; struct ast_db_entry *db_tree; @@ -3610,6 +3664,7 @@ paused_tok = strsep(&member, ";"); membername = strsep(&member, ";"); state_interface = strsep(&member,";"); + incominglimit_tok = strsep(&member,";"); if (!penalty_tok) { ast_log(LOG_WARNING, "Error parsing persistent member string for '%s' (penalty)\n", queue_name); @@ -3633,10 +3688,20 @@ if (ast_strlen_zero(membername)) membername = interface; + if (!incominglimit_tok) { + ast_log(LOG_WARNING, "Error parsing persistent member string for '%s' (incominglimit)\n", queue_name); + break; + } + incominglimit = strtol(incominglimit_tok, NULL, 10); + if (errno == ERANGE || incominglimit < 0) { + ast_log(LOG_WARNING, "Error converting incoming limit: %s: Out of range.\n", incominglimit_tok); + break; + } + if (option_debug) - ast_log(LOG_DEBUG, "Reload Members: Queue: %s Member: %s Name: %s Penalty: %d Paused: %d\n", queue_name, interface, membername, penalty, paused); + ast_log(LOG_DEBUG, "Reload Members: Queue: %s Member: %s Name: %s Penalty: %d Paused: %d IncomingLimit: %d\n", queue_name, interface, membername, penalty, paused, incominglimit); - if (add_to_queue(queue_name, interface, membername, penalty, paused, 0, state_interface) == RES_OUTOFMEMORY) { + if (add_to_queue(queue_name, interface, membername, penalty, paused, 0, state_interface, incominglimit) == RES_OUTOFMEMORY) { ast_log(LOG_ERROR, "Out of Memory when reloading persistent queue member\n"); break; } @@ -3836,11 +3901,13 @@ AST_APP_ARG(options); AST_APP_ARG(membername); AST_APP_ARG(state_interface); + AST_APP_ARG(incominglimit); ); int penalty = 0; + int incominglimit = 0; if (ast_strlen_zero(data)) { - ast_log(LOG_WARNING, "AddQueueMember requires an argument (queuename[|interface[|penalty[|options[|membername[|state_interface]]]]])\n"); + ast_log(LOG_WARNING, "AddQueueMember requires an argument (queuename[|interface[|penalty[|options[|membername[|state_interface[|incominglimit]]]]]])\n"); return -1; } @@ -3869,7 +3936,14 @@ priority_jump = 1; } - switch (add_to_queue(args.queuename, args.interface, args.membername, penalty, 0, queue_persistent_members, args.state_interface)) { + if (!ast_strlen_zero(args.incominglimit)) { + if ((sscanf(args.incominglimit, "%30d", &incominglimit) != 1) || incominglimit < 0) { + ast_log(LOG_WARNING, "Incoming Limit '%s' is invalid, must be an integer >= 0\n", args.incominglimit); + incominglimit = 0; + } + } + + switch (add_to_queue(args.queuename, args.interface, args.membername, penalty, 0, queue_persistent_members, args.state_interface, incominglimit)) { case RES_OKAY: ast_queue_log(args.queuename, chan->uniqueid, args.interface, "ADDMEMBER", "%s", ""); ast_log(LOG_NOTICE, "Added interface '%s' to queue '%s'\n", args.interface, args.queuename); @@ -4407,11 +4481,13 @@ char *interface, *state_interface; char *membername = NULL; int penalty; + int incominglimit; AST_DECLARE_APP_ARGS(args, AST_APP_ARG(interface); AST_APP_ARG(penalty); AST_APP_ARG(membername); AST_APP_ARG(state_interface); + AST_APP_ARG(incominglimit); ); if (!(cfg = ast_config_load("queues.conf"))) { @@ -4515,6 +4591,15 @@ state_interface = interface; } + if (!ast_strlen_zero(args.incominglimit)) { + tmp = ast_skip_blanks(args.incominglimit); + incominglimit = atoi(tmp); + if (incominglimit < 0) { + incominglimit = 0; + } + } else + incominglimit = 0; + /* Find the old position in the list */ ast_copy_string(tmpmem.interface, interface, sizeof(tmpmem.interface)); cur = ao2_find(q->members, &tmpmem, OBJ_POINTER | OBJ_UNLINK); @@ -4524,7 +4609,7 @@ remove_from_interfaces(cur->state_interface); } - newm = create_queue_member(interface, membername, penalty, cur ? cur->paused : 0, state_interface); + newm = create_queue_member(interface, membername, penalty, cur ? cur->paused : 0, state_interface, incominglimit); if (!cur || (cur && strcasecmp(cur->state_interface, state_interface))) { add_to_interfaces(state_interface); } @@ -4601,7 +4686,7 @@ struct member *mem; int pos, queue_show; time_t now; - char max_buf[150]; + char max_buf[255]; char *max; size_t max_left; float sl = 0; @@ -4671,10 +4756,10 @@ if (s) astman_append(s, "%-12.12s has %d calls (max %s) in '%s' strategy (%ds holdtime), W:%d, C:%d, A:%d, SL:%2.1f%% within %ds%s", q->name, q->count, max_buf, int2strat(q->strategy), q->holdtime, q->weight, - q->callscompleted, q->callsabandoned,sl,q->servicelevel, term); + q->callscompleted, q->callsabandoned, sl, q->servicelevel, term); else ast_cli(fd, "%-12.12s has %d calls (max %s) in '%s' strategy (%ds holdtime), W:%d, C:%d, A:%d, SL:%2.1f%% within %ds%s", - q->name, q->count, max_buf, int2strat(q->strategy), q->holdtime, q->weight, q->callscompleted, q->callsabandoned,sl,q->servicelevel, term); + q->name, q->count, max_buf, int2strat(q->strategy), q->holdtime, q->weight, q->callscompleted, q->callsabandoned, sl, q->servicelevel, term); if (ao2_container_count(q->members)) { if (s) astman_append(s, " Members: %s", term); @@ -4688,14 +4773,22 @@ if (strcasecmp(mem->membername, mem->interface)) { ast_build_string(&max, &max_left, " (%s)", mem->interface); } - if (mem->penalty) + if (mem->penalty) { ast_build_string(&max, &max_left, " with penalty %d", mem->penalty); + + if (mem->incominglimit) + ast_build_string(&max, &max_left, " and incoming limit %d", mem->incominglimit); + } else if (mem->incominglimit) { + ast_build_string(&max, &max_left, " with incoming limit %d", mem->incominglimit); + } if (mem->dynamic) ast_build_string(&max, &max_left, " (dynamic)"); if (mem->realtime) ast_build_string(&max, &max_left, " (realtime)"); if (mem->paused) ast_build_string(&max, &max_left, " (paused)"); + if (mem->incominglimit && mem->current_calls >= mem->incominglimit) + ast_build_string(&max, &max_left, " (incoming limit reached)"); ast_build_string(&max, &max_left, " (%s)", devstate2str(mem->status)); if (mem->calls) { ast_build_string(&max, &max_left, " has taken %d calls (last was %ld secs ago)", @@ -4844,10 +4937,22 @@ "LastCall: %d\r\n" "Status: %d\r\n" "Paused: %d\r\n" + "IncomingLimit: %d\r\n" + "CurrentCalls: %d\r\n" "%s" "\r\n", - q->name, mem->membername, mem->interface, mem->dynamic ? "dynamic" : "static", - mem->penalty, mem->calls, (int)mem->lastcall, mem->status, mem->paused, idText); + q->name, + mem->membername, + mem->interface, + mem->dynamic ? "dynamic" : "static", + mem->penalty, + mem->calls, + (int)mem->lastcall, + mem->status, + mem->paused, + mem->incominglimit, + mem->current_calls, + idText); } ao2_ref(mem, -1); } @@ -4886,8 +4991,8 @@ static int manager_add_queue_member(struct mansession *s, const struct message *m) { - const char *queuename, *interface, *penalty_s, *paused_s, *membername, *state_interface; - int paused, penalty = 0; + const char *queuename, *interface, *penalty_s, *paused_s, *membername, *state_interface, *incominglimit_s; + int paused, penalty, incominglimit = 0; queuename = astman_get_header(m, "Queue"); interface = astman_get_header(m, "Interface"); @@ -4895,6 +5000,7 @@ paused_s = astman_get_header(m, "Paused"); membername = astman_get_header(m, "MemberName"); state_interface = astman_get_header(m, "StateInterface"); + incominglimit_s = astman_get_header(m, "IncomingLimit"); if (ast_strlen_zero(queuename)) { astman_send_error(s, m, "'Queue' not specified."); @@ -4916,7 +5022,12 @@ else paused = abs(ast_true(paused_s)); - switch (add_to_queue(queuename, interface, membername, penalty, paused, queue_persistent_members, state_interface)) { + if (ast_strlen_zero(incominglimit_s)) + incominglimit = 0; + else if (sscanf(incominglimit_s, "%30d", &incominglimit) != 1 || incominglimit < 0) + incominglimit = 0; + + switch (add_to_queue(queuename, interface, membername, penalty, paused, queue_persistent_members, state_interface, incominglimit)) { case RES_OKAY: ast_queue_log(queuename, "MANAGER", interface, "ADDMEMBER", "%s", ""); astman_send_ack(s, m, "Added interface to queue"); @@ -4995,9 +5106,9 @@ static int handle_queue_add_member(int fd, int argc, char *argv[]) { char *queuename, *interface, *membername = NULL, *state_interface = NULL; - int penalty; + int penalty, incominglimit; - if ((argc != 6) && (argc != 8) && (argc != 10) && (argc != 12)) { + if ((argc != 6) && (argc != 8) && (argc != 10) && (argc != 12) && (argc != 14)) { return RESULT_SHOWUSAGE; } else if (strcmp(argv[4], "to")) { return RESULT_SHOWUSAGE; @@ -5007,6 +5118,8 @@ return RESULT_SHOWUSAGE; } else if ((argc == 12) && strcmp(argv[10], "state_interface")) { return RESULT_SHOWUSAGE; + } else if ((argc == 14) && strcmp(argv[12], "incominglimit")) { + return RESULT_SHOWUSAGE; } queuename = argv[5]; @@ -5033,7 +5146,21 @@ state_interface = argv[11]; } - switch (add_to_queue(queuename, interface, membername, penalty, 0, queue_persistent_members, state_interface)) { + if (argc >= 14) { + if (sscanf(argv[13], "%30d", &incominglimit) == 1) { + if (incominglimit < 0) { + ast_cli(fd, "Incoming Limit must be >= 0\n"); + incominglimit = 0; + } + } else { + ast_cli(fd, "Incoming Limit must be an integer >= 0\n"); + incominglimit = 0; + } + } else { + incominglimit = 0; + } + + switch (add_to_queue(queuename, interface, membername, penalty, 0, queue_persistent_members, state_interface, incominglimit)) { case RES_OKAY: ast_queue_log(queuename, "CLI", interface, "ADDMEMBER", "%s", ""); ast_cli(fd, "Added interface '%s' to queue '%s'\n", interface, queuename); @@ -5054,7 +5181,7 @@ static char *complete_queue_add_member(const char *line, const char *word, int pos, int state) { - /* 0 - queue; 1 - add; 2 - member; 3 - ; 4 - to; 5 - ; 6 - penalty; 7 - ; 8 - as; 9 - */ + /* 0 - queue; 1 - add; 2 - member; 3 - ; 4 - to; 5 - ; 6 - penalty; 7 - ; 8 - as; 9 - - 10 state_interface; - 11 ; - 12 incominglimit; - 13 ; */ switch (pos) { case 3: /* Don't attempt to complete name of interface (infinite possibilities) */ return NULL; @@ -5065,6 +5192,7 @@ case 6: /* only one possible match, "penalty" */ return state == 0 ? ast_strdup("penalty") : NULL; case 7: + case 13: if (state < 100) { /* 0-99 */ char *num; if ((num = ast_malloc(3))) { @@ -5080,6 +5208,8 @@ return NULL; case 10: return state == 0 ? ast_strdup("state_interface") : NULL; + case 12: + return state == 0 ? ast_strdup("incominglimit") : NULL; default: return NULL; } @@ -5165,7 +5295,7 @@ " Provides summary information on a specified queue.\n"; static char qam_cmd_usage[] = -"Usage: queue add member to [penalty [as [state_interface ]]]\n"; +"Usage: queue add member to [penalty [as [state_interface [incominglimit ]]]]\n"; static char qrm_cmd_usage[] = "Usage: queue remove member from \n";