Index: funcs/func_odbc.c =================================================================== --- funcs/func_odbc.c (revision 66703) +++ funcs/func_odbc.c (working copy) @@ -57,6 +57,7 @@ enum { OPT_ESCAPECOMMAS = (1 << 0), + OPT_MULTIROW = (1 << 1), } odbc_option_flags; struct acf_odbc_query { @@ -66,11 +67,47 @@ char sql_read[2048]; char sql_write[2048]; unsigned int flags; + int rowlimit; struct ast_custom_function *acf; }; +static void odbc_datastore_free(void *data); + +struct ast_datastore_info odbc_info = { + .type = "FUNC_ODBC", + .destroy = odbc_datastore_free, +}; + +/* For storing each result row */ +struct odbc_datastore_row { + AST_LIST_ENTRY(odbc_datastore_row) list; + char data[0]; +}; + +/* For storing each result set */ +struct odbc_datastore { + AST_LIST_HEAD(, odbc_datastore_row); + char names[0]; +}; + AST_LIST_HEAD_STATIC(queries, acf_odbc_query); +static int resultcount = 0; +AST_MUTEX_DEFINE_STATIC(resultlock); + +static void odbc_datastore_free(void *data) +{ + struct odbc_datastore *result = data; + struct odbc_datastore_row *row; + AST_LIST_LOCK(result); + while ((row = AST_LIST_REMOVE_HEAD(result, list))) { + ast_free(row); + } + AST_LIST_UNLOCK(result); + AST_LIST_HEAD_DESTROY(result); + ast_free(result); +} + static SQLHSTMT generic_prepare(struct odbc_obj *obj, void *data) { int res; @@ -200,8 +237,8 @@ { struct odbc_obj *obj = NULL; struct acf_odbc_query *query; - char sql[2048] = "", varname[15], colnames[2048] = ""; - int res, x, buflen = 0, escapecommas, dsn; + char sql[2048] = "", varname[15], colnames[2048] = "", rowcount[12] = "-1"; + int res, x, y, buflen = 0, escapecommas, rowlimit = 1, dsn; AST_DECLARE_APP_ARGS(args, AST_APP_ARG(field)[100]; ); @@ -209,6 +246,8 @@ SQLSMALLINT colcount=0; SQLLEN indicator; SQLSMALLINT collength; + struct odbc_datastore *resultset = NULL; + struct odbc_datastore_row *row = NULL; AST_LIST_LOCK(&queries); AST_LIST_TRAVERSE(&queries, query, list) { @@ -220,6 +259,7 @@ if (!query) { ast_log(LOG_ERROR, "No such function '%s'\n", cmd); AST_LIST_UNLOCK(&queries); + pbx_builtin_setvar_helper(chan, "ODBCROWS", rowcount); return -1; } @@ -237,9 +277,16 @@ pbx_builtin_setvar_helper(chan, varname, NULL); } - /* Save this flag, so we can release the lock */ + /* Save these flags, so we can release the lock */ escapecommas = ast_test_flag(query, OPT_ESCAPECOMMAS); - + if (ast_test_flag(query, OPT_MULTIROW)) { + resultset = ast_calloc(1, sizeof(*resultset)); + AST_LIST_HEAD_INIT(resultset); + if (query->rowlimit) + rowlimit = query->rowlimit; + else + rowlimit = INT_MAX; + } AST_LIST_UNLOCK(&queries); for (dsn = 0; dsn < 5; dsn++) { @@ -256,6 +303,7 @@ ast_log(LOG_ERROR, "Unable to execute query [%s]\n", sql); if (obj) ast_odbc_release_obj(obj); + pbx_builtin_setvar_helper(chan, "ODBCROWS", rowcount); return -1; } @@ -264,92 +312,152 @@ ast_log(LOG_WARNING, "SQL Column Count error!\n[%s]\n\n", sql); SQLFreeHandle (SQL_HANDLE_STMT, stmt); ast_odbc_release_obj(obj); + pbx_builtin_setvar_helper(chan, "ODBCROWS", rowcount); return -1; } - *buf = '\0'; - res = SQLFetch(stmt); if ((res != SQL_SUCCESS) && (res != SQL_SUCCESS_WITH_INFO)) { int res1 = -1; if (res == SQL_NO_DATA) { - if (option_verbose > 3) { + if (option_verbose > 3) ast_verbose(VERBOSE_PREFIX_4 "Found no rows [%s]\n", sql); - } res1 = 0; - } else if (option_verbose > 3) { + ast_copy_string(rowcount, "0", sizeof(rowcount)); + } else { ast_log(LOG_WARNING, "Error %d in FETCH [%s]\n", res, sql); } SQLFreeHandle(SQL_HANDLE_STMT, stmt); ast_odbc_release_obj(obj); + pbx_builtin_setvar_helper(chan, "ODBCROWS", rowcount); return res1; } - for (x = 0; x < colcount; x++) { - int i, namelen; - char coldata[256], colname[256]; + for (y = 0; y < rowlimit; y++) { + *buf = '\0'; + for (x = 0; x < colcount; x++) { + int i; + char coldata[256]; - res = SQLDescribeCol(stmt, x + 1, (unsigned char *)colname, sizeof(colname), &collength, NULL, NULL, NULL, NULL); - if (((res != SQL_SUCCESS) && (res != SQL_SUCCESS_WITH_INFO)) || collength == 0) { - snprintf(colname, sizeof(colname), "field%d", x); - } + if (y == 0) { + char colname[256]; + int namelen; - if (!ast_strlen_zero(colnames)) - strncat(colnames, ",", sizeof(colnames) - 1); - namelen = strlen(colnames); + res = SQLDescribeCol(stmt, x + 1, (unsigned char *)colname, sizeof(colname), &collength, NULL, NULL, NULL, NULL); + if (((res != SQL_SUCCESS) && (res != SQL_SUCCESS_WITH_INFO)) || collength == 0) { + snprintf(colname, sizeof(colname), "field%d", x); + } - /* Copy data, encoding '\' and ',' for the argument parser */ - for (i = 0; i < sizeof(colname); i++) { - if (escapecommas && (colname[i] == '\\' || colname[i] == ',')) { - colnames[namelen++] = '\\'; + if (!ast_strlen_zero(colnames)) + strncat(colnames, ",", sizeof(colnames) - 1); + namelen = strlen(colnames); + + /* Copy data, encoding '\' and ',' for the argument parser */ + for (i = 0; i < sizeof(colname); i++) { + if (escapecommas && (colname[i] == '\\' || colname[i] == ',')) { + colnames[namelen++] = '\\'; + } + colnames[namelen++] = colname[i]; + + if (namelen >= sizeof(colnames) - 2) { + colnames[namelen >= sizeof(colnames) ? sizeof(colnames) - 1 : namelen] = '\0'; + break; + } + + if (colname[i] == '\0') + break; + } + + if (resultset) { + void *tmp = ast_realloc(resultset, sizeof(*resultset) + strlen(colnames) + 1); + if (!tmp) { + ast_log(LOG_ERROR, "No space for a new resultset?\n"); + ast_free(resultset); + SQLFreeHandle(SQL_HANDLE_STMT, stmt); + ast_odbc_release_obj(obj); + pbx_builtin_setvar_helper(chan, "ODBCROWS", rowcount); + return -1; + } + resultset = tmp; + strcpy((char *)resultset + sizeof(*resultset), colnames); + } } - colnames[namelen++] = colname[i]; - if (namelen >= sizeof(colnames) - 2) { - colnames[namelen >= sizeof(colnames) ? sizeof(colnames) - 1 : namelen] = '\0'; - break; + buflen = strlen(buf); + res = SQLGetData(stmt, x + 1, SQL_CHAR, coldata, sizeof(coldata), &indicator); + if (indicator == SQL_NULL_DATA) { + coldata[0] = '\0'; + res = SQL_SUCCESS; } - if (colname[i] == '\0') - break; - } + if ((res != SQL_SUCCESS) && (res != SQL_SUCCESS_WITH_INFO)) { + ast_log(LOG_WARNING, "SQL Get Data error!\n[%s]\n\n", sql); + y = -1; + goto end_acf_read; + } - buflen = strlen(buf); - res = SQLGetData(stmt, x + 1, SQL_CHAR, coldata, sizeof(coldata), &indicator); - if (indicator == SQL_NULL_DATA) { - coldata[0] = '\0'; - res = SQL_SUCCESS; - } + /* Copy data, encoding '\' and ',' for the argument parser */ + for (i = 0; i < sizeof(coldata); i++) { + if (escapecommas && (coldata[i] == '\\' || coldata[i] == ',')) { + buf[buflen++] = '\\'; + } + buf[buflen++] = coldata[i]; - if ((res != SQL_SUCCESS) && (res != SQL_SUCCESS_WITH_INFO)) { - ast_log(LOG_WARNING, "SQL Get Data error!\n[%s]\n\n", sql); - SQLFreeHandle(SQL_HANDLE_STMT, stmt); - ast_odbc_release_obj(obj); - return -1; + if (buflen >= len - 2) + break; + + if (coldata[i] == '\0') + break; + } + + buf[buflen - 1] = ','; + buf[buflen] = '\0'; } + /* Trim trailing comma */ + buf[buflen - 1] = '\0'; - /* Copy data, encoding '\' and ',' for the argument parser */ - for (i = 0; i < sizeof(coldata); i++) { - if (escapecommas && (coldata[i] == '\\' || coldata[i] == ',')) { - buf[buflen++] = '\\'; + if (resultset) { + row = ast_calloc(1, sizeof(*row) + buflen); + if (!row) { + ast_log(LOG_ERROR, "Unable to allocate space for more rows in this resultset.\n"); + goto end_acf_read; } - buf[buflen++] = coldata[i]; + strcpy((char *)row + sizeof(*row), buf); + AST_LIST_INSERT_TAIL(resultset, row, list); - if (buflen >= len - 2) + /* Get next row */ + res = SQLFetch(stmt); + if ((res != SQL_SUCCESS) && (res != SQL_SUCCESS_WITH_INFO)) { + if (res != SQL_NO_DATA) + ast_log(LOG_WARNING, "Error %d in FETCH [%s]\n", res, sql); + y++; break; - - if (coldata[i] == '\0') - break; + } } - - buf[buflen - 1] = ','; - buf[buflen] = '\0'; } - /* Trim trailing comma */ - buf[buflen - 1] = '\0'; +end_acf_read: + snprintf(rowcount, sizeof(rowcount), "%d", y); + pbx_builtin_setvar_helper(chan, "ODBCROWS", rowcount); pbx_builtin_setvar_helper(chan, "~ODBCFIELDS~", colnames); - + if (resultset) { + int uid; + struct ast_datastore *odbc_store; + ast_mutex_lock(&resultlock); + uid = ++resultcount; + ast_mutex_unlock(&resultlock); + snprintf(buf, len, "%d", uid); + odbc_store = ast_channel_datastore_alloc(&odbc_info, buf); + if (!odbc_store) { + ast_log(LOG_ERROR, "Rows retrieved, but unable to store it in the channel. Results fail.\n"); + odbc_datastore_free(resultset); + SQLFreeHandle(SQL_HANDLE_STMT, stmt); + ast_odbc_release_obj(obj); + return -1; + } + odbc_store->data = resultset; + ast_channel_datastore_add(chan, odbc_store); + } SQLFreeHandle(SQL_HANDLE_STMT, stmt); ast_odbc_release_obj(obj); return 0; @@ -383,6 +491,60 @@ .write = NULL, }; +static int acf_fetch(struct ast_channel *chan, const char *cmd, char *data, char *buf, size_t len) +{ + struct ast_datastore *store; + struct odbc_datastore *resultset; + struct odbc_datastore_row *row; + store = ast_channel_datastore_find(chan, &odbc_info, data); + if (!store) { + return -1; + } + resultset = store->data; + AST_LIST_LOCK(resultset); + row = AST_LIST_REMOVE_HEAD(resultset, list); + AST_LIST_UNLOCK(resultset); + if (!row) { + /* Cleanup datastore */ + ast_channel_datastore_remove(chan, store); + ast_channel_datastore_free(store); + return -1; + } + pbx_builtin_setvar_helper(chan, "~ODBCFIELDS~", resultset->names); + ast_copy_string(buf, row->data, len); + ast_free(row); + return 0; +} + +static struct ast_custom_function fetch_function = { + .name = "ODBC_FETCH", + .synopsis = "Fetch a row from a multirow query", + .syntax = "ODBC_FETCH()", + .desc = +"For queries which are marked as mode=multirow, the original query returns a\n" +"result-id from which results may be fetched. This function implements the\n" +"actual fetch of the results.\n", + .read = acf_fetch, + .write = NULL, +}; + +static char *app_odbcfinish = "ODBCFinish"; +static char *syn_odbcfinish = "Clear the resultset of a successful multirow query"; +static char *desc_odbcfinish = +"ODBCFinish()\n" +" Clears any remaining rows of the specified resultset\n"; + + +static int exec_odbcfinish(struct ast_channel *chan, void *data) +{ + struct ast_datastore *store = ast_channel_datastore_find(chan, &odbc_info, data); + if (!store) /* Already freed; no big deal. */ + return 0; + ast_channel_datastore_remove(chan, store); + ast_channel_datastore_free(store); + return 0; +} + static int init_acf_query(struct ast_config *cfg, char *catg, struct acf_odbc_query **query) { const char *tmp; @@ -459,6 +621,13 @@ ast_clear_flag((*query), OPT_ESCAPECOMMAS); } + if ((tmp = ast_variable_retrieve(cfg, catg, "mode"))) { + if (strcasecmp(tmp, "multirow") == 0) + ast_set_flag((*query), OPT_MULTIROW); + if ((tmp = ast_variable_retrieve(cfg, catg, "rowlimit"))) + sscanf(tmp, "%d", &((*query)->rowlimit)); + } + (*query)->acf = ast_calloc(1, sizeof(struct ast_custom_function)); if (! (*query)->acf) { free(*query); @@ -569,6 +738,8 @@ struct ast_config *cfg; char *catg; + res |= ast_custom_function_register(&fetch_function); + res |= ast_register_application(app_odbcfinish, exec_odbcfinish, syn_odbcfinish, desc_odbcfinish); AST_LIST_LOCK(&queries); cfg = ast_config_load(config); @@ -617,6 +788,8 @@ } res |= ast_custom_function_unregister(&escape_function); + res |= ast_custom_function_unregister(&fetch_function); + res |= ast_unregister_application(app_odbcfinish); /* Allow any threads waiting for this lock to pass (avoids a race) */ AST_LIST_UNLOCK(&queries); Index: configs/func_odbc.conf.sample =================================================================== --- configs/func_odbc.conf.sample (revision 66703) +++ configs/func_odbc.conf.sample (working copy) @@ -67,4 +67,11 @@ ;escapecommas=no ; Normally, commas within a field are escaped such that each ; field may be separated into individual variables with ARRAY. ; This option turns that behavior off [default=yes]. +;mode=multirow ; Enable multirow fetching. Instead of returning results directly, + ; mode=multirow queries will return a result-id, which can be passed + ; multiple times to ODBC_FETCH, and that function will return each + ; row, in order. You can add to this the following parameter: +;rowlimit=5 ; rowlimit will limit the number of rows retrieved and stored from + ; the database. If not specified, all rows, up to available memory, + ; will be retrieved and stored.