Using Deno and Materialize to send notifications in real-time

Introduction

This is a self-contained example of a real-time notification app with Deno, Materialize and Discord.

Deno is a simple and secure runtime for JavaScript and TypeScript that uses V8. Deno, just like Materialize, is also written in Rust.

In this demo, we will build a simple notifications app. The notifications are going to be triggered when a user creates a bad review for a product. The reviews are generated by a mock service and stored in a Kafka/Redpanda topic. The data is then ingested into a live Materialize view which is kept up to date in real-time.

We then use Deno to connect to Materialize and send notifications to a Discord channel when a user creates a bad review.

That way we can send real-time notifications to a Discord channel and take action immediately to help the user with the issue they are facing with the product.

Overview

Here is a quick overview of the project:

Here is a diagram of the project:

Deno and materialize demo project

Prerequisites

To run this demo, you need to have the following installed.

Optionally, if you want to use the Discord webhook integration, you need to create a Discord channel and get a webhook integration URL. You can skip that step if you don't want to use it, and you can just see the notifications in the console.

Running the demo

To get started, clone the repository:

git clone git clone https://github.com/bobbyiliev/materialize-tutorials.git

Then you can access the directory:

cd materialize-tutorials/mz-deno-discord-live-notifications

With that you can then build the images:

docker-compose build

And finally, you can run all the containers:

docker-compose up -d

It might take a couple of minutes to start the containers and generate the demo data.

After that, we are ready to complete the Materialize setup.

Materialize setup

Now that we have the containers running, we need to set up the Materialize instance.

First, launch the Materialize CLI:

```shell session docker-compose run mzcli

> This is just a shortcut to a docker container with postgres-client pre-installed, if you already have `psql` you could run `psql -U materialize -h localhost -p 6875 materialize`.

Once you are in the Materialize CLI, let's start by creating the Kafka sources!

### Create the Kafka sources

By using the `CREATE SOURCE` statement, we can connect Materialize to an external data source and lets us interact with its data as if the data were in a SQL table.

To create the sources, execute the following statements:

sql CREATE SOURCE users FROM KAFKA BROKER 'redpanda:9092' TOPIC 'users_topic' FORMAT BYTES;

CREATE SOURCE roles FROM KAFKA BROKER 'redpanda:9092' TOPIC 'roles_topic' FORMAT BYTES;

CREATE SOURCE reviews FROM KAFKA BROKER 'redpanda:9092' TOPIC 'reviews_topic' FORMAT BYTES;

Creating the sources in Materialize does not actually start the data ingestion. You can think of a non-materialized source as just the metadata needed for Materialize to connect to your source but not process any data.

### Create non-materialized views

Once we have our sources ready, let's [create non-materialized views](https://materialize.com/docs/sql/create-view/), that essentially only provide us with an alias for the `SELECT` statements they include:

- Create a users view:

sql CREATE VIEW usersview AS SELECT * FROM ( SELECT (data->>'id')::int AS id, (data->>'username')::text AS username, (data->>'email')::text AS email, (data->>'roleid')::int AS roleid FROM ( SELECT CAST(data AS jsonb) AS data FROM ( SELECT convertfrom(data, 'utf8') AS data FROM users ) ) );

- Create the roles view:

sql CREATE VIEW rolesview AS SELECT * FROM ( SELECT (data->>'id')::int AS id, (data->>'name')::text AS name FROM ( SELECT CAST(data AS jsonb) AS data FROM ( SELECT convertfrom(data, 'utf8') AS data FROM roles ) ) );

- Create the reviews view:

sql CREATE VIEW reviewsview AS SELECT * FROM ( SELECT (data->>'userid')::int AS userid, (data->>'rating')::int AS rating, (data->>'reviewtext')::text AS reviewtext, (data->>'createdat')::double AS createdat FROM ( SELECT CAST(data AS jsonb) AS data FROM ( SELECT convertfrom(data, 'utf8') AS data FROM reviews ) ) );

Essentially, as the data in our Redpanda topics is JSON, we can use the `CAST` function to convert the data to a JSONB object and then use the `->>` operator to access the different fields. That way we can access the data in a more convenient way. For more information, check out the [Materialize documentation](https://materialize.com/docs/sql/create-source/kafka/#creating-a-source).

### Create the materialized views

Now that we have our sources ready, let's create the materialized views!

The `CREATE MATERIALIZED VIEW` statement creates a materialized view that computes and maintains the results of a `SELECT` query in memory. The results of the query are incrementally updated in real-time as new data is added to the source.

To create the materialized views we need to execute the following statements:

- Join `users` and `roles` and only get the users with role `vip`:

sql CREATE MATERIALIZED VIEW vipusers AS SELECT u.id, u.username, u.email, u.roleid, r.name FROM usersview u JOIN rolesview r ON u.role_id = r.id WHERE r.name = 'vip';

- And for the reviews, we would want to only materialize the reviews that were generated in the last 10 minutes and only get the reviews for the users with role `vip`:

sql CREATE MATERIALIZED VIEW badvipreviews AS SELECT r.userid, r.rating, r.reviewtext, r.createdat, u.username, u.email FROM reviewsview r JOIN vipusers u ON r.userid = u.id WHERE r.rating < 3 AND mzlogicaltimestamp() < (created_at*1000 + 100000)::numeric;

For more information on how the `mz_logical_timestamp()` function works, see the [Materialize documentation](https://materialize.com/docs/sql/functions/now_and_mz_logical_timestamp).

You can now query the data in the materialized views using standard SQL!

sql SELECT * FROM badvipreviews LIMIT 10;

You can run the query multiple times to see the results change.

### Using `TAIL`

Next, to see the results in real-time we can use `TAIL`:

sql COPY ( TAIL badvipreviews ) TO STDOUT;

You will see a flow of the reviews that were generated in real-time. As we've used the `mz_logical_timestamp()` function, you will see that the records which are older than 10 minutes are being filtered out, which is indicated with a `-1`.

We can also start a `TAIL` without a snapshot, which means that you will only see the latest records after the query is run:

sql COPY ( TAIL badvipreviews WITH (SNAPSHOT = false) ) TO STDOUT;

This is what we will see in our Deno application to get the latest bad reviews and send them to Discord as notifications when they are generated.

For more information on how the `TAIL` function works, see the [Materialize documentation](https://materialize.com/docs/sql/tail/).

## Using Deno

Now that we have Materialize ready, we can use Deno to get the latest reviews and send them to Discord as notifications when they are generated.

Start by checking if Deno is installed:

bash deno --version

If you don't have Deno installed, follow the steps here:

> [Install Deno](https://deno.land/manual/getting_started/installation)

Next, create a new file called `index.ts`:

touch index.ts

And add the following code:

ts import { Client } from "https://deno.land/x/postgres/mod.ts";

// Specify your Discord webhook URL const discordwebhookurl = "";

// Specify your Materialize connection details const client = new Client({ user: "materialize", database: "materialize", hostname: "127.0.0.1", port: 6875, });

const tail = async () => { // Connect to Materialize await client.connect();

// Start a transaction
await client.queryObject('BEGIN');
// Declare a cursor without a snapshot
await client.queryObject('DECLARE c CURSOR FOR TAIL bad_vip_reviews WITH (SNAPSHOT = false)');

// Start a loop to get the latest records
while (true) {
    // Get the next record
    const result = await client.queryObject<{ mz_timestamp: string; mz_diff: number, user_id: number, 

rating: number, reviewtext: number, createdat: string, username: string, email: string }>('FETCH ALL c');

    // Loop through the records and send them to Discord
    for (const row of result.rows) {
        if (row.mz_diff > 0) {
            console.log(`${row.username} has left a bad review with rating ${row.rating}. Email: ${row.email}`);
            // Make an HTTP request to post to the Discord webhook
            if (discord_webhook_url) {
                await fetch(discord_webhook_url, {
                    method: "POST",
                    headers: {
                        "Content-Type": "application/json",
                    },
                    body: JSON.stringify({
                        content: `${row.username} has left a bad review!

