worker.c File Reference


Detailed Description

CDiameterPeer Worker Procedures.

This the process pool representation that is used for processing incoming messages.

Author:
Dragos Vingarzan vingarzan -at- fokus dot fraunhofer dot de

Definition in file worker.c.

#include <time.h>
#include <stdlib.h>
#include <sys/types.h>
#include <unistd.h>
#include <sys/ipc.h>
#include <sys/sem.h>
#include "utils.h"
#include "globals.h"
#include "config.h"
#include "worker.h"
#include "diameter_api.h"

Go to the source code of this file.

Data Structures

union  semun

Functions

int dp_add_pid (pid_t pid)
 Add a pid to the local process list.
void dp_del_pid (pid_t pid)
 Delete a pid from the process list.
static void cdp_lock_get (int sid)
 Gets the lock on a semaphore and blocks until it is available.
static void cdp_lock_release (int sid)
 Releases the lock on a sempahore.
void worker_init ()
 Initializes the worker structures, like the task queue.
void worker_destroy ()
 Destroys the worker structures.
int cb_add (cdp_cb_f cb, void *ptr)
void cb_remove (cdp_cb_t *cb)
int put_task (peer *p, AAAMessage *msg)
 Adds a message as a task to the task queue.
task_t take_task ()
 Remove and return the first task from the queue (FIFO).
void worker_poison_queue ()
 Poisons the worker queue.
void worker_process (int id)
 This is the main worker process.

Variables

dp_configconfig
 Configuration for this diameter peer.
task_queue_ttasks
 queue of tasks
cdp_cb_list_tcallbacks
 list of callbacks for message processing
sembuf cdp_sem_lock = { 0, -1, 0}
 sembuf structure to lock a semaphore
sembuf cdp_sem_unlock = { 0, +1, 0}
 sembuf structure to unlock a semaphore
semun cdp_semun_lock
semun cdp_semun_unlock
semun cdp_semun_init
semun cdp_semun_destroy


Function Documentation

int dp_add_pid ( pid_t  pid  )  [inline]

Add a pid to the local process list.

Parameters:
pid newly forked pid
Returns:
1 on success or 0 on error

Definition at line 95 of file diameter_peer.c.

References pid_list_head_t::head, LOG_NO_MEM, _pid_list_t::next, _pid_list_t::pid, pid_list, pid_list_lock, _pid_list_t::prev, and pid_list_head_t::tail.

00096 {
00097     pid_list_t *n;
00098     lock_get(pid_list_lock);
00099     n = shm_malloc(sizeof(pid_list_t));
00100     if (!n){
00101         LOG_NO_MEM("shm",sizeof(pid_list_t));
00102         lock_release(pid_list_lock);
00103         return 0;
00104     }
00105     n->pid = pid;
00106     n->next = 0;
00107     n->prev = pid_list->tail;
00108     if (!pid_list->head) pid_list->head = n;
00109     if (pid_list->tail) pid_list->tail->next = n;
00110     pid_list->tail = n;
00111     lock_release(pid_list_lock);
00112     return 1;
00113 }

void dp_del_pid ( pid_t  pid  )  [inline]

Delete a pid from the process list.

Parameters:
pid - the pid to remove

Definition at line 132 of file diameter_peer.c.

References pid_list_head_t::head, _pid_list_t::next, _pid_list_t::pid, pid_list, pid_list_lock, _pid_list_t::prev, and pid_list_head_t::tail.

00133 {   
00134     pid_list_t *i;
00135     lock_get(pid_list_lock);
00136     i = pid_list->head;
00137     if (!i) {
00138         lock_release(pid_list_lock);
00139         return;
00140     }
00141     while(i && i->pid!=pid) i = i->next;
00142     if (i){
00143         if (i->prev) i->prev->next = i->next;
00144         else pid_list->head = i->next;
00145         if (i->next) i->next->prev = i->prev;
00146         else pid_list->tail = i->prev;
00147         shm_free(i);
00148     }
00149     lock_release(pid_list_lock);
00150 }

static void cdp_lock_get ( int  sid  )  [inline, static]

Gets the lock on a semaphore and blocks until it is available.

This procedures does not consume CPU cycles as a busy-waiting would and it is used for blocking on the task queue without a big impact on performance.

Parameters:
sid - semaphore id
Returns:
when the sempahore is aquired or shutdown

Definition at line 98 of file worker.c.

References cdp_sem_lock, errno, and shutdownx.

Referenced by put_task(), take_task(), and worker_init().

00099 {
00100     if((semop(sid, &cdp_sem_lock, 1)) == -1)
00101     {
00102         if (shutdownx&&(*shutdownx)) return;
00103         LOG(L_INFO,"ERROR:cdp_lock_get(): Error on semop > %s\n",strerror(errno));
00104     }
00105 }

