worker.h File Reference


Detailed Description

CDiameterPeer Worker Procedures.

Author:
Dragos Vingarzan vingarzan -at- fokus dot fraunhofer dot de

Definition in file worker.h.

#include "peer.h"
#include "diameter.h"
#include "utils.h"

Go to the source code of this file.

Data Structures

struct  _task_t
 task element More...
struct  task_queue_t
 task queue More...
struct  _cdp_cb_t
 callback element for message processing More...
struct  cdp_cb_list_t
 list of callback elements for message processing More...

Typedefs

typedef int(*) worker_init_function (int rank)
 function to be called on worker initialization
typedef _task_t task_t
 task element
typedef int(*) cdp_cb_f (peer *p, AAAMessage *msg, void *ptr)
 callback function to be called on message processing
typedef _cdp_cb_t cdp_cb_t
 callback element for message processing

Functions

void worker_init ()
 Initializes the worker structures, like the task queue.
void worker_destroy ()
 Destroys the worker structures.
int cb_add (cdp_cb_f cb, void *ptr)
void cb_remove (cdp_cb_t *cb)
int put_task (peer *p, AAAMessage *msg)
 Adds a message as a task to the task queue.
task_t take_task ()
 Remove and return the first task from the queue (FIFO).
void worker_poison_queue ()
 Poisons the worker queue.
void worker_process (int id)
 This is the main worker process.


Typedef Documentation

typedef int(*) worker_init_function(int rank)

function to be called on worker initialization

Definition at line 64 of file worker.h.

typedef struct _task_t task_t

task element

typedef int(*) cdp_cb_f(peer *p, AAAMessage *msg, void *ptr)

callback function to be called on message processing

Definition at line 84 of file worker.h.

typedef struct _cdp_cb_t cdp_cb_t

callback element for message processing


Function Documentation

void worker_init (  ) 

Initializes the worker structures, like the task queue.

Definition at line 126 of file worker.c.

References callbacks, cdp_lock_get(), cdp_semun_init, config, task_queue_t::empty, task_queue_t::end, errno, task_queue_t::full, cdp_cb_list_t::head, task_queue_t::lock, LOG_NO_MEM, task_queue_t::max, task_queue_t::queue, dp_config::queue_length, task_queue_t::start, cdp_cb_list_t::tail, and tasks.

Referenced by diameter_peer_init().

00127 {
00128     tasks = shm_malloc(sizeof(task_queue_t));
00129     
00130     tasks->lock = lock_alloc();
00131     tasks->lock = lock_init(tasks->lock);
00132     
00133     tasks->empty = semget(IPC_PRIVATE,1,0666 | IPC_CREAT );
00134     if (tasks->empty==-1){
00135         LOG(L_ERR,"ERROR:worker_init(): Error creating semaphore for empty queue > %s\n",strerror(errno));
00136     }else
00137         semctl(tasks->empty, 0, SETVAL, cdp_semun_init );
00138     tasks->full = semget(IPC_PRIVATE,1, 0666 | IPC_CREAT );
00139     if (tasks->full==-1){
00140         LOG(L_ERR,"ERROR:worker_init(): Error creating semaphore for full queue > %s\n",strerror(errno));
00141     }else
00142         semctl(tasks->full, 0, SETVAL, cdp_semun_init);
00143     
00144     tasks->start = 0;
00145     tasks->end = 0;
00146     tasks->max = config->queue_length;
00147     tasks->queue = shm_malloc(tasks->max*sizeof(task_t));
00148     if (!tasks->queue) LOG_NO_MEM("shm",tasks->max*sizeof(task_t));
00149     memset(tasks->queue,0,tasks->max*sizeof(task_t));
00150         
00151     callbacks = shm_malloc(sizeof(cdp_cb_list_t));
00152     callbacks->head = 0; 
00153     callbacks->tail = 0;
00154     
00155     cdp_lock_get(tasks->empty);
00156 //  lock_release(tasks->full);  
00157 }

void worker_destroy (  ) 

Destroys the worker structures.

Definition at line 162 of file worker.c.

References AAAFreeMessage(), callbacks, cb_remove(), cdp_semun_destroy, task_queue_t::empty, task_queue_t::full, cdp_cb_list_t::head, task_queue_t::lock, task_queue_t::max, _task_t::msg, task_queue_t::queue, and tasks.

Referenced by diameter_peer_destroy().