Rating ${row.rating}. Email: ${row.email}`, }), }); } } } }

await client.end();

}

tail();

Rundown of the code:

- As Materialize is Postgres wire compatible, first we import the `Client` class from the `https://deno.land/x/postgres/mod.ts` module. This is the class that we will use to connect to the Materialize instance.
- Then, you can specify the Discord webhook URL which you want to send the notifications to. If you don't want to send notifications, you can leave this empty and the code will just print the reviews to the console.
- Next, we create a new `Client` instance and pass it the credentials for Materialize.
- Then we call the `connect()` method on the client instance to connect to Materialize.
- Next, we call the `queryObject()` method on the client instance to start a transaction and also call the `queryObject()` method on the client instance to declare a cursor without a snapshot.
- Finally, we start a loop to get the latest records.
- In the loop, we call the `queryObject()` method on the client instance to get the next record.
- In the loop, we loop through the records and send them to Discord.

Then run the application:

bash deno run --allow-net --allow-read --allow-env --unstable index.ts

Sample output:

user7872 has left a bad review with rating 2. Email: [email protected] user7881 has left a bad review with rating 2. Email: [email protected] user7887 has left a bad review with rating 2. Email: [email protected] user7890 has left a bad review with rating 1. Email: [email protected] user7894 has left a bad review with rating 1. Email: [email protected] … ```

The same information will also be sent to Discord as a notification:

Discord notification

Conclusion

You can leave the Deno application running so that it would be subscribed to the Materialize instance and send the latest reviews to Discord as notifications.

As a next step, you can implement some error handling for the application so that it would gracefully handle errors and send the error to Discord as a notification.

As a next step you can check out the Materialize + dbt + Redpanda demo which is based on the same user reviews mock data:

Materialize + dbt + Redpanda demo

As an extra step for this tutorial, you can try using the TrustPilot API to get accurate ratings rather than the mock data.

Helpful resources:

Community

If you have any questions or comments, please join the Materialize Slack Community!

Bobby

© 2023 Bobby Iliev - Personal Blog

Facebook 𝕏 GitHub YouTube Instagram