static void cdp_lock_release ( int  sid  )  [inline, static]

Releases the lock on a sempahore.

Parameters:
sid - the semaphore id

Definition at line 111 of file worker.c.

References cdp_semun_unlock, task_queue_t::end, errno, task_queue_t::full, shutdownx, task_queue_t::start, and tasks.

Referenced by put_task(), take_task(), and worker_poison_queue().

00112 {
00113     if( semctl(sid, 0, SETVAL, cdp_semun_unlock) == -1
00114     /*semop(sid, &cdp_sem_unlock, 1) == -1*/)
00115     {
00116         if (shutdownx&&(*shutdownx)) return;
00117         LOG(L_INFO,"ERROR:cdp_lock_release(): Error on semop %s > %d: %s Q[%2d/%2d]\n",
00118             sid==tasks->full?"full":"empty",errno,strerror(errno),
00119             tasks->start,tasks->end);
00120     }
00121 }

void worker_init (  ) 

Initializes the worker structures, like the task queue.

Definition at line 126 of file worker.c.

References callbacks, cdp_lock_get(), cdp_semun_init, config, task_queue_t::empty, task_queue_t::end, errno, task_queue_t::full, cdp_cb_list_t::head, task_queue_t::lock, LOG_NO_MEM, task_queue_t::max, task_queue_t::queue, dp_config::queue_length, task_queue_t::start, cdp_cb_list_t::tail, and tasks.

Referenced by diameter_peer_init().

00127 {
00128     tasks = shm_malloc(sizeof(task_queue_t));
00129     
00130     tasks->lock = lock_alloc();
00131     tasks->lock = lock_init(tasks->lock);
00132     
00133     tasks->empty = semget(IPC_PRIVATE,1,0666 | IPC_CREAT );
00134     if (tasks->empty==-1){
00135         LOG(L_ERR,"ERROR:worker_init(): Error creating semaphore for empty queue > %s\n",strerror(errno));
00136     }else
00137         semctl(tasks->empty, 0, SETVAL, cdp_semun_init );
00138     tasks->full = semget(IPC_PRIVATE,1, 0666 | IPC_CREAT );
00139     if (tasks->full==-1){
00140         LOG(L_ERR,"ERROR:worker_init(): Error creating semaphore for full queue > %s\n",strerror(errno));
00141     }else
00142         semctl(tasks->full, 0, SETVAL, cdp_semun_init);
00143     
00144     tasks->start = 0;
00145     tasks->end = 0;
00146     tasks->max = config->queue_length;
00147     tasks->queue = shm_malloc(tasks->max*sizeof(task_t));
00148     if (!tasks->queue) LOG_NO_MEM("shm",tasks->max*sizeof(task_t));
00149     memset(tasks->queue,0,tasks->max*sizeof(task_t));
00150         
00151     callbacks = shm_malloc(sizeof(cdp_cb_list_t));
00152     callbacks->head = 0; 
00153     callbacks->tail = 0;
00154     
00155     cdp_lock_get(tasks->empty);
00156 //  lock_release(tasks->full);  
00157 }

void worker_destroy (  ) 

Destroys the worker structures.

Definition at line 162 of file worker.c.

References AAAFreeMessage(), callbacks, cb_remove(), cdp_semun_destroy, task_queue_t::empty, task_queue_t::full, cdp_cb_list_t::head, task_queue_t::lock, task_queue_t::max, _task_t::msg, task_queue_t::queue, and tasks.

Referenced by diameter_peer_destroy().

00163 {
00164     int i;
00165 //  LOG(L_CRIT,"-1-\n");
00166 /*  lock_get(tasks->lock);*/
00167     for(i=0;i<tasks->max;i++){
00168         if (tasks->queue[i].msg) AAAFreeMessage(&(tasks->queue[i].msg));
00169     }
00170 //  LOG(L_CRIT,"-2-\n");    
00171     shm_free(tasks->queue);
00172     lock_destroy(tasks->lock);
00173     lock_dealloc((void*)tasks->lock);
00174 //  LOG(L_CRIT,"-3-\n");    
00175     
00176     //lock_release(tasks->empty);
00177     semctl(tasks->empty, 0, IPC_RMID, cdp_semun_destroy);
00178 //  LOG(L_CRIT,"-4-\n");    
00179     
00180     semctl(tasks->full, 0, IPC_RMID, cdp_semun_destroy);
00181     
00182     shm_free(tasks);
00183     
00184     while(callbacks->head)
00185         cb_remove(callbacks->head);
00186     shm_free(callbacks);
00187 }

