56 #define DATE_FORMAT "'%Y-%m-%d %T'"
58 #define PGSQL_MIN_VERSION_SCHEMA 70300
60 static const char name[] =
"pgsql";
61 static const char config[] =
"cdr_pgsql.conf";
63 static char *pghostname;
64 static char *pgdbname;
65 static char *pgdbuser;
66 static char *pgpassword;
67 static char *pgappname;
68 static char *pgdbport;
70 static char *encoding;
73 static int connected = 0;
75 static int maxsize = 512, maxsize2 = 512;
76 static time_t connect_time = 0;
77 static int totalrecords = 0;
85 AST_MUTEX_DEFINE_STATIC(pgsql_lock);
87 static PGconn *conn = NULL;
93 unsigned int notnull:1;
94 unsigned int hasdefault:1;
100 #define LENGTHEN_BUF(size, var_sql) \
103 if (ast_str_strlen(var_sql) + size + 1 > ast_str_size(var_sql)) { \
104 if (ast_str_make_space(&var_sql, ((ast_str_size(var_sql) + size + 3) / 512 + 1) * 512) != 0) { \
105 ast_log(LOG_ERROR, "Unable to allocate sufficient memory. Insert CDR '%s:%s' failed.\n", pghostname, table); \
108 AST_RWLIST_UNLOCK(&psql_columns); \
109 ast_mutex_unlock(&pgsql_lock); \
115 #define LENGTHEN_BUF1(size) \
116 LENGTHEN_BUF(size, sql);
117 #define LENGTHEN_BUF2(size) \
118 LENGTHEN_BUF(size, sql2);
125 e->
command =
"cdr show pgsql status";
127 "Usage: cdr show pgsql status\n"
128 " Shows current connection status for cdr_pgsql\n";
134 if (a->argc != e->
args)
135 return CLI_SHOWUSAGE;
139 char status2[100] =
"";
141 int ctime = time(NULL) - connect_time;
144 snprintf(status, 255,
"Connected to %s@%s, port %s", pgdbname, pghostname, pgdbport);
146 snprintf(status, 255,
"Connected to %s@%s", pgdbname, pghostname);
149 if (pgdbuser && *pgdbuser) {
150 snprintf(status2, 99,
" with username %s", pgdbuser);
152 if (table && *table) {
153 snprintf(status2, 99,
" using table %s", table);
156 snprintf(buf,
sizeof(buf),
"%s%s for ", status, status2);
159 if (records == totalrecords) {
160 ast_cli(a->fd,
" Wrote %d records since last restart.\n", totalrecords);
162 ast_cli(a->fd,
" Wrote %d records since last restart and %d records since last reconnect.\n", totalrecords, records);
165 ast_cli(a->fd,
"Not currently connected to a PgSQL server.\n");
170 static void pgsql_reconnect(
void)
174 ast_log(LOG_ERROR,
"Failed to allocate memory for connection string.\n");
183 if (!ast_strlen_zero(pghostname)) {
186 if (!ast_strlen_zero(pgdbport)) {
189 if (!ast_strlen_zero(pgdbname)) {
192 if (!ast_strlen_zero(pgdbuser)) {
195 if (!ast_strlen_zero(pgappname)) {
198 if (!ast_strlen_zero(pgpassword)) {
202 ast_log(LOG_ERROR,
"Connection string is blank.\n");
210 static int pgsql_log(
struct ast_cdr *cdr)
217 ast_mutex_lock(&pgsql_lock);
219 if ((!connected) && pghostname && pgdbuser && pgpassword && pgdbname) {
222 if (PQstatus(conn) != CONNECTION_BAD) {
224 connect_time = time(NULL);
226 if (PQsetClientEncoding(conn, encoding)) {
227 #ifdef HAVE_PGSQL_pg_encoding_to_char
228 ast_log(LOG_WARNING,
"Failed to set encoding to '%s'. Encoding set to default '%s'\n", encoding, pg_encoding_to_char(PQclientEncoding(conn)));
230 ast_log(LOG_WARNING,
"Failed to set encoding to '%s'. Encoding set to default.\n", encoding);
234 pgerror = PQerrorMessage(conn);
235 ast_log(LOG_ERROR,
"Unable to connect to database server %s. Calls will not be logged!\n", pghostname);
236 ast_log(LOG_ERROR,
"Reason: %s\n", pgerror);
246 char *escapebuf = NULL, *value;
247 char *separator =
"";
248 size_t bufsize = 513;
251 if (!escapebuf || !sql || !sql2) {
252 goto ast_log_cleanup;
259 AST_RWLIST_TRAVERSE(&psql_columns, cur, list) {
262 if (strcmp(cur->name,
"calldate") == 0 && !value) {
266 if (cur->notnull && !cur->hasdefault) {
268 LENGTHEN_BUF1(strlen(cur->name) + 2);
277 LENGTHEN_BUF1(strlen(cur->name) + 2);
280 if (strcmp(cur->name,
"start") == 0 || strcmp(cur->name,
"calldate") == 0) {
281 if (strncmp(cur->type,
"int", 3) == 0) {
283 ast_str_append(&sql2, 0,
"%s%ld", separator, (
long) cdr->start.tv_sec);
284 }
else if (strncmp(cur->type,
"float", 5) == 0) {
286 ast_str_append(&sql2, 0,
"%s%f", separator, (
double)cdr->start.tv_sec + (
double)cdr->start.tv_usec / 1000000.0);
294 }
else if (strcmp(cur->name,
"answer") == 0) {
295 if (strncmp(cur->type,
"int", 3) == 0) {
297 ast_str_append(&sql2, 0,
"%s%ld", separator, (
long) cdr->answer.tv_sec);
298 }
else if (strncmp(cur->type,
"float", 5) == 0) {
300 ast_str_append(&sql2, 0,
"%s%f", separator, (
double)cdr->answer.tv_sec + (
double)cdr->answer.tv_usec / 1000000.0);
308 }
else if (strcmp(cur->name,
"end") == 0) {
309 if (strncmp(cur->type,
"int", 3) == 0) {
311 ast_str_append(&sql2, 0,
"%s%ld", separator, (
long) cdr->end.tv_sec);
312 }
else if (strncmp(cur->type,
"float", 5) == 0) {
314 ast_str_append(&sql2, 0,
"%s%f", separator, (
double)cdr->end.tv_sec + (
double)cdr->end.tv_usec / 1000000.0);
322 }
else if (strcmp(cur->name,
"duration") == 0 || strcmp(cur->name,
"billsec") == 0) {
323 if (cur->type[0] ==
'i') {
328 }
else if (strncmp(cur->type,
"float", 5) == 0) {
329 struct timeval *when = cur->name[0] ==
'd' ? &cdr->start :
ast_tvzero(cdr->answer) ? &cdr->end : &cdr->answer;
334 struct timeval *when = cur->name[0] ==
'd' ? &cdr->start :
ast_tvzero(cdr->answer) ? &cdr->end : &cdr->answer;
338 }
else if (strcmp(cur->name,
"disposition") == 0 || strcmp(cur->name,
"amaflags") == 0) {
339 if (strncmp(cur->type,
"int", 3) == 0) {
353 if (strncmp(cur->type,
"int", 3) == 0) {
355 if (value && sscanf(value,
"%30lld", &whatever) == 1) {
362 }
else if (strncmp(cur->type,
"float", 5) == 0) {
363 long double whatever;
364 if (value && sscanf(value,
"%30Lf", &whatever) == 1) {
374 size_t required_size = strlen(value) * 2 + 1;
380 if (required_size > bufsize) {
381 char *tmpbuf =
ast_realloc(escapebuf, required_size);
385 goto ast_log_cleanup;
389 bufsize = required_size;
391 PQescapeStringConn(conn, escapebuf, value, strlen(value), NULL);
395 LENGTHEN_BUF2(strlen(escapebuf) + 3);
411 if (PQstatus(conn) == CONNECTION_OK) {
414 ast_log(LOG_ERROR,
"Connection was lost... attempting to reconnect.\n");
416 if (PQstatus(conn) == CONNECTION_OK) {
417 ast_log(LOG_ERROR,
"Connection reestablished.\n");
419 connect_time = time(NULL);
422 pgerror = PQerrorMessage(conn);
423 ast_log(LOG_ERROR,
"Unable to reconnect to database server %s. Calls will not be logged!\n", pghostname);
424 ast_log(LOG_ERROR,
"Reason: %s\n", pgerror);
428 goto ast_log_cleanup;
432 if (PQresultStatus(result) != PGRES_COMMAND_OK) {
433 pgerror = PQresultErrorMessage(result);
434 ast_log(LOG_ERROR,
"Failed to insert call detail record into database!\n");
435 ast_log(LOG_ERROR,
"Reason: %s\n", pgerror);
436 ast_log(LOG_ERROR,
"Connection may have been lost... attempting to reconnect.\n");
438 if (PQstatus(conn) == CONNECTION_OK) {
439 ast_log(LOG_ERROR,
"Connection reestablished.\n");
441 connect_time = time(NULL);
445 if (PQresultStatus(result) != PGRES_COMMAND_OK) {
446 pgerror = PQresultErrorMessage(result);
447 ast_log(LOG_ERROR,
"HARD ERROR! Attempted reconnection failed. DROPPING CALL RECORD!\n");
448 ast_log(LOG_ERROR,
"Reason: %s\n", pgerror);
477 ast_mutex_unlock(&pgsql_lock);
482 static void empty_columns(
void)
484 struct columns *current;
486 while ((current = AST_RWLIST_REMOVE_HEAD(&psql_columns, list))) {
493 static int unload_module(
void)
505 ast_free(pghostname);
508 ast_free(pgpassword);
520 static int config_module(
int reload)
529 if ((cfg =
ast_config_load(config, config_flags)) == NULL || cfg == CONFIG_STATUS_FILEINVALID) {
530 ast_log(LOG_WARNING,
"Unable to load config for PostgreSQL CDR's: %s\n", config);
532 }
else if (cfg == CONFIG_STATUS_FILEUNCHANGED) {
536 ast_mutex_lock(&pgsql_lock);
538 if (!ast_variable_browse(cfg,
"global")) {
540 ast_mutex_unlock(&pgsql_lock);
541 ast_log(LOG_NOTICE,
"cdr_pgsql configuration contains no global section, skipping module %s.\n",
542 reload ?
"reload" :
"load");
546 if (!(tmp = ast_variable_retrieve(cfg,
"global",
"hostname"))) {
547 ast_log(LOG_WARNING,
"PostgreSQL server hostname not specified. Assuming unix socket connection\n");
551 ast_free(pghostname);
554 ast_mutex_unlock(&pgsql_lock);
558 if (!(tmp = ast_variable_retrieve(cfg,
"global",
"dbname"))) {
559 ast_log(LOG_WARNING,
"PostgreSQL database not specified. Assuming asterisk\n");
560 tmp =
"asteriskcdrdb";
566 ast_mutex_unlock(&pgsql_lock);
570 if (!(tmp = ast_variable_retrieve(cfg,
"global",
"user"))) {
571 ast_log(LOG_WARNING,
"PostgreSQL database user not specified. Assuming asterisk\n");
578 ast_mutex_unlock(&pgsql_lock);
582 if (!(tmp = ast_variable_retrieve(cfg,
"global",
"appname"))) {
589 ast_mutex_unlock(&pgsql_lock);
594 if (!(tmp = ast_variable_retrieve(cfg,
"global",
"password"))) {
595 ast_log(LOG_WARNING,
"PostgreSQL database password not specified. Assuming blank\n");
599 ast_free(pgpassword);
602 ast_mutex_unlock(&pgsql_lock);
606 if (!(tmp = ast_variable_retrieve(cfg,
"global",
"port"))) {
607 ast_log(LOG_WARNING,
"PostgreSQL database port not specified. Using default 5432.\n");
614 ast_mutex_unlock(&pgsql_lock);
618 if (!(tmp = ast_variable_retrieve(cfg,
"global",
"table"))) {
619 ast_log(LOG_WARNING,
"CDR table not specified. Assuming cdr\n");
626 ast_mutex_unlock(&pgsql_lock);
630 if (!(tmp = ast_variable_retrieve(cfg,
"global",
"encoding"))) {
631 ast_log(LOG_WARNING,
"Encoding not specified. Assuming LATIN9\n");
638 ast_mutex_unlock(&pgsql_lock);
642 if (!(tmp = ast_variable_retrieve(cfg,
"global",
"timezone"))) {
649 if (!ast_strlen_zero(tmp) && !(tz =
ast_strdup(tmp))) {
651 ast_mutex_unlock(&pgsql_lock);
655 if (DEBUG_ATLEAST(1)) {
656 if (ast_strlen_zero(pghostname)) {
657 ast_log(LOG_DEBUG,
"using default unix socket\n");
659 ast_log(LOG_DEBUG,
"got hostname of %s\n", pghostname);
661 ast_log(LOG_DEBUG,
"got port of %s\n", pgdbport);
662 ast_log(LOG_DEBUG,
"got user of %s\n", pgdbuser);
663 ast_log(LOG_DEBUG,
"got dbname of %s\n", pgdbname);
664 ast_log(LOG_DEBUG,
"got password of %s\n", pgpassword);
665 ast_log(LOG_DEBUG,
"got application name of %s\n", pgappname);
666 ast_log(LOG_DEBUG,
"got sql table name of %s\n", table);
667 ast_log(LOG_DEBUG,
"got encoding of %s\n", encoding);
668 ast_log(LOG_DEBUG,
"got timezone of %s\n", tz);
673 if (PQstatus(conn) != CONNECTION_BAD) {
675 char *fname, *ftype, *flen, *fnotnull, *fdef;
676 int i, rows, version;
677 ast_debug(1,
"Successfully connected to PostgreSQL database.\n");
679 connect_time = time(NULL);
681 if (PQsetClientEncoding(conn, encoding)) {
682 #ifdef HAVE_PGSQL_pg_encoding_to_char
683 ast_log(LOG_WARNING,
"Failed to set encoding to '%s'. Encoding set to default '%s'\n", encoding, pg_encoding_to_char(PQclientEncoding(conn)));
685 ast_log(LOG_WARNING,
"Failed to set encoding to '%s'. Encoding set to default.\n", encoding);
688 version = PQserverVersion(conn);
690 if (version >= PGSQL_MIN_VERSION_SCHEMA) {
691 char *schemaname, *tablename, *tmp_schemaname, *tmp_tablename;
692 if (strchr(table,
'.')) {
694 tmp_tablename = strchr(tmp_schemaname,
'.');
695 *tmp_tablename++ =
'\0';
698 tmp_tablename = table;
700 tablename =
ast_alloca(strlen(tmp_tablename) * 2 + 1);
701 PQescapeStringConn(conn, tablename, tmp_tablename, strlen(tmp_tablename), NULL);
703 schemaname =
ast_alloca(strlen(tmp_schemaname) * 2 + 1);
704 PQescapeStringConn(conn, schemaname, tmp_schemaname, strlen(tmp_schemaname), NULL);
706 snprintf(sqlcmd,
sizeof(sqlcmd),
"SELECT a.attname, t.typname, a.attlen, a.attnotnull, pg_catalog.pg_get_expr(d.adbin, d.adrelid) adsrc, a.atttypmod FROM (((pg_catalog.pg_class c INNER JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace AND c.relname = '%s' AND n.nspname = %s%s%s) INNER JOIN pg_catalog.pg_attribute a ON (NOT a.attisdropped) AND a.attnum > 0 AND a.attrelid = c.oid) INNER JOIN pg_catalog.pg_type t ON t.oid = a.atttypid) LEFT OUTER JOIN pg_attrdef d ON a.atthasdef AND d.adrelid = a.attrelid AND d.adnum = a.attnum ORDER BY n.nspname, c.relname, attnum",
708 ast_strlen_zero(schemaname) ?
"" :
"'", ast_strlen_zero(schemaname) ?
"current_schema()" : schemaname, ast_strlen_zero(schemaname) ?
"" :
"'");
710 snprintf(sqlcmd,
sizeof(sqlcmd),
"SELECT a.attname, t.typname, a.attlen, a.attnotnull, d.adsrc, a.atttypmod FROM pg_class c, pg_type t, pg_attribute a LEFT OUTER JOIN pg_attrdef d ON a.atthasdef AND d.adrelid = a.attrelid AND d.adnum = a.attnum WHERE c.oid = a.attrelid AND a.atttypid = t.oid AND (a.attnum > 0) AND c.relname = '%s' ORDER BY c.relname, attnum", table);
713 result = PQexec(conn, sqlcmd);
714 if (PQresultStatus(result) != PGRES_TUPLES_OK) {
715 pgerror = PQresultErrorMessage(result);
716 ast_log(LOG_ERROR,
"Failed to query database columns: %s\n", pgerror);
719 ast_mutex_unlock(&pgsql_lock);
723 rows = PQntuples(result);
725 ast_log(LOG_ERROR,
"cdr_pgsql: Failed to query database columns. No columns found, does the table exist?\n");
728 ast_mutex_unlock(&pgsql_lock);
735 for (i = 0; i < rows; i++) {
736 fname = PQgetvalue(result, i, 0);
737 ftype = PQgetvalue(result, i, 1);
738 flen = PQgetvalue(result, i, 2);
739 fnotnull = PQgetvalue(result, i, 3);
740 fdef = PQgetvalue(result, i, 4);
741 if (atoi(flen) == -1) {
743 flen = PQgetvalue(result, i, 5);
746 cur =
ast_calloc(1,
sizeof(*cur) + strlen(fname) + strlen(ftype) + 2);
748 sscanf(flen,
"%30d", &cur->len);
749 cur->name = (
char *)cur +
sizeof(*cur);
750 cur->type = (
char *)cur +
sizeof(*cur) + strlen(fname) + 1;
751 strcpy(cur->name, fname);
752 strcpy(cur->type, ftype);
753 if (*fnotnull ==
't') {
758 if (!ast_strlen_zero(fdef)) {
764 AST_RWLIST_INSERT_TAIL(&psql_columns, cur, list);
770 pgerror = PQerrorMessage(conn);
771 ast_log(LOG_ERROR,
"Unable to connect to database server %s. CALLS WILL NOT BE LOGGED!!\n", pghostname);
772 ast_log(LOG_ERROR,
"Reason: %s\n", pgerror);
780 ast_mutex_unlock(&pgsql_lock);
784 static int load_module(
void)
788 if (config_module(0)) {
805 static int reload(
void)
807 return config_module(1);
810 AST_MODULE_INFO(
ASTERISK_GPL_KEY, AST_MODFLAG_LOAD_ORDER,
"PostgreSQL CDR Backend",
811 .support_level = AST_MODULE_SUPPORT_EXTENDED,
813 .unload = unload_module,
Asterisk main include file. File version handling, generic pbx functions.
#define ast_realloc(p, len)
A wrapper for realloc()
int ast_cdr_unregister(const char *name)
Unregister a CDR handling engine.
int ast_cli_unregister_multiple(struct ast_cli_entry *e, int len)
Unregister multiple commands.
descriptor for a cli entry.
char * ast_str_buffer(const struct ast_str *buf)
Returns the string buffer within the ast_str buf.
#define AST_RWLIST_RDLOCK(head)
Read locks a list.
struct ast_tm * ast_localtime(const struct timeval *timep, struct ast_tm *p_tm, const char *zone)
Timezone-independent version of localtime_r(3).
int ast_tvzero(const struct timeval t)
Returns true if the argument is 0,0.
#define AST_RWLIST_WRLOCK(head)
Write locks a list.
int ast_str_append(struct ast_str **buf, ssize_t max_len, const char *fmt,...)
Append to a thread local dynamic string.
#define ast_cli_register_multiple(e, len)
Register multiple commands.
#define ast_strdup(str)
A wrapper for strdup()
int args
This gets set in ast_cli_register()
#define AST_RWLIST_HEAD_STATIC(name, type)
Defines a structure to be used to hold a read/write list of specified type, statically initialized...
void ast_cdr_format_var(struct ast_cdr *cdr, const char *name, char **ret, char *workspace, int workspacelen, int raw)
Format a CDR variable from an already posted CDR.
int ast_str_set(struct ast_str **buf, ssize_t max_len, const char *fmt,...)
Set a dynamic string using variable arguments.
Configuration File Parser.
#define ast_config_load(filename, flags)
Load a config file.
int ast_cdr_register(const char *name, const char *desc, ast_cdrbe be)
Register a CDR handling engine.
General Asterisk PBX channel definitions.
#define ast_strdupa(s)
duplicate a string in memory from the stack
#define ast_malloc(len)
A wrapper for malloc()
#define ast_debug(level,...)
Log a DEBUG message.
#define ast_alloca(size)
call __builtin_alloca to ensure we get gcc builtin semantics
Responsible for call detail data.
Support for dynamic strings.
#define ast_calloc(num, len)
A wrapper for calloc()
Module has failed to load, may be in an inconsistent state.
int ast_strftime(char *buf, size_t len, const char *format, const struct ast_tm *tm)
Special version of strftime(3) that handles fractions of a second. Takes the same arguments as strfti...
Structure used to handle boolean flags.
void ast_cli_print_timestr_fromseconds(int fd, int seconds, const char *prefix)
Print on cli a duration in seconds in format s year(s), s week(s), s day(s), s hour(s), s second(s)
size_t ast_str_strlen(const struct ast_str *buf)
Returns the current length of the string stored within buf.
Standard Command Line Interface.
int64_t ast_tvdiff_us(struct timeval end, struct timeval start)
Computes the difference (in microseconds) between two struct timeval instances.
#define AST_RWLIST_UNLOCK(head)
Attempts to unlock a read/write based list.
void ast_config_destroy(struct ast_config *cfg)
Destroys a config.
#define ASTERISK_GPL_KEY
The text the key() function should return.
Asterisk module definitions.
static char * handle_cdr_pgsql_status(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
Handle the CLI command cdr show pgsql status.
#define ast_str_create(init_len)
Create a malloc'ed dynamic length string.