This is me…

SAGA Microservices Architecture Patterns - NodeJs & Kafka

Published 3 year ago20 min read0 comments
Pattern: Saga

The saga pattern is a failure management pattern that helps establish consistency in distributed applications, and coordinates transactions between multiple microservices to maintain data consistency.

You have applied the Database per Service pattern. Each service has its own database. Some business transactions, however, span multiple service so you need a mechanism to implement transactions that span services.

For example, let’s imagine that you are building an e-commerce store where customers have a credit limit. The application must ensure that a new order will not exceed the customer’s credit limit. Since Orders and Customers are in different databases owned by different services the application cannot simply use a local ACID transaction.

Kafka

Apache Kafka is an open-source distributed streaming system used for stream processing, real-time data pipelines, and data integration at scale. Originally created to handle real-time data feeds at LinkedIn in 2011, Kafka quickly evolved from messaging queue to a full-fledged event streaming platform capable of handling over 1 million messages per second, or trillions of messages per day.

High level diagram

Installation

Kafka

Apache Kafka can be downloaded from its official site kafka.apache.org

kafka web interface for downloading

Copy the path of the Kafka folder. Now go to config inside kafka folder and open zookeeper.properties file. Copy the path against the field dataDir and add /zookeeper-data to the path.

folder structure

kafka config folder
					    
//zookeeper.properties
dataDir=D:/Rimsan/kafka/zookeeper-data
clientPort=2181

//server.properties
log.dirs=D:/Rimsan/kafka/zookeeper-data
					
				

To easly run your zookeeper and kafka server, you can use .bat files. in which, you can specify the commands

kafka .bat files

save the seperate files as .bat extenstion

            
//start zookeeper.bat
.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties

//start server.bat
.\bin\windows\kafka-server-start.bat .\config\server.properties
            
        

whenever kafka thorows errors or unable to start, please all the file in the zookeeper-data

log folder
start zookeeper.bat
zookeeper started
start server.bat
kafka server started
let’s play with NodeJs Services

First you have to create four folders which picture shown below

Service folder structure
            
booking/package.json 
--------------------
"dependencies": {
    "dayjs": "^1.11.10",
    "express": "^4.18.2",
    "ioredis": "^5.3.2",
    "kafkajs": "^2.2.4",
    "pino": "^8.16.0",
    "uuid": "^9.0.1"
}

other folder packages
---------------------
"dependencies": {
    "dayjs": "^1.11.10",
    "express": "^4.18.2",
    "kafkajs": "^2.2.4",
    "pino": "^8.16.0"
}
            
        

booking folder:

    
index.js
--------------------

const express = require('express');
const consumer = require('./event-consumer');
const { newBooking } = require('./event-producer');
const CONFIG = require('./config');
const logger = require('./logger/logger');

const IORedis = require('ioredis');
const redis = new IORedis();
const { v4: uuidV4 } = require('uuid');

consumer().catch(error => logger.error('Error on subscribing to topic'));
const app = express();

app.post('/', express.json(), async (req, res) => {

    const passportNumber = req.body.passportNumber;

    logger.debug(`New booking initiate: ${passportNumber}`);

    const uniqueKey = uuidV4();
    const redisKey = `rimsan:new-booking:registration:${passportNumber}:${uniqueKey}`;
    const workflow = {
        history: { newBooking: 'pending' },
        payment: 'pending',
        activation: 'pending'
    }
    logger.debug(`Redis Key: ${redisKey}`);
    const save = await redis.set(redisKey, JSON.stringify(workflow));

    const event = {
        from: CONFIG.SERVICE_NAME,
        type: 'NEW_BOOKING',
        key: passportNumber,
        result: 'pending'
    }

    await newBooking(CONFIG.PRODUCE_TOPIC, event);

    res.send(uniqueKey);
});

app.get('/:passportNumber', async (req, res) => {

    const passportNumber = req.params.passportNumber;
    const key = req.query.key;

    const keyPattern = `rimsan:new-booking:registration:${passportNumber}:${key}`;
    const connection = await redis.get(keyPattern);
    logger.debug(connection, ' cached output');
    res.status(200).send(connection)

});

app.listen(CONFIG.SERVICE_PORT, () => 
    logger.info(`Running Booking server port: ${CONFIG.SERVICE_PORT}`)
);

kafka.config.js
--------------------
const { Kafka } = require('kafkajs');
const CONFIG = require('./config');

const kafka = new Kafka({
    clientId: CONFIG.SERVICE_NAME,
    brokers: [CONFIG.BROKER_URL]
});

module.exports = kafka;

config.js
--------------------
module.exports = {
    NODE_ENV: 'DEV',
    SERVICE_NAME: 'activation',
    SERVICE_PORT: 3004,
    BROKER_URL: 'localhost:9092',
    CONSUMER_GROUP: 'activation-group',
    LISTEN_TOPIC: 'new-booking',
    RESPOND_TOPIC: 'new-booking-response'
}