int cb_add ( cdp_cb_f  cb,
void *  ptr 
)

Definition at line 190 of file worker.c.

References callbacks, _cdp_cb_t::cb, cdp_cb_list_t::head, LOG_NO_MEM, _cdp_cb_t::next, _cdp_cb_t::prev, _cdp_cb_t::ptr, and cdp_cb_list_t::tail.

Referenced by diameter_peer_init().

00191 {
00192     cdp_cb_t *x;
00193     x = shm_malloc(sizeof(cdp_cb_t));
00194     if (!x){
00195         LOG_NO_MEM("shm",sizeof(cdp_cb_t));
00196         return 0;
00197     }
00198     x->cb = cb;
00199     x->ptr = shm_malloc(sizeof(void*));
00200     if (!x->ptr){
00201         LOG_NO_MEM("shm",sizeof(void*));
00202         return 0;
00203     }
00204     *(x->ptr) = ptr;
00205     x->next = 0;
00206     x->prev = callbacks->tail;
00207     if (callbacks->tail) callbacks->tail->next = x;
00208     callbacks->tail = x;
00209     if (!callbacks->head) callbacks->head = x;
00210     return 1;   
00211 }

void cb_remove ( cdp_cb_t cb  ) 

Definition at line 214 of file worker.c.

References callbacks, cdp_cb_list_t::head, _cdp_cb_t::next, _cdp_cb_t::prev, _cdp_cb_t::ptr, and cdp_cb_list_t::tail.

Referenced by worker_destroy().

00215 {
00216     cdp_cb_t *x;
00217     x = callbacks->head;
00218     while(x && x!=cb) x = x->next;
00219     if (!x) return;
00220     if (x->prev) x->prev->next = x->next;
00221     else callbacks->head = x->next;
00222     if (x->next) x->next->prev = x->prev;
00223     else callbacks->tail = x->prev;
00224     
00225     if (x->ptr) shm_free(x->ptr);
00226     shm_free(x);
00227 }

int put_task ( peer p,
AAAMessage msg 
)

Adds a message as a task to the task queue.

This blocks if the task queue is full, untill there is space.

Parameters:
p - the peer that the message was received from
msg - the message
Returns:
1 on success, 0 on failure (eg. shutdown in progress)

Definition at line 236 of file worker.c.

References cdp_lock_get(), cdp_lock_release(), task_queue_t::empty, task_queue_t::end, task_queue_t::full, task_queue_t::lock, task_queue_t::max, _task_t::msg, _task_t::p, task_queue_t::queue, shutdownx, task_queue_t::start, and tasks.

Referenced by Rcv_Process().

00237 {
00238 //  LOG(L_CRIT,"+1+\n");
00239     lock_get(tasks->lock);
00240 //  LOG(L_CRIT,"+2+\n");
00241     while ((tasks->end+1)%tasks->max == tasks->start){
00242 //      LOG(L_CRIT,"+3+\n");
00243         lock_release(tasks->lock);
00244 //      LOG(L_CRIT,"+4+\n");
00245         if (*shutdownx) {
00246             cdp_lock_release(tasks->full);
00247             return 0;
00248         }
00249 //      LOG(L_ERR,"+");
00250         cdp_lock_get(tasks->full);
00251 //      LOG(L_CRIT,"+5+\n");
00252         lock_get(tasks->lock);
00253     }
00254     tasks->queue[tasks->end].p = p;
00255     tasks->queue[tasks->end].msg = msg;
00256     tasks->end = (tasks->end+1) % tasks->max;
00257     cdp_lock_release(tasks->empty);
00258     lock_release(tasks->lock);
00259     return 1;
00260 }

task_t take_task (  ) 

Remove and return the first task from the queue (FIFO).

This blocks until there is something in the queue.

Returns:
the first task from the queue or an empty task on error (eg. shutdown in progress)

Definition at line 267 of file worker.c.

References cdp_lock_get(), cdp_lock_release(), task_queue_t::empty, task_queue_t::end, task_queue_t::full, task_queue_t::lock, task_queue_t::max, _task_t::msg, task_queue_t::queue, shutdownx, task_queue_t::start, and tasks.

Referenced by worker_process().

