Definition in file worker.h.
#include "peer.h"
#include "diameter.h"
#include "utils.h"
Go to the source code of this file.
Data Structures | |
| struct | _task_t |
| task element More... | |
| struct | task_queue_t |
| task queue More... | |
| struct | _cdp_cb_t |
| callback element for message processing More... | |
| struct | cdp_cb_list_t |
| list of callback elements for message processing More... | |
Typedefs | |
| typedef int(*) | worker_init_function (int rank) |
| function to be called on worker initialization | |
| typedef _task_t | task_t |
| task element | |
| typedef int(*) | cdp_cb_f (peer *p, AAAMessage *msg, void *ptr) |
| callback function to be called on message processing | |
| typedef _cdp_cb_t | cdp_cb_t |
| callback element for message processing | |
Functions | |
| void | worker_init () |
| Initializes the worker structures, like the task queue. | |
| void | worker_destroy () |
| Destroys the worker structures. | |
| int | cb_add (cdp_cb_f cb, void *ptr) |
| void | cb_remove (cdp_cb_t *cb) |
| int | put_task (peer *p, AAAMessage *msg) |
| Adds a message as a task to the task queue. | |
| task_t | take_task () |
| Remove and return the first task from the queue (FIFO). | |
| void | worker_poison_queue () |
| Poisons the worker queue. | |
| void | worker_process (int id) |
| This is the main worker process. | |
| typedef int(*) worker_init_function(int rank) |
| typedef int(*) cdp_cb_f(peer *p, AAAMessage *msg, void *ptr) |
| void worker_init | ( | ) |
Initializes the worker structures, like the task queue.
Definition at line 126 of file worker.c.
References callbacks, cdp_lock_get(), cdp_semun_init, config, task_queue_t::empty, task_queue_t::end, errno, task_queue_t::full, cdp_cb_list_t::head, task_queue_t::lock, LOG_NO_MEM, task_queue_t::max, task_queue_t::queue, dp_config::queue_length, task_queue_t::start, cdp_cb_list_t::tail, and tasks.
Referenced by diameter_peer_init().
00127 { 00128 tasks = shm_malloc(sizeof(task_queue_t)); 00129 00130 tasks->lock = lock_alloc(); 00131 tasks->lock = lock_init(tasks->lock); 00132 00133 tasks->empty = semget(IPC_PRIVATE,1,0666 | IPC_CREAT ); 00134 if (tasks->empty==-1){ 00135 LOG(L_ERR,"ERROR:worker_init(): Error creating semaphore for empty queue > %s\n",strerror(errno)); 00136 }else 00137 semctl(tasks->empty, 0, SETVAL, cdp_semun_init ); 00138 tasks->full = semget(IPC_PRIVATE,1, 0666 | IPC_CREAT ); 00139 if (tasks->full==-1){ 00140 LOG(L_ERR,"ERROR:worker_init(): Error creating semaphore for full queue > %s\n",strerror(errno)); 00141 }else 00142 semctl(tasks->full, 0, SETVAL, cdp_semun_init); 00143 00144 tasks->start = 0; 00145 tasks->end = 0; 00146 tasks->max = config->queue_length; 00147 tasks->queue = shm_malloc(tasks->max*sizeof(task_t)); 00148 if (!tasks->queue) LOG_NO_MEM("shm",tasks->max*sizeof(task_t)); 00149 memset(tasks->queue,0,tasks->max*sizeof(task_t)); 00150 00151 callbacks = shm_malloc(sizeof(cdp_cb_list_t)); 00152 callbacks->head = 0; 00153 callbacks->tail = 0; 00154 00155 cdp_lock_get(tasks->empty); 00156 // lock_release(tasks->full); 00157 }
| void worker_destroy | ( | ) |
Destroys the worker structures.
Definition at line 162 of file worker.c.
References AAAFreeMessage(), callbacks, cb_remove(), cdp_semun_destroy, task_queue_t::empty, task_queue_t::full, cdp_cb_list_t::head, task_queue_t::lock, task_queue_t::max, _task_t::msg, task_queue_t::queue, and tasks.
Referenced by diameter_peer_destroy().
00163 { 00164 int i; 00165 // LOG(L_CRIT,"-1-\n"); 00166 /* lock_get(tasks->lock);*/ 00167 for(i=0;i<tasks->max;i++){ 00168 if (tasks->queue[i].msg) AAAFreeMessage(&(tasks->queue[i].msg)); 00169 } 00170 // LOG(L_CRIT,"-2-\n"); 00171 shm_free(tasks->queue); 00172 lock_destroy(tasks->lock); 00173 lock_dealloc((void*)tasks->lock); 00174 // LOG(L_CRIT,"-3-\n"); 00175 00176 //lock_release(tasks->empty); 00177 semctl(tasks->empty, 0, IPC_RMID, cdp_semun_destroy); 00178 // LOG(L_CRIT,"-4-\n"); 00179 00180 semctl(tasks->full, 0, IPC_RMID, cdp_semun_destroy); 00181 00182 shm_free(tasks); 00183 00184 while(callbacks->head) 00185 cb_remove(callbacks->head); 00186 shm_free(callbacks); 00187 }
| int cb_add | ( | cdp_cb_f | cb, | |
| void * | ptr | |||
| ) |
Definition at line 190 of file worker.c.
References callbacks, _cdp_cb_t::cb, cdp_cb_list_t::head, LOG_NO_MEM, _cdp_cb_t::next, _cdp_cb_t::prev, _cdp_cb_t::ptr, and cdp_cb_list_t::tail.
Referenced by diameter_peer_init().
00191 { 00192 cdp_cb_t *x; 00193 x = shm_malloc(sizeof(cdp_cb_t)); 00194 if (!x){ 00195 LOG_NO_MEM("shm",sizeof(cdp_cb_t)); 00196 return 0; 00197 } 00198 x->cb = cb; 00199 x->ptr = shm_malloc(sizeof(void*)); 00200 if (!x->ptr){ 00201 LOG_NO_MEM("shm",sizeof(void*)); 00202 return 0; 00203 } 00204 *(x->ptr) = ptr; 00205 x->next = 0; 00206 x->prev = callbacks->tail; 00207 if (callbacks->tail) callbacks->tail->next = x; 00208 callbacks->tail = x; 00209 if (!callbacks->head) callbacks->head = x; 00210 return 1; 00211 }
| void cb_remove | ( | cdp_cb_t * | cb | ) |
Definition at line 214 of file worker.c.
References callbacks, cdp_cb_list_t::head, _cdp_cb_t::next, _cdp_cb_t::prev, _cdp_cb_t::ptr, and cdp_cb_list_t::tail.
Referenced by worker_destroy().
00215 { 00216 cdp_cb_t *x; 00217 x = callbacks->head; 00218 while(x && x!=cb) x = x->next; 00219 if (!x) return; 00220 if (x->prev) x->prev->next = x->next; 00221 else callbacks->head = x->next; 00222 if (x->next) x->next->prev = x->prev; 00223 else callbacks->tail = x->prev; 00224 00225 if (x->ptr) shm_free(x->ptr); 00226 shm_free(x); 00227 }
| int put_task | ( | peer * | p, | |
| AAAMessage * | msg | |||
| ) |
Adds a message as a task to the task queue.
This blocks if the task queue is full, untill there is space.
| p | - the peer that the message was received from | |
| msg | - the message |
Definition at line 236 of file worker.c.
References cdp_lock_get(), cdp_lock_release(), task_queue_t::empty, task_queue_t::end, task_queue_t::full, task_queue_t::lock, task_queue_t::max, _task_t::msg, _task_t::p, task_queue_t::queue, shutdownx, task_queue_t::start, and tasks.
Referenced by Rcv_Process().
00237 { 00238 // LOG(L_CRIT,"+1+\n"); 00239 lock_get(tasks->lock); 00240 // LOG(L_CRIT,"+2+\n"); 00241 while ((tasks->end+1)%tasks->max == tasks->start){ 00242 // LOG(L_CRIT,"+3+\n"); 00243 lock_release(tasks->lock); 00244 // LOG(L_CRIT,"+4+\n"); 00245 if (*shutdownx) { 00246 cdp_lock_release(tasks->full); 00247 return 0; 00248 } 00249 // LOG(L_ERR,"+"); 00250 cdp_lock_get(tasks->full); 00251 // LOG(L_CRIT,"+5+\n"); 00252 lock_get(tasks->lock); 00253 } 00254 tasks->queue[tasks->end].p = p; 00255 tasks->queue[tasks->end].msg = msg; 00256 tasks->end = (tasks->end+1) % tasks->max; 00257 cdp_lock_release(tasks->empty); 00258 lock_release(tasks->lock); 00259 return 1; 00260 }
| task_t take_task | ( | ) |
Remove and return the first task from the queue (FIFO).
This blocks until there is something in the queue.
Definition at line 267 of file worker.c.
References cdp_lock_get(), cdp_lock_release(), task_queue_t::empty, task_queue_t::end, task_queue_t::full, task_queue_t::lock, task_queue_t::max, _task_t::msg, task_queue_t::queue, shutdownx, task_queue_t::start, and tasks.
Referenced by worker_process().
00268 { 00269 task_t t={0,0}; 00270 // LOG(L_CRIT,"-1-\n"); 00271 lock_get(tasks->lock); 00272 // LOG(L_CRIT,"-2-\n"); 00273 while(tasks->start == tasks->end){ 00274 // LOG(L_CRIT,"-3-\n"); 00275 lock_release(tasks->lock); 00276 // LOG(L_CRIT,"-4-\n"); 00277 if (*shutdownx) { 00278 cdp_lock_release(tasks->empty); 00279 return t; 00280 } 00281 // LOG(L_ERR,"-"); 00282 cdp_lock_get(tasks->empty); 00283 // LOG(L_CRIT,"-5-\n"); 00284 00285 lock_get(tasks->lock); 00286 // LOG(L_CRIT,"-6-\n"); 00287 } 00288 // LOG(L_CRIT,"-7-\n"); 00289 00290 t = tasks->queue[tasks->start]; 00291 tasks->queue[tasks->start].msg = 0; 00292 tasks->start = (tasks->start+1) % tasks->max; 00293 cdp_lock_release(tasks->full); 00294 lock_release(tasks->lock); 00295 return t; 00296 }
| void worker_poison_queue | ( | ) |
Poisons the worker queue.
Actually it just releases the task queue locks so that the workers get to evaluate if a shutdown is in process and exit.
Definition at line 303 of file worker.c.
References cdp_lock_release(), task_queue_t::empty, and tasks.
Referenced by diameter_peer_destroy(), and worker_process().
00304 { 00305 // int i; 00306 // for(i=0;i<config->workers;i++) 00307 cdp_lock_release(tasks->empty); 00308 }
| void worker_process | ( | int | id | ) |
This is the main worker process.
Takes tasks from the queue in a loop and processes them by calling the registered callbacks.
| id | - id of the worker |
Definition at line 316 of file worker.c.
References AAAFreeMessage(), callbacks, _cdp_cb_t::cb, dp_del_pid(), task_queue_t::end, cdp_cb_list_t::head, is_req, memlog, _task_t::msg, _cdp_cb_t::next, _task_t::p, _cdp_cb_t::ptr, shutdownx, task_queue_t::start, take_task(), tasks, and worker_poison_queue().
Referenced by diameter_peer_start().
00317 { 00318 task_t t; 00319 cdp_cb_t *cb; 00320 int r; 00321 LOG(L_INFO,"INFO:[%d] Worker process started...\n",id); 00322 /* init the application level for this child */ 00323 while(1){ 00324 if (shutdownx&&(*shutdownx)) break; 00325 t = take_task(); 00326 if (!t.msg) { 00327 if (shutdownx&&(*shutdownx)) break; 00328 LOG(L_INFO,"INFO:worker_process(): [%d] got empty task Q(%d/%d)\n",id,tasks->start,tasks->end); 00329 continue; 00330 } 00331 LOG(L_DBG,"DBG:worker_process(): [%d] got task Q(%d/%d)\n",id,tasks->start,tasks->end); 00332 r = is_req(t.msg); 00333 for(cb = callbacks->head;cb;cb = cb->next) 00334 (*(cb->cb))(t.p,t.msg,*(cb->ptr)); 00335 00336 if (r){ 00337 AAAFreeMessage(&(t.msg)); 00338 }else{ 00339 /* will be freed by the user in upper api */ 00340 /*AAAFreeMessage(&(t.msg));*/ 00341 } 00342 } 00343 worker_poison_queue(); 00344 LOG(L_INFO,"INFO:[%d]... Worker process finished\n",id); 00345 #ifdef CDP_FOR_SER 00346 #else 00347 #ifdef PKG_MALLOC 00348 LOG(memlog, "Worker[%d] Memory status (pkg):\n",id); 00349 //pkg_status(); 00350 #ifdef pkg_sums 00351 pkg_sums(); 00352 #endif 00353 #endif 00354 dp_del_pid(getpid()); 00355 #endif 00356 exit(0); 00357 }
1.5.2