worker.c

Go to the documentation of this file.
00001 
00056 #include <time.h> 
00057 #include <stdlib.h>
00058 #include <sys/types.h> 
00059 #include <unistd.h> 
00060 #include <sys/ipc.h>
00061 #include <sys/sem.h>
00062 
00063 #include "utils.h"
00064 #include "globals.h"
00065 #include "config.h"
00066 
00067 #include "worker.h"
00068 #include "diameter_api.h"
00069 
00070 /* defined in ../diameter_peer.c */
00071 int dp_add_pid(pid_t pid);
00072 void dp_del_pid(pid_t pid);
00073 
00074 extern dp_config *config;       
00076 task_queue_t *tasks;            
00078 cdp_cb_list_t *callbacks;       
00080 struct sembuf cdp_sem_lock=  { 0, -1, 0};   
00081 struct sembuf cdp_sem_unlock={ 0, +1, 0};   
00083 union semun {int val;struct semid_ds *buf;ushort *array;} 
00084     cdp_semun_lock   = {0}, 
00085     cdp_semun_unlock = {1},
00086     cdp_semun_init   = {0666|IPC_CREAT},
00087     cdp_semun_destroy= {0};
00088     
00089     
00090     
00098 static inline void cdp_lock_get(int sid)
00099 {
00100     if((semop(sid, &cdp_sem_lock, 1)) == -1)
00101     {
00102         if (shutdownx&&(*shutdownx)) return;
00103         LOG(L_INFO,"ERROR:cdp_lock_get(): Error on semop > %s\n",strerror(errno));
00104     }
00105 }
00106 
00111 static inline void cdp_lock_release(int sid)
00112 {
00113     if( semctl(sid, 0, SETVAL, cdp_semun_unlock) == -1
00114     /*semop(sid, &cdp_sem_unlock, 1) == -1*/)
00115     {
00116         if (shutdownx&&(*shutdownx)) return;
00117         LOG(L_INFO,"ERROR:cdp_lock_release(): Error on semop %s > %d: %s Q[%2d/%2d]\n",
00118             sid==tasks->full?"full":"empty",errno,strerror(errno),
00119             tasks->start,tasks->end);
00120     }
00121 }
00122 
00126 void worker_init()
00127 {
00128     tasks = shm_malloc(sizeof(task_queue_t));
00129     
00130     tasks->lock = lock_alloc();
00131     tasks->lock = lock_init(tasks->lock);
00132     
00133     tasks->empty = semget(IPC_PRIVATE,1,0666 | IPC_CREAT );
00134     if (tasks->empty==-1){
00135         LOG(L_ERR,"ERROR:worker_init(): Error creating semaphore for empty queue > %s\n",strerror(errno));
00136     }else
00137         semctl(tasks->empty, 0, SETVAL, cdp_semun_init );
00138     tasks->full = semget(IPC_PRIVATE,1, 0666 | IPC_CREAT );
00139     if (tasks->full==-1){
00140         LOG(L_ERR,"ERROR:worker_init(): Error creating semaphore for full queue > %s\n",strerror(errno));
00141     }else
00142         semctl(tasks->full, 0, SETVAL, cdp_semun_init);
00143     
00144     tasks->start = 0;
00145     tasks->end = 0;
00146     tasks->max = config->queue_length;
00147     tasks->queue = shm_malloc(tasks->max*sizeof(task_t));
00148     if (!tasks->queue) LOG_NO_MEM("shm",tasks->max*sizeof(task_t));
00149     memset(tasks->queue,0,tasks->max*sizeof(task_t));
00150         
00151     callbacks = shm_malloc(sizeof(cdp_cb_list_t));
00152     callbacks->head = 0; 
00153     callbacks->tail = 0;
00154     
00155     cdp_lock_get(tasks->empty);
00156 //  lock_release(tasks->full);  
00157 }
00158 
00162 void worker_destroy()
00163 {
00164     int i;
00165 //  LOG(L_CRIT,"-1-\n");
00166 /*  lock_get(tasks->lock);*/
00167     for(i=0;i<tasks->max;i++){
00168         if (tasks->queue[i].msg) AAAFreeMessage(&(tasks->queue[i].msg));
00169     }
00170 //  LOG(L_CRIT,"-2-\n");    
00171     shm_free(tasks->queue);
00172     lock_destroy(tasks->lock);
00173     lock_dealloc((void*)tasks->lock);
00174 //  LOG(L_CRIT,"-3-\n");    
00175     
00176     //lock_release(tasks->empty);
00177     semctl(tasks->empty, 0, IPC_RMID, cdp_semun_destroy);
00178 //  LOG(L_CRIT,"-4-\n");    
00179     
00180     semctl(tasks->full, 0, IPC_RMID, cdp_semun_destroy);
00181     
00182     shm_free(tasks);
00183     
00184     while(callbacks->head)
00185         cb_remove(callbacks->head);
00186     shm_free(callbacks);
00187 }
00188 
00189 /*unsafe*/
00190 int cb_add(cdp_cb_f cb,void *ptr)
00191 {
00192     cdp_cb_t *x;
00193     x = shm_malloc(sizeof(cdp_cb_t));
00194     if (!x){
00195         LOG_NO_MEM("shm",sizeof(cdp_cb_t));
00196         return 0;
00197     }
00198     x->cb = cb;
00199     x->ptr = shm_malloc(sizeof(void*));
00200     if (!x->ptr){
00201         LOG_NO_MEM("shm",sizeof(void*));
00202         return 0;
00203     }
00204     *(x->ptr) = ptr;
00205     x->next = 0;
00206     x->prev = callbacks->tail;
00207     if (callbacks->tail) callbacks->tail->next = x;
00208     callbacks->tail = x;
00209     if (!callbacks->head) callbacks->head = x;
00210     return 1;   
00211 }
00212 
00213 /*unsafe*/
00214 void cb_remove(cdp_cb_t *cb)
00215 {
00216     cdp_cb_t *x;
00217     x = callbacks->head;
00218     while(x && x!=cb) x = x->next;
00219     if (!x) return;
00220     if (x->prev) x->prev->next = x->next;
00221     else callbacks->head = x->next;
00222     if (x->next) x->next->prev = x->prev;
00223     else callbacks->tail = x->prev;
00224     
00225     if (x->ptr) shm_free(x->ptr);
00226     shm_free(x);
00227 }
00228 
00236 int put_task(peer *p,AAAMessage *msg)
00237 {
00238 //  LOG(L_CRIT,"+1+\n");
00239     lock_get(tasks->lock);
00240 //  LOG(L_CRIT,"+2+\n");
00241     while ((tasks->end+1)%tasks->max == tasks->start){
00242 //      LOG(L_CRIT,"+3+\n");
00243         lock_release(tasks->lock);
00244 //      LOG(L_CRIT,"+4+\n");
00245         if (*shutdownx) {
00246             cdp_lock_release(tasks->full);
00247             return 0;
00248         }
00249 //      LOG(L_ERR,"+");
00250         cdp_lock_get(tasks->full);
00251 //      LOG(L_CRIT,"+5+\n");
00252         lock_get(tasks->lock);
00253     }
00254     tasks->queue[tasks->end].p = p;
00255     tasks->queue[tasks->end].msg = msg;
00256     tasks->end = (tasks->end+1) % tasks->max;
00257     cdp_lock_release(tasks->empty);
00258     lock_release(tasks->lock);
00259     return 1;
00260 }
00261 
00267 task_t take_task()
00268 {
00269     task_t t={0,0};
00270 //  LOG(L_CRIT,"-1-\n");
00271     lock_get(tasks->lock);
00272 //  LOG(L_CRIT,"-2-\n");
00273     while(tasks->start == tasks->end){
00274 //      LOG(L_CRIT,"-3-\n");
00275         lock_release(tasks->lock);
00276 //      LOG(L_CRIT,"-4-\n");
00277         if (*shutdownx) {
00278             cdp_lock_release(tasks->empty);
00279             return t;
00280         }
00281 //      LOG(L_ERR,"-");
00282         cdp_lock_get(tasks->empty);
00283 //      LOG(L_CRIT,"-5-\n");
00284         
00285         lock_get(tasks->lock);
00286 //      LOG(L_CRIT,"-6-\n");
00287     }
00288 //  LOG(L_CRIT,"-7-\n");
00289     
00290     t = tasks->queue[tasks->start];
00291     tasks->queue[tasks->start].msg = 0;
00292     tasks->start = (tasks->start+1) % tasks->max;
00293     cdp_lock_release(tasks->full);
00294     lock_release(tasks->lock);
00295     return t;
00296 }
00297 
00303 void worker_poison_queue()
00304 {
00305 //  int i;
00306 //  for(i=0;i<config->workers;i++)
00307     cdp_lock_release(tasks->empty);
00308 }
00309 
00316 void worker_process(int id)
00317 {
00318     task_t t;
00319     cdp_cb_t *cb;
00320     int r;
00321     LOG(L_INFO,"INFO:[%d] Worker process started...\n",id); 
00322     /* init the application level for this child */
00323     while(1){
00324         if (shutdownx&&(*shutdownx)) break;
00325         t = take_task();
00326         if (!t.msg) {
00327             if (shutdownx&&(*shutdownx)) break;
00328             LOG(L_INFO,"INFO:worker_process(): [%d] got empty task Q(%d/%d)\n",id,tasks->start,tasks->end);
00329             continue;
00330         }       
00331         LOG(L_DBG,"DBG:worker_process(): [%d] got task Q(%d/%d)\n",id,tasks->start,tasks->end);
00332         r = is_req(t.msg);
00333         for(cb = callbacks->head;cb;cb = cb->next)
00334             (*(cb->cb))(t.p,t.msg,*(cb->ptr));
00335         
00336         if (r){
00337             AAAFreeMessage(&(t.msg));
00338         }else{
00339             /* will be freed by the user in upper api */
00340             /*AAAFreeMessage(&(t.msg));*/
00341         }
00342     }
00343     worker_poison_queue();
00344     LOG(L_INFO,"INFO:[%d]... Worker process finished\n",id);    
00345 #ifdef CDP_FOR_SER
00346 #else
00347     #ifdef PKG_MALLOC
00348         LOG(memlog, "Worker[%d] Memory status (pkg):\n",id);
00349         //pkg_status();
00350         #ifdef pkg_sums
00351             pkg_sums();
00352         #endif 
00353     #endif
00354     dp_del_pid(getpid());   
00355 #endif
00356     exit(0);
00357 }
00358 

Generated on Tue Jul 29 04:19:11 2008 for Open IMS Core CSCFs by  doxygen 1.5.2