--- app_queue.c 2004-06-28 13:17:20.000000000 -0700 +++ ../asterisk/apps/app_queue.c 2004-07-02 13:51:26.000000000 -0700 @@ -57,6 +57,19 @@ #include "../astconf.h" +#ifdef USEPOSTGRESQUEUES +/* + * PostgreSQL routines written by Otmar Lendl + */ +#include +#endif + +#include + + +#define EXTRA_LOG 0 +#define ELEMOF(a) (sizeof(a)/sizeof(a[0])) + #define QUEUE_STRATEGY_RINGALL 0 #define QUEUE_STRATEGY_ROUNDROBIN 1 #define QUEUE_STRATEGY_LEASTRECENT 2 @@ -225,10 +238,19 @@ static struct ast_call_queue *queues = NULL; AST_MUTEX_DEFINE_STATIC(qlock); +#ifdef USEPOSTGRESQUEUES +/* I've got a habit to start names of all static variables with underscore (Constfilin) */ +static PGconn* _dbhandler; +static char _dboption[512]; +static char _db_getqueue_query[1024]; +static char _db_getmembers_query[1024]; +static void _reload_queues_from_database(void); +#endif + static char *int2strat(int strategy) { int x; - for (x=0;xname, queuename)) { @@ -310,8 +335,8 @@ manager_event(EVENT_FLAG_CALL, "Join", "Channel: %s\r\nCallerID: %s\r\nQueue: %s\r\nPosition: %d\r\nCount: %d\r\n", qe->chan->name, (qe->chan->callerid ? qe->chan->callerid : "unknown"), q->name, qe->pos, q->count ); -#if 0 -ast_log(LOG_NOTICE, "Queue '%s' Join, Channel '%s', Position '%d'\n", q->name, qe->chan->name, qe->pos ); +#if EXTRA_LOG + ast_log(LOG_NOTICE, "Queue '%s' Join, Channel '%s', Position '%d'\n", q->name, qe->chan->name, qe->pos ); #endif } ast_mutex_unlock(&q->lock); @@ -498,8 +523,8 @@ manager_event(EVENT_FLAG_CALL, "Leave", "Channel: %s\r\nQueue: %s\r\nCount: %d\r\n", qe->chan->name, q->name, q->count); -#if 0 -ast_log(LOG_NOTICE, "Queue '%s' Leave, Channel '%s'\n", q->name, qe->chan->name ); +#if EXTRA_LOG + ast_log(LOG_NOTICE, "Queue '%s' Leave, Channel '%s'\n", q->name, qe->chan->name ); #endif /* Take us out of the queue */ if (prev) @@ -547,8 +572,8 @@ /* Request the peer */ tmp->chan = ast_request(tmp->tech, qe->chan->nativeformats, tmp->numsubst); if (!tmp->chan) { /* If we can't, just go on to the next call */ -#if 0 - ast_log(LOG_NOTICE, "Unable to create channel of type '%s'\n", cur->tech); +#if EXTRA_LOG + ast_log(LOG_NOTICE, "Unable to create channel of type '%s'\n", tmp->tech); #endif if (qe->chan->cdr) ast_cdr_busy(qe->chan->cdr); @@ -1544,9 +1569,10 @@ } } -// if (option_debug) - ast_log(LOG_DEBUG, "queue: %s, options: %s, url: %s, announce: %s, timeout: %d, priority: %d\n", - queuename, options, url, announceoverride, qe.queuetimeout, (int)prio); +#if EXTRA_LOG + fprintf(stderr,"queue: %s, options: %s, url: %s, announce: %s, timeout: %d\n", + queuename, options, url, announceoverride, qe.queuetimeout); +#endif qe.chan = chan; qe.start = time(NULL); @@ -1672,53 +1698,27 @@ return res; } -static void reload_queues(void) -{ - struct ast_call_queue *q, *ql, *qn; - struct ast_config *cfg; - char *cat, *tmp; - struct ast_variable *var; - struct member *prev, *cur; - int new; - cfg = ast_load("queues.conf"); - if (!cfg) { - ast_log(LOG_NOTICE, "No call queueing config file, so no call queues\n"); - return; - } - ast_mutex_lock(&qlock); - /* Mark all queues as dead for the moment */ - q = queues; - while(q) { - q->dead = 1; - q = q->next; - } - /* Chug through config file */ - cat = ast_category_browse(cfg, NULL); - while(cat) { - if (strcasecmp(cat, "general")) { - /* Look for an existing one */ - q = queues; - while(q) { - if (!strcmp(q->name, cat)) - break; - q = q->next; +static struct ast_call_queue *_lookup_queue_by_name( struct ast_call_queue *head, const char* name ) { + struct ast_call_queue *q; + for( q = head;q;q = q->next) { + if (!strcmp(q->name, name)) + return q; } - if (!q) { - /* Make one then */ - q = malloc(sizeof(struct ast_call_queue)); + return NULL; +} + +static struct ast_call_queue *_make_new_queue( const char* name ) { + struct ast_call_queue *q = malloc(sizeof(struct ast_call_queue)); if (q) { /* Initialize it */ - memset(q, 0, sizeof(struct ast_call_queue)); + memset(q, 0 ,sizeof(struct ast_call_queue)); ast_mutex_init(&q->lock); - strncpy(q->name, cat, sizeof(q->name)); - new = 1; - } else new = 0; - } else - new = 0; - if (q) { - if (!new) - ast_mutex_lock(&q->lock); - /* Re-initialize the queue */ + strncpy(q->name, name, sizeof(q->name)-1); + } + return q; +} + +static void _init_queue( struct ast_call_queue* q ) { q->dead = 0; q->retry = 0; q->timeout = -1; @@ -1744,14 +1744,344 @@ strcpy(q->sound_minutes, "queue-minutes"); strcpy(q->sound_seconds, "queue-seconds"); strcpy(q->sound_thanks, "queue-thankyou"); - prev = q->members; - if (prev) { +} + +static void _validate_queue_properties( struct ast_call_queue *q ) { + if (q->retry < 1) + q->retry = DEFAULT_RETRY; + if (q->timeout < 0) + q->timeout = DEFAULT_TIMEOUT; + if (q->maxlen < 0) + q->maxlen = 0; +} + +static struct member *_find_end_of_members( struct member* head ) { + struct member *m = head; + if (m) { /* find the end of any dynamic members */ - while(prev->next) - prev = prev->next; + while(m->next) + m = m->next; + } + return m; +} + +static int _set_queue_property( struct ast_call_queue *q, const char *propName, const char *propValue ) { + if (!strcasecmp(propName, "music")) { + strncpy(q->moh, propValue, sizeof(q->moh) - 1); + } else if (!strcasecmp(propName, "announce")) { + strncpy(q->announce, propValue, sizeof(q->announce) - 1); + } else if (!strcasecmp(propName, "context")) { + strncpy(q->context, propValue, sizeof(q->context) - 1); + } else if (!strcasecmp(propName, "timeout")) { + q->timeout = atoi(propValue); + } else if (!strcasecmp(propName, "monitor-join")) { + q->monjoin = ast_true((char*)propValue); + } else if (!strcasecmp(propName, "monitor-format")) { + strncpy(q->monfmt, propValue, sizeof(q->monfmt) - 1); + } else if (!strcasecmp(propName, "queue-youarenext")) { + strncpy(q->sound_next, propValue, sizeof(q->sound_next) - 1); + } else if (!strcasecmp(propName, "queue-thereare")) { + strncpy(q->sound_thereare, propValue, sizeof(q->sound_thereare) - 1); + } else if (!strcasecmp(propName, "queue-callswaiting")) { + strncpy(q->sound_calls, propValue, sizeof(q->sound_calls) - 1); + } else if (!strcasecmp(propName, "queue-holdtime")) { + strncpy(q->sound_holdtime, propValue, sizeof(q->sound_holdtime) - 1); + } else if (!strcasecmp(propName, "queue-minutes")) { + strncpy(q->sound_minutes, propValue, sizeof(q->sound_minutes) - 1); + } else if (!strcasecmp(propName, "queue-thankyou")) { + strncpy(q->sound_thanks, propValue, sizeof(q->sound_thanks) - 1); + } else if (!strcasecmp(propName, "announce-frequency")) { + q->announcefrequency = atoi(propValue); + } else if (!strcasecmp(propName, "announce-round-seconds")) { + q->roundingseconds = atoi(propValue); + if(q->roundingseconds>60 || q->roundingseconds<0) { + ast_log(LOG_WARNING, "'%s' isn't a valid value for queue-rounding-seconds using 0 instead\n", propValue); + q->roundingseconds=0; + } + } else if (!strcasecmp(propName, "announce-holdtime")) { + q->announceholdtime = (!strcasecmp(propValue,"once")) ? 1 : ast_true((char*)propValue); + } else if (!strcasecmp(propName, "retry")) { + q->retry = atoi(propValue); + } else if (!strcasecmp(propName, "maxlen")) { + q->maxlen = atoi(propValue); + } else if (!strcasecmp(propName, "servicelevel")) { + q->servicelevel= atoi(propValue); + } else if (!strcasecmp(propName, "strategy")) { + q->strategy = strat2int((char*)propValue); + if (q->strategy < 0) { + ast_log(LOG_WARNING, "'%s' isn't a valid strategy, using ringall instead\n", propValue); + q->strategy = 0; + } + } else { + return 0; + } + return 1; +} + +#ifdef USEPOSTGRESQUEUES +static void _reload_queues_from_database() +{ + PGresult* pg_queue_result; + int queue_fields; + int queue_rows; + int queue_field_ndx; + int queue_row_ndx; + char* queue_field_name; + char* queue_field_value; + + PGresult* pg_members_result; + int member_fields; + int member_rows; + int member_field_ndx; + int member_row_ndx; + char* member_field_name; + char* member_field_value; + + struct ast_call_queue *queue; + struct member* prev_member; + struct member* cur_member; + char* queue_name; + int queue_is_new; + char memberQueryBuff[1024]; + +#if EXTRA_LOG + printf("In _reload_queues_from_database,_db_getqueue_query=%s,_db_getmembers_query=%s\n",_db_getqueue_query,_db_getmembers_query); +#endif + + if( _dbhandler ) { + /* qlock must be locked by now! */ + pg_queue_result = PQexec(_dbhandler,_db_getqueue_query); + if(pg_queue_result!=NULL) { + if (PQresultStatus(pg_queue_result) == PGRES_BAD_RESPONSE || + PQresultStatus(pg_queue_result) == PGRES_NONFATAL_ERROR || + PQresultStatus(pg_queue_result) == PGRES_FATAL_ERROR) { + ast_log(LOG_WARNING,"PGSQL_query: Query Error (%s) Calling PQreset\n",PQcmdStatus(pg_queue_result)); + PQreset(_dbhandler); + } else { + queue_fields = PQnfields(pg_queue_result); + queue_rows = PQntuples(pg_queue_result); +#if EXTRA_LOG + fprintf(stderr,"_reload_queues_from_database: queue query found %d rows with %d fields\n",queue_rows,queue_fields); +#endif + /* Enumerate all rows */ + for(queue_row_ndx=0; queue_row_ndxlock); + + _init_queue(queue); /* Re-initialize the queue. This also wipes out all static members */ + + prev_member = _find_end_of_members(queue->members); + + /* Enumerate all fields */ + for(queue_field_ndx=0;queue_field_ndxtech,member_field_value,sizeof(cur_member->tech)-1); + } + else if( !strcasecmp(member_field_name,"penalty") ) { + cur_member->penalty = atoi(member_field_value); + if( cur_member->penalty<0 ) { + cur_member->penalty = 0; + } + } + else if( !strcasecmp(member_field_name,"location") ) { + strncpy(cur_member->loc,member_field_value,sizeof(cur_member->loc)-1); } - var = ast_variable_browse(cfg, cat); - while(var) { + else { +#if EXTRA_LOG + fprintf(stderr,"Unknown value in queue member '%s' : %s\n", queue_name, member_field_name ); +#endif + } + } + *( prev_member ? &(prev_member->next) : &(queue->members) ) = cur_member; + prev_member = cur_member; + } + else { + ast_log(LOG_WARNING,"Cannot allocate %d bytes for member structure\n",sizeof(struct member)); + } + } + } + PQclear(pg_members_result); + } + else { + ast_log(LOG_WARNING,"_reload_queues_from_database: query '%s' failed",memberQueryBuff); + } + /* Finalization */ + _validate_queue_properties(queue); + if (!queue_is_new) + ast_mutex_unlock(&queue->lock); + if (queue_is_new) { + queue->next = queues; + queues = queue; + } + } + + free(queue_name); + + } + else { + ast_log(LOG_WARNING,"_reload_queues_from_database: Field \"name\" is not found in query '%s'",_db_getqueue_query); + } + } + } + PQclear(pg_queue_result); + } + else { + ast_log(LOG_WARNING,"_reload_queues_from_database: Connection Error (%s)\n",PQerrorMessage(_dbhandler)); + } + } + else { + /* bad database connection */ + ast_log(LOG_WARNING,"_dbhandler is NULL in _reload_queues_from_database\n"); + } +} +#endif + +static void reload_queues(void) +{ + struct ast_call_queue *q, *ql, *qn; + struct ast_config *cfg; + char *cat; + struct ast_variable *var; +#ifndef USEPOSTGRESQUEUES + struct member *prev, *cur; + char *tmp; + int new; +#endif + +#if EXTRA_LOG + printf("In reload_queues\n"); +#endif + + cfg = ast_load("queues.conf"); + if (!cfg) { + ast_log(LOG_NOTICE, "No call queueing config file, so no call queues\n"); + return; + } + + ast_mutex_lock(&qlock); + + /* Mark all queues as dead for the moment */ + for( q = queues;q;q = q->next) { + q->dead = 1; + q = q->next; + } + + /* Chug through config file */ + for( cat = ast_category_browse(cfg, NULL);cat;cat = ast_category_browse(cfg,cat)) { +#ifdef USEPOSTGRESQUEUES + if( !strcasecmp(cat,"general") ) { + for( var = ast_variable_browse(cfg, cat);var;var = var->next ) { + if( !strcasecmp(var->name,"dboption") ) { + if( !strcasecmp(var->value,_dboption) ) { + /* dboption hasn't changed - do nothing */ + } + else { + /* + Update _dbhandler and _dboption only in case if the the settings are valid. + It is better to preserve older settings in case if the newr settings are invalid. + */ + ast_verbose( VERBOSE_PREFIX_3 "Logging into postgres database: %s\n", var->value); + PGconn* dbhandler = PQconnectdb(var->value); + if (PQstatus(dbhandler) == CONNECTION_BAD) { + ast_log(LOG_WARNING, "Error Logging into database '%s' (%s)\n",var->value,PQerrorMessage(dbhandler)); + } + else { + if( _dbhandler ) { + PQfinish(_dbhandler); + } + _dbhandler = dbhandler; + strncpy(_dboption,var->value,sizeof(_dboption)-1); + } + } + } + else if( !strcasecmp(var->name,"db_getqueue_query") ) { + strncpy(_db_getqueue_query,var->value,sizeof(_db_getqueue_query)-1); + } + else if( !strcasecmp(var->name,"db_getmembers_query") ) { + strncpy(_db_getmembers_query,var->value,sizeof(_db_getmembers_query)-1); + } + } + } +#else + if (strcasecmp(cat, "general")) { + /* Look for an existing one */ + q = _lookup_queue_by_name(queues,cat); + if (!q) { + q = _make_new_queue(cat); + new = q ? 1 : 0; + } else { +B new = 0; + } + if (q) { + if (!new) + ast_mutex_lock(&q->lock); + _init_queue(q); /* Re-initialize the queue */ + prev = _find_end_of_members(q->members); + for( var = ast_variable_browse(cfg, cat);var;var = var->next ) { if (!strcasecmp(var->name, "member")) { /* Add a new member */ cur = malloc(sizeof(struct member)); @@ -1780,67 +2110,13 @@ q->members = cur; prev = cur; } - } else if (!strcasecmp(var->name, "music")) { - strncpy(q->moh, var->value, sizeof(q->moh) - 1); - } else if (!strcasecmp(var->name, "announce")) { - strncpy(q->announce, var->value, sizeof(q->announce) - 1); - } else if (!strcasecmp(var->name, "context")) { - strncpy(q->context, var->value, sizeof(q->context) - 1); - } else if (!strcasecmp(var->name, "timeout")) { - q->timeout = atoi(var->value); - } else if (!strcasecmp(var->name, "monitor-join")) { - q->monjoin = ast_true(var->value); - } else if (!strcasecmp(var->name, "monitor-format")) { - strncpy(q->monfmt, var->value, sizeof(q->monfmt) - 1); - } else if (!strcasecmp(var->name, "queue-youarenext")) { - strncpy(q->sound_next, var->value, sizeof(q->sound_next) - 1); - } else if (!strcasecmp(var->name, "queue-thereare")) { - strncpy(q->sound_thereare, var->value, sizeof(q->sound_thereare) - 1); - } else if (!strcasecmp(var->name, "queue-callswaiting")) { - strncpy(q->sound_calls, var->value, sizeof(q->sound_calls) - 1); - } else if (!strcasecmp(var->name, "queue-holdtime")) { - strncpy(q->sound_holdtime, var->value, sizeof(q->sound_holdtime) - 1); - } else if (!strcasecmp(var->name, "queue-minutes")) { - strncpy(q->sound_minutes, var->value, sizeof(q->sound_minutes) - 1); - } else if (!strcasecmp(var->name, "queue-seconds")) { - strncpy(q->sound_seconds, var->value, sizeof(q->sound_seconds) - 1); - } else if (!strcasecmp(var->name, "queue-thankyou")) { - strncpy(q->sound_thanks, var->value, sizeof(q->sound_thanks) - 1); - } else if (!strcasecmp(var->name, "announce-frequency")) { - q->announcefrequency = atoi(var->value); - } else if (!strcasecmp(var->name, "announce-round-seconds")) { - q->roundingseconds = atoi(var->value); - if(q->roundingseconds>60 || q->roundingseconds<0) { - ast_log(LOG_WARNING, "'%s' isn't a valid value for queue-rounding-seconds using 0 instead at line %d of queue.conf\n", var->value, var->lineno); - q->roundingseconds=0; - } - } else if (!strcasecmp(var->name, "announce-holdtime")) { - q->announceholdtime = (!strcasecmp(var->value,"once")) ? 1 : ast_true(var->value); - } else if (!strcasecmp(var->name, "retry")) { - q->retry = atoi(var->value); - } else if (!strcasecmp(var->name, "wrapuptime")) { - q->wrapuptime = atoi(var->value); - } else if (!strcasecmp(var->name, "maxlen")) { - q->maxlen = atoi(var->value); - } else if (!strcasecmp(var->name, "servicelevel")) { - q->servicelevel= atoi(var->value); - } else if (!strcasecmp(var->name, "strategy")) { - q->strategy = strat2int(var->value); - if (q->strategy < 0) { - ast_log(LOG_WARNING, "'%s' isn't a valid strategy, using ringall instead\n", var->value); - q->strategy = 0; - } } else { - ast_log(LOG_WARNING, "Unknown keyword in queue '%s': %s at line %d of queue.conf\n", cat, var->name, var->lineno); + if( !_set_queue_property(q,var->name,var->value) ) { + ast_log(LOG_WARNING,"Unknown keyword in queue '%s': %s at line %d of queue.conf\n", cat, var->name, var->lineno); } - var = var->next; } - if (q->retry < 1) - q->retry = DEFAULT_RETRY; - if (q->timeout < 0) - q->timeout = DEFAULT_TIMEOUT; - if (q->maxlen < 0) - q->maxlen = 0; + } + _validate_queue_properties(q); if (!new) ast_mutex_unlock(&q->lock); if (new) { @@ -1849,9 +2125,12 @@ } } } - cat = ast_category_browse(cfg, cat); +#endif } ast_destroy(cfg); +#if USEPOSTGRESQUEUES + _reload_queues_from_database(); +#endif q = queues; ql = NULL; while(q) { @@ -1863,10 +2142,12 @@ queues = q->next; if (!q->count) { free(q); - } else - ast_log(LOG_WARNING, "XXX Leaking a little memory :( XXX\n"); - } else + } else { + ast_log(LOG_WARNING, "XXX Leaking a litttle memory in aspp_queue.c :( XXX\n"); + } + } else { ql = q; + } q = qn; } ast_mutex_unlock(&qlock); @@ -1887,6 +2168,9 @@ if ((!queue_show && argc != 2) || (queue_show && argc != 3)) return RESULT_SHOWUSAGE; ast_mutex_lock(&qlock); +#if USEPOSTGRESQUEUES + _reload_queues_from_database(); +#endif q = queues; if (!q) { ast_mutex_unlock(&qlock); @@ -1970,6 +2254,9 @@ int which=0; ast_mutex_lock(&qlock); +#if USEPOSTGRESQUEUES + _reload_queues_from_database(); +#endif q = queues; while(q) { if (!strncasecmp(word, q->name, strlen(word))) { @@ -2004,6 +2291,9 @@ astman_send_ack(s, m, "Queue status will follow"); time(&now); ast_mutex_lock(&qlock); +#if USEPOSTGRESQUEUES + _reload_queues_from_database(); +#endif q = queues; if (id && !ast_strlen_zero(id)) { snprintf(idText,256,"ActionID: %s\r\n",id); @@ -2087,12 +2377,23 @@ ast_manager_unregister( "QueueStatus" ); ast_unregister_application(app_aqm); ast_unregister_application(app_rqm); +#ifdef USEPOSTGRESQUEUES + if( _dbhandler ) { + PQfinish(_dbhandler); + _dbhandler = 0; + } +#endif return ast_unregister_application(app); } int load_module(void) { int res; +#if USEPOSTGRESQUEUES + _dbhandler = 0; + _dboption[0] = 0; + _db_getqueue_query[0] = 0; +#endif res = ast_register_application(app, queue_exec, synopsis, descrip); if (!res) { ast_cli_register(&cli_show_queue);