R. J. Brown2
In many real-time programs, it is necessary to share resources that are in short supply. To help manage these situations, a priority system is often established. When several contenders are competing for the same resource, the one with the highest priority gets the resource. When the resource again becomes available, the next priority contender gets it, etc. A simple first-in/first-out queue can be thought of as a priority queue where the time a contender is enqueued is his priority, and lower numbers have higher priority.
In practice, the resource being waited for may be a physical device, such as a printer; a logical device, such as a file, or a record of a file; the CPU itself; or a timer scheduler may be implemented by using the desired dispatch time as the priority, and having the dequeing operation wait until the time of day equals the dispatch time at the head of the queue.
A priority queueing scheme in a real-time system must be able to perform the following operations on the elements, or nodes, of the queue: determine the highest priority node, add a new node, and remove a node. Removing the highest priority node is a special case of the more general operation of removing a node anywhere in the queue. This is called preempting.
Most simple implementations for priority queues result in the time to perform at least one of the above operations being directly proportional to the size of the queue. For extremely large queues, this becomes unworkable. The problem is similar to sorting, and sorting can be done in time proportional to the logarithm of the number of elements, so we should be able to do as well for a priority queue.
In fact we can do a bit better than this. While the priority queue algorithm I have implemented performs in logarithmic time, it does not keep the queue completely sorted. It defers determining the second priority element until after the top priority element is removed. While this does not change the logarithmic factor, it makes each iteration go faster.
The representation for the queue is a binary tree with LEFT, RITE, and FATHER connections. Each element also contains a sort KEY, which may be interpreted as the priority. In addition, a DIST cell is used to indicate the minimum distance from that node to a leaf, which is a connection to no element, or a terminator. The FATHER connection is not used to maintain the ordering of the queue, but is used to allow rapid removal of any node in the queue. See the listing, lines 28-37. (Note: my structure declaring words, lines 3-26, together with examples of their use, are available on the East Coast Forth Board, (703) 442-8695, as the file STRUC.ARC, and will not be described further here.)
The WORD and DATA cells, lines 36-37, are used to contain dispatching information. WORD is the address of a Forth word to PERFORM when the node is dispatched, and DATA is a word of data, typically a pointer, that is pushed on the stack before PERFORMing the WORD.
The following priority queueing algorithm was first described by Clark Allen Crane in 1971. My implementation is an extension of a revised version of Crane's algorithm that is described by Donald Knuth, 1973. The interested reader is referred to Knuth's "The Art of Computer Programming, Vol 3, Sorting and Searching" for a complete description of the algorithm and an analysis of its performance.
The central concept behind Crane's algorithm is that the operations of inserting and removing an element from the queue may be viewed as merging two separate queues. Crane's algorithm keeps the highest priority node at the root of the tree, and the subtrees follow the same pattern. Thus to dispatch the head element of the queue, lines 103-106, the left and right subtrees are pruned from the root and merged back together, leaving the root out of the result. To insert an element on the queue, lines 98-101, that element is viewed as a little priority queue in its own right, of one element. This queue is merged with the original queue and the element is thereby inserted into the original queue. To remove a node from somewhere in the middle of the queue, i.e. to preempt it, lines 108-111, the node's LEFT and RITE subtrees are cut off and merged together, forming an intermediate result. Next, the preempted node is removed by using its FATHER pointer to cut it off of the element that points to it. Finally, the original queue, less the preempted node and its two subtrees, is merged with the intermediate tree described above.
The implementation in the listing has been tested on a 10 MHz 80286 with 1 wait state on memory access and 3.2 running under LMI PC/FORTH+ 3.1 for 10,000 iterations of an UNQUE on a randomly selected element of the queue, followed by an ENQUE using a randomly chosen priority, with the following results (the time is seen to be clearly logarithmic):
#nodes milisecs 10 7.4 20 9.3 50 11.9 100 13.8 200 15.5 500 17.9 1000 19.4 2000 20.7 5000 21.9 10000 23.1
The complete source code, as a standard Forth screen file, including the benchmark test, will be available on CompuServe's DDJ Forum, and also on Jerry Shifrin's East Coast Forth Board and Ray Duncan's LMI and West Coast Forth Boards.
1 ( Timer queue implementation rjb 16:06 07/21/86 ) 2 3 ( -------------- structure declaring words ---------------- ) 4 5 : <struc> CREATE , DOES> @ + ; ( 2nd generation defining word ) 6 7 ( 'struc' is used to create the defining word for a structure ) 8 : struc CREATE , ( org struc <struc> ; creates a structure ) 9 DOES> ( size <struc> <element> ; creates an element ) 10 DUP @ DUP >R ROT + SWAP ! ( update location counter ) 11 R> <struc> ; ( make word for element ) 12 13 ( 'array' is used to create an array of structures ) 14 : array CREATE ( #slots slot-size array <array> ; makes array ) 15 DUP , * ALLOT ( save slot-size and allocate array ) 16 DOES> ( subscript <array> -- &<array>element ) 17 DUP @ ROT * + WSIZE + ; ( return pointer to the slot ) 18 19 ( 'org' is useful for doing the 4th version of a C 'union' ) 20 : org ( n org <strucname> ; re-initializes location counter ) 21 BL WORD FIND NOT ABORT" Undefined! " >BODY ! ; 22 23 ( 'sizeof' is used for declaring structures of structures ) 24 : sizeof ( sizeof <strucname> -- n ; gets the size of a struc ) 25 BL WORD FIND NOT ABORT" Undefined " >BODY @ 26 STATE @ IF [COMPILE] LITERAL THEN ; IMMEDIATE 27 28 ( ---------------- timer queue entry ---------------- ) 29 30 0 struc tq ( a node in the timer queue ) 31 4 tq key ( the sort key: dispatch time ) 32 4 tq dist ( distance to nearest leaf ) 33 4 tq left ( pointer to left sub-tree ) 34 4 tq rite ( pointer to right sub-tree ) 35 4 tq father ( pointer to father of node ) 36 4 tq word ( word to execute at dispatch ) 37 4 tq data ( pointer to data for word ) 38 39 ( ---------------- timer queue implementetion --------------- ) 40 41 ( left! & rite! set the left and rite subtrees, respectively 42 of a node, called the father. The father pointer of the son 43 node is updated to point to the pointer, left or rite, that 44 points to the son, so that it may be cleared on an unque. ) 45 46 : left! DUP 0<> IF OVER left OVER father ! ( Father Son -- ) 47 THEN SWAP left ! ; 48 49 : rite! DUP 0<> IF OVER rite OVER father ! ( Father Son -- ) 50 THEN SWAP rite ! ; 51 52 : go-rt DUP rite @ >R DUP ROT rite! R> ; ( go dn rite side ) 53 54 : dist@ DUP IF dist @ THEN ; ( P -- P=nil ? 0 : dist @ ) 55 56 ( This is step M2: from Knuth p 619, sol'n to prob. 32, p 159 ) 57 : list-merge ( P Q R -- P Q R D ; merge priques P & Q, ) 58 BEGIN ( R is Roving pointer, D is Distance to near leaf ) 59 OVER 0= IF ( Q = nil ? ) 60 2 PICK dist@ EXIT THEN ( yes, D = P->dist; done! ) 61 2 PICK 0= IF ( P = nil ? ) 62 ROT DROP OVER SWAP ( yes, P = Q ) 63 2 PICK dist@ EXIT THEN ( yes, D = P->dist; done! ) 64 2 PICK key @ 2 PICK key @ ( P->key < Q->key ? ) 65 < IF ROT go-rt ROT ROT ELSE ( yes, P moves right ) 66 SWAP go-rt SWAP THEN ( no, Q moves right ) 67 AGAIN ; ( loop until one of the trees is eliminated ) 68 69 ( This is steps M3: and M4: from Knuth p 619 ) 70 : fix-dist ( P Q R D -- P ; fixes up distance to nearest leaf ) 71 BEGIN 72 OVER 0= IF 3DROP EXIT THEN ( R = nil ? yes, done! ) 73 ROT DROP OVER rite @ ROT ROT ( Q = R->rite ) 74 OVER left @ dist@ OVER < IF ( R->left->dist < D ? ) 75 DROP DUP left @ dist@ 1+ ( D = R->left->dist + 1 ) 76 OVER DUP left @ rite! ( R->rite = R->left ) 77 OVER 4 PICK left! ELSE ( R->left = P ) 78 1+ OVER 4 PICK rite! THEN ( D++; R->rite = P ) 79 DUP 2 PICK dist ! ( R->dist = D ) 80 >R ROT DROP SWAP DUP R> ( P = R; R = Q ) 81 AGAIN ; ( spin down the right sub-tree ) 82 83 ( tq-merge is Knuth's Algorithm M from p 619 ) 84 : tq-merge 0 list-merge fix-dist ; ( P Q -- P ; merge 2 tq's ) 85 86 ( the timer queue root pointer ) VARIABLE TQ 0 TQ ! 87 88 ( TQ! updates the father pointer in the first node, & sets TQ ) 89 : TQ! ( tq -- ; sets TQ and father of tq ) 90 DUP TQ ! ( set the timer queue head ) 91 DUP 0= IF DROP EXIT THEN ( empty? yes, done! ) 92 father TQ SWAP ! ; ( no, set father of top node ) 93 94 : ?waiting DUP father @ @ = ; ( tq -- flag ) 95 96 ( ------------ timer queue package entry points ------------- ) 97 98 : tq-enque ( &tq -- ; enques to timer ) 99 0 OVER left ! 0 OVER rite ! ( nullify both sub-trees ) 100 1 OVER dist ! ( distance to a leaf is 1 ) 101 TQ @ tq-merge TQ! ; ( merge new node with old queue ) 102 103 : tq-deque ( -- &tq ; deques from timer ) 104 TQ @ DUP 0= IF EXIT THEN ( returns nil on empty queue ) 105 DUP ( save head for answer ) 106 DUP left @ SWAP rite @ tq-merge TQ! ; ( merge remains ) 107 108 : tq-unque ( &tq -- &tq ; removes from timer ) 109 DUP ?waiting NOT IF EXIT THEN 0 OVER father @ ! ( cut ) 110 DUP left @ OVER rite @ TQ @ ( both subtrees of tq & TQ ) 111 tq-merge tq-merge TQ! ; ( paste it back together ) 112 <End-Of-File>