Index: include/asterisk/jabber.h =================================================================== --- include/asterisk/jabber.h (revision 366739) +++ include/asterisk/jabber.h (working copy) @@ -73,6 +73,7 @@ #define AJI_MAX_JIDLEN 3071 #define AJI_MAX_RESJIDLEN 1023 #define AJI_MAX_ATTRLEN 256 +#define AJI_MAX_SUBID 256 #define MUC_NS "http://jabber.org/protocol/muc" @@ -101,6 +102,11 @@ AJI_UTRANS = 2, }; +enum aji_pubsub_node { + AJI_DEVICE_STATE = 0, + AJI_MESSAGE_WAITING = 1, +}; + struct aji_version { char version[50]; int jingle; @@ -158,6 +164,8 @@ char sid[10]; /* Session ID */ char mid[6]; /* Message ID */ char context[AST_MAX_CONTEXT]; + char device_state_subid[AJI_MAX_SUBID]; /* Device State Subscription ID */ + char message_waiting_subid[AJI_MAX_SUBID]; /* Message Waiting Subscription ID */ iksid *jid; iksparser *p; iksfilter *f; Index: res/res_jabber.c =================================================================== --- res/res_jabber.c (revision 366739) +++ res/res_jabber.c (working copy) @@ -354,7 +354,9 @@ const char *device_state); static int aji_handle_pubsub_error(void *data, ikspak *pak); static int aji_handle_pubsub_event(void *data, ikspak *pak); +static int aji_handle_pubsub_result(void *data, ikspak *pak); static void aji_pubsub_subscribe(struct aji_client *client, const char *node); +static void aji_pubsub_unsubscribe(struct aji_client *client, const char *node, const char *subid); static void aji_delete_pubsub_node(struct aji_client *client, const char *node_name); static iks* aji_build_node_request(struct aji_client *client, const char *collection); static int aji_delete_node_list(void *data, ikspak* pak); @@ -3295,11 +3297,50 @@ aji_pubsub_subscribe(client, "message_waiting"); iks_filter_add_rule(client->f, aji_handle_pubsub_event, client, IKS_RULE_TYPE, IKS_PAK_MESSAGE, IKS_RULE_FROM, client->pubsub_node, IKS_RULE_DONE); + iks_filter_add_rule(client->f, aji_handle_pubsub_result, client, IKS_RULE_TYPE, + IKS_PAK_IQ, IKS_RULE_SUBTYPE, IKS_TYPE_RESULT, IKS_RULE_FROM, client->pubsub_node, IKS_RULE_DONE); iks_filter_add_rule(client->f, aji_handle_pubsub_error, client, IKS_RULE_TYPE, IKS_PAK_IQ, IKS_RULE_SUBTYPE, IKS_TYPE_ERROR, IKS_RULE_DONE); } +static void aji_pubsub_remove_extra_subscriptions(void *data, enum aji_pubsub_node node, ikspak *pak) +{ + iks *header, *content; + char *valid_sub_id = NULL; + char *node_name = NULL; + struct aji_client *client = (struct aji_client *)data; + + if (node == AJI_DEVICE_STATE) { + valid_sub_id = client->device_state_subid; + node_name = "device_state"; + } else if (node == AJI_MESSAGE_WAITING) { + valid_sub_id = client->message_waiting_subid; + node_name = "message_waiting"; + } + + if (valid_sub_id != NULL && node_name != NULL) { + /* Enumerate through all the subscription IDs. Remove any that + * do not actually pertain to the active, valid subscription id + * for this node. These don't always get cleaned up at shutdown. + */ + header = iks_child(iks_find(pak->x, "headers")); + while (header) { + char *header_name = iks_find_attrib(header, "name"); + if (header_name && !strcasecmp(header_name, "SubID")) { + content = iks_child(header); + char *sub_id = iks_cdata(content); + if (sub_id && strcasecmp(valid_sub_id, sub_id) != 0) { + aji_pubsub_unsubscribe(client, node_name, sub_id); + } + } + header = iks_next_tag(header); + } + } else { + ast_log(LOG_NOTICE, "Could not determine pubsub node type when removing extra subscriptions.\n"); + } +} + /*! * \brief Callback for handling PubSub events * \param data void pointer to aji_client structure @@ -3311,7 +3352,7 @@ int oldmsgs, newmsgs; iks *item, *item_content; struct ast_eid pubsub_eid; - struct ast_event *event; + struct ast_event *event = NULL; item = iks_find(iks_find(iks_find(pak->x, "event"), "items"), "item"); if (!item) { ast_log(LOG_ERROR, "Could not parse incoming PubSub event\n"); @@ -3325,6 +3366,7 @@ return IKS_FILTER_EAT; } if (!strcasecmp(iks_name(item_content), "state")) { + aji_pubsub_remove_extra_subscriptions(data, AJI_DEVICE_STATE, pak); device_state = iks_find_cdata(item, "state"); if (!(event = ast_event_new(AST_EVENT_DEVICE_STATE_CHANGE, AST_EVENT_IE_DEVICE, AST_EVENT_IE_PLTYPE_STR, item_id, AST_EVENT_IE_STATE, @@ -3334,6 +3376,7 @@ return IKS_FILTER_EAT; } } else if (!strcasecmp(iks_name(item_content), "mailbox")) { + aji_pubsub_remove_extra_subscriptions(data, AJI_MESSAGE_WAITING, pak); context = strsep(&item_id, "@"); sscanf(iks_find_cdata(item_content, "OLDMSGS"), "%10d", &oldmsgs); sscanf(iks_find_cdata(item_content, "NEWMSGS"), "%10d", &newmsgs); @@ -3350,11 +3393,49 @@ iks_name(item_content)); return IKS_FILTER_EAT; } - ast_event_queue_and_cache(event); + if (event != NULL) + { + ast_event_queue_and_cache(event); + } return IKS_FILTER_EAT; } /*! + * \brief Callback for handling PubSub results + * \param data void pointer to aji_client structure + * \return IKS_FILTER_PASS + */ +static int aji_handle_pubsub_result(void *data, ikspak *pak) +{ + char *sub_id = NULL; + char *node = NULL; + iks *subscription; + + subscription = iks_find(iks_find(pak->x, "pubsub"), "subscription"); + if (subscription) { + struct aji_client *client = (struct aji_client *)data; + sub_id = iks_find_attrib(subscription, "subid"); + if (sub_id) { + node = iks_find_attrib(subscription, "node"); + if (node) { + if (!strcasecmp(node, "device_state")) { + strncpy(client->device_state_subid, sub_id, AJI_MAX_SUBID); + } else if (!strcasecmp(node, "message_waiting")) { + strncpy(client->message_waiting_subid, sub_id, AJI_MAX_SUBID); + } else { + ast_log(LOG_ERROR, "Received unrecognized subscription node: %s\n", node); + } + } else { + ast_log(LOG_ERROR, "Could not find node in PubSub result\n"); + } + } else { + ast_log(LOG_ERROR, "Could not find subscription ID in PubSub result\n"); + } + } + return IKS_FILTER_PASS; +} + +/*! * \brief Add Owner affiliations for pubsub node * \param client the configured XMPP client we use to connect to a XMPP server * \param node the name of the node to which to add affiliations @@ -3417,6 +3498,21 @@ iks_delete(request); } +static void aji_pubsub_unsubscribe(struct aji_client *client, const char *node, const char *subid) +{ + iks *request = aji_pubsub_iq_create(client, "set"); + iks *pubsub, *unsubscribe; + + pubsub = iks_insert(request, "pubsub"); + iks_insert_attrib(pubsub, "xmlns", "http://jabber.org/protocol/pubsub"); + unsubscribe = iks_insert(pubsub, "unsubscribe"); + iks_insert_attrib(unsubscribe, "jid", client->jid->partial); + iks_insert_attrib(unsubscribe, "node", node); + iks_insert_attrib(unsubscribe, "subid", subid); + ast_aji_send(client, request); + iks_delete(request); +} + /*! * \brief Build the skeleton of a publish * \param client the configured XMPP client we use to connect to a XMPP server