在 Node.js 的微服务中,一般不同的服务模块我们会采用 TCP 进行通信,本文来简单谈一谈如何设计 TCP 服务的基础管理。
在具体设计上,本文参考了微服务框架 Seneca 所采用的通信方案 Seneca-transport,已经被实践所证明其可行性。
一提到 TCP 通信,我们肯定离不开 net 模块,事实上,借助 net 模块,我们也可以比较快速地完成一般的 TCP 通信的任务。
为了避免对基础的遗忘,我们还是先附上一个基本的 TCP 链接代码:
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 
 | const net = require('net');
 
 const server = net.createServer((socket) => {
 socket.write('goodbye\n');
 socket.on('data', (data) => {
 console.log('data:', data.toString());
 socket.write('goodbye\n');
 })
 }).on('error', (err) => {
 throw err;
 });
 
 
 server.listen(8024, () => {
 console.log('opened server on', server.address());
 });
 
 
 const net = require('net');
 
 const client = net.createConnection({ port: 8024 }, () => {
 
 console.log('connected to server!');
 client.write('world!\r\n');
 setInterval(() => {
 client.write('world!\r\n');
 }, 1000)
 });
 client.on('data', (data) => {
 console.log(data.toString());
 
 });
 client.on('end', () => {
 console.log('disconnected from server');
 });
 
 | 
其实,上述已经是一个几乎最简单的客户端和服务端通信 Demo,但是并不能在实际项目中使用,首先我们需要审视,其离生产环境还差哪些内容:
- 以上要求 Server 端要在 Client 端之前启动,并且一旦因为一些错误导致 Server 端重启了并且这个时候 Client 端正好和 Server 端进行通信,那么肯定会 crash,所以,我们需要一个更为平滑兼容的方案。
- 以上 TCP 链接的 Server 部分,并没有对 connection 进行管理的能力,并且在在以上的例子中,双方都没有主动释放链接,也就是说,建立的是一个 TCP 长连接。
- 以上链接的处理数据能力有限,只能处理纯文本的内容,并且还有一定的风险性(你也许会说可以用 JSON 的序列化反序列化的方法来处理 JSON 数据,但是你别忘了 socket.on('data'...很可能接收到的不是一个完整的 JSON,如果 JSON 较长,其可能只接收到一般的内容,这个时候如果直接JSON.parse())很可能就会报错)。
以上三个问题,便是我们要解决的主要问题,如果你看过之后立刻知道该如何解决了,那么这篇文章可能你不需要看了,否则,我们可以一起继续探索解决方案。
使用 reconnect-core
reconnect-core 是一个和协议无关的链接重试算法,其工作方式也比较简单,当你需要在 Client 端建立链接的时候,其流程是这样的:
- 调用事先传入的链接建立函数,如果这个时候返回成功了,即成功建立链接。
- 如果第一次建立链接失败了,那么再隔一段时间建立第二次,如果第二次还是失败,那么再隔一段更长的时间建立第三次,如果还是失败,那么再隔更长的一段时间……直到到达最大的尝试次数。
实际上关于尝试的时间间隔,也会有不同的策略,比较常用的是 Fibonacci 策略和 exponential 策略。
当然,关于策略的具体实现,reconnect-core 采用了一个 backoff 的库来管理,其可以支持  Fibonacci 策略和 exponential 策略以及更多的自定义策略。
对于上面提到的 DEMO 代码。我们给出 Client 端使用 reconnect-core 的一个实现:
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 
 | const Reconnect = require('reconnect-core');
 const net = require('net');
 const Ndjson = require('ndjson');
 
 const Connect = Reconnect(function() {
 var args = [].slice.call(arguments);
 return net.connect.apply(null, args)
 });
 
 let connection = Connect(function(socket) {
 socket.write('world!\r\n');
 socket.on('data', (msg) => {
 console.log('data', msg.toString());
 });
 socket.on('close', (msg) => {
 console.log('close', msg).toString();
 connection.disconnect();
 });
 socket.on('end', () => {
 console.log('end');
 });
 });
 
 connection.connect({
 port: 8024
 });
 connection.on('reconnect', function () {
 console.log('on reconnect...')
 });
 connection.on('error', function (err) {
 console.log('error:', err);
 });
 connection.on('disconnect', function (err) {
 console.log('disconnect:', err);
 });
 
 | 
