Store application logs in timescaleDB/postgres.(23 November 2018)
Intro If the title alone has already turned you off, I understand. It's almost unheard of, these days, to store application logs in a structured datastore. Whenever people talk about logs and their logging pipeline, it usually involves ELK(Elasticsearch), MongoDB or such other NoSQL databases. And no such talk is complete without a tale of how people have to stay up at night feeding their Elasticsearch monstrous JVM with the blood of unblemished year-old sheep(or whatever JVM's feed on). And it does make a lot of sense to store logs in an unstructured database, because after all, logs are unstructured by nature. Are they? In this blogpost, I'll try & make a case for storing logs in an SQL database. I'll then go ahead and implement a proto-type logging pipeline using PostgreSQL/timescaleDB. Opinion I hold a differing opinion, I think if you look at logs hard enough; a structure starts to emerge. In this section, I'll lay out my opinion about logs and logging in general. This are my opinions and I hold no brief for anybody. I'm not going to try and convince you to adopt my worldview, but atleast to consider it. opinion 1: logs are actually structured. If you look at an example HTTP log of the popular tcp/http load balancer, haproxy:
Mar 9 15:08:05 LB1 local0.info haproxy[21843]: 10.0.0.1:1028 [09/Mar/2012:15:08:05.179] FT BK/SRV 0/0/1/8/9 304 12 - - ”GET / HTTP/1.1”
You can see a lot of structure in it, there's; timestamp, client IP address, backend
name, response time, status code etc
And all this things will be available in every log event; that's a lot of structure.
opinion 2: You should annotate your logs with more structured information to make them
much more useful.
Take the haproxy log above, it would be much more useful if we added metadata like;
- the environment the application is running in(production/test/dev/staging etc)
- the IP address of the server that the app is running in
- the name of the service/app
- the service/app version number(this can be the last commit hash that was deployed)
- etc
time | application_name |application_version | environment_name | log_event | trace_id | file_path | host_ip | data
------------------------------+------------------+--------------------+------------------+--------------------+----------+-----------------------------+------------+------------------------------------------------------------------------
2018-11-18 14:07:18.936522+00 | ETL_app | v0.5.0 | production | video_process_job3 | caf72697 | /usr/src/app/code/etl.py | 172.23.0.2 | {"job_id": "658d31dd85fd", "jobType": "batch"}
2018-11-18 14:14:58.223893+00 | ETL_app | v0.6.0 | canary | video_process_job3 | 17603a0 | /usr/src/app/code/etl.py | 172.23.0.3 | {"error": "Traceback (most recent call last):\n File \"/usr/src/app/code/etl.py\", line 129, in job3\n ourJobs[5]\nIndexError: list index out of range\n", "job_id": "c1a164c2-86c5-43e6-a0e5-3ca2377abe95", "jobType": "batch"}
2018-11-18 14:09:09.581655+00 | WebApp | v0.5.0 | production | login | 45a7dc73 | /usr/src/app/code/web.py | 172.23.0.2 | {"user": "Shawn Corey Carter", "email": "someemail@email.com", "status_code": 504, "response_time": 95}
2018-11-18 14:09:09.580918+00 | WebApp | v0.5.0 | production | login | 0f776af0 | /usr/src/app/code/web.py | 172.23.0.2 | {"user": "Shawn Corey Carter", "email": "someemail@email.com", "status_code": 504, "response_time": 6}
2018-11-18 14:09:10.552307+00 | WorkerApp | v1.6.8 | staging | process_work | 6d07fb95 | /usr/src/app/code/worker.py | 172.23.0.2 | {"worker_id": "97b44537", "datacenter": "us-west"}
2018-11-18 14:09:10.551532+00 | WorkerApp | v0.5.6 | production | process_work | 8b2daa49 | /usr/src/app/code/woker.py | 172.23.0.2 | {"worker_id": "a6035461, "datacenter": "us-west"}
you can(and probably should) fashion your logs as time-series events.
When we consider all that, we see that an SQL database is not that bad of an idea as a medium for
storing logs.
And on top of that you gain a lot of other added advantages:
- you query using SQL which is relatively easy to learn
- your fellow developers probably already know SQL
- you do not have to learn a new query language from splunk/sumologic/elasctic/influxdb et al
- access to all the domain knowledge in your SQL database ecosystem(stackoverflow/google/docs/blogposts etc)
- access to all the tools/extensions in your SQL database ecosystem
- your average SQL database is relatively* easy to operate
#!/usr/bin/env bash
create_extension() {
psql -U "${POSTGRES_USER}" "${POSTGRES_DB}" -c "CREATE EXTENSION IF NOT EXISTS timescaledb CASCADE;"
}
create_table() {
psql -U "${POSTGRES_USER}" "${POSTGRES_DB}" -c "CREATE TABLE logs (
time TIMESTAMPTZ NOT NULL,
application_name TEXT NOT NULL,
application_version TEXT NOT NULL,
environment_name TEXT NOT NULL,
log_event TEXT NOT NULL,
trace_id TEXT NOT NULL,
file_path TEXT NOT NULL,
host_ip TEXT NOT NULL,
data JSONB NULL,
PRIMARY KEY(time, trace_id)
);"
}
create_table_indices() {
# read:
# 1. https://www.postgresql.org/docs/11/indexes.html
# 2. https://blog.timescale.com/use-composite-indexes-to-speed-up-time-series-queries-sql-8ca2df6b3aaa
# 3. https://docs.timescale.com/v1.0/using-timescaledb/schema-management#indexing-best-practices
psql -U "${POSTGRES_USER}" "${POSTGRES_DB}" -c "CREATE INDEX idxgin ON logs USING GIN (data);"
psql -U "${POSTGRES_USER}" "${POSTGRES_DB}" -c "CREATE INDEX ON logs (log_event, trace_id, time DESC) WHERE log_event IS NOT NULL AND trace_id IS NOT NULL;"
}
create_hypertable() {
psql -U "${POSTGRES_USER}" "${POSTGRES_DB}" -c "SELECT create_hypertable('logs', 'time');"
}
# 1. create timescaledb extension
# 2. create database table
# 3. create indices
# 4. create hypertable
create_extension
create_table
create_table_indices
create_hypertable
That creates the timescaleDB exension in postgres, creates a table, indices and a timescaledb hypertable
Our application
In our application, we want our log events to have a few mandatory items; time,
application_name,
application_version,
environment_name, log_event, trace_id, host_ip etc
This will allow us to be able to ask more meaningful questions when debugging our app/service, we can
ask questions like;
- from which server are the majority of errors emanating?
- what was the path/trace of execution for the request with trace_id XYZ?
- in which module/file are the errors been raised from?
- is this an issue that only affects our canary servers or does it also affect production?
- is the perfomance of the current deployed application version similar to version 2.4.7 ?
- etc
import os
import json
import uuid
import errno
import random
import asyncio
import datetime
import get_ip
def makeFifo(fifo_directory="/tmp/namedPipes", fifo_file="komusNamedPipe"):
fifo_file = os.path.join(fifo_directory, fifo_file)
try:
os.mkdir(fifo_directory, mode=0o777)
except OSError as e:
if e.errno == 17:
pass
else:
raise e
try:
os.mkfifo(fifo_file, mode=0o777)
except OSError as e:
if e.errno != errno.EEXIST:
raise e
return fifo_file
def log_structure(log_event, trace_id, application_name, environment_name, file_path, data):
now = datetime.datetime.now(datetime.timezone.utc)
return {
"log_event": log_event,
"trace_id": trace_id,
"application_name": application_name,
"application_version": "v2.1.0",
"environment_name": environment_name,
"file_path": file_path,
"data": data,
"time": str(now),
"host_ip": get_ip(),
}
async def emmit_logs(
log_event, trace_id, application_name, environment_name, file_path, log_event_data
):
try:
pipe = os.open(makeFifo(), os.O_WRONLY | os.O_NONBLOCK | os.O_ASYNC)
log = log_structure(
log_event=log_event,
trace_id=trace_id,
application_name=application_name,
environment_name=environment_name,
file_path=file_path,
data=log_event_data,
)
# we use newline to demarcate where one log event ends.
write_data = json.dumps(log) + "\n"
write_data = write_data.encode()
os.write(pipe, write_data)
os.close(pipe)
except OSError as e:
if e.errno == 6:
pass
else:
pass
finally:
await asyncio.sleep(1)
async def web_app(app_name):
while True:
await emmit_logs(
log_event="login",
trace_id=str(uuid.uuid4()),
application_name=app_name,
environment_name=random.choice(["production", "canary", "staging"]),
file_path=os.path.realpath(__file__),
log_event_data={
"user": "Shawn Corey Carter",
"age": 48,
"email": "someemail@email.com",
"status_code": random.choice([200, 202, 307, 400, 404, 500, 504]),
"response_time": random.randint(1, 110),
},
)
loop = asyncio.get_event_loop()
loop.run_until_complete(web_app(app_name="web_app"))
loop.close()
Now we need to write code for the agent that will read code from the named pipe and send it to
timescaleDB/postgres.
This will be in two parts, in part one we read from the named pipe and buffer the logs in memory, the
second
part takes whatever is buffered in memory and sends it to timescaleDB.
In the following code, we read from the named pipe and buffer log events in memory:
import os
import json
import random
import asyncio
import asyncpg
import datetime
loop = asyncio.get_event_loop()
class Buffer:
def __init__(self, loop, interval=6):
self.loop = loop
self.interval = interval
self.lock = asyncio.Semaphore(value=1, loop=self.loop)
self.buf = []
def send_logs_every(self):
jitter = random.randint(1, 9) * 0.1
return self.interval + jitter
bufferedLogs = Buffer(loop=loop)
async def send_log_to_remote_storage(logs):
# todo: to be implemented later
pass
async def schedule_log_sending():
# todo: to be implemented later
pass
class PIPE:
fifo_file = None
def __enter__(self):
self.fifo_file = open("/tmp/namedPipes/komusNamedPipe", mode="r")
os.set_blocking(self.fifo_file.fileno(), False)
return self.fifo_file
def __exit__(self, type, value, traceback):
if self.fifo_file:
self.fifo_file.close()
async def __aenter__(self):
self.fifo_file = open("/tmp/namedPipes/komusNamedPipe", mode="r")
os.set_blocking(self.fifo_file.fileno(), False)
return await asyncio.sleep(-1, result=self.fifo_file)
async def __aexit__(self, exc_type, exc, tb):
if self.fifo_file:
await asyncio.sleep(-1, result=self.fifo_file.close())
async def collect_logs():
async with PIPE() as pipe:
while True:
try:
data = pipe.readline()
if len(data) == 0:
await asyncio.sleep(1)
continue
log = json.loads(data)
if log:
# buffer log events in memory
async with bufferedLogs.lock:
bufferedLogs.buf.append(log)
except OSError as e:
if e.errno == 6:
pass
else:
pass
tasks = asyncio.gather(collect_logs(), schedule_log_sending(), loop=loop)
loop.run_until_complete(tasks)
loop.close()
In the following code, we now take log events from the in memory buffer every X seconds and send them
to timescaleDB/postgres:
async def send_log_to_remote_storage(logs):
IN_DOCKER = os.environ.get("IN_DOCKER")
try:
host = "localhost" # replace with your postgres server IP address
if IN_DOCKER:
host = "timescale_db"
conn = await asyncpg.connect(
host=host,
port=5432,
user="myuser",
password="hey_NSA",
database="mydb",
timeout=6.0,
command_timeout=8.0,
)
all_logs = []
for i in logs:
time = datetime.datetime.strptime(i["time"], "%Y-%m-%d %H:%M:%S.%f%z")
application_name = i["application_name"]
application_version = i["application_version"]
environment_name = i["environment_name"]
log_event = i["log_event"]
trace_id = i["trace_id"]
file_path = i["file_path"]
host_ip = i["host_ip"]
data = i.get("data")
if data:
data = json.dumps(data)
all_logs.append(
(
time,
application_name,
environment_name,
application_version,
log_event,
trace_id,
file_path,
host_ip,
data,
)
)
# batch insert
await conn.executemany(
"""
INSERT INTO logs(time, application_name, application_version, environment_name, log_event, trace_id, file_path, host_ip, data)
VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9)
""",
all_logs,
timeout=8.0,
)
print("sent")
await conn.close()
except Exception:
pass
async def schedule_log_sending():
while True:
async with bufferedLogs.lock:
if len(bufferedLogs.buf) > 0:
await send_log_to_remote_storage(logs=bufferedLogs.buf)
bufferedLogs.buf = []
await asyncio.sleep(bufferedLogs.send_logs_every())
Querying
To badly misquote The
O'Jays, now that we have logs in our database, what are we going to do with it?
Let's run some queries:
1. what are the latest five events where an exception/error has been raised?
SELECT
log_event,
data -> 'error' AS error
FROM
logs
WHERE
data -> 'error' IS NOT NULL
ORDER BY
time DESC
LIMIT 5;
log_event | error
-------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------
video_process_job3 | "Traceback (most recent call last):\n File \"/usr/src/app/code/log_emmiter.py\", line 133, in job3\n ourJobs[5]\nIndexError: list index out of range\n"
video_process_job3 | "Traceback (most recent call last):\n File \"/usr/src/app/code/log_emmiter.py\", line 133, in job3\n ourJobs[5]\nIndexError: list index out of range\n"
video_process_job3 | "Traceback (most recent call last):\n File \"/usr/src/app/code/log_emmiter.py\", line 133, in job3\n ourJobs[5]\nIndexError: list index out of range\n"
video_process_job3 | "Traceback (most recent call last):\n File \"/usr/src/app/code/log_emmiter.py\", line 133, in job3\n ourJobs[5]\nIndexError: list index out of range\n"
video_process_job3 | "Traceback (most recent call last):\n File \"/usr/src/app/code/log_emmiter.py\", line 133, in job3\n ourJobs[5]\nIndexError: list index out of range\n"
2. what is the path/trace taken by the last request which resulted in an exception occuring?
SELECT
log_event,
trace_id,
file_path,
data -> 'error' AS error
FROM
logs
WHERE
logs.environment_name = 'production'
AND logs.trace_id = (
SELECT
trace_id
FROM
logs
WHERE
data -> 'error' IS NOT NULL
ORDER BY
time DESC
LIMIT 1);
log_event | trace_id | file_path | error
-------------------+----------+----------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------
video_process_job1 | f552320e | /usr/src/app/code/log_emmiter.py |
video_process_job2 | f552320e | /usr/src/app/code/log_emmiter.py |
video_process_job3 | f552320e | /usr/src/app/code/log_emmiter.py |
video_process_job3 | f552320e | /usr/src/app/code/log_emmiter.py | "Traceback (most recent call last):\n File \"/usr/src/app/code/log_emmiter.py\", line 133, in job3\n ourJobs[5]\nIndexError: list index out of range\n"
3. what are the log events in which the web application has returned a http 5XX status code in
production?
SELECT
log_event,
trace_id,
host_ip,
data -> 'status_code' AS status_code
FROM
logs
WHERE
logs.environment_name = 'production'
AND logs.application_name = 'web_app'
AND data -> 'status_code' IN ('500', '504');
log_event | trace_id | host_ip | status_code
----------+----------+------------+-------------
login | aa9951dc | 172.24.0.4 | 504
login | 63898b0d | 172.24.0.4 | 500
login | 6154aa3e | 172.24.0.4 | 504
login | 053a9820 | 172.24.0.4 | 504
login | 29e6644c | 172.24.0.4 | 504
All those instances were login events, maybe we should go and have a look at our login code.
But why are all of them emanating from the same server with IP adress 172.24.0.4? Maybe the bug
is a combination of the login code and
that particular server and not necessarily a fault of the login code alone.
4. what is the average, 75th and 99th percentile response time(in milliseconds) of the web
application
in
our
canary servers?
SELECT
percentile_disc(0.5)
WITHIN GROUP (ORDER BY data -> 'response_time') AS average,
percentile_disc(0.75)
WITHIN GROUP (ORDER BY data -> 'response_time') AS seven_five_percentile,
percentile_disc(0.99)
WITHIN GROUP (ORDER BY data -> 'response_time') AS nine_nine_percentile
FROM
logs
WHERE
logs.environment_name = 'canary'
AND logs.application_name = 'web_app'
AND data -> 'status_code' IS NOT NULL
AND data -> 'response_time' IS NOT NULL;
average | seven_five_percentile | nine_nine_percentile
--------+-----------------------+----------------------
43 | 86 | 89
5. which application version introduced the most errors/exceptions?
SELECT
application_version,
COUNT(data -> 'error') AS error_count
FROM
logs
WHERE
data -> 'error' IS NOT NULL
GROUP BY
application_version
ORDER BY
error_count DESC;
application_version | error_count
--------------------+-------------
v3.5.0 | 5
v3.4.0 | 1
v2.3.0 | 1
v1.7.0 | 1
Maybe we should roll-back the deployment of version v3.5.0 as we figure out why it has introduced so
many regressions.
Conclusion
All this is fine and dandy, but does the solution scale?
This is a hard question to answer conclusively. What I can encourage you to do, is to run your own
benchmarks.
You could also look
at literature of people who have ran
some benchmarks.
But ultimately, it comes down to your own individual application and set of circumstances.
Run your benchmarks and adopt the solution that seems to fit your app.
And maybe a NoSQL database is exactly what fits your needs. Who knows? Explore and find out.
If you are a small shop and are already using an SQL database for your other business processes,
it might turn out that there's no need to introduce a new database for logs.
All the code in this blogpost can be found at: https://github.com/komuw/komu.engineer/tree/master/blogs/06