00163 {
00164     int i;
00165 //  LOG(L_CRIT,"-1-\n");
00166 /*  lock_get(tasks->lock);*/
00167     for(i=0;i<tasks->max;i++){
00168         if (tasks->queue[i].msg) AAAFreeMessage(&(tasks->queue[i].msg));
00169     }
00170 //  LOG(L_CRIT,"-2-\n");    
00171     shm_free(tasks->queue);
00172     lock_destroy(tasks->lock);
00173     lock_dealloc((void*)tasks->lock);
00174 //  LOG(L_CRIT,"-3-\n");    
00175     
00176     //lock_release(tasks->empty);
00177     semctl(tasks->empty, 0, IPC_RMID, cdp_semun_destroy);
00178 //  LOG(L_CRIT,"-4-\n");    
00179     
00180     semctl(tasks->full, 0, IPC_RMID, cdp_semun_destroy);
00181     
00182     shm_free(tasks);
00183     
00184     while(callbacks->head)
00185         cb_remove(callbacks->head);
00186     shm_free(callbacks);
00187 }

int cb_add ( cdp_cb_f  cb,
void *  ptr 
)

Definition at line 190 of file worker.c.

References callbacks, _cdp_cb_t::cb, cdp_cb_list_t::head, LOG_NO_MEM, _cdp_cb_t::next, _cdp_cb_t::prev, _cdp_cb_t::ptr, and cdp_cb_list_t::tail.

Referenced by diameter_peer_init().

00191 {
00192     cdp_cb_t *x;
00193     x = shm_malloc(sizeof(cdp_cb_t));
00194     if (!x){
00195         LOG_NO_MEM("shm",sizeof(cdp_cb_t));
00196         return 0;
00197     }
00198     x->cb = cb;
00199     x->ptr = shm_malloc(sizeof(void*));
00200     if (!x->ptr){
00201         LOG_NO_MEM("shm",sizeof(void*));
00202         return 0;
00203     }
00204     *(x->ptr) = ptr;
00205     x->next = 0;
00206     x->prev = callbacks->tail;
00207     if (callbacks->tail) callbacks->tail->next = x;
00208     callbacks->tail = x;
00209     if (!callbacks->head) callbacks->head = x;
00210     return 1;   
00211 }

void cb_remove ( cdp_cb_t cb  ) 

Definition at line 214 of file worker.c.

References callbacks, cdp_cb_list_t::head, _cdp_cb_t::next, _cdp_cb_t::prev, _cdp_cb_t::ptr, and cdp_cb_list_t::tail.

Referenced by worker_destroy().

00215 {
00216     cdp_cb_t *x;
00217     x = callbacks->head;
00218     while(x && x!=cb) x = x->next;
00219     if (!x) return;
00220     if (x->prev) x->prev->next = x->next;
00221     else callbacks->head = x->next;
00222     if (x->next) x->next->prev = x->prev;
00223     else callbacks->tail = x->prev;
00224     
00225     if (x->ptr) shm_free(x->ptr);
00226     shm_free(x);
00227 }

int put_task ( peer p,
AAAMessage msg 
)

Adds a message as a task to the task queue.

This blocks if the task queue is full, untill there is space.

Parameters:
p - the peer that the message was received from
msg - the message
Returns:
1 on success, 0 on failure (eg. shutdown in progress)

Definition at line 236 of file worker.c.

References cdp_lock_get(), cdp_lock_release(), task_queue_t::empty, task_queue_t::end, task_queue_t::full, task_queue_t::lock, task_queue_t::max, _task_t::msg, _task_t::p, task_queue_t::queue, shutdownx, task_queue_t::start, and tasks.

Referenced by Rcv_Process().

00237 {
00238 //  LOG(L_CRIT,"+1+\n");
00239     lock_get(tasks->lock);
00240 //  LOG(L_CRIT,"+2+\n");
00241     while ((tasks->end+1)%tasks->max == tasks->start){
00242 //      LOG(L_CRIT,"+3+\n");
00243         lock_release(tasks->lock);
00244 //      LOG(L_CRIT,"+4+\n");
00245         if (*shutdownx) {
00246             cdp_lock_release(tasks->full);
00247             return 0;
00248         }
00249 //      LOG(L_ERR,"+");
00250         cdp_lock_get(tasks->full);
00251 //      LOG(L_CRIT,"+5+\n");
00252         lock_get(tasks->lock);
00253     }
00254     tasks->queue[tasks->end].p = p;
00255     tasks->queue[tasks->end].msg = msg;
00256     tasks->end = (tasks->end+1) % tasks->max;
00257     cdp_lock_release(tasks->empty);
00258     lock_release(tasks->lock);
00259     return 1;
00260 }

task_t take_task (  ) 

Remove and return the first task from the queue (FIFO).

