p_persistency.c

Go to the documentation of this file.
00001 /*
00002  * $Id: p_persistency.c 236 2007-04-18 12:53:40Z vingarzan $
00003  *
00004  * Copyright (C) 2004-2007 FhG Fokus
00005  *
00006  * This file is part of Open IMS Core - an open source IMS CSCFs & HSS
00007  * implementation
00008  *
00009  * Open IMS Core is free software; you can redistribute it and/or modify
00010  * it under the terms of the GNU General Public License as published by
00011  * the Free Software Foundation; either version 2 of the License, or
00012  * (at your option) any later version.
00013  *
00014  * For a license to use the Open IMS Core software under conditions
00015  * other than those described here, or to purchase support for this
00016  * software, please contact Fraunhofer FOKUS by e-mail at the following
00017  * addresses:
00018  *     info@open-ims.org
00019  *
00020  * Open IMS Core is distributed in the hope that it will be useful,
00021  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00022  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00023  * GNU General Public License for more details.
00024  *
00025  * It has to be noted that this Open Source IMS Core System is not
00026  * intended to become or act as a product in a commercial context! Its
00027  * sole purpose is to provide an IMS core reference implementation for
00028  * IMS technology testing and IMS application prototyping for research
00029  * purposes, typically performed in IMS test-beds.
00030  *
00031  * Users of the Open Source IMS Core System have to be aware that IMS
00032  * technology may be subject of patents and licence terms, as being
00033  * specified within the various IMS-related IETF, ITU-T, ETSI, and 3GPP
00034  * standards. Thus all Open IMS Core users have to take notice of this
00035  * fact and have to agree to check out carefully before installing,
00036  * using and extending the Open Source IMS Core System, if related
00037  * patents and licences may become applicable to the intended usage
00038  * context.
00039  *
00040  * You should have received a copy of the GNU General Public License
00041  * along with this program; if not, write to the Free Software
00042  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00043  *
00044  */
00045 
00055 #include "p_persistency.h"
00056 
00057 extern persistency_mode_t pcscf_persistency_mode;
00058 extern char* pcscf_persistency_location;        
00061 extern int p_dialogs_hash_size;                     
00062 extern p_dialog_hash_slot *p_dialogs;               
00064 /*****  DB related stuff *****/
00065 extern db_con_t* pcscf_db; 
00066 extern db_func_t pcscf_dbf; 
00067 extern int* registrar_snapshot_version;
00068 extern int* registrar_step_version;
00069 extern int* dialogs_snapshot_version;
00070 extern int* dialogs_step_version;
00071 extern int* subs_snapshot_version;
00072 extern int* subs_step_version;
00073 extern gen_lock_t* db_lock;
00074 extern char* pcscf_name;
00075 
00076 int p_dump(bin_data* x, char* location, char* prepend_fname, data_type_t dt);
00077 int bin_dump_to_db(bin_data *x, data_type_t dt);
00078 int bin_dump_registrar_to_table(bin_data* x, int snapshot_version, int step_version);
00079 int bin_dump_dialogs_to_table(bin_data* x, int snapshot_version, int step_version);
00080 int bin_dump_subs_to_table(bin_data* x, int snapshot_version, int step_version);
00081 int bin_bulk_dump_to_table(data_type_t dt, int snapshot_version, int step_version, bin_data *x);
00082 int bin_cache_dump_registrar_to_table(int snapshot_version, int step_version);
00083 int bin_cache_dump_dialogs_to_table(int snapshot_version, int step_version);
00084 int bin_cache_dump_subs_to_table(int snapshot_version, int step_version);
00085 int delete_older_snapshots(char* table, char* node_id, data_type_t dt, int current_snapshot);
00086  
00087 int p_load(bin_data *x, char* location, char* prepend_fname, data_type_t dt);
00088 int bin_load_from_db(bin_data *x, data_type_t dt);
00089 int bin_load_registrar_from_table(bin_data *x);
00090 int bin_load_dialogs_from_table(bin_data *x);
00091 int bin_load_subscriptions_from_table(bin_data *x);
00092 int bin_bulk_load_from_table(data_type_t dt, bin_data* x);
00093 int bin_cache_load_registrar_from_table();
00094 int bin_cache_load_dialogs_from_table();
00095 int bin_cache_load_subscriptions_from_table();
00096 int db_get_last_snapshot_version(char* table, char* node_id, data_type_t dt, int* version);
00097 int set_versions(data_type_t dt, int snapshot_version, int step_version);
00098 /*********************/
00099 
00104 int make_snapshot_dialogs()
00105 {
00106     bin_data x;
00107     p_dialog *d;
00108     int i;  
00109     
00110     /*In WITH_DATABASE_CACHE mode, serialize each hashtable element separately */
00111     if(pcscf_persistency_mode!=WITH_DATABASE_CACHE){
00112         if (!bin_alloc(&x,256)) goto error;     
00113         for(i=0;i<p_dialogs_hash_size;i++){
00114             d_lock(i);
00115             d = p_dialogs[i].head;
00116             while(d){
00117                 if (!bin_encode_p_dialog(&x,d)) goto error;
00118                 d = d->next;
00119             }
00120             d_unlock(i);
00121         }
00122         //bin_print(&x);
00123     }
00124     i = p_dump(&x,pcscf_persistency_location,"pdialogs", P_DIALOGS);
00125     //i = bin_dump(&x,pcscf_persistency_mode,pcscf_persistency_location,"pdialogs");
00126     if(pcscf_persistency_mode!=WITH_DATABASE_CACHE){        
00127         bin_free(&x);
00128     }
00129     return i;
00130 error:
00131     return 0;
00132 }  
00133 
00138 int load_snapshot_dialogs()
00139 {
00140     bin_data x;
00141     p_dialog *d;
00142     
00143     if (!p_load(&x,pcscf_persistency_location,"pdialogs",P_DIALOGS)) goto error;
00144     //if (!bin_load(&x,pcscf_persistency_mode,pcscf_persistency_location,"pdialogs")) goto error;
00145     
00146     if(pcscf_persistency_mode!=WITH_DATABASE_CACHE){
00147         //bin_print(&x);
00148         x.max=0;
00149         LOG(L_INFO,"INFO:"M_NAME":load_snapshot_dlg: max %d len %d\n",x.max,x.len);
00150         while(x.max<x.len){
00151             d = bin_decode_p_dialog(&x);
00152             if (!d) return 0;
00153             LOG(L_INFO,"INFO:"M_NAME":load_snapshot_dlg: Loaded p_dialog for <%.*s>\n",d->host.len,d->host.s);
00154             d_lock(d->hash);
00155             d->prev = p_dialogs[d->hash].tail;
00156             d->next = 0;
00157             if (p_dialogs[d->hash].tail) p_dialogs[d->hash].tail->next = d;
00158             p_dialogs[d->hash].tail = d;
00159             if (!p_dialogs[d->hash].head) p_dialogs[d->hash].head = d;
00160             d_unlock(d->hash);
00161         }
00162         bin_free(&x);
00163     }
00164     return 1;
00165 error:
00166     return 0;
00167     
00168 }
00169 
00170 
00176 void persistency_timer_dialogs(unsigned int ticks, void* param)
00177 {
00178     make_snapshot_dialogs();
00179     
00180     if(dialogs_snapshot_version) (*dialogs_snapshot_version)++;         
00181 }
00182 
00183 
00184 
00185 extern int r_hash_size;                     
00186 extern r_hash_slot *registrar;              
00192 int make_snapshot_registrar()
00193 {
00194     bin_data x;
00195     r_contact *c;
00196     int i;  
00197     
00198     /*In WITH_DATABASE_CACHE mode, serialize each hashtable element separately */
00199     if(pcscf_persistency_mode!=WITH_DATABASE_CACHE){
00200         if (!bin_alloc(&x,256)) goto error;     
00201         for(i=0;i<r_hash_size;i++){
00202             r_lock(i);
00203             c = registrar[i].head;
00204             while(c){
00205                 if (!bin_encode_r_contact(&x,c)) goto error;
00206                 c = c->next;
00207             }
00208             r_unlock(i);
00209         }
00210         //bin_print(&x);
00211     }
00212     i = p_dump(&x,pcscf_persistency_location,"pregistrar", P_REGISTRAR);
00213     //i = bin_dump(&x,pcscf_persistency_mode,pcscf_persistency_location,"pregistrar");
00214     if(pcscf_persistency_mode!=WITH_DATABASE_CACHE){        
00215         bin_free(&x);
00216     }
00217     return i;
00218 error:
00219     return 0;
00220 }  
00221 
00226 int load_snapshot_registrar()
00227 {
00228     bin_data x;
00229     r_contact *c;
00230     
00231     if (!p_load(&x,pcscf_persistency_location,"pregistrar",P_REGISTRAR)) goto error;
00232     //if (!bin_load(&x,pcscf_persistency_mode,pcscf_persistency_location,"pregistrar")) goto error;
00233     
00234     if(pcscf_persistency_mode!=WITH_DATABASE_CACHE){
00235         //bin_print(&x);
00236         x.max=0;
00237         LOG(L_INFO,"INFO:"M_NAME":load_snapshot_registrar: max %d len %d\n",x.max,x.len);
00238         while(x.max<x.len){
00239             c = bin_decode_r_contact(&x);
00240             if (!c) return 0;
00241             LOG(L_INFO,"INFO:"M_NAME":load_snapshot_registrar: Loaded r_contact for <%.*s>\n",c->uri.len,c->uri.s);
00242             r_lock(c->hash);
00243             c->prev = registrar[c->hash].tail;
00244             c->next = 0;
00245             if (registrar[c->hash].tail) registrar[c->hash].tail->next = c;
00246             registrar[c->hash].tail = c;
00247             if (!registrar[c->hash].head) registrar[c->hash].head = c;
00248             r_unlock(c->hash);
00249         }
00250         bin_free(&x);
00251     }
00252     return 1;
00253 error:
00254     return 0;
00255     
00256 }
00257 
00258 
00264 void persistency_timer_registrar(unsigned int ticks, void* param)
00265 {
00266     make_snapshot_registrar();   
00267     
00268     if(registrar_snapshot_version) (*registrar_snapshot_version)++; 
00269 }
00270 
00271 
00272 
00273 
00274 
00275 extern int subscriptions_hash_size;                     
00276 extern r_subscription_hash_slot *subscriptions;             
00282 int make_snapshot_subscriptions()
00283 {
00284     bin_data x;
00285     r_subscription *s;
00286     int i;  
00287     
00288     /*In WITH_DATABASE_CACHE mode, serialize each hashtable element separately */
00289     if(pcscf_persistency_mode!=WITH_DATABASE_CACHE){
00290         if (!bin_alloc(&x,256)) goto error;     
00291         for(i=0;i<subscriptions_hash_size;i++){
00292             subs_lock(i);
00293             s = subscriptions[i].head;
00294             while(s){
00295                 if (!bin_encode_r_subscription(&x,s)) goto error;
00296                 s = s->next;
00297             }
00298             subs_unlock(i);
00299         }
00300         //bin_print(&x);
00301     }
00302     i = p_dump(&x,pcscf_persistency_location,"psubscriptions", P_SUBSCRIPTIONS);
00303     //i = bin_dump(&x,pcscf_persistency_mode,pcscf_persistency_location,"psubscriptions");      
00304     if(pcscf_persistency_mode!=WITH_DATABASE_CACHE){
00305         bin_free(&x);
00306     }
00307     return i;
00308 error:
00309     return 0;
00310 }  
00311 
00316 int load_snapshot_subscriptions()
00317 {
00318     bin_data x;
00319     r_subscription *s;
00320     
00321     if (!p_load(&x,pcscf_persistency_location,"psubscriptions",P_SUBSCRIPTIONS)) goto error;
00322     //if (!bin_load(&x,pcscf_persistency_mode,pcscf_persistency_location,"psubscriptions")) goto error;
00323     
00324     if(pcscf_persistency_mode!=WITH_DATABASE_CACHE){
00325         //bin_print(&x);
00326         x.max=0;
00327         LOG(L_INFO,"INFO:"M_NAME":load_snapshot_subscriptions: max %d len %d\n",x.max,x.len);
00328         while(x.max<x.len){
00329             s = bin_decode_r_subscription(&x);
00330             if (!s) return 0;
00331             LOG(L_INFO,"INFO:"M_NAME":load_snapshot_subscriptions: Loaded r_subscription for <%.*s>\n",s->req_uri.len,s->req_uri.s);
00332             subs_lock(s->hash);
00333             s->prev = subscriptions[s->hash].tail;
00334             s->next = 0;
00335             if (subscriptions[s->hash].tail) subscriptions[s->hash].tail->next = s;
00336             subscriptions[s->hash].tail = s;
00337             if (!subscriptions[s->hash].head) subscriptions[s->hash].head = s;
00338             subs_unlock(s->hash);
00339         }   
00340         bin_free(&x);
00341     }
00342     return 1;
00343 error:
00344     return 0;
00345     
00346 }
00347 
00348 
00354 void persistency_timer_subscriptions(unsigned int ticks, void* param)
00355 {
00356     make_snapshot_subscriptions();   
00357     
00358     if(subs_snapshot_version) (*subs_snapshot_version)++;   
00359 }
00360 
00361 /******  DB related functions  *****/
00362 
00363     /* Dump related functions*/
00364     
00374 int p_dump(bin_data* x, char* location, char* prepend_fname, data_type_t dt){
00375     switch (pcscf_persistency_mode){
00376         case NO_PERSISTENCY:
00377             LOG(L_ERR,"ERR:"M_NAME":p_dump: Snapshot done but persistency was disabled...\n");
00378             return 0;
00379         case WITH_FILES:
00380             return bin_dump_to_file(x,location,prepend_fname);
00381         case WITH_DATABASE_BULK:
00382             return bin_dump_to_db(x, dt);
00383         case WITH_DATABASE_CACHE:
00384             return bin_dump_to_db(NULL, dt); //ignore x, x is empty
00385         default:
00386             LOG(L_ERR,"ERR:"M_NAME":p_dump: Snapshot done but no such mode %d\n",pcscf_persistency_mode);
00387             return 0;
00388     }
00389 }
00390 
00398 int bin_dump_to_db(bin_data *x, data_type_t dt){
00399 
00400     int snapshot_version;
00401     int step_version;
00402     
00403     switch(dt){
00404         case P_REGISTRAR:
00405             snapshot_version=*registrar_snapshot_version;
00406             step_version=*registrar_step_version;
00407             return bin_dump_registrar_to_table(x, snapshot_version, step_version);
00408         case P_DIALOGS:
00409             snapshot_version=*dialogs_snapshot_version;
00410             step_version=*dialogs_step_version;
00411             return bin_dump_dialogs_to_table(x, snapshot_version, step_version);
00412         case P_SUBSCRIPTIONS:
00413             snapshot_version=*subs_snapshot_version;
00414             step_version=*subs_step_version;
00415             return bin_dump_subs_to_table(x, snapshot_version, step_version);
00416         default:
00417             LOG(L_ERR,"ERR:"M_NAME":bin_dump_to_db: No such information to dump %d\n", dt);
00418             return 0;
00419     }
00420 }
00421 
00430 int bin_dump_registrar_to_table(bin_data *x, int snapshot_version, int step_version){
00431     
00432     if(x){//whole hashtable serialized to x
00433         return bin_bulk_dump_to_table(P_REGISTRAR, snapshot_version, step_version, x);
00434     }
00435     else{//serialize and dump each hashtable element separately
00436         return bin_cache_dump_registrar_to_table(snapshot_version, step_version);
00437     }
00438 }
00439 
00448 int bin_dump_dialogs_to_table(bin_data *x, int snapshot_version, int step_version){
00449     
00450     if(x){//whole hashtable serialized to x
00451         return bin_bulk_dump_to_table(P_DIALOGS, snapshot_version, step_version, x);
00452     }
00453     else{//serialize and dump each hashtable element separately
00454         return bin_cache_dump_dialogs_to_table(snapshot_version, step_version);
00455     }
00456 }
00457 
00466 int bin_dump_subs_to_table(bin_data *x, int snapshot_version, int step_version){
00467     
00468     if(x){//whole hashtable serialized to x
00469         return bin_bulk_dump_to_table(P_SUBSCRIPTIONS, snapshot_version, step_version, x);
00470     }
00471     else{//serialize and dump each hashtable element separately
00472         return bin_cache_dump_subs_to_table(snapshot_version, step_version);
00473     }
00474 }
00475 
00484 int bin_bulk_dump_to_table(data_type_t dt, int snapshot_version, int step_version, bin_data *x){
00485     db_key_t keys[5];
00486     db_val_t vals[5];
00487     int len;
00488 
00489     /* id auto incremented */
00490     keys[0] = "node_id";
00491     keys[1] = "data_type";
00492     keys[2] = "snapshot_version";
00493     keys[3] = "step_version";
00494     /* record_id_1/2/3/4=NULL */
00495     keys[4] = "data";
00496     
00497     vals[0].type = DB_STR;
00498     vals[0].nul = 0;
00499     vals[0].val.str_val.s=pcscf_name;
00500     len = strlen(pcscf_name);
00501     vals[0].val.str_val.len=MIN(len, 64);
00502 
00503     vals[1].type = DB_INT;
00504     vals[1].nul = 0;
00505     vals[1].val.int_val=dt;
00506 
00507     vals[2].type = DB_INT;
00508     vals[2].nul = 0;
00509     vals[2].val.int_val=snapshot_version;
00510     
00511     vals[3].type = DB_INT;
00512     vals[3].nul = 0;
00513     vals[3].val.int_val=step_version;
00514 
00515     str d = {x->s, x->len};
00516     vals[4].type = DB_BLOB;
00517     vals[4].nul = 0;
00518     vals[4].val.blob_val = d;
00519 
00520     //lock
00521     lock_get(db_lock);
00522 
00523     if (pcscf_dbf.use_table(pcscf_db, "snapshot") < 0) {
00524         LOG(L_ERR, "ERR:"M_NAME":bin_bulk_dump_to_table(): Error in use_table\n");
00525         lock_release(db_lock);//unlock
00526         return 0;
00527     }
00528 
00529     if (pcscf_dbf.insert(pcscf_db, keys, vals, 5) < 0) {
00530         LOG(L_ERR, "ERR:"M_NAME":bin_bulk_dump_to_table(): Error while inserting on snapshot table\n");
00531         lock_release(db_lock);//unlock
00532         return 0;
00533     }
00534     
00535     //delete older snapshots
00536     if (delete_older_snapshots("snapshot", pcscf_name, dt, snapshot_version)!=1){
00537         LOG(L_ERR, "ERR:"M_NAME":bin_bulk_dump_to_table(): Error while deleting older snapshots from snapshot table\n");
00538         lock_release(db_lock);//unlock
00539         return 0;
00540     }
00541     
00542     //unlock
00543     lock_release(db_lock);
00544     
00545     return 1;
00546 }
00547 
00555 int bin_cache_dump_registrar_to_table(int snapshot_version, int step_version){
00556     bin_data x;
00557     r_contact *contact;
00558     int i;
00559     int len;
00560     char aux1[10];
00561     char aux2[10];
00562     
00563     //lock
00564     lock_get(db_lock);
00565 
00566     if (pcscf_dbf.use_table(pcscf_db, "snapshot") < 0) {
00567         LOG(L_ERR, "ERR:"M_NAME":bin_cache_dump_registrar_to_table(): Error in use_table\n");
00568         goto error;
00569     }
00570     
00571     for(i=0;i<r_hash_size;i++){
00572         r_lock(i);
00573         contact = registrar[i].head;
00574         while(contact){
00575             if (!bin_alloc(&x,128)){
00576                 r_unlock(i);
00577                 goto error;
00578             }
00579             if (!bin_encode_r_contact(&x,contact)){
00580                 r_unlock(i);
00581                 goto error;
00582             }
00583             
00584             db_key_t keys[8];
00585             db_val_t vals[8];
00586             
00587             /* id auto incremented */
00588             keys[0] = "node_id";
00589             keys[1] = "data_type";
00590             keys[2] = "snapshot_version";
00591             keys[3] = "step_version";
00592             keys[4] = "record_id_1"; /* host */
00593             keys[5] = "record_id_2"; /* port */
00594             keys[6] = "record_id_3"; /* transport */
00595             /* record_id_4=NULL */
00596             keys[7] = "data";
00597             
00598             vals[0].type = DB_STR;
00599             vals[0].nul = 0;
00600             vals[0].val.str_val.s=pcscf_name;
00601             len = strlen(pcscf_name);
00602             vals[0].val.str_val.len=MIN(len, 64);
00603 
00604             vals[1].type = DB_INT;
00605             vals[1].nul = 0;
00606             vals[1].val.int_val=P_REGISTRAR;
00607 
00608             vals[2].type = DB_INT;
00609             vals[2].nul = 0;
00610             vals[2].val.int_val=snapshot_version;
00611     
00612             vals[3].type = DB_INT;
00613             vals[3].nul = 0;
00614             vals[3].val.int_val=step_version;
00615     
00616             vals[4].type = DB_STR;
00617             vals[4].nul = 0;
00618             vals[4].val.str_val.s=contact->host.s;
00619             vals[4].val.str_val.len=MIN(contact->host.len, 256);
00620             
00621             vals[5].type = DB_STR;
00622             vals[5].nul = 0;
00623             sprintf(aux1, "%d", contact->port);
00624             vals[5].val.str_val.s=aux1;
00625             vals[5].val.str_val.len=strlen(aux1);
00626     
00627             vals[6].type = DB_STR;
00628             vals[6].nul = 0;
00629             sprintf(aux2, "%d", contact->transport);
00630             vals[6].val.str_val.s=aux2;
00631             vals[6].val.str_val.len=strlen(aux2);
00632         
00633             vals[7].type = DB_BLOB;
00634             vals[7].nul = 0;
00635             str data={x.s, x.len};
00636             vals[7].val.blob_val = data;
00637             
00638             if (pcscf_dbf.insert(pcscf_db, keys, vals, 8) < 0) {
00639                 LOG(L_ERR, "ERR:"M_NAME":bin_cache_dump_registrar_to_table(): Error while inserting on snapshot table\n");
00640                 r_unlock(i);
00641                 goto error;
00642             }
00643             bin_free(&x);
00644             
00645             contact = contact->next;
00646         }
00647         r_unlock(i);
00648     }   
00649     
00650     //delete older snapshots
00651     if (delete_older_snapshots("snapshot", pcscf_name, P_REGISTRAR, snapshot_version)!=1){
00652         LOG(L_ERR, "ERR:"M_NAME":bin_cache_dump_registrar_to_table(): Error while deleting older snapshots from snapshot table\n");
00653         goto error;
00654     }
00655     
00656     //unlock
00657     lock_release(db_lock);
00658     
00659     return 1;
00660 
00661 error:
00662     lock_release(db_lock);//unlock
00663     return 0;
00664 }
00665 
00673 int bin_cache_dump_dialogs_to_table(int snapshot_version, int step_version){
00674     bin_data x;
00675     p_dialog *dialog;
00676     int i;
00677     int len;
00678     char aux1[10];
00679     char aux2[10];
00680     
00681     //lock
00682     lock_get(db_lock);
00683 
00684     if (pcscf_dbf.use_table(pcscf_db, "snapshot") < 0) {
00685         LOG(L_ERR, "ERR:"M_NAME":bin_cache_dump_dialogs_to_table(): Error in use_table\n");
00686         goto error;
00687     }
00688     
00689     for(i=0;i<p_dialogs_hash_size;i++){
00690         d_lock(i);
00691         dialog = p_dialogs[i].head;
00692         while(dialog){
00693             if (!bin_alloc(&x,128)){
00694                 d_unlock(i);
00695                 goto error;
00696             }
00697             if (!bin_encode_p_dialog(&x,dialog)){
00698                 d_unlock(i);
00699                 goto error;
00700             }
00701             
00702             db_key_t keys[9];
00703             db_val_t vals[9];
00704             
00705             /* id auto incremented */
00706             keys[0] = "node_id";
00707             keys[1] = "data_type";
00708             keys[2] = "snapshot_version";
00709             keys[3] = "step_version";
00710             keys[4] = "record_id_1"; /* call-id */
00711             keys[5] = "record_id_2"; /* host */
00712             keys[6] = "record_id_3"; /* port */
00713             keys[7] = "record_id_4"; /* transport */
00714             keys[8] = "data";
00715             
00716             vals[0].type = DB_STR;
00717             vals[0].nul = 0;
00718             vals[0].val.str_val.s=pcscf_name;
00719             len = strlen(pcscf_name);
00720             vals[0].val.str_val.len=MIN(len, 64);
00721 
00722             vals[1].type = DB_INT;
00723             vals[1].nul = 0;
00724             vals[1].val.int_val=P_DIALOGS;
00725 
00726             vals[2].type = DB_INT;
00727             vals[2].nul = 0;
00728             vals[2].val.int_val=snapshot_version;
00729     
00730             vals[3].type = DB_INT;
00731             vals[3].nul = 0;
00732             vals[3].val.int_val=step_version;
00733     
00734             vals[4].type = DB_STR;
00735             vals[4].nul = 0;
00736             vals[4].val.str_val.s=dialog->call_id.s;
00737             vals[4].val.str_val.len=MIN(dialog->call_id.len, 256);
00738     
00739             vals[5].type = DB_STR;
00740             vals[5].nul = 0;
00741             vals[5].val.str_val.s=dialog->host.s;
00742             vals[5].val.str_val.len=MIN(dialog->host.len, 256);
00743             
00744             vals[6].type = DB_STR;
00745             vals[6].nul = 0;
00746             sprintf(aux1, "%d", dialog->port);
00747             vals[6].val.str_val.s=aux1;
00748             vals[6].val.str_val.len=strlen(aux1);
00749     
00750             vals[7].type = DB_STR;
00751             vals[7].nul = 0;
00752             sprintf(aux2, "%d", dialog->transport);
00753             vals[7].val.str_val.s=aux2;
00754             vals[7].val.str_val.len=strlen(aux2);
00755     
00756             vals[8].type = DB_BLOB;
00757             vals[8].nul = 0;
00758             str data={x.s, x.len};
00759             vals[8].val.blob_val = data;
00760             
00761             if (pcscf_dbf.insert(pcscf_db, keys, vals, 9) < 0) {
00762                 LOG(L_ERR, "ERR:"M_NAME":bin_cache_dump_dialogs_to_table(): Error while inserting on snapshot table\n");
00763                 d_unlock(i);
00764                 goto error;
00765             }
00766             bin_free(&x);
00767             
00768             dialog = dialog->next;
00769         }
00770         d_unlock(i);
00771     }   
00772     
00773     //delete older snapshots
00774     if (delete_older_snapshots("snapshot", pcscf_name, P_DIALOGS, snapshot_version)!=1){
00775         LOG(L_ERR, "ERR:"M_NAME":bin_cache_dump_dialogs_to_table(): Error while deleting older snapshots from snapshot table\n");
00776         goto error;
00777     }
00778     
00779     //unlock
00780     lock_release(db_lock);
00781     
00782     return 1;
00783 
00784 error:
00785     lock_release(db_lock);//unlock
00786     return 0;
00787 }
00788 
00796 int bin_cache_dump_subs_to_table(int snapshot_version, int step_version){
00797     bin_data x;
00798     r_subscription *sub;
00799     int i;
00800     int len;
00801     
00802     //lock
00803     lock_get(db_lock);
00804 
00805     if (pcscf_dbf.use_table(pcscf_db, "snapshot") < 0) {
00806         LOG(L_ERR, "ERR:"M_NAME":bin_cache_dump_subs_to_table(): Error in use_table\n");
00807         goto error;
00808     }
00809     
00810     for(i=0;i<subscriptions_hash_size;i++){
00811         subs_lock(i);
00812         sub = subscriptions[i].head;
00813         while(sub){
00814             if (!bin_alloc(&x,128)){
00815                 subs_unlock(i);
00816                 goto error;
00817             }
00818             if (!bin_encode_r_subscription(&x,sub)){
00819                 subs_unlock(i);
00820                 goto error;
00821             }
00822             
00823             db_key_t keys[6];
00824             db_val_t vals[6];
00825             
00826             /* id auto incremented */
00827             keys[0] = "node_id";
00828             keys[1] = "data_type";
00829             keys[2] = "snapshot_version";
00830             keys[3] = "step_version";
00831             keys[4] = "record_id_1"; /* req_uri */
00832             /* record_id_2/3/4=NULL*/
00833             keys[5] = "data";
00834             
00835             vals[0].type = DB_STR;
00836             vals[0].nul = 0;
00837             vals[0].val.str_val.s=pcscf_name;
00838             len = strlen(pcscf_name);
00839             vals[0].val.str_val.len=MIN(len, 64);
00840 
00841             vals[1].type = DB_INT;
00842             vals[1].nul = 0;
00843             vals[1].val.int_val=P_SUBSCRIPTIONS;
00844 
00845             vals[2].type = DB_INT;
00846             vals[2].nul = 0;
00847             vals[2].val.int_val=snapshot_version;
00848     
00849             vals[3].type = DB_INT;
00850             vals[3].nul = 0;
00851             vals[3].val.int_val=step_version;
00852     
00853             vals[4].type = DB_STR;
00854             vals[4].nul = 0;
00855             vals[4].val.str_val.s=sub->req_uri.s;
00856             vals[4].val.str_val.len=MIN(sub->req_uri.len, 256);
00857     
00858             vals[5].type = DB_BLOB;
00859             vals[5].nul = 0;
00860             str data={x.s, x.len};
00861             vals[5].val.blob_val = data;
00862             
00863             if (pcscf_dbf.insert(pcscf_db, keys, vals, 6) < 0) {
00864                 LOG(L_ERR, "ERR:"M_NAME":bin_cache_dump_subscriptions_to_table(): Error while inserting on snapshot table\n");
00865                 subs_unlock(i);
00866                 goto error;
00867             }
00868             bin_free(&x);
00869             
00870             sub = sub->next;
00871         }
00872         subs_unlock(i);
00873     }   
00874     
00875     //delete older snapshots
00876     if (delete_older_snapshots("snapshot", pcscf_name, P_SUBSCRIPTIONS, snapshot_version)!=1){
00877         LOG(L_ERR, "ERR:"M_NAME":bin_cache_dump_subscriptions_to_table(): Error while deleting older snapshots from snapshot table\n");
00878         goto error;
00879     }
00880     
00881     //unlock
00882     lock_release(db_lock);
00883     
00884     return 1;
00885 
00886 error:
00887     lock_release(db_lock);//unlock
00888     return 0;
00889 }
00890 
00891 
00892 int bin_db_keep_count=1; 
00903 int delete_older_snapshots(char* table, char* node_id, data_type_t dt, int current_snapshot){
00904 
00905     db_key_t query_cols[3];
00906     db_op_t  query_ops[3];
00907     db_val_t query_vals[3];
00908     int len;
00909 
00910     query_cols[0] = "snapshot_version";
00911     query_ops[0] = OP_LEQ;
00912     query_vals[0].type = DB_INT;
00913     query_vals[0].nul = 0;
00914     query_vals[0].val.int_val = current_snapshot - bin_db_keep_count;
00915     
00916     query_cols[1] = "node_id";
00917     query_ops[1] = OP_EQ;
00918     query_vals[1].type = DB_STR;
00919     query_vals[1].nul = 0;
00920     len = strlen(node_id);
00921     query_vals[1].val.str_val.s=node_id;
00922     query_vals[1].val.str_val.len=MIN(len, 256);
00923     
00924     query_cols[2] = "data_type";
00925     query_ops[2] = OP_EQ;
00926     query_vals[2].type = DB_INT;
00927     query_vals[2].nul = 0;
00928     query_vals[2].val.int_val = dt;
00929 
00930     if (pcscf_dbf.use_table(pcscf_db, table) < 0) {
00931         LOG(L_ERR, "ERR:"M_NAME":delete_older_snapshots(): Error in use_table\n");
00932         return 0;
00933     }
00934 
00935     if (pcscf_dbf.delete(pcscf_db, query_cols, query_ops, query_vals, 3) < 0) {
00936         LOG(L_ERR, "ERR:"M_NAME":delete_older_snapshots(): Error while deleting older snapshots\n");
00937         return 0;
00938     }
00939     
00940     return 1;
00941 }
00942 
00943     /* Load related functions*/
00944 
00956 int p_load(bin_data *x, char* location, char* prepend_fname, data_type_t dt){
00957     switch (pcscf_persistency_mode){
00958         case NO_PERSISTENCY:
00959             LOG(L_ERR,"ERR:"M_NAME":p_load: Persistency support was disabled\n");
00960             return 0;
00961         case WITH_FILES:
00962             return bin_load_from_file(x,location,prepend_fname);        
00963         case WITH_DATABASE_BULK:
00964             return bin_load_from_db(x, dt);
00965         case WITH_DATABASE_CACHE:
00966             return bin_load_from_db(NULL, dt); //ignore x, x is empty
00967         default:
00968             LOG(L_ERR,"ERR:"M_NAME":p_load: Can't resume because no such mode %d\n",pcscf_persistency_mode);
00969             return 0;
00970     }
00971 }
00972 
00979 int bin_load_from_db(bin_data *x, data_type_t dt){
00980     switch(dt){
00981         case P_REGISTRAR:
00982             return bin_load_registrar_from_table(x);
00983         case P_DIALOGS:
00984             return bin_load_dialogs_from_table(x);
00985         case P_SUBSCRIPTIONS:
00986             return bin_load_subscriptions_from_table(x);
00987         default:
00988             LOG(L_ERR,"ERR:"M_NAME":bin_load_from_db: No such information to load %d\n", dt);
00989             return 0;
00990     }
00991 }
00992 
00998 int bin_load_registrar_from_table(bin_data *x){
00999     if(x){//whole hashtable dumped to db
01000         return bin_bulk_load_from_table(P_REGISTRAR, x);
01001     }
01002     else{//each hashtable element dumped to DB separately
01003         return bin_cache_load_registrar_from_table();
01004     }
01005 }
01006 
01012 int bin_load_dialogs_from_table(bin_data *x){
01013     if(x){//whole hashtable dumped to db
01014         return bin_bulk_load_from_table(P_DIALOGS, x);
01015     }
01016     else{//each hashtable element dumped to DB separately
01017         return bin_cache_load_dialogs_from_table();
01018     }
01019 }
01020 
01026 int bin_load_subscriptions_from_table(bin_data *x){
01027     if(x){//whole hashtable dumped to db
01028         return bin_bulk_load_from_table(P_SUBSCRIPTIONS, x);
01029     }
01030     else{//each hashtable element dumped to DB separately
01031         return bin_cache_load_subscriptions_from_table();
01032     }
01033 }
01034 
01041 int bin_bulk_load_from_table(data_type_t dt, bin_data* x){
01042     int snapshot_version;
01043     int r;
01044     int len;
01045     bin_alloc(x,1024);
01046     
01047     //lock
01048     lock_get(db_lock);
01049     
01050     if((r=db_get_last_snapshot_version("snapshot", pcscf_name, dt, &snapshot_version))==0){
01051         LOG(L_ERR,"ERR:"M_NAME":bin_bulk_load_from_table: Error while getting snapshot version\n");
01052         lock_release(db_lock);//unlock
01053         return 0;
01054     }
01055     else if(r==-1){//empty table, nothing to load
01056             LOG(L_INFO,"INFO:"M_NAME":bin_bulk_load_from_table: snapshot table empty\n");
01057             lock_release(db_lock);//unlock
01058             return 1;
01059         }
01060         
01061     //set snapshot/step versions
01062     set_versions(dt, snapshot_version+1, 0);
01063         
01064     db_key_t keys[3];
01065     db_val_t vals[3];
01066     db_op_t ops[3];
01067     db_key_t result_cols[1];
01068     
01069     db_res_t *res = NULL;
01070     
01071     keys[0] = "node_id";
01072     keys[1] = "data_type";
01073     keys[2] = "snapshot_version";
01074     
01075     ops[0] = OP_EQ;
01076     ops[1] = OP_EQ;
01077     ops[2] = OP_EQ;
01078 
01079     vals[0].type = DB_STR;
01080     vals[0].nul = 0;
01081     vals[0].val.str_val.s = pcscf_name;
01082     len = strlen(pcscf_name);
01083     vals[0].val.str_val.len = MIN(len, 256);
01084     
01085     vals[1].type = DB_INT;
01086     vals[1].nul = 0;
01087     vals[1].va