This the process pool representation that is used for processing incoming messages.
Definition in file worker.c.
#include <time.h>
#include <stdlib.h>
#include <sys/types.h>
#include <unistd.h>
#include <sys/ipc.h>
#include <sys/sem.h>
#include "utils.h"
#include "globals.h"
#include "config.h"
#include "worker.h"
#include "diameter_api.h"
Go to the source code of this file.
Data Structures | |
| union | semun |
Functions | |
| int | dp_add_pid (pid_t pid) |
| Add a pid to the local process list. | |
| void | dp_del_pid (pid_t pid) |
| Delete a pid from the process list. | |
| static void | cdp_lock_get (int sid) |
| Gets the lock on a semaphore and blocks until it is available. | |
| static void | cdp_lock_release (int sid) |
| Releases the lock on a sempahore. | |
| void | worker_init () |
| Initializes the worker structures, like the task queue. | |
| void | worker_destroy () |
| Destroys the worker structures. | |
| int | cb_add (cdp_cb_f cb, void *ptr) |
| void | cb_remove (cdp_cb_t *cb) |
| int | put_task (peer *p, AAAMessage *msg) |
| Adds a message as a task to the task queue. | |
| task_t | take_task () |
| Remove and return the first task from the queue (FIFO). | |
| void | worker_poison_queue () |
| Poisons the worker queue. | |
| void | worker_process (int id) |
| This is the main worker process. | |
Variables | |
| dp_config * | config |
| Configuration for this diameter peer. | |
| task_queue_t * | tasks |
| queue of tasks | |
| cdp_cb_list_t * | callbacks |
| list of callbacks for message processing | |
| sembuf | cdp_sem_lock = { 0, -1, 0} |
| sembuf structure to lock a semaphore | |
| sembuf | cdp_sem_unlock = { 0, +1, 0} |
| sembuf structure to unlock a semaphore | |
| semun | cdp_semun_lock |
| semun | cdp_semun_unlock |
| semun | cdp_semun_init |
| semun | cdp_semun_destroy |
| int dp_add_pid | ( | pid_t | pid | ) | [inline] |
Add a pid to the local process list.
| pid | newly forked pid |
Definition at line 95 of file diameter_peer.c.
References pid_list_head_t::head, LOG_NO_MEM, _pid_list_t::next, _pid_list_t::pid, pid_list, pid_list_lock, _pid_list_t::prev, and pid_list_head_t::tail.
00096 { 00097 pid_list_t *n; 00098 lock_get(pid_list_lock); 00099 n = shm_malloc(sizeof(pid_list_t)); 00100 if (!n){ 00101 LOG_NO_MEM("shm",sizeof(pid_list_t)); 00102 lock_release(pid_list_lock); 00103 return 0; 00104 } 00105 n->pid = pid; 00106 n->next = 0; 00107 n->prev = pid_list->tail; 00108 if (!pid_list->head) pid_list->head = n; 00109 if (pid_list->tail) pid_list->tail->next = n; 00110 pid_list->tail = n; 00111 lock_release(pid_list_lock); 00112 return 1; 00113 }
| void dp_del_pid | ( | pid_t | pid | ) | [inline] |
Delete a pid from the process list.
| pid | - the pid to remove |
Definition at line 132 of file diameter_peer.c.
References pid_list_head_t::head, _pid_list_t::next, _pid_list_t::pid, pid_list, pid_list_lock, _pid_list_t::prev, and pid_list_head_t::tail.
00133 { 00134 pid_list_t *i; 00135 lock_get(pid_list_lock); 00136 i = pid_list->head; 00137 if (!i) { 00138 lock_release(pid_list_lock); 00139 return; 00140 } 00141 while(i && i->pid!=pid) i = i->next; 00142 if (i){ 00143 if (i->prev) i->prev->next = i->next; 00144 else pid_list->head = i->next; 00145 if (i->next) i->next->prev = i->prev; 00146 else pid_list->tail = i->prev; 00147 shm_free(i); 00148 } 00149 lock_release(pid_list_lock); 00150 }
| static void cdp_lock_get | ( | int | sid | ) | [inline, static] |
Gets the lock on a semaphore and blocks until it is available.
This procedures does not consume CPU cycles as a busy-waiting would and it is used for blocking on the task queue without a big impact on performance.
| sid | - semaphore id |
Definition at line 98 of file worker.c.
References cdp_sem_lock, errno, and shutdownx.
Referenced by put_task(), take_task(), and worker_init().
00099 { 00100 if((semop(sid, &cdp_sem_lock, 1)) == -1) 00101 { 00102 if (shutdownx&&(*shutdownx)) return; 00103 LOG(L_INFO,"ERROR:cdp_lock_get(): Error on semop > %s\n",strerror(errno)); 00104 } 00105 }
| static void cdp_lock_release | ( | int | sid | ) | [inline, static] |
Releases the lock on a sempahore.
| sid | - the semaphore id |
Definition at line 111 of file worker.c.
References cdp_semun_unlock, task_queue_t::end, errno, task_queue_t::full, shutdownx, task_queue_t::start, and tasks.
Referenced by put_task(), take_task(), and worker_poison_queue().
00112 { 00113 if( semctl(sid, 0, SETVAL, cdp_semun_unlock) == -1 00114 /*semop(sid, &cdp_sem_unlock, 1) == -1*/) 00115 { 00116 if (shutdownx&&(*shutdownx)) return; 00117 LOG(L_INFO,"ERROR:cdp_lock_release(): Error on semop %s > %d: %s Q[%2d/%2d]\n", 00118 sid==tasks->full?"full":"empty",errno,strerror(errno), 00119 tasks->start,tasks->end); 00120 } 00121 }
| void worker_init | ( | ) |
Initializes the worker structures, like the task queue.
Definition at line 126 of file worker.c.
References callbacks, cdp_lock_get(), cdp_semun_init, config, task_queue_t::empty, task_queue_t::end, errno, task_queue_t::full, cdp_cb_list_t::head, task_queue_t::lock, LOG_NO_MEM, task_queue_t::max, task_queue_t::queue, dp_config::queue_length, task_queue_t::start, cdp_cb_list_t::tail, and tasks.
Referenced by diameter_peer_init().
00127 { 00128 tasks = shm_malloc(sizeof(task_queue_t)); 00129 00130 tasks->lock = lock_alloc(); 00131 tasks->lock = lock_init(tasks->lock); 00132 00133 tasks->empty = semget(IPC_PRIVATE,1,0666 | IPC_CREAT ); 00134 if (tasks->empty==-1){ 00135 LOG(L_ERR,"ERROR:worker_init(): Error creating semaphore for empty queue > %s\n",strerror(errno)); 00136 }else 00137 semctl(tasks->empty, 0, SETVAL, cdp_semun_init ); 00138 tasks->full = semget(IPC_PRIVATE,1, 0666 | IPC_CREAT ); 00139 if (tasks->full==-1){ 00140 LOG(L_ERR,"ERROR:worker_init(): Error creating semaphore for full queue > %s\n",strerror(errno)); 00141 }else 00142 semctl(tasks->full, 0, SETVAL, cdp_semun_init); 00143 00144 tasks->start = 0; 00145 tasks->end = 0; 00146 tasks->max = config->queue_length; 00147 tasks->queue = shm_malloc(tasks->max*sizeof(task_t)); 00148 if (!tasks->queue) LOG_NO_MEM("shm",tasks->max*sizeof(task_t)); 00149 memset(tasks->queue,0,tasks->max*sizeof(task_t)); 00150 00151 callbacks = shm_malloc(sizeof(cdp_cb_list_t)); 00152 callbacks->head = 0; 00153 callbacks->tail = 0; 00154 00155 cdp_lock_get(tasks->empty); 00156 // lock_release(tasks->full); 00157 }
| void worker_destroy | ( | ) |
Destroys the worker structures.
Definition at line 162 of file worker.c.
References AAAFreeMessage(), callbacks, cb_remove(), cdp_semun_destroy, task_queue_t::empty, task_queue_t::full, cdp_cb_list_t::head, task_queue_t::lock, task_queue_t::max, _task_t::msg, task_queue_t::queue, and tasks.
Referenced by diameter_peer_destroy().
00163 { 00164 int i; 00165 // LOG(L_CRIT,"-1-\n"); 00166 /* lock_get(tasks->lock);*/ 00167 for(i=0;i<tasks->max;i++){ 00168 if (tasks->queue[i].msg) AAAFreeMessage(&(tasks->queue[i].msg)); 00169 } 00170 // LOG(L_CRIT,"-2-\n"); 00171 shm_free(tasks->queue); 00172 lock_destroy(tasks->lock); 00173 lock_dealloc((void*)tasks->lock); 00174 // LOG(L_CRIT,"-3-\n"); 00175 00176 //lock_release(tasks->empty); 00177 semctl(tasks->empty, 0, IPC_RMID, cdp_semun_destroy); 00178 // LOG(L_CRIT,"-4-\n"); 00179 00180 semctl(tasks->full, 0, IPC_RMID, cdp_semun_destroy); 00181 00182 shm_free(tasks); 00183 00184 while(callbacks->head) 00185 cb_remove(callbacks->head); 00186 shm_free(callbacks); 00187 }
| int cb_add | ( | cdp_cb_f | cb, | |
| void * | ptr | |||
| ) |
Definition at line 190 of file worker.c.
References callbacks, _cdp_cb_t::cb, cdp_cb_list_t::head, LOG_NO_MEM, _cdp_cb_t::next, _cdp_cb_t::prev, _cdp_cb_t::ptr, and cdp_cb_list_t::tail.
Referenced by diameter_peer_init().
00191 { 00192 cdp_cb_t *x; 00193 x = shm_malloc(sizeof(cdp_cb_t)); 00194 if (!x){ 00195 LOG_NO_MEM("shm",sizeof(cdp_cb_t)); 00196 return 0; 00197 } 00198 x->cb = cb; 00199 x->ptr = shm_malloc(sizeof(void*)); 00200 if (!x->ptr){ 00201 LOG_NO_MEM("shm",sizeof(void*)); 00202 return 0; 00203 } 00204 *(x->ptr) = ptr; 00205 x->next = 0; 00206 x->prev = callbacks->tail; 00207 if (callbacks->tail) callbacks->tail->next = x; 00208 callbacks->tail = x; 00209 if (!callbacks->head) callbacks->head = x; 00210 return 1; 00211 }
| void cb_remove | ( | cdp_cb_t * | cb | ) |
Definition at line 214 of file worker.c.
References callbacks, cdp_cb_list_t::head, _cdp_cb_t::next, _cdp_cb_t::prev, _cdp_cb_t::ptr, and cdp_cb_list_t::tail.
Referenced by worker_destroy().
00215 { 00216 cdp_cb_t *x; 00217 x = callbacks->head; 00218 while(x && x!=cb) x = x->next; 00219 if (!x) return; 00220 if (x->prev) x->prev->next = x->next; 00221 else callbacks->head = x->next; 00222 if (x->next) x->next->prev = x->prev; 00223 else callbacks->tail = x->prev; 00224 00225 if (x->ptr) shm_free(x->ptr); 00226 shm_free(x); 00227 }
| int put_task | ( | peer * | p, | |
| AAAMessage * | msg | |||
| ) |
Adds a message as a task to the task queue.
This blocks if the task queue is full, untill there is space.
| p | - the peer that the message was received from | |
| msg | - the message |
Definition at line 236 of file worker.c.
References cdp_lock_get(), cdp_lock_release(), task_queue_t::empty, task_queue_t::end, task_queue_t::full, task_queue_t::lock, task_queue_t::max, _task_t::msg, _task_t::p, task_queue_t::queue, shutdownx, task_queue_t::start, and tasks.
Referenced by Rcv_Process().
00237 { 00238 // LOG(L_CRIT,"+1+\n"); 00239 lock_get(tasks->lock); 00240 // LOG(L_CRIT,"+2+\n"); 00241 while ((tasks->end+1)%tasks->max == tasks->start){ 00242 // LOG(L_CRIT,"+3+\n"); 00243 lock_release(tasks->lock); 00244 // LOG(L_CRIT,"+4+\n"); 00245 if (*shutdownx) { 00246 cdp_lock_release(tasks->full); 00247 return 0; 00248 } 00249 // LOG(L_ERR,"+"); 00250 cdp_lock_get(tasks->full); 00251 // LOG(L_CRIT,"+5+\n"); 00252 lock_get(tasks->lock); 00253 } 00254 tasks->queue[tasks->end].p = p; 00255 tasks->queue[tasks->end].msg = msg; 00256 tasks->end = (tasks->end+1) % tasks->max; 00257 cdp_lock_release(tasks->empty); 00258 lock_release(tasks->lock); 00259 return 1; 00260 }
| task_t take_task | ( | ) |
Remove and return the first task from the queue (FIFO).
This blocks until there is something in the queue.
Definition at line 267 of file worker.c.
References cdp_lock_get(), cdp_lock_release(), task_queue_t::empty, task_queue_t::end, task_queue_t::full, task_queue_t::lock, task_queue_t::max, _task_t::msg, task_queue_t::queue, shutdownx, task_queue_t::start, and tasks.
Referenced by worker_process().
00268 { 00269 task_t t={0,0}; 00270 // LOG(L_CRIT,"-1-\n"); 00271 lock_get(tasks->lock); 00272 // LOG(L_CRIT,"-2-\n"); 00273 while(tasks->start == tasks->end){ 00274 // LOG(L_CRIT,"-3-\n"); 00275 lock_release(tasks->lock); 00276 // LOG(L_CRIT,"-4-\n"); 00277 if (*shutdownx) { 00278 cdp_lock_release(tasks->empty); 00279 return t; 00280 } 00281 // LOG(L_ERR,"-"); 00282 cdp_lock_get(tasks->empty); 00283 // LOG(L_CRIT,"-5-\n"); 00284 00285 lock_get(tasks->lock); 00286 // LOG(L_CRIT,"-6-\n"); 00287 } 00288 // LOG(L_CRIT,"-7-\n"); 00289 00290 t = tasks->queue[tasks->start]; 00291 tasks->queue[tasks->start].msg = 0; 00292 tasks->start = (tasks->start+1) % tasks->max; 00293 cdp_lock_release(tasks->full); 00294 lock_release(tasks->lock); 00295 return t; 00296 }
| void worker_poison_queue | ( | ) |
Poisons the worker queue.
Actually it just releases the task queue locks so that the workers get to evaluate if a shutdown is in process and exit.
Definition at line 303 of file worker.c.
References cdp_lock_release(), task_queue_t::empty, and tasks.
Referenced by diameter_peer_destroy(), and worker_process().
00304 { 00305 // int i; 00306 // for(i=0;i<config->workers;i++) 00307 cdp_lock_release(tasks->empty); 00308 }
| void worker_process | ( | int | id | ) |
This is the main worker process.
Takes tasks from the queue in a loop and processes them by calling the registered callbacks.
| id | - id of the worker |
Definition at line 316 of file worker.c.
References AAAFreeMessage(), callbacks, _cdp_cb_t::cb, dp_del_pid(), task_queue_t::end, cdp_cb_list_t::head, is_req, memlog, _task_t::msg, _cdp_cb_t::next, _task_t::p, _cdp_cb_t::ptr, shutdownx, task_queue_t::start, take_task(), tasks, and worker_poison_queue().
Referenced by diameter_peer_start().
00317 { 00318 task_t t; 00319 cdp_cb_t *cb; 00320 int r; 00321 LOG(L_INFO,"INFO:[%d] Worker process started...\n",id); 00322 /* init the application level for this child */ 00323 while(1){ 00324 if (shutdownx&&(*shutdownx)) break; 00325 t = take_task(); 00326 if (!t.msg) { 00327 if (shutdownx&&(*shutdownx)) break; 00328 LOG(L_INFO,"INFO:worker_process(): [%d] got empty task Q(%d/%d)\n",id,tasks->start,tasks->end); 00329 continue; 00330 } 00331 LOG(L_DBG,"DBG:worker_process(): [%d] got task Q(%d/%d)\n",id,tasks->start,tasks->end); 00332 r = is_req(t.msg); 00333 for(cb = callbacks->head;cb;cb = cb->next) 00334 (*(cb->cb))(t.p,t.msg,*(cb->ptr)); 00335 00336 if (r){ 00337 AAAFreeMessage(&(t.msg)); 00338 }else{ 00339 /* will be freed by the user in upper api */ 00340 /*AAAFreeMessage(&(t.msg));*/ 00341 } 00342 } 00343 worker_poison_queue(); 00344 LOG(L_INFO,"INFO:[%d]... Worker process finished\n",id); 00345 #ifdef CDP_FOR_SER 00346 #else 00347 #ifdef PKG_MALLOC 00348 LOG(memlog, "Worker[%d] Memory status (pkg):\n",id); 00349 //pkg_status(); 00350 #ifdef pkg_sums 00351 pkg_sums(); 00352 #endif 00353 #endif 00354 dp_del_pid(getpid()); 00355 #endif 00356 exit(0); 00357 }
queue of tasks
Definition at line 76 of file worker.c.
Referenced by cdp_lock_release(), put_task(), take_task(), worker_destroy(), worker_init(), worker_poison_queue(), and worker_process().
list of callbacks for message processing
Definition at line 78 of file worker.c.
Referenced by cb_add(), cb_remove(), worker_destroy(), worker_init(), and worker_process().
| struct sembuf cdp_sem_lock = { 0, -1, 0} |
sembuf structure to lock a semaphore
Definition at line 80 of file worker.c.
Referenced by cdp_lock_get().
| struct sembuf cdp_sem_unlock = { 0, +1, 0} |
| union semun cdp_semun_lock |
| union semun cdp_semun_unlock |
Referenced by cdp_lock_release().
| union semun cdp_semun_init |
Referenced by worker_init().
| union semun cdp_semun_destroy |
Referenced by worker_destroy().
1.5.2