This blocks until there is something in the queue.

Returns:
the first task from the queue or an empty task on error (eg. shutdown in progress)

Definition at line 267 of file worker.c.

References cdp_lock_get(), cdp_lock_release(), task_queue_t::empty, task_queue_t::end, task_queue_t::full, task_queue_t::lock, task_queue_t::max, _task_t::msg, task_queue_t::queue, shutdownx, task_queue_t::start, and tasks.

Referenced by worker_process().

00268 {
00269     task_t t={0,0};
00270 //  LOG(L_CRIT,"-1-\n");
00271     lock_get(tasks->lock);
00272 //  LOG(L_CRIT,"-2-\n");
00273     while(tasks->start == tasks->end){
00274 //      LOG(L_CRIT,"-3-\n");
00275         lock_release(tasks->lock);
00276 //      LOG(L_CRIT,"-4-\n");
00277         if (*shutdownx) {
00278             cdp_lock_release(tasks->empty);
00279             return t;
00280         }
00281 //      LOG(L_ERR,"-");
00282         cdp_lock_get(tasks->empty);
00283 //      LOG(L_CRIT,"-5-\n");
00284         
00285         lock_get(tasks->lock);
00286 //      LOG(L_CRIT,"-6-\n");
00287     }
00288 //  LOG(L_CRIT,"-7-\n");
00289     
00290     t = tasks->queue[tasks->start];
00291     tasks->queue[tasks->start].msg = 0;
00292     tasks->start = (tasks->start+1) % tasks->max;
00293     cdp_lock_release(tasks->full);
00294     lock_release(tasks->lock);
00295     return t;
00296 }

void worker_poison_queue (  ) 

Poisons the worker queue.

Actually it just releases the task queue locks so that the workers get to evaluate if a shutdown is in process and exit.

Definition at line 303 of file worker.c.

References cdp_lock_release(), task_queue_t::empty, and tasks.

Referenced by diameter_peer_destroy(), and worker_process().

00304 {
00305 //  int i;
00306 //  for(i=0;i<config->workers;i++)
00307     cdp_lock_release(tasks->empty);
00308 }

void worker_process ( int  id  ) 

This is the main worker process.

Takes tasks from the queue in a loop and processes them by calling the registered callbacks.

Parameters:
id - id of the worker
Returns:
never, exits on shutdown.

Definition at line 316 of file worker.c.

References AAAFreeMessage(), callbacks, _cdp_cb_t::cb, dp_del_pid(), task_queue_t::end, cdp_cb_list_t::head, is_req, memlog, _task_t::msg, _cdp_cb_t::next, _task_t::p, _cdp_cb_t::ptr, shutdownx, task_queue_t::start, take_task(), tasks, and worker_poison_queue().

Referenced by diameter_peer_start().

00317 {
00318     task_t t;
00319     cdp_cb_t *cb;
00320     int r;
00321     LOG(L_INFO,"INFO:[%d] Worker process started...\n",id); 
00322     /* init the application level for this child */
00323     while(1){
00324         if (shutdownx&&(*shutdownx)) break;
00325         t = take_task();
00326         if (!t.msg) {
00327             if (shutdownx&&(*shutdownx)) break;
00328             LOG(L_INFO,"INFO:worker_process(): [%d] got empty task Q(%d/%d)\n",id,tasks->start,tasks->end);
00329             continue;
00330         }       
00331         LOG(L_DBG,"DBG:worker_process(): [%d] got task Q(%d/%d)\n",id,tasks->start,tasks->end);
00332         r = is_req(t.msg);
00333         for(cb = callbacks->head;cb;cb = cb->next)
00334             (*(cb->cb))(t.p,t.msg,*(cb->ptr));
00335         
00336         if (r){
00337             AAAFreeMessage(&(t.msg));
00338         }else{
00339             /* will be freed by the user in upper api */
00340             /*AAAFreeMessage(&(t.msg));*/
00341         }
00342     }
00343     worker_poison_queue();
00344     LOG(L_INFO,"INFO:[%d]... Worker process finished\n",id);    
00345 #ifdef CDP_FOR_SER
00346 #else
00347     #ifdef PKG_MALLOC
00348         LOG(memlog, "Worker[%d] Memory status (pkg):\n",id);
00349         //pkg_status();
00350         #ifdef pkg_sums
00351             pkg_sums();
00352         #endif 
00353     #endif
00354     dp_del_pid(getpid());   
00355 #endif
00356     exit(0);
00357 }


Generated on Thu Oct 23 04:14:40 2008 for Open IMS Core CSCFs by  doxygen 1.5.2