Decentralized App Development

Peer-to-Peer MongoDB: Part 2

Synchronizing servers using TCP sockets

Jeremy Orme
Published in
6 min readDec 23, 2023

--

Photo by Rubaitul Azad on Unsplash

Last time, we built a simple web server to read and write entries in database collections, protected so that only the owner of the cryptographic key-pair associated with a collection could modify it.

In this article, we’ll run a pair of servers each with their own copy of the database and exchange updates between them to keep them synchronized.

We’ll also investigate how to create a collection that is writable by anyone, to handle cases where we want to merge entries from multiple users — for example, a collection of comments for a blog post.

NOTE: At the end of the previous article, I alluded to using libp2p for the synchronization between servers. Unfortunately, I had message reliability issues with libp2p that were consuming a lot of my time with debugging and delaying writing this article. On reflection, I determined that the use case was sufficiently simple to implement the synchronization using sockets directly.

Simple peer-to-peer with sockets

We have a network containing multiple peer servers and we want to connect them together. For now, we’ll assume that we have a complete list of all the peers in the network. For example, if we run two peers on our local machine, they might have addresses:

        const peerAddresses = ['localhost:5000', 'localhost:5001'];

To connect to a peer we just need to know its host name and port. We can use port finder to dish out ports from a given start number, e.g.

        this._serverPort = await portfinder.getPortPromise({port: 5000});

When we initialize each peer, we create a TCP server and listen for incoming connections on its assigned port:

        this._server = net.createServer(socket => {
this._onPeerConnected(socket);

socket.on('end', () => {
this._onPeerDisconnected(socket);
});

socket.on('error', console.log);
});

this._server.on('error', console.log);
this._server.listen(this._serverPort, () => {
this._onListening();
});

Our peers are listening. Next we want each peer to connect to the others. Of course, this is not a scalable solution but let’s start simple. After creating the server we call a method to connect to the known peers:


this._tryConnectToPeers(peerAddresses);

In this method, we check to see if all the peers are connected and if not, we try to connect each peer then schedule the method to be called again after some suitable timeout (5 seconds):

    private _tryConnectToPeers(addresses: string[]) {
if (this._clients.size < addresses.length)
{
for (const address of addresses) {
this._tryConnectToPeer(address);
}
}

setTimeout(() => this._tryConnectToPeers(addresses), 5000);
}

The real work is done in _tryConnectPeer where we create a new connection provided it’s not to our own address or to a peer that’s already connected.

    private _tryConnectToPeer(address: string) {
if (this._clients.has(address)) {
console.log(`Already connected to: ${address}`);
return;
}
const [host, portStr] = address.split(':');
const port = parseInt(portStr);
if (host == 'localhost' && port == this._serverPort) {
console.log('Skipping attempt to connect to self');
return;
}
const socket = net.createConnection(port, host, () => {
this._onPeerConnected(socket);
this._clients.set(address, socket);
});

socket.on('close', () => {
this._onPeerDisconnected(socket);
this._clients.delete(address);
});

socket.on('error', console.log);
}

When a peer connects, we store its socket in a map of address -> socket. When a peer disconnects, we remove it from the map. This is the same map we checked the size of in _tryConnectPeers to see if all the peers were already connected.

Now we have sockets to our connected peers, we can add a helper to broadcast a message:

    private _sendToPeers(obj: any) {
const json = JSON.stringify(obj);
const buffer = new TextEncoder().encode(json);
for (const [_, socket] of this._clients) {
socket.write(buffer);
}
}

And update our server to handle the received message:

        this._server = net.createServer(socket => {
this._onPeerConnected(socket);

socket.on('data', msg => {
try {
const json = new TextDecoder().decode(msg);
const obj = JSON.parse(json);
this._onPeerReceived(obj)
}
catch (error: any) {
console.log(error);
}
});

// ...
});

This calls _onPeerReceived with the decoded object:

    private _onPeerReceived(obj: any) {
console.log('Received data');
console.log(obj);
}

We just need to add some handlers to report the connection events:

    private _onListening() {
console.log(`Listening for peers on port ${this._serverPort}`);
}

private _onPeerConnected(socket: net.Socket) {
console.log(`Peer connected on port ${socket.localPort}/${socket.remotePort}`);
}

