Title
Introduction to Advanced Queues (AQ)

Date
2008-05-21

Summary
This note is intended for those who are interested in learning how AQs are implemented.

Details
1. Log in as sysdba, create a database user, and grant the appropriate privileges:
CREATE USER ahmed IDENTIFIED BY ahmed;
GRANT resource, connect TO ahmed;
GRANT execute ON dbms_aq TO ahmed;
GRANT execute ON dbms_aqadm TO ahmed;
CONN ahmed/ahmed;

2. Create a type:
CREATE TYPE iti_message_type as object (subject VARCHAR2(20), text VARCHAR2(20));

3. Create a queue table, create the queue, and start the queue:
EXECUTE dbms_aqadm.create_queue_table (queue_table => 'iti_queue_table',
                                       sort_list=> 'PRIORITY,ENQ_TIME',
                                       queue_payload_type => 'iti_message_type',
                                       multiple_consumers => TRUE);

EXECUTE dbms_aqadm.create_queue (queue_name  => 'iti_queue', queue_table => 'iti_queue_table');

EXECUTE dbms_aqadm.start_queue (queue_name  => 'iti_queue');

4. Create a subscriber on the queue:
DECLARE
  Subscriber Sys.Aq$_agent;
BEGIN
  Subscriber := sys.aq$_agent('ITI_IN_USER', NULL, NULL);
  DBMS_AQADM.ADD_SUBSCRIBER(queue_name => 'iti_queue', Subscriber => subscriber);
END;

5. Enqueue a message:
DECLARE
  enqueue_options 	dbms_aq.enqueue_options_t;
  message_properties 	dbms_aq.message_properties_t;
  subscribers 		dbms_aq.aq$_recipient_list_t;
  message 		iti_message_type;
  consumer		VARCHAR2(200);
  qname			VARCHAR2(200);
  msg_id		VARCHAR2(200);
BEGIN
  qname := 'ITI_QUEUE';
  consumer := 'ITI_IN_USER';
  subscribers(1) := SYS.AQ$_AGENT(consumer, null, null);
  message_properties.RECIPIENT_LIST := subscribers;
  message := iti_message_type('Some data here', 'More data here');
  msg_id := '1001';
  DBMS_AQ.ENQUEUE (	queue_name => qname,
			enqueue_options => enqueue_options,
			message_properties => message_properties,
			payload => message,
			msgid => msg_id );
  COMMIT;
END;

6. Dequeue the message. Note that if nothing is in the queue, the command will wait until a message is queued.
set serveroutput on;

DECLARE
  dequeue_options 	dbms_aq.dequeue_options_t;
  message_properties 	dbms_aq.message_properties_t;
  subscribers 		dbms_aq.aq$_recipient_list_t;
  message 		iti_message_type;
  qname			VARCHAR2(200);
  msg_id		VARCHAR2(200);
  consumer		VARCHAR2(200);
BEGIN
  qname := 'ITI_QUEUE';
  consumer := 'ITI_IN_USER';
  dequeue_options.consumer_name := consumer;
  dequeue_options.navigation := DBMS_AQ.FIRST_MESSAGE;
  --dequeue_options.wait := DBMS_AQ.NO_WAIT;
  dequeue_options.dequeue_mode := DBMS_AQ.REMOVE;
  DBMS_AQ.DEQUEUE (	queue_name => qname,
			dequeue_options => dequeue_options,
			message_properties => message_properties,
			payload => message,
			msgid => msg_id );
  dbms_output.put_line(message.subject);
  COMMIT;
END;

7. Here are some useful queries and commands:
-- Just display the objects created after queue creation
SELECT object_name, object_type FROM user_objects ORDER BY 2,1;

-- See how many messages are currently in the queue table
SELECT COUNT(1) FROM aq$iti_queue_table;

-- Query the queue table for actual message content
SELECT queue, msg_id, consumer_name, user_data FROM aq$iti_queue_table;

-- Drop the queue table and all related objects
EXECUTE DBMS_AQADM.DROP_QUEUE_TABLE(queue_table=>'ITI_QUEUE_TABLE',force=>TRUE);

Applicable Versions
Oracle Database 9i
Oracle Database 10g
Ahmed Aboulnaga

.com .com