Streaming PostgreSQL Large Objects using Clojure

Published on 2023-03-16

Table of Contents

Pixelated Noise is a software consultancy and we're always looking for interesting projects to help out with. If you have unmet software development needs, we would be happy to hear from you.

image.jpg

1. Intro

If you came about a requirement for managing large data files (>1GB) and you want to do this in a memory efficient way, then Large Objects in PostgreSQL are the way forward.

Large Objects are essentially a way to store very large files (up to 4TB) in the database while giving you stream-style access to it.

For example, when you want to stream video, audio data or large data files for your data analysis projects, and you are looking for a memory efficient way to store and retrieve it, Large Objects can be of help as they allow file sizes in the vicinity of 4TB.

Note that all Large Objects are stored in a single system table called pg_largeobject which makes it very easy to take backups.

If your files don't grow beyond 1GB then you can also look into PostgresSQL's "TOAST" storage system.

2. Large objects and how to go about them?

Ok, so we are at a point where we have a massive video file and we want to upload it to our service. It would be great if we could stream this to our DB; and this is exactly what we are about to do using the large-object system that we mentioned above.

Note that if you are planning to build a streaming platform, then storing all such files in the Large Object facility may not be the best way forward - for example, the filesystem is faster to access

First things first, we are going to need a table in which we will hold some basic, meaningful information on that file. This is going to help us keep track when we retrieve the file later; such information could be as simple as file-oid and filename:

CREATE TABLE uploaded_file (
  id          SERIAL PRIMARY KEY,
  file_oid    oid,
  filename    TEXT,
  uploaded_at TIMESTAMP DEFAULT NOW()
);

Notice that this table only holds information on the large object pointer oid which is basically all we need to access this data object.

In this post we are going to assume that we already have a FileInputStream which we would like to stream into the DB (we might look into the whole process of how to setup streaming a file all the way from the front-end to the DB at another blog post):

(def f (FileInputStream. "test-resources/myfile"))

Now that we have a stream, we could write a nice function that would take this and stream it into the DB; let's call this function stream->db and write it up, progressively. The signature could look something like:

(ns blob
  (:import [org.postgresql.largeobject LargeObjectManager LargeObject]
           [org.postgresql PGConnection]
           [java.io FileInputStream ByteArrayOutputStream]))

(defn stream->db [db fis] ...)

This function has a signature of [db fis], hence, we would expect it to get the db connection and the file-input-stream fis which we want to persist.

In order to perform any operation with the Large Object system we need to get the Large Object manager, which we get from the DB connection:

(defn get-large-object-manager
  "Get the large object manager from an unwrapped connection."
  [connection]
  (-> (.unwrap connection org.postgresql.PGConnection)
      (.getLargeObjectAPI)))

Through the Large Object manager (LOM for short), we can read and write a large object from and to the DB; let's draft some helper functions for this purpose:

First we need a function with which we will be creating large objects:

(defn create-large-object
  "Create a new large object for read-write (default); returns a long `oid`"
  [large-object-manager]
  (.createLO large-object-manager LargeObjectManager/WRITE))

We would also need a function to read large objects from DB given the oid pointer:

(defn read-large-object
  "Returns a large object from a large object manager given oid"
  [large-object-manager oid mode]
  (.open large-object-manager oid mode))

And, of course, we need a function that writes a stream to a large-object:

(defn write-large-object
  "Writes file input stream to large object and closes; returns the
  large-object"
  [^FileInputStream fis ^LargeObject large-object]
  (clojure.java.io/copy fis
                        (.getOutputStream large-object)))

According to PostgreSQL's documentation all Large Object API calls should be within a transaction call with autoCommit = false. Now we have all the parts to write up the function that will stream data into the DB:

(defn stream->db [db filename fis]
  (jdbc/with-db-transaction [db db {:auto-commit? false}]
    (let [lom (get-large-object-manager (:connection db))
          oid (create-large-object lom)]
      (with-open [lo (read-large-object lom oid LargeObjectManager/WRITE)]
        (write-large-object fis lo))
      (insert-file db {:oid oid :filename filename}))))

This function takes an InputStream and writes this into the DB using the Large Object system, while also it does yet another side-effect, that is insert-file which stores the oid to our uploaded_file table, allowing us to find it and stream it back to the user.

Streaming back from the DB is easier. First we need a little helper function:

(defn input-stream
  [^LargeObject lobj]
  (.getInputStream lobj))

And then for streaming the Large Object out of the database we are going to use the following macro:

(defmacro with-db-stream
  [identifier db oid & body]
  ~(jdbc/with-db-transaction [db# ~db {:auto-commit? false}]
     (let [~identifier (-> (get-large-object-manager (:connection db#))
                           (read-large-object ~oid LargeObjectManager/READ)
                           input-stream)]
       ~@body)))

You can then simply call it from your handler like:

(with-db-stream my-stream db 1234
  (do-some-work-with-stream my-stream))

Here's the whole listing:

(ns blob
  (:import [org.postgresql.largeobject LargeObjectManager LargeObject]
           [org.postgresql PGConnection]
           [java.io FileInputStream ByteArrayOutputStream]))

(defn get-large-object-manager
  "Get the large object manager from an unwrapped connection."
  [connection]
  (-> (.unwrap connection org.postgresql.PGConnection)
      (.getLargeObjectAPI)))

(defn create-large-object
  "Create a new large object for read-write (default); returns a long `oid`"
  [large-object-manager]
  (.createLO large-object-manager LargeObjectManager/WRITE))

(defn read-large-object
  "Returns a large object from a large object manager given oid"
  [large-object-manager oid mode]
  (.open large-object-manager oid mode))

(defn write-large-object
  "Writes file input stream to large object and closes; returns the
  large-object"
  [^FileInputStream fis ^LargeObject large-object]
  (clojure.java.io/copy fis
                        (.getOutputStream large-object)))

(defn stream->db [db filename fis]
  (jdbc/with-db-transaction [db db {:auto-commit? false}]
    (let [lom (get-large-object-manager (:connection db))
          oid (create-large-object lom)]
      (with-open [lo (read-large-object lom oid LargeObjectManager/WRITE)]
        (write-large-object fis lo))
      (insert-file db {:oid oid :filename filename}))))

(defn input-stream
  [^LargeObject lobj]
  (.getInputStream lobj))

(defmacro with-db-stream
  [identifier db oid & body]
  ~(jdbc/with-db-transaction [db# ~db {:auto-commit? false}]
     (let [~identifier (-> (get-large-object-manager (:connection db#))
                           (read-large-object ~oid LargeObjectManager/READ)
                           input-stream)]
       ~@body)))

Everything should now be in place to stream data in and out of PostgreSQL!