private _onPeerDisconnected(socket: net.Socket) {
console.log('Peer disconnected');
}

And implement a method to disconnect from the network:

    async disconnect() {
for (const [_, socket] of this._clients) {
socket.end();
}
this._server?.close();
};

And now we have a rudimentary p2p network based on TCP sockets.

Synchronizing insertions

Now the hard work is out of the way, we can use our newfound broadcasting ability to send updates between peers when an insertion happens.

First we add an extra parameter (notifyPeers) to DbManager.insertOne:

    async insertOne(name: string, publicKeyOwner: string | null, entry: OptionalId<Document>, notifyPeers: boolean = true) { ... }

We’ll set this to false when we invoke insertOne at the remote end, to avoid sending a message storm.

Next, we broadcast the insertion to the other peers after successfully inserting locally:

            if (notifyPeers) {
this._sendToPeers({action: 'insertOne', name, publicKeyOwner, entry});
}

Finally, we handle the received message and call insertOne:

    private _onPeerReceived(obj: any) {
console.log('Received data');
console.log(obj);

switch (obj.action) {
case 'insertOne':
{
const {name, publicKey, entry} = obj;
this.insertOne(name, publicKey, entry, false);
}
break;
}
}

To test this, we need to set up our peers to write to different databases even though they connect to the same local database server. We can do that by adding the HTTP port number to the database name:

        dbManager.connect(`mongodb://localhost:27017/music-${port}`, peerAddresses);

The first instance will create an HTTP server on port 3000 and open database with name music-3000 and the second instance will create an HTTP server on port 3001 and open a database with name music-3001. Any insertions requested through either HTTP server will be broadcast to the other and applied to the other database.

This assumes that all the servers started running from the same point in time so didn’t miss any updates. We’ll come back to the issue of servers joining the party late in the next article.

Public collections

So far we’ve required that a writer must sign their entry and their public key must match the one in the collection address. This enforces that only the owner of the collection can write to it.

However, there are many cases where it’s necessary to allow multiple writers. A simple example is the collection of comments for a blog post. If people are only allowed to write collections they own then the blog editor would need to keep a list of people registered to comment and use this to find the collections that they own that contain their comments. Worse still, there wouldn’t even be a way to request to be registered using the database.

We can solve this with the concept of a public collection. Instead of appending an owner public key to the collection address, we add it to the entry _id field. We verify that the entry signature corresponds to this public key. That ensures that although users can write to the same collection, they cannot write to the same keys within it.

An example entry to add to a public collection looks like this:

{
"_id": "bc1574acbd07cd903918b9dbed20936dedde9a8a34551cf3e932de527c881a17/0",
"_signature": "12f30b61e6701546cc31d13f4b3e40d816383937f0e7e2bf5f8001f819518287d5b5b0cb18ae0852905ce2d40612cc38f25799481bbb9d5de9dc18cb0eba5d00",
"artist": "Air",
"title": "Moon Safari"
}

We to handle insert and find for public collections (we just pass null for the public key parameter):

// Post one or more signed entries into a public collection and return their ids
app.post('/collection/:name', async (req, res) => {
const result = await dbManager.insertOne(req.params.name, null, req.body);
res.send(result);
});

// Get a range of entries from a public collection
app.get('/collection/:name', async (req, res) => {
const result = await dbManager.find(req.params.name, null, req.body);
res.send(result);
});

In insertOne, we take the public key used for verification from the collection address/key depending on whether the collection is private/public:

        const id = entry._id.toString();
const publicKey = publicKeyOwner || (id.match(/^[0-9a-f]+\//) ? id.split('/')[0] : null);
if (!publicKey) {
console.log(`Attempt to insert entry into public collection '${name}' without public key prefix on _id`);
return null;
}

And in both insertOne and find, we handle the null owner public key when constructing the collection address:

        const address = publicKeyOwner ? `${name}/${publicKeyOwner}` : name;

That’s it, we can now create shared collections!

If you fancy playing around with this, the source up to this point is available on github: https://github.com/jeremyorme/bonono-server/tree/release/part-2

Next time

We’ll look at how to handle servers joining in late and therefore missing collections that were already constructed.

--

--

Jeremy Orme
Coinmonks

Software engineer. Experimenting with database decentralization