采用 Reconnect 实际上相比之前是多了一层内容,我们在这里需要区分 connection 实例和 socket 句柄,并且附加正确的时间监听。
现在,我们就不用担心到底是先启动服务端还是先启动客户端了,另外,就算我们的服务端在启动之后由于某些错误关闭了一会,只要没超过最大时间(而这个也是可配置的),仍然不用担心客户端与其建立连接。
给 Server 端增加管理能力
给 Server 端增加管理能力是一个比较必要的并且可以做成不同程度的,一般来说,最重要的功能则是及时清理链接,常用的做法是收到某条指令之后进行清理,或者到达一定时间之后定时清理。
这里我们可以增加一个功能,达到一定时间之后,自动清理所有链接:
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 
 | const net = require('net');
 
 var connections = [];
 
 const server = net.createServer((socket) => {
 connections.push(socket);
 socket.write('goodbye\n');
 socket.on('data', (data) => {
 console.log('data:', data.toString());
 socket.write('goodbye\n');
 })
 }).on('error', (err) => {
 throw err;
 });
 
 setTimeout(() => {
 console.log('clear connections');
 connections.forEach((connection) => {
 connection.end('end')
 
 })
 }, 10000);
 
 
 server.listen(8024, () => {
 console.log('opened server on', server.address());
 });
 
 | 
我们可以通过connection.end('end') 和 connection.destory() 来清理,一般来说,前者是正常情况下的关闭指令,需要 Client 端进行确认,而后者则是强制关闭,一般在出错的时候会这样调用。
使用 ndjson 来格式化数据
ndjson 是一个比较方便的 JSON 序列化/反序列化库,相比于我们直接用 JSON,其好处主要体现在:
- 可以同时解析多个 JSON 对象,如果是一个文件流,即其可以包含多个 {},但是要求则是每一个占据一行,其按行分割并且解析。
- 内部使用了 split2,好处就是其返回时可以保证该行的所有内容已经接受完毕,从而防止 ndjson 在序列化的时候出错。
关于 ndjson 的基本使用,可以根据上述链接查找文档,这里一般情况下,我们的使用方式如下(以下是一个 demo):
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 
 | const net = require('net');
 
 var connections = [];
 
 const server = net.createServer((socket) => {
 connections.push(socket);
 socket.on('data', (data) => {
 console.log('data:', data.toString());
 socket.write('{"good": 1234}\r\n');
 socket.write('{"good": 4567}\n\n');
 })
 }).on('error', (err) => {
 throw err;
 });
 
 
 server.listen(8024, () => {
 console.log('opened server on', server.address());
 });
 
 
 const Reconnect = require('reconnect-core');
 const net = require('net');
 const Ndjson = require('ndjson');
 var Stream = require('stream');
 
 const Connect = Reconnect(function() {
 var args = [].slice.call(arguments);
 return net.connect.apply(null, args)
 });
 
 let connection = Connect(function(socket) {
 socket.write('world!\r\n');
 var parser = Ndjson.parse();
 var stringifier = Ndjson.stringify();
 
 function yourhandler(){
 var messager = new Stream.Duplex({ objectMode: true });
 messager._read = function () {
 
 };
 messager._write = function (data, enc, callback) {
 console.log(typeof data, data);
 
 return callback()
 };
 return messager
 }
 socket
 .pipe(parser)
 .pipe(yourhandler())
 .pipe(stringifier)
 .pipe(socket);
 
 socket.on('close', (msg) => {
 console.log('close', msg).toString();
 connection.disconnect();
 });
 socket.on('end', (msg) => {
 console.log('end', msg);
 });
 });
 connection.connect({
 port: 8024
 });
 connection.on('reconnect', function () {
 console.log('on reconnect...')
 });
 connection.on('error', function (err) {
 console.log('error:', err);
 });
 connection.on('disconnect', function (err) {
 console.log('disconnect:', err);
 });
 
 | 
其中,用户具体的逻辑代码,可以是 yourhandler 函数 _write 里面的一部分,其接收的是一个一个处理好的对象。