Oracle introduced powerful queuing mechanisms where messages can be
exchanged between different programs. They called it Advanced Queuing AQ.
Exchanging messages and communicating between different application modules is a key
functionally becoming important as soon as we leave the database servers SQL and PL/SQL
programming domain.
If we have to do different jobs simultaneously, for instance to
communicate with external systems and evaluate complex queries at the same time, it might
be a design decision to uncouple "request for service" and "supply for service". In one
case an application module deals with all external systems and requests a certain query
by posting a message on a queue. In the other case an application gets the message,
performs the query and supplies the result back on the queue in between.
While using Oracle Advanced Queuing we do not have to install
additional middle-ware dealing with inter-process communication and transaction
monitoring. We can directly use an existing and well-known database and can benefit from
given functionalities like online backup or transaction processing. Alternatively other
simple and non queue based messaging techniques can be used like the Java RMI, which is
limited to Java. Or more complex approaches like CORBA, where the complexity lies more in
design and conceptual decisions.
The Point-to-Point Model
In a simple system we could think about two applications that like to
use one or more queues together. This approach is called the Point-to-Point
Model:
The process to put messages on a queue is called
enqueue or send whereas the opposite is called dequeue. There may be more
than two consumer applications but a single message can only be dequeued once. Consumers
may browse the queue without dequeuing messages.
In a more advanced system we may like to have different
applications that publish messages and others that subscribe to certain queues from where
they like to consume messages. There is no more strict connection between applications.
This is called the Publish-Subscribe Model and is covered by Part II of these
articles.
Queue Creation
For database queue creation we should have an AQ
administrator user with the required privileges. It can be used as object owner too. All
created queues and message object types will belong to this administrator. Afterwards we
can create as many queue users as we like or grant the required privileges to existing
users who want to access the queues. To avoid maintaining privileges for every single
user, we will create two roles in this sample. One for the AQ administrator and another
for all AQ users.
In these samples the administrator role is called
"my_aq_adm_role" and the corresponding user "aqadm". We grant Oracle's AQ role
"aq_administrator_role" to our administrator role.
CREATE ROLE my_aq_adm_role;
GRANT CONNECT, RESOURCE, aq_administrator_role
TO my_aq_adm_role;
The user role is called "my_aq_user_role" and the corresponding sample
user "aquser". Here we grant Oracle's AQ role "aq_user_role" and additional system
privileges required for basic operations.
CREATE ROLE my_aq_user_role;
GRANT CREATE SESSION, aq_user_role TO my_aq_user_role;
EXEC DBMS_AQADM.GRANT_SYSTEM_PRIVILEGE(
privilege => 'ENQUEUE_ANY',
grantee => 'my_aq_user_role',
admin_option => FALSE);
EXEC DBMS_AQADM.GRANT_SYSTEM_PRIVILEGE(
privilege => 'DEQUEUE_ANY',
grantee => 'my_aq_user_role',
admin_option => FALSE);
Now we're ready to create the AQ administration user:
CREATE USER aqadm IDENTIFIED BY aqadm
DEFAULT TABLESPACE tab
TEMPORARY TABLESPACE temp;
GRANT my_aq_adm_role TO aqadm;
And the queue user for our samples:
CREATE USER aquser IDENTIFIED BY aquser
DEFAULT TABLESPACE tab
TEMPORARY TABLESPACE temp;
GRANT my_aq_user_role TO aquser;
For our first queue we will use an object type instead of a base data
type like NUMBER or VARCHAR2 as payload. The payload is the data type and
structure used for every message. To use an object type is more realistic than sending
single numbers or strings around but a bit more complicated. In a message we might have
an identification number, a title and a message text or content.
It's time now to change to the AQ administration user where the
previous operations could be performed by any DBA.
CONNECT aqadm/aqadm;
CREATE TYPE queue_message_type AS OBJECT(
no NUMBER,
title VARCHAR2(30),
text VARCHAR2(2000) );
/
GRANT EXECUTE ON queue_message_type TO my_aq_user_role;
Let's create a queue called "message_queue" with a corresponding queue
table "queue_message_table". We start the queue so that it can be used from now on.
EXEC DBMS_AQADM.CREATE_QUEUE_TABLE(
queue_table => 'queue_message_table',
queue_payload_type =>
aqadm.queue_message_type');
EXEC DBMS_AQADM.CREATE_QUEUE(
queue_name => 'message_queue',
queue_table => 'queue_message_table');
EXEC DBMS_AQADM.START_QUEUE(
queue_name => 'message_queue');
Now we have a complete queue that is ready to use. All the
administrative PL/SQL operations shown are available in Java too. However it's a handy
idea to do these steps in a SQL shell.
Using a Queue
with PL/SQL in a Point-to-Point Model
To work with queues we connect with our AQ sample user:
CONNECT aquser/aquser;
Now we like to enqueue a message. We have to name the queue, give some
default options and pass our message "my_message" as payload, which is made by our own
defined message. Remember, we live in a transactional environment. We must issue a final
COMMIT.
CONNECT aquser/aquser;
DECLARE
queue_options
DBMS_AQ.ENQUEUE_OPTIONS_T;
message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
message_id
RAW(16);
my_message
aqadm.queue_message_type;
BEGIN
my_message := aqadm.queue_message_type(
1,
'This is a sample
message',
'This message has been
posted on ' ||
TO_CHAR(SYSDATE,'DD.MM.YYYY HH24:MI:SS'));
DBMS_AQ.ENQUEUE(
queue_name => 'aqadm.message_queue',
enqueue_options => queue_options,
message_properties =>
message_properties,
payload => my_message,
msgid => message_id);
COMMIT;
END;
/
We can dequeue the recently enqueued message. The
DBMS_AQ.DEQUEUE statement waits until there is a message to dequeue. The shown
code looks very similar to the one above.
SET SERVEROUTPUT ON;
DECLARE
queue_options
DBMS_AQ.DEQUEUE_OPTIONS_T;
message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
message_id
RAW(2000);
my_message
aqadm.queue_message_type;
BEGIN
DBMS_AQ.DEQUEUE(
queue_name => 'aqadm.message_queue',
dequeue_options => queue_options,
message_properties =>
message_properties,
payload => my_message,
msgid => message_id );
COMMIT;
DBMS_OUTPUT.PUT_LINE(
'Dequeued no: ' || my_message.no);
DBMS_OUTPUT.PUT_LINE(
'Dequeued title: ' || my_message.title);
DBMS_OUTPUT.PUT_LINE(
'Dequeued text: ' || my_message.text);
END;
/
The PL/SQL samples were easy and straightforward. Not a lot to do.
Every kind of application and programming environment could use it like this, assuming
they are able to connect to the database and execute PL/SQL stored procedures. However,
it is more convenient and is better practise to use the programming languages' own way to
deal with messages. That's the point where we should have a look what Java offers...
Introducing the Java
Samples
While using Java, it's not only the different programming syntax we use
but also the way we design programs. We leave the procedural area and enter into the
object oriented world. In these samples we use an abstract base class
AQApplication to hide all the steps we must perform before we are able to start
working with queues.
This UML diagram shows our sample class AQDequeue derived from
AQApplication. We will focus down to those statements we must know about, in order
to work with queues. It's not necessarily required to understand the whole object
oriented concept.
Using
JPublisher to Prepare an Oracle Object Type for Java
When we created our queue we created also an Oracle object type to be
used for messages. Because we cannot use Oracle data types in Java we must have a Java
class to fill the dequeued message in. JPublisher can do this job for us. It connects to
the database and creates a Java class matching the specified Oracle object type.
set CLASSPATH=
D:\Oracle\Product\8.1.7\jdbc\lib\classes12.zip;
D:\Oracle\Product\8.1.7\sqlj\lib\translator.zip;
D:\Oracle\Product\8.1.7\sqlj\lib\runtime.zip
jpub -user=aqadm/aqadm
-sql=QUEUE_MESSAGE_TYPE
-usertypes=oracle
-methods=false
-user=aqadm/aqadm
Object owner and password to which the to be translated objects
belong.
-sql=QUEUE_MESSAGE_TYPE
One or more object types and packages that you want JPublisher to
translate. Use commas for separation.
-usertypes=oracle
The oracle mapping maps Oracle datatypes to their corresponding Java
classes.
-methods=false
If true, JPublisher generates SQLJ classes for PL/SQL packages and
wrapper methods for methods in packages and object types. SQLJ wraps static SQL
operations in Java code. We do not use SQLJ here, thus we pass false.
JPublisher connects to the database and creates a Java class
QUEUE_MESSAGE_TYPE for us. We can use this class now as a Java data type to receive
messages posted by another Java or PL/SQL client.
Dequeue a
Point-to-Point Message with Oracle's
Native AQ Interface for Java
We use the previously shown PL/SQL sample and replace the dequeue
functionality by Java to show the similarities. Additionally it can be used to show
message exchanging between PL/SQL and Java. Enqueuing in Java is very similar again to
dequeuing and is therefore not shown here.
Before we can start using Oracle's Native AQ Interface for Java we must
connect to the database via JDBC. As a connection string we use a host name for HOST and
an Oracle database SID for SID. Between these two values the listener port address must
be specified, e.g. 1521.
Class.forName(
"oracle.jdbc.driver.OracleDriver");
aq.connection = DriverManager.getConnection(
"jdbc:oracle:thin:@HOST:1521:SID,
"aquser", "aquser");
aq.connection.setAutoCommit(false);
Afterwards we create a so-called AQ session passing the AQ
connection:
Class.forName("oracle.AQ.AQOracleDriver");
aq.session = AQDriverManager.createAQSession(aq.connection);
Now we're ready to get a reference to the queue we like to use. To do
so, we pass the queue owner and the queue name:
AQQueue queue = aq.session.getQueue(
"aqadm", "MESSAGE_QUEUE");
For dequeuing we create default options and pass them along with an
instance of JPublisher's created message data type QUEUE_MESSAGE_TYPE.
AQDequeueOption dequeueOption = new AQDequeueOption();
System.out.println("Waiting for message to dequeue...");
AQMessage message = ((AQOracleQueue)queue).dequeue(
dequeueOption,
QUEUE_MESSAGE_TYPE.getFactory());
To get the message content we convert the raw payload into our message
type.
AQObjectPayload payload = message.getObjectPayload();
QUEUE_MESSAGE_TYPE messageData =
(QUEUE_MESSAGE_TYPE) payload.getPayloadData();
aq.connection.commit();
System.out.println("Dequeued no: " +
messageData.getNo());
System.out.println("Dequeued title: " +
messageData.getTitle());
System.out.println("Dequeued text: " +
messageData.getText());
Like in PL/SQL, we need a final COMMIT.
Conclusion
Oracle Advanced Queuing is a powerful and rather simple way of working
with queues. The libraries available for Java offer a smart way to enqueue and dequeue
messages without too much programming overhead.
In more sophisticated projects Advanced Queuing's whole functionality
can be taken into account. Oracle's Advanced Queuing developer's guide exceeds thousand
pages. This might be seen as an indication that in advanced projects a reasonable amount
of time is required to understand the different concepts and possibilities. This should
not be seen as a disadvantage for Oracle's Advanced Queuing, the same is true for other
similar communication or queuing technologies.
Only essential issues and aspects have been covered by this article.
Part II of these articles will cover somewhat more sophisticated concepts such as
publishing and subscribing a message with Oracle's Java Message Service JMS Interface to
AQ.
Oracle Advanced Queuing AQ is a powerful queuing mechanism for message
exchanging between different applications. Part I of these articles introduced AQ and
explained how to create queues in the database, use PL/SQL in a Point-to-Point Model,
JPublisher and Oracle's Native AQ Interface for Java.
In Part II complete samples are available, showing all the required
statements, environment variables, imports, JAR files, error processing's along with
Windows batch files to compile and run etc.
The Publish-Subscribe Model
In a more sophisticated system we may like to have different
applications that publish messages and others that subscribe to certain queues from where
they like to consume messages. The Publish-Subscribe Model uses multi-consumer
queues:
Publisher applications propagate messages to queues which are called
topics here. These messages can be either addressed for specific applications or they
will be received by all destinations. Applications receiving messages are also called
agents. We talk about broadcast if the message can be consumed by all applications and
about multicast if a subscription is required for consummation.
To explain broadcast and multicast more clearly the following parallel
cases are often used: Broadcast is similar to radio and TV broadcasting received by
everybody. Multicast can be seen like a newspaper where you need a subscription. Many
people have a subscription but not everybody.
The Java Message Service JMS
Different enterprise messaging vendors lead by Sun Microsystems, Inc.
defined a common API for reliable and flexible message exchange in distributed systems
throughout an enterprise. Oracle is one of the companies that implemented JMS and decided
to do it by their own Advanced Queuing feature. Other companies implement JMS using
another technology. The underlying technology remains exchangeable and developers do not
need to learn always proprietary messaging API's.
We will use Oracle's JMS Interface to AQ that implements an interface
for Advanced Queuing.
Other Words for Same Things
If we talk about queues while using Advanced Queuing, we call the same
Topics if we are in the world of JMS. The same is true for enqueue and dequeue.
They're called Publish and Receive in JMS. Additionally applications are
often called Agents in JMS.
Queue Creation
Required database users creation and granting of needed privileges has
been described in Part I of these articles. We created an AQ administration user "aqadm"
and a AQ application user called "aquser" for the samples.
A queue table "multi_message_table" with a special object type
AQ$_JMS_OBJECT_MESSAGE will be created now. This object type is a reference only and does
not yet define the message structure. It gives us the freedom to define later the payload
of our messages we like to transfer. The payload is the data type and structure used for
every message.
Creating and starting the queue "multi_queue" works in the same way as
for the Point-to-Point connection, except that the parameter "multiple_consumers" is set
to true.
We use the AQ administration user "aqadm":
CONNECT aqadm/aqadm;
EXEC DBMS_AQADM.CREATE_QUEUE_TABLE(
queue_table => 'multi_message_table',
queue_payload_type => 'SYS.AQ$_JMS_OBJECT_MESSAGE',
multiple_consumers => TRUE);
EXEC DBMS_AQADM.CREATE_QUEUE(
queue_name => 'multi_queue',
queue_table => 'multi_message_table');
EXEC DBMS_AQADM.START_QUEUE(
queue_name => 'multi_queue');
Introducing the Java
Samples
When using Java we leave the procedural area and step into the object
oriented world. The abstract base class AQApplication offers all required steps we must
perform before we are able to start working with queues.
The UML diagram shows our sample classes AQJmsPublisher and
AQJmsSubscriber derived from AQApplication. They will be used to act as publishers and
subscribers.
Both classes instantiate the payload class AQJmsMultiQueueItem as
message data type.
We will focus down to those statements we must know about, in order to
work with queues. It's not necessarily required to understand the whole object oriented
concept.
Create Class for Message
Content
In our sample application the following class is used as payload for
message content. We are free in choosing the member variables and methods. However, the
class must implement the Serializable interface.
The Serializable interface in the java.io library is used for object
serialisation. Serialisation means, exchanging objects between programs on the same
machine and between remote computers. The objects are transferred via streams and
networks conserving their current states and data. They are restored by receivers, become
alive again and continue to work.
public class AQJmsMultiQueueItem implements Serializable {
private int _no;
private String _title;
private String _text;
public AQJmsMultiQueueItem(int no, String
title,
String text) {
_no = no;
_title = title;
_text = text;
}
public int getNo() { return _no; }
public String getTitle() { return _title; }
public String getText() { return _text; }
}
Publish a
Message with Oracle's JMS Interface to AQ
To establish a connection to a topic we need to create a connection
factory using JDBC. As connection string we use a host name for HOST and an Oracle
database SID for SID. Between these two values the listener port address must be
specified, e.g. 1521. We use a Properties object to pass the AQ user name and
password.
Properties info = new Properties();
info.put("aquser", "aquser");
TopicConnectionFactory
topicConnectionFactory =
AQjmsFactory.getTopicConnectionFactory(
"jdbc:oracle:thin:@HOST:1521:SID",info);
With the factory we can get now two things: An AQ topic connection and
an AQ topic session. We pass true for a transactional session and request client
acknowledges. This simply means that we like a transactional behaviour and that clients
perform ROLLBACKs and COMMITs.
aq.connection =
topicConnectionFactory.createTopicConnection(
"aquser", "aquser");
aq.session =
aq.connection.createTopicSession(
true,Session.CLIENT_ACKNOWLEDGE);
We can start the connection and create a publisher afterwards. We could
pass a topic to the publisher at the place where we pass null now. But without specifying
a topic here we can work with more than one topic using the same publisher. Afterwards we
get a reference to the topic we like to use now. The topic is owned by "aqadm" and is
called "MULTI_QUEUE".
aq.connection.start();
TopicPublisher publisher = aq.session.createPublisher(null);
Topic topic = ((AQjmsSession) aq.session).getTopic(
"aqadm", "MULTI_QUEUE");
Lets make an instance of the recently created payload class
AQJmsMultiQueueItem. This object is converted into a JMS object message.
AQJmsMultiQueueItem messageData = new AQJmsMultiQueueItem(
0,
"Published message title",
"This is the message text");
ObjectMessage objectMessage =
aq.session.createObjectMessage(
messageData);
We don't want to send and broadcast this message to everybody. Instead,
we prepare a list of agents for multicasting. These recipients are identified by
subscription names, e.g. "SUBSCRIPTION1" and "SUBSCRIPTION2". The null parameter could be
used to pass an address identifying agents on remote machines.
AQjmsAgent[] recipientList = new AQjmsAgent[2];
recipientList[0] = new AQjmsAgent("SUBSCRIPTION1", null);
recipientList[1] = new AQjmsAgent("SUBSCRIPTION2", null);
Finally we publish the message to the topic along with the recipient
list, commit the whole thing and close the session and connection.
((AQjmsTopicPublisher) publisher).publish(
topic, objectMessage, recipientList);
aq.session.commit();
aq.session.close();
aq.connection.close();
Subscribe to a Topic
and Receive a Message
We can create a subscriber agent on a topic and call it for example
"SUBSCRIPTION1". The null parameter could be replaced by a message selector that filters
some of the received messages. Here we like to get all.
TopicReceiver subscriber =
((AQjmsSession) aq.session).createTopicReceiver(
topic,
"SUBSCRIPTION1",
null);
When calling the receive method the program waits until a message
appears on the topic. After 60 seconds it runs into a time-out, the program continues and
the "objectMessage" stays null, if no message appears. The time-out is specified in
milliseconds.
System.out.println(
"Waiting 60 seconds for message");
ObjectMessage objectMessage =
(ObjectMessage) subscriber.receive(60000);
We read back our payload stored in the object message.
if (objectMessage != null) {
AQJmsMultiQueueItem messageData = (AQJmsMultiQueueItem)
objectMessage.getObject();
System.out.println("Received no: " +
messageData.getNo());
System.out.println("Received title: " +
messageData.getTitle());
System.out.println("Received text: " +
messageData.getText());
}
Again we need a final commit and we close the session and
connection.
aq.session.commit();
aq.session.close();
aq.connection.close();
Some Words About Persistence
The queues and all messages inside are persistent. Persistency means
that sending and receiving messages is a transaction controlled operation. The familiar
statements ROLLBACK and COMMIT can do this. Messages survive even system crashes. We may
use our samples to send a message, shutdown the database, restart it and receive the
message.
Oracle Message Broker OMB
Outlook
While working with Advanced Queuing and JMS we often come across the
Oracle Message Broker OMB in literature and in browsing the web. The question arises if
we need OMB and for what purpose it is.
The Oracle Message Broker provides a platform-independent messaging
mechanism. The complexity of different underlying messaging technologies should be
hidden. The Java Message Service JMS is the foundation of the message broker. Many
different drivers from several vendors are supported. Advanced Queuing is one of them. In
these articles we use directly Native AQ and JMS without any message broker.
If we need to connect many different platforms the complexity could be
reduced by using a common interface and broker mechanism. To describe OMB is not part of
this article. See for instance "Oracle Message Broker Administration Guide" for
details.
Conclusion and Prospects
Oracle Advanced Queuing is a rather simple but powerful way to work
with messages and PL/SQL, Native AQ or the Java Message Service JMS. Both articles
Parts I and II speak only about the main functionality and the essential operations that
are required to produce functioning programs. There are a lot more features available
such as message prioritisation, message grouping, rule-based subscription, message
scheduling, message histories and many more ...
In distributed systems the Java Message Service JMS can be a good
decision for a platform independent messaging implementation. On different connected
platforms, several JMS implementations can be used and can even be replaced if necessary.
Using Oracle's JMS Interface to AQ on nodes where an Oracle database server is already
installed can be very efficient and cost effective. No third-party messaging solution
must be evaluated and because we are familiar with Oracle less learning effort is
required.
How to Use the Samples
For the samples Oracle 8i (8.1.7) and Java 2 (1.2) have been
chosen. These are currently the most common releases of both products. The samples are
for demonstration only, can be used with other Oracle and Java versions and integrated in
your own applications. You may download Java 2 from http://java.sun.com/j2se/
First of all, run the following script to install the DB part with
users and queues. Modify this file to match your own DB connection SID, user name and
password prior execution.
\init\install.bat
Dequeue a Point-to-Point message with Oracle's Native AQ Interface for
Java similar to the PL/SQL dequeue sample (Part I).
You may like to open the PL/SQL sample file \plsql\sample.sql
and enqueue and dequeue messages. Start several sqlplus shells and use them as
different clients. Copy the necessary statements into these shells to send and receive
messages between clients and see how it works.
Prepare and compile
Java samples (Part I + II)
To run the Java samples adopt all path strings in stub.bat,
compile.bat and run.bat within the \java sub directory. Some constants
at the top of AQApplication.java need to be changed to your own settings too.
-
Execute stub.bat to get a Java class matching the specified
Oracle object type.
-
Compile all Java sources using compile.bat.
Dequeue a Point-to-Point message with Oracle's Native AQ Interface for
Java similar to the PL/SQL dequeue sample (Part I).
Start rundequeue.bat to dequeue messages enqueued by PL/SQL.
Publish and Subscribe
a Message with JMS Interface to AQ (Part II)
Execute runpublisher that publishes:
message 0 to SUBSCRIPTION0
message 1 to SUBSCRIPTION1
message 2 to SUBSCRIPTION2
message 3 to SUBSCRIPTION1 and SUBSCRIPTION2
To get the messages run...
runsubscriber 0
runsubscriber 1
runsubscriber 2
To get message number 3 still waiting on SUBSCRIPTION1 and
SUBSCRIPTION2 run again...
runsubscriber 1
runsubscriber 2
Sample Files and Directories
\init
|
Folder including all DB initialization scripts for roles,
privileges, users and queues
|
\init\create_admin.sql
|
Create AQ administration user
|
\init\create_privilege.sql
|
Create all roles and grants privileges
|
\init\create_queue.sql
|
Create queues
|
\init\create_user.sql
|
Create AQ application user
|
\init\install.bat
|
Windows batch file to install the above listed SQL files
|
\plsql
|
|
\plsql\sample.sql
|
PL/SQL sample to enqueue and dequeue messages in a Point-to-Point
Model
|
\java
|
|
\java\AQApplication.java
|
Base class for all AQ applications
|
\java\AQDequeue.java
|
Sample class that dequeues messages using the Native AQ
Interface
|
\java\AQJmsMultiQueueItem.java
|
This is a sample message item class for the JMS Interface to AQ
|
\java\AQJmsPublisher.java
|
This is a sample class that publishes messages using the JMS
Interface to AQ
|
\java\AQJmsSubscriber.java
|
This is a sample message receiver class using the JMS Interface to
AQ
|
\java\stub.bat
|
Creates stub classes out of the database using JPublisher
|
\java\compile.bat
|
Compiles all Java files
|
\java\run.bat
|
Runs a Java class file that is passed as argument, used by the
following batch files...
|
\java\rundequeue.bat
|
Runs the Java Point-to-Point dequeue sample
|
\java\runpublisher.bat
|
Publishes messages in a Publish-Subscribe Model using Java
Message Service JMS
|
\java\runsubscriber.bat <id>
|
Subscribes to topics and receives previously published messages
with JMS
You may pass a subscription identification <id> to create
different subscribers. For our samples pass 0, 1 and 2!
The passed characters will be added to the word SUBSCRIPTION. For
instance if you pass 2 you get SUBSCRIPTION2 or for ABC you get SUBSCRIPTIONABC.
If you do not pass any identification you get SUBSCRIPTION0.
|
Java Message Service JMS
http://java.sun.com/products/jms
Oracle Documentation
-
Application Developer's Guide - Advanced Queuing
-
Supplied PL/SQL Packages Reference
-
Supplied Java Packages Reference
Sample Files as
ZIP Sample Files as
tar.gz Part I as
PDF Part II as
PDF
|