00001
00056 #include <time.h>
00057 #include <stdlib.h>
00058 #include <sys/types.h>
00059 #include <unistd.h>
00060 #include <sys/ipc.h>
00061 #include <sys/sem.h>
00062
00063 #include "utils.h"
00064 #include "globals.h"
00065 #include "config.h"
00066
00067 #include "worker.h"
00068 #include "diameter_api.h"
00069
00070
00071 int dp_add_pid(pid_t pid);
00072 void dp_del_pid(pid_t pid);
00073
00074 extern dp_config *config;
00076 task_queue_t *tasks;
00078 cdp_cb_list_t *callbacks;
00080 struct sembuf cdp_sem_lock= { 0, -1, 0};
00081 struct sembuf cdp_sem_unlock={ 0, +1, 0};
00083 union semun {int val;struct semid_ds *buf;ushort *array;}
00084 cdp_semun_lock = {0},
00085 cdp_semun_unlock = {1},
00086 cdp_semun_init = {0666|IPC_CREAT},
00087 cdp_semun_destroy= {0};
00088
00089
00090
00098 static inline void cdp_lock_get(int sid)
00099 {
00100 if((semop(sid, &cdp_sem_lock, 1)) == -1)
00101 {
00102 if (shutdownx&&(*shutdownx)) return;
00103 LOG(L_INFO,"ERROR:cdp_lock_get(): Error on semop > %s\n",strerror(errno));
00104 }
00105 }
00106
00111 static inline void cdp_lock_release(int sid)
00112 {
00113 if( semctl(sid, 0, SETVAL, cdp_semun_unlock) == -1
00114 )
00115 {
00116 if (shutdownx&&(*shutdownx)) return;
00117 LOG(L_INFO,"ERROR:cdp_lock_release(): Error on semop %s > %d: %s Q[%2d/%2d]\n",
00118 sid==tasks->full?"full":"empty",errno,strerror(errno),
00119 tasks->start,tasks->end);
00120 }
00121 }
00122
00126 void worker_init()
00127 {
00128 tasks = shm_malloc(sizeof(task_queue_t));
00129
00130 tasks->lock = lock_alloc();
00131 tasks->lock = lock_init(tasks->lock);
00132
00133 tasks->empty = semget(IPC_PRIVATE,1,0666 | IPC_CREAT );
00134 if (tasks->empty==-1){
00135 LOG(L_ERR,"ERROR:worker_init(): Error creating semaphore for empty queue > %s\n",strerror(errno));
00136 }else
00137 semctl(tasks->empty, 0, SETVAL, cdp_semun_init );
00138 tasks->full = semget(IPC_PRIVATE,1, 0666 | IPC_CREAT );
00139 if (tasks->full==-1){
00140 LOG(L_ERR,"ERROR:worker_init(): Error creating semaphore for full queue > %s\n",strerror(errno));
00141 }else
00142 semctl(tasks->full, 0, SETVAL, cdp_semun_init);
00143
00144 tasks->start = 0;
00145 tasks->end = 0;
00146 tasks->max = config->queue_length;
00147 tasks->queue = shm_malloc(tasks->max*sizeof(task_t));
00148 if (!tasks->queue) LOG_NO_MEM("shm",tasks->max*sizeof(task_t));
00149 memset(tasks->queue,0,tasks->max*sizeof(task_t));
00150
00151 callbacks = shm_malloc(sizeof(cdp_cb_list_t));
00152 callbacks->head = 0;
00153 callbacks->tail = 0;
00154
00155 cdp_lock_get(tasks->empty);
00156
00157 }
00158
00162 void worker_destroy()
00163 {
00164 int i;
00165
00166
00167 for(i=0;i<tasks->max;i++){
00168 if (tasks->queue[i].msg) AAAFreeMessage(&(tasks->queue[i].msg));
00169 }
00170
00171 shm_free(tasks->queue);
00172 lock_destroy(tasks->lock);
00173 lock_dealloc((void*)tasks->lock);
00174
00175
00176
00177 semctl(tasks->empty, 0, IPC_RMID, cdp_semun_destroy);
00178
00179
00180 semctl(tasks->full, 0, IPC_RMID, cdp_semun_destroy);
00181
00182 shm_free(tasks);
00183
00184 while(callbacks->head)
00185 cb_remove(callbacks->head);
00186 shm_free(callbacks);
00187 }
00188
00189
00190 int cb_add(cdp_cb_f cb,void *ptr)
00191 {
00192 cdp_cb_t *x;
00193 x = shm_malloc(sizeof(cdp_cb_t));
00194 if (!x){
00195 LOG_NO_MEM("shm",sizeof(cdp_cb_t));
00196 return 0;
00197 }
00198 x->cb = cb;
00199 x->ptr = shm_malloc(sizeof(void*));
00200 if (!x->ptr){
00201 LOG_NO_MEM("shm",sizeof(void*));
00202 return 0;
00203 }
00204 *(x->ptr) = ptr;
00205 x->next = 0;
00206 x->prev = callbacks->tail;
00207 if (callbacks->tail) callbacks->tail->next = x;
00208 callbacks->tail = x;
00209 if (!callbacks->head) callbacks->head = x;
00210 return 1;
00211 }
00212
00213
00214 void cb_remove(cdp_cb_t *cb)
00215 {
00216 cdp_cb_t *x;
00217 x = callbacks->head;
00218 while(x && x!=cb) x = x->next;
00219 if (!x) return;
00220 if (x->prev) x->prev->next = x->next;
00221 else callbacks->head = x->next;
00222 if (x->next) x->next->prev = x->prev;
00223 else callbacks->tail = x->prev;
00224
00225 if (x->ptr) shm_free(x->ptr);
00226 shm_free(x);
00227 }
00228
00236 int put_task(peer *p,AAAMessage *msg)
00237 {
00238
00239 lock_get(tasks->lock);
00240
00241 while ((tasks->end+1)%tasks->max == tasks->start){
00242
00243 lock_release(tasks->lock);
00244
00245 if (*shutdownx) {
00246 cdp_lock_release(tasks->full);
00247 return 0;
00248 }
00249
00250 cdp_lock_get(tasks->full);
00251
00252 lock_get(tasks->lock);
00253 }
00254 tasks->queue[tasks->end].p = p;
00255 tasks->queue[tasks->end].msg = msg;
00256 tasks->end = (tasks->end+1) % tasks->max;
00257 cdp_lock_release(tasks->empty);
00258 lock_release(tasks->lock);
00259 return 1;
00260 }
00261
00267 task_t take_task()
00268 {
00269 task_t t={0,0};
00270
00271 lock_get(tasks->lock);
00272
00273 while(tasks->start == tasks->end){
00274
00275 lock_release(tasks->lock);
00276
00277 if (*shutdownx) {
00278 cdp_lock_release(tasks->empty);
00279 return t;
00280 }
00281
00282 cdp_lock_get(tasks->empty);
00283
00284
00285 lock_get(tasks->lock);
00286
00287 }
00288
00289
00290 t = tasks->queue[tasks->start];
00291 tasks->queue[tasks->start].msg = 0;
00292 tasks->start = (tasks->start+1) % tasks->max;
00293 cdp_lock_release(tasks->full);
00294 lock_release(tasks->lock);
00295 return t;
00296 }
00297
00303 void worker_poison_queue()
00304 {
00305
00306
00307 cdp_lock_release(tasks->empty);
00308 }
00309
00316 void worker_process(int id)
00317 {
00318 task_t t;
00319 cdp_cb_t *cb;
00320 int r;
00321 LOG(L_INFO,"INFO:[%d] Worker process started...\n",id);
00322
00323 while(1){
00324 if (shutdownx&&(*shutdownx)) break;
00325 t = take_task();
00326 if (!t.msg) {
00327 if (shutdownx&&(*shutdownx)) break;
00328 LOG(L_INFO,"INFO:worker_process(): [%d] got empty task Q(%d/%d)\n",id,tasks->start,tasks->end);
00329 continue;
00330 }
00331 LOG(L_DBG,"DBG:worker_process(): [%d] got task Q(%d/%d)\n",id,tasks->start,tasks->end);
00332 r = is_req(t.msg);
00333 for(cb = callbacks->head;cb;cb = cb->next)
00334 (*(cb->cb))(t.p,t.msg,*(cb->ptr));
00335
00336 if (r){
00337 AAAFreeMessage(&(t.msg));
00338 }else{
00339
00340
00341 }
00342 }
00343 worker_poison_queue();
00344 LOG(L_INFO,"INFO:[%d]... Worker process finished\n",id);
00345 #ifdef CDP_FOR_SER
00346 #else
00347 #ifdef PKG_MALLOC
00348 LOG(memlog, "Worker[%d] Memory status (pkg):\n",id);
00349
00350 #ifdef pkg_sums
00351 pkg_sums();
00352 #endif
00353 #endif
00354 dp_del_pid(getpid());
00355 #endif
00356 exit(0);
00357 }
00358