event-producer.js
--------------------
const { LISTEN_TOPIC } = require('./config');
const kafka = require('./kafka.config');

const producer = kafka.producer();

const send = async (msg) => {
    // Producing
    await producer.connect()
    await producer.send({
        topic: LISTEN_TOPIC,
        messages: [
            { value: JSON.stringify({ name: msg }) },
        ],
    });
}

const newBooking = async (topic, payload, key) => {
    // Producing
    await producer.connect();

    if (key) {
        await producer.send({
            topic: topic,
            messages: [
                {
                    value: JSON.stringify(payload),
                    key
                },
            ],
        });
    } else {
        await producer.send({
            topic: topic,
            messages: [
                { value: JSON.stringify(payload) },
            ],
        });
    }
}

module.exports = { send, newBooking };

event-consumer.js
--------------------
const { CONSUMER_GROUP, LISTEN_TOPIC, RESPOND_TOPIC } = require('./config');
const { newBooking } = require('./event-producer');
const kafka = require('./kafka.config');
const logger = require('./logger/logger');

const IORedis = require('ioredis');
const redis = new IORedis();

const consumer = kafka.consumer({ groupId: CONSUMER_GROUP });

const run = async () => {
    // Consuming
    await consumer.connect()

    logger.info(`subscribing to ${process.env.LISTEN_TOPIC || 'error'}`);
    await consumer.subscribe({ topic: LISTEN_TOPIC, fromBeginning: true });

    await consumer.run({
        eachMessage: async ({ topic, partition, message }) => {

            //logger.debug(`Reciveing the Message: ${message.value}`)
            const newMessage = JSON.parse(message.value.toString());
            logger.debug(`New Message: ${JSON.stringify(newMessage)}`);

            await newBooking(RESPOND_TOPIC || 'error', {
                from: process.env.SERVICE_NAME,
                type: 'ACTIVATION',
                key: newMessage?.key,
                result: 'success'
            }).catch((e) => {
                throw new Error('error on publishing message');
            });

            logger.debug('responded to message');

            await consumer.commitOffsets([
                {
                    topic,
                    partition,
                    offset: (Number(message.offset) + 1).toString(),
                },
            ]);
        },
    })
}

module.exports = run;
    

Other folder files

    
index.js
--------------------
const express = require('express');
const logger = require('./logger/logger');
const run = require('./event-consumer');
const { SERVICE_PORT } = require("./config");

const app = express();
const port = SERVICE_PORT;

run().catch((e) => logger.error('error on subscribing to topic'));

app.listen(port, () => {
    logger.info(`Registration service running on port ${port}`);
});

event-producer.js
--------------------
const { LISTEN_TOPIC } = require('./config');
const kafka = require('./kafka.config');

const producer = kafka.producer();

const send = async (msg) => {
    // Producing
    await producer.connect()
    await producer.send({
        topic: LISTEN_TOPIC,
        messages: [
            { value: JSON.stringify({ name: msg }) },
        ],
    });
}

const newBooking = async (topic, payload, key) => {
    // Producing
    await producer.connect();

    if (key) {
        await producer.send({
            topic: topic,
            messages: [
                {
                    value: JSON.stringify(payload),
                    key
                },
            ],
        });
    } else {
        await producer.send({
            topic: topic,
            messages: [
                { value: JSON.stringify(payload) },
            ],
        });
    }
}

module.exports = { send, newBooking };

event-consumer.js
--------------------
const { CONSUMER_GROUP, LISTEN_TOPIC, RESPOND_TOPIC } = require('./config');
const { newBooking } = require('./event-producer');
const kafka = require('./kafka.config');
const logger = require('./logger/logger');

const IORedis = require('ioredis');
const redis = new IORedis();

const consumer = kafka.consumer({ groupId: CONSUMER_GROUP });

const run = async () => {
    // Consuming
    await consumer.connect()

    logger.info(`subscribing to ${process.env.LISTEN_TOPIC || 'error'}`);
    await consumer.subscribe({ topic: LISTEN_TOPIC, fromBeginning: true });

    await consumer.run({
        eachMessage: async ({ topic, partition, message }) => {

            //logger.debug(`Reciveing the Message: ${message.value}`)
            const newMessage = JSON.parse(message.value.toString());
            logger.debug(`New Message: ${JSON.stringify(newMessage)}`);

            await newBooking(RESPOND_TOPIC || 'error', {
                from: process.env.SERVICE_NAME,
                type: 'PAYMENT',
                key: newMessage?.key,
                result: 'success'
            }).catch((e) => {
                throw new Error('error on publishing message');
            });

            logger.debug('responded to message');

            await consumer.commitOffsets([
                {
                    topic,
                    partition,
                    offset: (Number(message.offset) + 1).toString(),
                },
            ]);
        },
    })
}

module.exports = run;

    

All the code has been uploaded on github (download).



Next

Load Balance – Nodejs & ExpressJs with axios