53 #define DATE_FORMAT "%Y-%m-%d %T.%6q"
55 #define PGSQL_BACKEND_NAME "CEL PGSQL backend"
57 #define PGSQL_MIN_VERSION_SCHEMA 70300
59 static char *config =
"cel_pgsql.conf";
61 static char *pghostname;
62 static char *pgdbname;
63 static char *pgdbuser;
64 static char *pgpassword;
65 static char *pgappname;
66 static char *pgdbport;
70 static int connected = 0;
72 static int maxsize = 512, maxsize2 = 512;
73 static int usegmtime = 0;
76 #define CEL_SHOW_USERDEF_DEFAULT 0
81 AST_MUTEX_DEFINE_STATIC(pgsql_lock);
83 static PGconn *conn = NULL;
84 static PGresult *result = NULL;
90 unsigned int notnull:1;
91 unsigned int hasdefault:1;
97 #define LENGTHEN_BUF(size, var_sql) \
100 if (ast_str_strlen(var_sql) + size + 1 > ast_str_size(var_sql)) { \
101 if (ast_str_make_space(&var_sql, ((ast_str_size(var_sql) + size + 3) / 512 + 1) * 512) != 0) { \
102 ast_log(LOG_ERROR, "Unable to allocate sufficient memory. Insert CEL '%s:%s' failed.\n", pghostname, table); \
105 AST_RWLIST_UNLOCK(&psql_columns); \
111 #define LENGTHEN_BUF1(size) \
112 LENGTHEN_BUF(size, sql);
113 #define LENGTHEN_BUF2(size) \
114 LENGTHEN_BUF(size, sql2);
116 static void pgsql_reconnect(
void)
120 ast_log(LOG_ERROR,
"Failed to allocate memory for connection string.\n");
129 ast_str_set(&conn_info, 0,
"host=%s port=%s dbname=%s user=%s",
130 pghostname, pgdbport, pgdbname, pgdbuser);
132 if (!ast_strlen_zero(pgappname)) {
136 if (!ast_strlen_zero(pgpassword)) {
158 ast_mutex_lock(&pgsql_lock);
160 ast_localtime(&record.event_time, &tm, usegmtime ?
"GMT" : NULL);
161 ast_strftime(timestr,
sizeof(timestr), DATE_FORMAT, &tm);
163 if ((!connected) && pghostname && pgdbuser && pgpassword && pgdbname) {
165 if (PQstatus(conn) != CONNECTION_BAD) {
168 pgerror = PQerrorMessage(conn);
169 ast_log(LOG_ERROR,
"cel_pgsql: Unable to connect to database server %s. Calls will not be logged!\n", pghostname);
170 ast_log(LOG_ERROR,
"cel_pgsql: Reason: %s\n", pgerror);
179 char *escapebuf = NULL;
182 size_t bufsize = 513;
185 if (!escapebuf || !sql || !sql2) {
186 goto ast_log_cleanup;
192 #define SEP (first ? "" : ",")
195 AST_RWLIST_TRAVERSE(&psql_columns, cur, list) {
196 LENGTHEN_BUF1(strlen(cur->name) + 2);
199 if (strcmp(cur->name,
"eventtime") == 0) {
200 if (strncmp(cur->type,
"int", 3) == 0) {
202 ast_str_append(&sql2, 0,
"%s%ld", SEP, (
long) record.event_time.tv_sec);
203 }
else if (strncmp(cur->type,
"float", 5) == 0) {
207 (
double) record.event_time.tv_sec +
208 (
double) record.event_time.tv_usec / 1000000.0);
212 ast_localtime(&record.event_time, &tm, usegmtime ?
"GMT" : NULL);
216 }
else if (strcmp(cur->name,
"eventtype") == 0) {
217 if (cur->type[0] ==
'i') {
221 }
else if (strncmp(cur->type,
"float", 5) == 0) {
223 ast_str_append(&sql2, 0,
"%s%f", SEP, (
double) record.event_type);
226 const char *event_name;
230 ? record.user_defined_name : record.event_name;
231 LENGTHEN_BUF2(strlen(event_name) + 1);
234 }
else if (strcmp(cur->name,
"amaflags") == 0) {
235 if (strncmp(cur->type,
"int", 3) == 0) {
246 if (strcmp(cur->name,
"userdeftype") == 0) {
247 value = record.user_defined_name;
248 }
else if (strcmp(cur->name,
"cid_name") == 0) {
249 value = record.caller_id_name;
250 }
else if (strcmp(cur->name,
"cid_num") == 0) {
251 value = record.caller_id_num;
252 }
else if (strcmp(cur->name,
"cid_ani") == 0) {
253 value = record.caller_id_ani;
254 }
else if (strcmp(cur->name,
"cid_rdnis") == 0) {
255 value = record.caller_id_rdnis;
256 }
else if (strcmp(cur->name,
"cid_dnid") == 0) {
257 value = record.caller_id_dnid;
258 }
else if (strcmp(cur->name,
"exten") == 0) {
259 value = record.extension;
260 }
else if (strcmp(cur->name,
"context") == 0) {
261 value = record.context;
262 }
else if (strcmp(cur->name,
"channame") == 0) {
263 value = record.channel_name;
264 }
else if (strcmp(cur->name,
"appname") == 0) {
265 value = record.application_name;
266 }
else if (strcmp(cur->name,
"appdata") == 0) {
267 value = record.application_data;
268 }
else if (strcmp(cur->name,
"accountcode") == 0) {
269 value = record.account_code;
270 }
else if (strcmp(cur->name,
"peeraccount") == 0) {
271 value = record.peer_account;
272 }
else if (strcmp(cur->name,
"uniqueid") == 0) {
273 value = record.unique_id;
274 }
else if (strcmp(cur->name,
"linkedid") == 0) {
275 value = record.linked_id;
276 }
else if (strcmp(cur->name,
"userfield") == 0) {
277 value = record.user_field;
278 }
else if (strcmp(cur->name,
"peer") == 0) {
280 }
else if (strcmp(cur->name,
"extra") == 0) {
281 value = record.extra;
288 }
else if (strncmp(cur->type,
"int", 3) == 0) {
290 if (value && sscanf(value,
"%30lld", &whatever) == 1) {
297 }
else if (strncmp(cur->type,
"float", 5) == 0) {
298 long double whatever;
299 if (value && sscanf(value,
"%30Lf", &whatever) == 1) {
309 size_t required_size = strlen(value) * 2 + 1;
315 if (required_size > bufsize) {
316 char *tmpbuf =
ast_realloc(escapebuf, required_size);
320 goto ast_log_cleanup;
324 bufsize = required_size;
326 PQescapeStringConn(conn, escapebuf, value, strlen(value), NULL);
330 LENGTHEN_BUF2(strlen(escapebuf) + 3);
344 if (PQstatus(conn) == CONNECTION_OK) {
347 ast_log(LOG_WARNING,
"Connection was lost... attempting to reconnect.\n");
349 if (PQstatus(conn) == CONNECTION_OK) {
350 ast_log(LOG_NOTICE,
"Connection reestablished.\n");
353 pgerror = PQerrorMessage(conn);
354 ast_log(LOG_ERROR,
"Unable to reconnect to database server %s. Calls will not be logged!\n", pghostname);
355 ast_log(LOG_ERROR,
"Reason: %s\n", pgerror);
359 goto ast_log_cleanup;
363 if (PQresultStatus(result) != PGRES_COMMAND_OK) {
364 pgerror = PQresultErrorMessage(result);
365 ast_log(LOG_WARNING,
"Failed to insert call detail record into database!\n");
366 ast_log(LOG_WARNING,
"Reason: %s\n", pgerror);
367 ast_log(LOG_WARNING,
"Connection may have been lost... attempting to reconnect.\n");
369 if (PQstatus(conn) == CONNECTION_OK) {
370 ast_log(LOG_NOTICE,
"Connection reestablished.\n");
374 if (PQresultStatus(result) != PGRES_COMMAND_OK) {
375 pgerror = PQresultErrorMessage(result);
376 ast_log(LOG_ERROR,
"HARD ERROR! Attempted reconnection failed. DROPPING CALL RECORD!\n");
377 ast_log(LOG_ERROR,
"Reason: %s\n", pgerror);
397 ast_mutex_unlock(&pgsql_lock);
400 static int my_unload_module(
void)
402 struct columns *current;
411 ast_free(pghostname);
423 ast_free(pgpassword);
442 while ((current = AST_RWLIST_REMOVE_HEAD(&psql_columns, list))) {
449 static int unload_module(
void)
451 return my_unload_module();
454 static int process_my_load_module(
struct ast_config *cfg)
462 if (!(var = ast_variable_browse(cfg,
"global"))) {
463 ast_log(LOG_WARNING,
"CEL pgsql config file missing global section.\n");
466 if (!(tmp = ast_variable_retrieve(cfg,
"global",
"hostname"))) {
467 ast_log(LOG_WARNING,
"PostgreSQL server hostname not specified. Assuming unix socket connection\n");
471 ast_free(pghostname);
473 ast_log(LOG_WARNING,
"PostgreSQL Ran out of memory copying host info\n");
476 if (!(tmp = ast_variable_retrieve(cfg,
"global",
"dbname"))) {
477 ast_log(LOG_WARNING,
"PostgreSQL database not specified. Assuming asterisk\n");
478 tmp =
"asteriskceldb";
483 ast_log(LOG_WARNING,
"PostgreSQL Ran out of memory copying dbname info\n");
486 if (!(tmp = ast_variable_retrieve(cfg,
"global",
"user"))) {
487 ast_log(LOG_WARNING,
"PostgreSQL database user not specified. Assuming asterisk\n");
493 ast_log(LOG_WARNING,
"PostgreSQL Ran out of memory copying user info\n");
496 if (!(tmp = ast_variable_retrieve(cfg,
"global",
"password"))) {
497 ast_log(LOG_WARNING,
"PostgreSQL database password not specified. Assuming blank\n");
501 ast_free(pgpassword);
503 ast_log(LOG_WARNING,
"PostgreSQL Ran out of memory copying password info\n");
506 if (!(tmp = ast_variable_retrieve(cfg,
"global",
"appname"))) {
513 ast_log(LOG_WARNING,
"PostgreSQL Ran out of memory copying appname info\n");
517 if (!(tmp = ast_variable_retrieve(cfg,
"global",
"port"))) {
518 ast_log(LOG_WARNING,
"PostgreSQL database port not specified. Using default 5432.\n");
524 ast_log(LOG_WARNING,
"PostgreSQL Ran out of memory copying port info\n");
527 if (!(tmp = ast_variable_retrieve(cfg,
"global",
"table"))) {
528 ast_log(LOG_WARNING,
"CEL table not specified. Assuming cel\n");
537 if ((tmp = ast_variable_retrieve(cfg,
"global",
"show_user_defined"))) {
540 if ((tmp = ast_variable_retrieve(cfg,
"global",
"usegmtime"))) {
545 if (!(tmp = ast_variable_retrieve(cfg,
"global",
"schema"))) {
552 ast_log(LOG_WARNING,
"PostgreSQL Ran out of memory copying schema info\n");
555 if (DEBUG_ATLEAST(3)) {
556 if (ast_strlen_zero(pghostname)) {
557 ast_log(LOG_DEBUG,
"cel_pgsql: using default unix socket\n");
559 ast_log(LOG_DEBUG,
"cel_pgsql: got hostname of %s\n", pghostname);
561 ast_log(LOG_DEBUG,
"cel_pgsql: got port of %s\n", pgdbport);
562 ast_log(LOG_DEBUG,
"cel_pgsql: got user of %s\n", pgdbuser);
563 ast_log(LOG_DEBUG,
"cel_pgsql: got dbname of %s\n", pgdbname);
564 ast_log(LOG_DEBUG,
"cel_pgsql: got password of %s\n", pgpassword);
565 ast_log(LOG_DEBUG,
"cel_pgsql: got sql table name of %s\n", table);
566 ast_log(LOG_DEBUG,
"cel_pgsql: got show_user_defined of %s\n",
571 if (PQstatus(conn) != CONNECTION_BAD) {
573 char *fname, *ftype, *flen, *fnotnull, *fdef, *tablename, *tmp_tablename;
574 int i, rows, version;
576 ast_debug(1,
"Successfully connected to PostgreSQL database.\n");
579 version = PQserverVersion(conn);
581 if ((tmp_tablename = strrchr(table,
'.'))) {
584 tmp_tablename = table;
586 tablename =
ast_alloca(strlen(tmp_tablename) * 2 + 1);
587 PQescapeStringConn(conn, tablename, tmp_tablename, strlen(tmp_tablename), NULL);
588 if (version >= PGSQL_MIN_VERSION_SCHEMA) {
591 lenschema = strlen(schema);
593 PQescapeStringConn(conn, schemaname, schema, lenschema, NULL);
595 snprintf(sqlcmd,
sizeof(sqlcmd),
596 "SELECT a.attname, t.typname, a.attlen, a.attnotnull, pg_catalog.pg_get_expr(d.adbin, d.adrelid) adsrc, a.atttypmod "
597 "FROM (((pg_catalog.pg_class c INNER JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace "
598 "AND c.relname = '%s' AND n.nspname = %s%s%s) "
599 "INNER JOIN pg_catalog.pg_attribute a ON ("
600 "NOT a.attisdropped) AND a.attnum > 0 AND a.attrelid = c.oid) "
601 "INNER JOIN pg_catalog.pg_type t ON t.oid = a.atttypid) "
602 "LEFT OUTER JOIN pg_attrdef d ON a.atthasdef AND d.adrelid = a.attrelid "
603 "AND d.adnum = a.attnum "
604 "ORDER BY n.nspname, c.relname, attnum",
606 lenschema == 0 ?
"" :
"'", lenschema == 0 ?
"current_schema()" : schemaname, lenschema == 0 ?
"" :
"'");
608 snprintf(sqlcmd,
sizeof(sqlcmd),
609 "SELECT a.attname, t.typname, a.attlen, a.attnotnull, d.adsrc, a.atttypmod "
610 "FROM pg_class c, pg_type t, pg_attribute a "
611 "LEFT OUTER JOIN pg_attrdef d ON a.atthasdef AND d.adrelid = a.attrelid "
612 "AND d.adnum = a.attnum WHERE c.oid = a.attrelid AND a.atttypid = t.oid "
613 "AND (a.attnum > 0) AND c.relname = '%s' ORDER BY c.relname, attnum", tablename);
616 result = PQexec(conn, sqlcmd);
617 if (PQresultStatus(result) != PGRES_TUPLES_OK) {
618 pgerror = PQresultErrorMessage(result);
619 ast_log(LOG_ERROR,
"Failed to query database columns: %s\n", pgerror);
625 rows = PQntuples(result);
626 for (i = 0; i < rows; i++) {
627 fname = PQgetvalue(result, i, 0);
628 ftype = PQgetvalue(result, i, 1);
629 flen = PQgetvalue(result, i, 2);
630 fnotnull = PQgetvalue(result, i, 3);
631 fdef = PQgetvalue(result, i, 4);
632 ast_verb(4,
"Found column '%s' of type '%s'\n", fname, ftype);
633 cur =
ast_calloc(1,
sizeof(*cur) + strlen(fname) + strlen(ftype) + 2);
635 sscanf(flen,
"%30d", &cur->len);
636 cur->name = (
char *)cur +
sizeof(*cur);
637 cur->type = (
char *)cur +
sizeof(*cur) + strlen(fname) + 1;
638 strcpy(cur->name, fname);
639 strcpy(cur->type, ftype);
640 if (*fnotnull ==
't') {
645 if (!ast_strlen_zero(fdef)) {
650 AST_RWLIST_INSERT_TAIL(&psql_columns, cur, list);
655 pgerror = PQerrorMessage(conn);
656 ast_log(LOG_ERROR,
"cel_pgsql: Unable to connect to database server %s. CALLS WILL NOT BE LOGGED!!\n", pghostname);
657 ast_log(LOG_ERROR,
"cel_pgsql: Reason: %s\n", pgerror);
665 static int my_load_module(
int reload)
670 if ((cfg =
ast_config_load(config, config_flags)) == NULL || cfg == CONFIG_STATUS_FILEINVALID) {
671 ast_log(LOG_WARNING,
"Unable to load config for PostgreSQL CEL's: %s\n", config);
673 }
else if (cfg == CONFIG_STATUS_FILEUNCHANGED) {
681 process_my_load_module(cfg);
685 ast_log(LOG_WARNING,
"Unable to subscribe to CEL events for pgsql\n");
692 static int load_module(
void)
694 return my_load_module(0);
697 static int reload(
void)
699 return my_load_module(1);
702 AST_MODULE_INFO(
ASTERISK_GPL_KEY, AST_MODFLAG_LOAD_ORDER,
"PostgreSQL CEL Backend",
703 .support_level = AST_MODULE_SUPPORT_EXTENDED,
705 .unload = unload_module,
Helper struct for getting the fields out of a CEL event.
Asterisk main include file. File version handling, generic pbx functions.
#define ast_realloc(p, len)
A wrapper for realloc()
int ast_cel_backend_register(const char *name, ast_cel_backend_cb backend_callback)
Register a CEL backend.
char * ast_str_buffer(const struct ast_str *buf)
Returns the string buffer within the ast_str buf.
#define AST_RWLIST_RDLOCK(head)
Read locks a list.
struct ast_tm * ast_localtime(const struct timeval *timep, struct ast_tm *p_tm, const char *zone)
Timezone-independent version of localtime_r(3).
Structure for variables, used for configurations and for channel variables.
#define AST_RWLIST_WRLOCK(head)
Write locks a list.
int ast_str_append(struct ast_str **buf, ssize_t max_len, const char *fmt,...)
Append to a thread local dynamic string.
#define ast_strdup(str)
A wrapper for strdup()
#define AST_RWLIST_HEAD_STATIC(name, type)
Defines a structure to be used to hold a read/write list of specified type, statically initialized...
int ast_str_set(struct ast_str **buf, ssize_t max_len, const char *fmt,...)
Set a dynamic string using variable arguments.
Configuration File Parser.
#define ast_config_load(filename, flags)
Load a config file.
General Asterisk PBX channel definitions.
uint32_t version
struct ABI version
#define ast_malloc(len)
A wrapper for malloc()
#define ast_debug(level,...)
Log a DEBUG message.
int ast_cel_backend_unregister(const char *name)
Unregister a CEL backend.
#define ast_alloca(size)
call __builtin_alloca to ensure we get gcc builtin semantics
int attribute_pure ast_true(const char *val)
Make sure something is true. Determine if a string containing a boolean value is "true". This function checks to see whether a string passed to it is an indication of an "true" value. It checks to see if the string is "yes", "true", "y", "t", "on" or "1".
Support for dynamic strings.
#define ast_calloc(num, len)
A wrapper for calloc()
Support for logging to various files, console and syslog Configuration in file logger.conf.
Module has failed to load, may be in an inconsistent state.
int ast_strftime(char *buf, size_t len, const char *format, const struct ast_tm *tm)
Special version of strftime(3) that handles fractions of a second. Takes the same arguments as strfti...
#define AST_CEL_EVENT_RECORD_VERSION
struct ABI version
Structure used to handle boolean flags.
size_t ast_str_strlen(const struct ast_str *buf)
Returns the current length of the string stored within buf.
static unsigned char cel_show_user_def
Options provided by main asterisk program.
a user-defined event, the event name field should be set
#define AST_RWLIST_UNLOCK(head)
Attempts to unlock a read/write based list.
void ast_config_destroy(struct ast_config *cfg)
Destroys a config.
#define CEL_SHOW_USERDEF_DEFAULT
show_user_def is off by default
#define ASTERISK_GPL_KEY
The text the key() function should return.
Asterisk module definitions.
int ast_cel_fill_record(const struct ast_event *event, struct ast_cel_event_record *r)
Fill in an ast_cel_event_record from a CEL event.
#define ast_str_create(init_len)
Create a malloc'ed dynamic length string.