00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043
00044
00045
00055 #include "p_persistency.h"
00056
00057 extern persistency_mode_t pcscf_persistency_mode;
00058 extern char* pcscf_persistency_location;
00061 extern int p_dialogs_hash_size;
00062 extern p_dialog_hash_slot *p_dialogs;
00064
00065 extern db_con_t* pcscf_db;
00066 extern db_func_t pcscf_dbf;
00067 extern int* registrar_snapshot_version;
00068 extern int* registrar_step_version;
00069 extern int* dialogs_snapshot_version;
00070 extern int* dialogs_step_version;
00071 extern int* subs_snapshot_version;
00072 extern int* subs_step_version;
00073 extern gen_lock_t* db_lock;
00074 extern char* pcscf_name;
00075
00076 int p_dump(bin_data* x, char* location, char* prepend_fname, data_type_t dt);
00077 int bin_dump_to_db(bin_data *x, data_type_t dt);
00078 int bin_dump_registrar_to_table(bin_data* x, int snapshot_version, int step_version);
00079 int bin_dump_dialogs_to_table(bin_data* x, int snapshot_version, int step_version);
00080 int bin_dump_subs_to_table(bin_data* x, int snapshot_version, int step_version);
00081 int bin_bulk_dump_to_table(data_type_t dt, int snapshot_version, int step_version, bin_data *x);
00082 int bin_cache_dump_registrar_to_table(int snapshot_version, int step_version);
00083 int bin_cache_dump_dialogs_to_table(int snapshot_version, int step_version);
00084 int bin_cache_dump_subs_to_table(int snapshot_version, int step_version);
00085 int delete_older_snapshots(char* table, char* node_id, data_type_t dt, int current_snapshot);
00086
00087 int p_load(bin_data *x, char* location, char* prepend_fname, data_type_t dt);
00088 int bin_load_from_db(bin_data *x, data_type_t dt);
00089 int bin_load_registrar_from_table(bin_data *x);
00090 int bin_load_dialogs_from_table(bin_data *x);
00091 int bin_load_subscriptions_from_table(bin_data *x);
00092 int bin_bulk_load_from_table(data_type_t dt, bin_data* x);
00093 int bin_cache_load_registrar_from_table();
00094 int bin_cache_load_dialogs_from_table();
00095 int bin_cache_load_subscriptions_from_table();
00096 int db_get_last_snapshot_version(char* table, char* node_id, data_type_t dt, int* version);
00097 int set_versions(data_type_t dt, int snapshot_version, int step_version);
00098
00099
00104 int make_snapshot_dialogs()
00105 {
00106 bin_data x;
00107 p_dialog *d;
00108 int i;
00109
00110
00111 if(pcscf_persistency_mode!=WITH_DATABASE_CACHE){
00112 if (!bin_alloc(&x,256)) goto error;
00113 for(i=0;i<p_dialogs_hash_size;i++){
00114 d_lock(i);
00115 d = p_dialogs[i].head;
00116 while(d){
00117 if (!bin_encode_p_dialog(&x,d)) goto error;
00118 d = d->next;
00119 }
00120 d_unlock(i);
00121 }
00122
00123 }
00124 i = p_dump(&x,pcscf_persistency_location,"pdialogs", P_DIALOGS);
00125
00126 if(pcscf_persistency_mode!=WITH_DATABASE_CACHE){
00127 bin_free(&x);
00128 }
00129 return i;
00130 error:
00131 return 0;
00132 }
00133
00138 int load_snapshot_dialogs()
00139 {
00140 bin_data x;
00141 p_dialog *d;
00142
00143 if (!p_load(&x,pcscf_persistency_location,"pdialogs",P_DIALOGS)) goto error;
00144
00145
00146 if(pcscf_persistency_mode!=WITH_DATABASE_CACHE){
00147
00148 x.max=0;
00149 LOG(L_INFO,"INFO:"M_NAME":load_snapshot_dlg: max %d len %d\n",x.max,x.len);
00150 while(x.max<x.len){
00151 d = bin_decode_p_dialog(&x);
00152 if (!d) return 0;
00153 LOG(L_INFO,"INFO:"M_NAME":load_snapshot_dlg: Loaded p_dialog for <%.*s>\n",d->host.len,d->host.s);
00154 d_lock(d->hash);
00155 d->prev = p_dialogs[d->hash].tail;
00156 d->next = 0;
00157 if (p_dialogs[d->hash].tail) p_dialogs[d->hash].tail->next = d;
00158 p_dialogs[d->hash].tail = d;
00159 if (!p_dialogs[d->hash].head) p_dialogs[d->hash].head = d;
00160 d_unlock(d->hash);
00161 }
00162 bin_free(&x);
00163 }
00164 return 1;
00165 error:
00166 return 0;
00167
00168 }
00169
00170
00176 void persistency_timer_dialogs(unsigned int ticks, void* param)
00177 {
00178 make_snapshot_dialogs();
00179
00180 if(dialogs_snapshot_version) (*dialogs_snapshot_version)++;
00181 }
00182
00183
00184
00185 extern int r_hash_size;
00186 extern r_hash_slot *registrar;
00192 int make_snapshot_registrar()
00193 {
00194 bin_data x;
00195 r_contact *c;
00196 int i;
00197
00198
00199 if(pcscf_persistency_mode!=WITH_DATABASE_CACHE){
00200 if (!bin_alloc(&x,256)) goto error;
00201 for(i=0;i<r_hash_size;i++){
00202 r_lock(i);
00203 c = registrar[i].head;
00204 while(c){
00205 if (!bin_encode_r_contact(&x,c)) goto error;
00206 c = c->next;
00207 }
00208 r_unlock(i);
00209 }
00210
00211 }
00212 i = p_dump(&x,pcscf_persistency_location,"pregistrar", P_REGISTRAR);
00213
00214 if(pcscf_persistency_mode!=WITH_DATABASE_CACHE){
00215 bin_free(&x);
00216 }
00217 return i;
00218 error:
00219 return 0;
00220 }
00221
00226 int load_snapshot_registrar()
00227 {
00228 bin_data x;
00229 r_contact *c;
00230
00231 if (!p_load(&x,pcscf_persistency_location,"pregistrar",P_REGISTRAR)) goto error;
00232
00233
00234 if(pcscf_persistency_mode!=WITH_DATABASE_CACHE){
00235
00236 x.max=0;
00237 LOG(L_INFO,"INFO:"M_NAME":load_snapshot_registrar: max %d len %d\n",x.max,x.len);
00238 while(x.max<x.len){
00239 c = bin_decode_r_contact(&x);
00240 if (!c) return 0;
00241 LOG(L_INFO,"INFO:"M_NAME":load_snapshot_registrar: Loaded r_contact for <%.*s>\n",c->uri.len,c->uri.s);
00242 r_lock(c->hash);
00243 c->prev = registrar[c->hash].tail;
00244 c->next = 0;
00245 if (registrar[c->hash].tail) registrar[c->hash].tail->next = c;
00246 registrar[c->hash].tail = c;
00247 if (!registrar[c->hash].head) registrar[c->hash].head = c;
00248 r_unlock(c->hash);
00249 }
00250 bin_free(&x);
00251 }
00252 return 1;
00253 error:
00254 return 0;
00255
00256 }
00257
00258
00264 void persistency_timer_registrar(unsigned int ticks, void* param)
00265 {
00266 make_snapshot_registrar();
00267
00268 if(registrar_snapshot_version) (*registrar_snapshot_version)++;
00269 }
00270
00271
00272
00273
00274
00275 extern int subscriptions_hash_size;
00276 extern r_subscription_hash_slot *subscriptions;
00282 int make_snapshot_subscriptions()
00283 {
00284 bin_data x;
00285 r_subscription *s;
00286 int i;
00287
00288
00289 if(pcscf_persistency_mode!=WITH_DATABASE_CACHE){
00290 if (!bin_alloc(&x,256)) goto error;
00291 for(i=0;i<subscriptions_hash_size;i++){
00292 subs_lock(i);
00293 s = subscriptions[i].head;
00294 while(s){
00295 if (!bin_encode_r_subscription(&x,s)) goto error;
00296 s = s->next;
00297 }
00298 subs_unlock(i);
00299 }
00300
00301 }
00302 i = p_dump(&x,pcscf_persistency_location,"psubscriptions", P_SUBSCRIPTIONS);
00303
00304 if(pcscf_persistency_mode!=WITH_DATABASE_CACHE){
00305 bin_free(&x);
00306 }
00307 return i;
00308 error:
00309 return 0;
00310 }
00311
00316 int load_snapshot_subscriptions()
00317 {
00318 bin_data x;
00319 r_subscription *s;
00320
00321 if (!p_load(&x,pcscf_persistency_location,"psubscriptions",P_SUBSCRIPTIONS)) goto error;
00322
00323
00324 if(pcscf_persistency_mode!=WITH_DATABASE_CACHE){
00325
00326 x.max=0;
00327 LOG(L_INFO,"INFO:"M_NAME":load_snapshot_subscriptions: max %d len %d\n",x.max,x.len);
00328 while(x.max<x.len){
00329 s = bin_decode_r_subscription(&x);
00330 if (!s) return 0;
00331 LOG(L_INFO,"INFO:"M_NAME":load_snapshot_subscriptions: Loaded r_subscription for <%.*s>\n",s->req_uri.len,s->req_uri.s);
00332 subs_lock(s->hash);
00333 s->prev = subscriptions[s->hash].tail;
00334 s->next = 0;
00335 if (subscriptions[s->hash].tail) subscriptions[s->hash].tail->next = s;
00336 subscriptions[s->hash].tail = s;
00337 if (!subscriptions[s->hash].head) subscriptions[s->hash].head = s;
00338 subs_unlock(s->hash);
00339 }
00340 bin_free(&x);
00341 }
00342 return 1;
00343 error:
00344 return 0;
00345
00346 }
00347
00348
00354 void persistency_timer_subscriptions(unsigned int ticks, void* param)
00355 {
00356 make_snapshot_subscriptions();
00357
00358 if(subs_snapshot_version) (*subs_snapshot_version)++;
00359 }
00360
00361
00362
00363
00364
00374 int p_dump(bin_data* x, char* location, char* prepend_fname, data_type_t dt){
00375 switch (pcscf_persistency_mode){
00376 case NO_PERSISTENCY:
00377 LOG(L_ERR,"ERR:"M_NAME":p_dump: Snapshot done but persistency was disabled...\n");
00378 return 0;
00379 case WITH_FILES:
00380 return bin_dump_to_file(x,location,prepend_fname);
00381 case WITH_DATABASE_BULK:
00382 return bin_dump_to_db(x, dt);
00383 case WITH_DATABASE_CACHE:
00384 return bin_dump_to_db(NULL, dt);
00385 default:
00386 LOG(L_ERR,"ERR:"M_NAME":p_dump: Snapshot done but no such mode %d\n",pcscf_persistency_mode);
00387 return 0;
00388 }
00389 }
00390
00398 int bin_dump_to_db(bin_data *x, data_type_t dt){
00399
00400 int snapshot_version;
00401 int step_version;
00402
00403 switch(dt){
00404 case P_REGISTRAR:
00405 snapshot_version=*registrar_snapshot_version;
00406 step_version=*registrar_step_version;
00407 return bin_dump_registrar_to_table(x, snapshot_version, step_version);
00408 case P_DIALOGS:
00409 snapshot_version=*dialogs_snapshot_version;
00410 step_version=*dialogs_step_version;
00411 return bin_dump_dialogs_to_table(x, snapshot_version, step_version);
00412 case P_SUBSCRIPTIONS:
00413 snapshot_version=*subs_snapshot_version;
00414 step_version=*subs_step_version;
00415 return bin_dump_subs_to_table(x, snapshot_version, step_version);
00416 default:
00417 LOG(L_ERR,"ERR:"M_NAME":bin_dump_to_db: No such information to dump %d\n", dt);
00418 return 0;
00419 }
00420 }
00421
00430 int bin_dump_registrar_to_table(bin_data *x, int snapshot_version, int step_version){
00431
00432 if(x){
00433 return bin_bulk_dump_to_table(P_REGISTRAR, snapshot_version, step_version, x);
00434 }
00435 else{
00436 return bin_cache_dump_registrar_to_table(snapshot_version, step_version);
00437 }
00438 }
00439
00448 int bin_dump_dialogs_to_table(bin_data *x, int snapshot_version, int step_version){
00449
00450 if(x){
00451 return bin_bulk_dump_to_table(P_DIALOGS, snapshot_version, step_version, x);
00452 }
00453 else{
00454 return bin_cache_dump_dialogs_to_table(snapshot_version, step_version);
00455 }
00456 }
00457
00466 int bin_dump_subs_to_table(bin_data *x, int snapshot_version, int step_version){
00467
00468 if(x){
00469 return bin_bulk_dump_to_table(P_SUBSCRIPTIONS, snapshot_version, step_version, x);
00470 }
00471 else{
00472 return bin_cache_dump_subs_to_table(snapshot_version, step_version);
00473 }
00474 }
00475
00484 int bin_bulk_dump_to_table(data_type_t dt, int snapshot_version, int step_version, bin_data *x){
00485 db_key_t keys[5];
00486 db_val_t vals[5];
00487 int len;
00488
00489
00490 keys[0] = "node_id";
00491 keys[1] = "data_type";
00492 keys[2] = "snapshot_version";
00493 keys[3] = "step_version";
00494
00495 keys[4] = "data";
00496
00497 vals[0].type = DB_STR;
00498 vals[0].nul = 0;
00499 vals[0].val.str_val.s=pcscf_name;
00500 len = strlen(pcscf_name);
00501 vals[0].val.str_val.len=MIN(len, 64);
00502
00503 vals[1].type = DB_INT;
00504 vals[1].nul = 0;
00505 vals[1].val.int_val=dt;
00506
00507 vals[2].type = DB_INT;
00508 vals[2].nul = 0;
00509 vals[2].val.int_val=snapshot_version;
00510
00511 vals[3].type = DB_INT;
00512 vals[3].nul = 0;
00513 vals[3].val.int_val=step_version;
00514
00515 str d = {x->s, x->len};
00516 vals[4].type = DB_BLOB;
00517 vals[4].nul = 0;
00518 vals[4].val.blob_val = d;
00519
00520
00521 lock_get(db_lock);
00522
00523 if (pcscf_dbf.use_table(pcscf_db, "snapshot") < 0) {
00524 LOG(L_ERR, "ERR:"M_NAME":bin_bulk_dump_to_table(): Error in use_table\n");
00525 lock_release(db_lock);
00526 return 0;
00527 }
00528
00529 if (pcscf_dbf.insert(pcscf_db, keys, vals, 5) < 0) {
00530 LOG(L_ERR, "ERR:"M_NAME":bin_bulk_dump_to_table(): Error while inserting on snapshot table\n");
00531 lock_release(db_lock);
00532 return 0;
00533 }
00534
00535
00536 if (delete_older_snapshots("snapshot", pcscf_name, dt, snapshot_version)!=1){
00537 LOG(L_ERR, "ERR:"M_NAME":bin_bulk_dump_to_table(): Error while deleting older snapshots from snapshot table\n");
00538 lock_release(db_lock);
00539 return 0;
00540 }
00541
00542
00543 lock_release(db_lock);
00544
00545 return 1;
00546 }
00547
00555 int bin_cache_dump_registrar_to_table(int snapshot_version, int step_version){
00556 bin_data x;
00557 r_contact *contact;
00558 int i;
00559 int len;
00560 char aux1[10];
00561 char aux2[10];
00562
00563
00564 lock_get(db_lock);
00565
00566 if (pcscf_dbf.use_table(pcscf_db, "snapshot") < 0) {
00567 LOG(L_ERR, "ERR:"M_NAME":bin_cache_dump_registrar_to_table(): Error in use_table\n");
00568 goto error;
00569 }
00570
00571 for(i=0;i<r_hash_size;i++){
00572 r_lock(i);
00573 contact = registrar[i].head;
00574 while(contact){
00575 if (!bin_alloc(&x,128)){
00576 r_unlock(i);
00577 goto error;
00578 }
00579 if (!bin_encode_r_contact(&x,contact)){
00580 r_unlock(i);
00581 goto error;
00582 }
00583
00584 db_key_t keys[8];
00585 db_val_t vals[8];
00586
00587
00588 keys[0] = "node_id";
00589 keys[1] = "data_type";
00590 keys[2] = "snapshot_version";
00591 keys[3] = "step_version";
00592 keys[4] = "record_id_1";
00593 keys[5] = "record_id_2";
00594 keys[6] = "record_id_3";
00595
00596 keys[7] = "data";
00597
00598 vals[0].type = DB_STR;
00599 vals[0].nul = 0;
00600 vals[0].val.str_val.s=pcscf_name;
00601 len = strlen(pcscf_name);
00602 vals[0].val.str_val.len=MIN(len, 64);
00603
00604 vals[1].type = DB_INT;
00605 vals[1].nul = 0;
00606 vals[1].val.int_val=P_REGISTRAR;
00607
00608 vals[2].type = DB_INT;
00609 vals[2].nul = 0;
00610 vals[2].val.int_val=snapshot_version;
00611
00612 vals[3].type = DB_INT;
00613 vals[3].nul = 0;
00614 vals[3].val.int_val=step_version;
00615
00616 vals[4].type = DB_STR;
00617 vals[4].nul = 0;
00618 vals[4].val.str_val.s=contact->host.s;
00619 vals[4].val.str_val.len=MIN(contact->host.len, 256);
00620
00621 vals[5].type = DB_STR;
00622 vals[5].nul = 0;
00623 sprintf(aux1, "%d", contact->port);
00624 vals[5].val.str_val.s=aux1;
00625 vals[5].val.str_val.len=strlen(aux1);
00626
00627 vals[6].type = DB_STR;
00628 vals[6].nul = 0;
00629 sprintf(aux2, "%d", contact->transport);
00630 vals[6].val.str_val.s=aux2;
00631 vals[6].val.str_val.len=strlen(aux2);
00632
00633 vals[7].type = DB_BLOB;
00634 vals[7].nul = 0;
00635 str data={x.s, x.len};
00636 vals[7].val.blob_val = data;
00637
00638 if (pcscf_dbf.insert(pcscf_db, keys, vals, 8) < 0) {
00639 LOG(L_ERR, "ERR:"M_NAME":bin_cache_dump_registrar_to_table(): Error while inserting on snapshot table\n");
00640 r_unlock(i);
00641 goto error;
00642 }
00643 bin_free(&x);
00644
00645 contact = contact->next;
00646 }
00647 r_unlock(i);
00648 }
00649
00650
00651 if (delete_older_snapshots("snapshot", pcscf_name, P_REGISTRAR, snapshot_version)!=1){
00652 LOG(L_ERR, "ERR:"M_NAME":bin_cache_dump_registrar_to_table(): Error while deleting older snapshots from snapshot table\n");
00653 goto error;
00654 }
00655
00656
00657 lock_release(db_lock);
00658
00659 return 1;
00660
00661 error:
00662 lock_release(db_lock);
00663 return 0;
00664 }
00665
00673 int bin_cache_dump_dialogs_to_table(int snapshot_version, int step_version){
00674 bin_data x;
00675 p_dialog *dialog;
00676 int i;
00677 int len;
00678 char aux1[10];
00679 char aux2[10];
00680
00681
00682 lock_get(db_lock);
00683
00684 if (pcscf_dbf.use_table(pcscf_db, "snapshot") < 0) {
00685 LOG(L_ERR, "ERR:"M_NAME":bin_cache_dump_dialogs_to_table(): Error in use_table\n");
00686 goto error;
00687 }
00688
00689 for(i=0;i<p_dialogs_hash_size;i++){
00690 d_lock(i);
00691 dialog = p_dialogs[i].head;
00692 while(dialog){
00693 if (!bin_alloc(&x,128)){
00694 d_unlock(i);
00695 goto error;
00696 }
00697 if (!bin_encode_p_dialog(&x,dialog)){
00698 d_unlock(i);
00699 goto error;
00700 }
00701
00702 db_key_t keys[9];
00703 db_val_t vals[9];
00704
00705
00706 keys[0] = "node_id";
00707 keys[1] = "data_type";
00708 keys[2] = "snapshot_version";
00709 keys[3] = "step_version";
00710 keys[4] = "record_id_1";
00711 keys[5] = "record_id_2";
00712 keys[6] = "record_id_3";
00713 keys[7] = "record_id_4";
00714 keys[8] = "data";
00715
00716 vals[0].type = DB_STR;
00717 vals[0].nul = 0;
00718 vals[0].val.str_val.s=pcscf_name;
00719 len = strlen(pcscf_name);
00720 vals[0].val.str_val.len=MIN(len, 64);
00721
00722 vals[1].type = DB_INT;
00723 vals[1].nul = 0;
00724 vals[1].val.int_val=P_DIALOGS;
00725
00726 vals[2].type = DB_INT;
00727 vals[2].nul = 0;
00728 vals[2].val.int_val=snapshot_version;
00729
00730 vals[3].type = DB_INT;
00731 vals[3].nul = 0;
00732 vals[3].val.int_val=step_version;
00733
00734 vals[4].type = DB_STR;
00735 vals[4].nul = 0;
00736 vals[4].val.str_val.s=dialog->call_id.s;
00737 vals[4].val.str_val.len=MIN(dialog->call_id.len, 256);
00738
00739 vals[5].type = DB_STR;
00740 vals[5].nul = 0;
00741 vals[5].val.str_val.s=dialog->host.s;
00742 vals[5].val.str_val.len=MIN(dialog->host.len, 256);
00743
00744 vals[6].type = DB_STR;
00745 vals[6].nul = 0;
00746 sprintf(aux1, "%d", dialog->port);
00747 vals[6].val.str_val.s=aux1;
00748 vals[6].val.str_val.len=strlen(aux1);
00749
00750 vals[7].type = DB_STR;
00751 vals[7].nul = 0;
00752 sprintf(aux2, "%d", dialog->transport);
00753 vals[7].val.str_val.s=aux2;
00754 vals[7].val.str_val.len=strlen(aux2);
00755
00756 vals[8].type = DB_BLOB;
00757 vals[8].nul = 0;
00758 str data={x.s, x.len};
00759 vals[8].val.blob_val = data;
00760
00761 if (pcscf_dbf.insert(pcscf_db, keys, vals, 9) < 0) {
00762 LOG(L_ERR, "ERR:"M_NAME":bin_cache_dump_dialogs_to_table(): Error while inserting on snapshot table\n");
00763 d_unlock(i);
00764 goto error;
00765 }
00766 bin_free(&x);
00767
00768 dialog = dialog->next;
00769 }
00770 d_unlock(i);
00771 }
00772
00773
00774 if (delete_older_snapshots("snapshot", pcscf_name, P_DIALOGS, snapshot_version)!=1){
00775 LOG(L_ERR, "ERR:"M_NAME":bin_cache_dump_dialogs_to_table(): Error while deleting older snapshots from snapshot table\n");
00776 goto error;
00777 }
00778
00779
00780 lock_release(db_lock);
00781
00782 return 1;
00783
00784 error:
00785 lock_release(db_lock);
00786 return 0;
00787 }
00788
00796 int bin_cache_dump_subs_to_table(int snapshot_version, int step_version){
00797 bin_data x;
00798 r_subscription *sub;
00799 int i;
00800 int len;
00801
00802
00803 lock_get(db_lock);
00804
00805 if (pcscf_dbf.use_table(pcscf_db, "snapshot") < 0) {
00806 LOG(L_ERR, "ERR:"M_NAME":bin_cache_dump_subs_to_table(): Error in use_table\n");
00807 goto error;
00808 }
00809
00810 for(i=0;i<subscriptions_hash_size;i++){
00811 subs_lock(i);
00812 sub = subscriptions[i].head;
00813 while(sub){
00814 if (!bin_alloc(&x,128)){
00815 subs_unlock(i);
00816 goto error;
00817 }
00818 if (!bin_encode_r_subscription(&x,sub)){
00819 subs_unlock(i);
00820 goto error;
00821 }
00822
00823 db_key_t keys[6];
00824 db_val_t vals[6];
00825
00826
00827 keys[0] = "node_id";
00828 keys[1] = "data_type";
00829 keys[2] = "snapshot_version";
00830 keys[3] = "step_version";
00831 keys[4] = "record_id_1";
00832
00833 keys[5] = "data";
00834
00835 vals[0].type = DB_STR;
00836 vals[0].nul = 0;
00837 vals[0].val.str_val.s=pcscf_name;
00838 len = strlen(pcscf_name);
00839 vals[0].val.str_val.len=MIN(len, 64);
00840
00841 vals[1].type = DB_INT;
00842 vals[1].nul = 0;
00843 vals[1].val.int_val=P_SUBSCRIPTIONS;
00844
00845 vals[2].type = DB_INT;
00846 vals[2].nul = 0;
00847 vals[2].val.int_val=snapshot_version;
00848
00849 vals[3].type = DB_INT;
00850 vals[3].nul = 0;
00851 vals[3].val.int_val=step_version;
00852
00853 vals[4].type = DB_STR;
00854 vals[4].nul = 0;
00855 vals[4].val.str_val.s=sub->req_uri.s;
00856 vals[4].val.str_val.len=MIN(sub->req_uri.len, 256);
00857
00858 vals[5].type = DB_BLOB;
00859 vals[5].nul = 0;
00860 str data={x.s, x.len};
00861 vals[5].val.blob_val = data;
00862
00863 if (pcscf_dbf.insert(pcscf_db, keys, vals, 6) < 0) {
00864 LOG(L_ERR, "ERR:"M_NAME":bin_cache_dump_subscriptions_to_table(): Error while inserting on snapshot table\n");
00865 subs_unlock(i);
00866 goto error;
00867 }
00868 bin_free(&x);
00869
00870 sub = sub->next;
00871 }
00872 subs_unlock(i);
00873 }
00874
00875
00876 if (delete_older_snapshots("snapshot", pcscf_name, P_SUBSCRIPTIONS, snapshot_version)!=1){
00877 LOG(L_ERR, "ERR:"M_NAME":bin_cache_dump_subscriptions_to_table(): Error while deleting older snapshots from snapshot table\n");
00878 goto error;
00879 }
00880
00881
00882 lock_release(db_lock);
00883
00884 return 1;
00885
00886 error:
00887 lock_release(db_lock);
00888 return 0;
00889 }
00890
00891
00892 int bin_db_keep_count=1;
00903 int delete_older_snapshots(char* table, char* node_id, data_type_t dt, int current_snapshot){
00904
00905 db_key_t query_cols[3];
00906 db_op_t query_ops[3];
00907 db_val_t query_vals[3];
00908 int len;
00909
00910 query_cols[0] = "snapshot_version";
00911 query_ops[0] = OP_LEQ;
00912 query_vals[0].type = DB_INT;
00913 query_vals[0].nul = 0;
00914 query_vals[0].val.int_val = current_snapshot - bin_db_keep_count;
00915
00916 query_cols[1] = "node_id";
00917 query_ops[1] = OP_EQ;
00918 query_vals[1].type = DB_STR;
00919 query_vals[1].nul = 0;
00920 len = strlen(node_id);
00921 query_vals[1].val.str_val.s=node_id;
00922 query_vals[1].val.str_val.len=MIN(len, 256);
00923
00924 query_cols[2] = "data_type";
00925 query_ops[2] = OP_EQ;
00926 query_vals[2].type = DB_INT;
00927 query_vals[2].nul = 0;
00928 query_vals[2].val.int_val = dt;
00929
00930 if (pcscf_dbf.use_table(pcscf_db, table) < 0) {
00931 LOG(L_ERR, "ERR:"M_NAME":delete_older_snapshots(): Error in use_table\n");
00932 return 0;
00933 }
00934
00935 if (pcscf_dbf.delete(pcscf_db, query_cols, query_ops, query_vals, 3) < 0) {
00936 LOG(L_ERR, "ERR:"M_NAME":delete_older_snapshots(): Error while deleting older snapshots\n");
00937 return 0;
00938 }
00939
00940 return 1;
00941 }
00942
00943
00944
00956 int p_load(bin_data *x, char* location, char* prepend_fname, data_type_t dt){
00957 switch (pcscf_persistency_mode){
00958 case NO_PERSISTENCY:
00959 LOG(L_ERR,"ERR:"M_NAME":p_load: Persistency support was disabled\n");
00960 return 0;
00961 case WITH_FILES:
00962 return bin_load_from_file(x,location,prepend_fname);
00963 case WITH_DATABASE_BULK:
00964 return bin_load_from_db(x, dt);
00965 case WITH_DATABASE_CACHE:
00966 return bin_load_from_db(NULL, dt);
00967 default:
00968 LOG(L_ERR,"ERR:"M_NAME":p_load: Can't resume because no such mode %d\n",pcscf_persistency_mode);
00969 return 0;
00970 }
00971 }
00972
00979 int bin_load_from_db(bin_data *x, data_type_t dt){
00980 switch(dt){
00981 case P_REGISTRAR:
00982 return bin_load_registrar_from_table(x);
00983 case P_DIALOGS:
00984 return bin_load_dialogs_from_table(x);
00985 case P_SUBSCRIPTIONS:
00986 return bin_load_subscriptions_from_table(x);
00987 default:
00988 LOG(L_ERR,"ERR:"M_NAME":bin_load_from_db: No such information to load %d\n", dt);
00989 return 0;
00990 }
00991 }
00992
00998 int bin_load_registrar_from_table(bin_data *x){
00999 if(x){
01000 return bin_bulk_load_from_table(P_REGISTRAR, x);
01001 }
01002 else{
01003 return bin_cache_load_registrar_from_table();
01004 }
01005 }
01006
01012 int bin_load_dialogs_from_table(bin_data *x){
01013 if(x){
01014 return bin_bulk_load_from_table(P_DIALOGS, x);
01015 }
01016 else{
01017 return bin_cache_load_dialogs_from_table();
01018 }
01019 }
01020
01026 int bin_load_subscriptions_from_table(bin_data *x){
01027 if(x){
01028 return bin_bulk_load_from_table(P_SUBSCRIPTIONS, x);
01029 }
01030 else{
01031 return bin_cache_load_subscriptions_from_table();
01032 }
01033 }
01034
01041 int bin_bulk_load_from_table(data_type_t dt, bin_data* x){
01042 int snapshot_version;
01043 int r;
01044 int len;
01045 bin_alloc(x,1024);
01046
01047
01048 lock_get(db_lock);
01049
01050 if((r=db_get_last_snapshot_version("snapshot", pcscf_name, dt, &snapshot_version))==0){
01051 LOG(L_ERR,"ERR:"M_NAME":bin_bulk_load_from_table: Error while getting snapshot version\n");
01052 lock_release(db_lock);
01053 return 0;
01054 }
01055 else if(r==-1){
01056 LOG(L_INFO,"INFO:"M_NAME":bin_bulk_load_from_table: snapshot table empty\n");
01057 lock_release(db_lock);
01058 return 1;
01059 }
01060
01061
01062 set_versions(dt, snapshot_version+1, 0);
01063
01064 db_key_t keys[3];
01065 db_val_t vals[3];
01066 db_op_t ops[3];
01067 db_key_t result_cols[1];
01068
01069 db_res_t *res = NULL;
01070
01071 keys[0] = "node_id";
01072 keys[1] = "data_type";
01073 keys[2] = "snapshot_version";
01074
01075 ops[0] = OP_EQ;
01076 ops[1] = OP_EQ;
01077 ops[2] = OP_EQ;
01078
01079 vals[0].type = DB_STR;
01080 vals[0].nul = 0;
01081 vals[0].val.str_val.s = pcscf_name;
01082 len = strlen(pcscf_name);
01083 vals[0].val.str_val.len = MIN(len, 256);
01084
01085 vals[1].type = DB_INT;
01086 vals[1].nul = 0;
01087 vals[1].va