00268 {
00269     task_t t={0,0};
00270 //  LOG(L_CRIT,"-1-\n");
00271     lock_get(tasks->lock);
00272 //  LOG(L_CRIT,"-2-\n");
00273     while(tasks->start == tasks->end){
00274 //      LOG(L_CRIT,"-3-\n");
00275         lock_release(tasks->lock);
00276 //      LOG(L_CRIT,"-4-\n");
00277         if (*shutdownx) {
00278             cdp_lock_release(tasks->empty);
00279             return t;
00280         }
00281 //      LOG(L_ERR,"-");
00282         cdp_lock_get(tasks->empty);
00283 //      LOG(L_CRIT,"-5-\n");
00284         
00285         lock_get(tasks->lock);
00286 //      LOG(L_CRIT,"-6-\n");
00287     }
00288 //  LOG(L_CRIT,"-7-\n");
00289     
00290     t = tasks->queue[tasks->start];
00291     tasks->queue[tasks->start].msg = 0;
00292     tasks->start = (tasks->start+1) % tasks->max;
00293     cdp_lock_release(tasks->full);
00294     lock_release(tasks->lock);
00295     return t;
00296 }

void worker_poison_queue (  ) 

Poisons the worker queue.

Actually it just releases the task queue locks so that the workers get to evaluate if a shutdown is in process and exit.

Definition at line 303 of file worker.c.

References cdp_lock_release(), task_queue_t::empty, and tasks.

Referenced by diameter_peer_destroy(), and worker_process().

00304 {
00305 //  int i;
00306 //  for(i=0;i<config->workers;i++)
00307     cdp_lock_release(tasks->empty);
00308 }

void worker_process ( int  id  ) 

This is the main worker process.

Takes tasks from the queue in a loop and processes them by calling the registered callbacks.

Parameters:
id - id of the worker
Returns:
never, exits on shutdown.

Definition at line 316 of file worker.c.

References AAAFreeMessage(), callbacks, _cdp_cb_t::cb, dp_del_pid(), task_queue_t::end, cdp_cb_list_t::head, is_req, memlog, _task_t::msg, _cdp_cb_t::next, _task_t::p, _cdp_cb_t::ptr, shutdownx, task_queue_t::start, take_task(), tasks, and worker_poison_queue().

Referenced by diameter_peer_start().

00317 {
00318     task_t t;
00319     cdp_cb_t *cb;
00320     int r;
00321     LOG(L_INFO,"INFO:[%d] Worker process started...\n",id); 
00322     /* init the application level for this child */
00323     while(1){
00324         if (shutdownx&&(*shutdownx)) break;
00325         t = take_task();
00326         if (!t.msg) {
00327             if (shutdownx&&(*shutdownx)) break;
00328             LOG(L_INFO,"INFO:worker_process(): [%d] got empty task Q(%d/%d)\n",id,tasks->start,tasks->end);
00329             continue;
00330         }       
00331         LOG(L_DBG,"DBG:worker_process(): [%d] got task Q(%d/%d)\n",id,tasks->start,tasks->end);
00332         r = is_req(t.msg);
00333         for(cb = callbacks->head;cb;cb = cb->next)
00334             (*(cb->cb))(t.p,t.msg,*(cb->ptr));
00335         
00336         if (r){
00337             AAAFreeMessage(&(t.msg));
00338         }else{
00339             /* will be freed by the user in upper api */
00340             /*AAAFreeMessage(&(t.msg));*/
00341         }
00342     }
00343     worker_poison_queue();
00344     LOG(L_INFO,"INFO:[%d]... Worker process finished\n",id);    
00345 #ifdef CDP_FOR_SER
00346 #else
00347     #ifdef PKG_MALLOC
00348         LOG(memlog, "Worker[%d] Memory status (pkg):\n",id);
00349         //pkg_status();
00350         #ifdef pkg_sums
00351             pkg_sums();
00352         #endif 
00353     #endif
00354     dp_del_pid(getpid());   
00355 #endif
00356     exit(0);
00357 }


Variable Documentation

dp_config* config

Configuration for this diameter peer.

Definition at line 76 of file diameter_peer.c.

task_queue_t* tasks

queue of tasks

Definition at line 76 of file worker.c.

Referenced by cdp_lock_release(), put_task(), take_task(), worker_destroy(), worker_init(), worker_poison_queue(), and worker_process().

cdp_cb_list_t* callbacks

list of callbacks for message processing

Definition at line 78 of file worker.c.

Referenced by cb_add(), cb_remove(), worker_destroy(), worker_init(), and worker_process().

struct sembuf cdp_sem_lock = { 0, -1, 0}

sembuf structure to lock a semaphore

Definition at line 80 of file worker.c.

Referenced by cdp_lock_get().

struct sembuf cdp_sem_unlock = { 0, +1, 0}

sembuf structure to unlock a semaphore

Definition at line 81 of file worker.c.

union semun cdp_semun_lock

union semun cdp_semun_unlock

Referenced by cdp_lock_release().

union semun cdp_semun_init

Referenced by worker_init().

union semun cdp_semun_destroy

Referenced by worker_destroy().


Generated on Fri Jul 18 04:14:02 2008 for Open IMS Core CSCFs by  doxygen 1.5.2