提倡是什么意思语解释例句-海词汉语-佛山市中考分数线
2023年4月19日发(作者:第57届格莱美颁奖典礼)nodejs微服务解决⽅案
前⾔
seneca是⼀个nodejs微服务⼯具集,它赋予系统易于连续构建和更新的能⼒。下⾯会逐⼀和⼤家⼀起了解相关技术⼊门以及实践。
这⾥插⼊⼀段硬⼴。⼩⼦再进⾏简单整合之后撸了个,有兴趣的同学过⽬⼀下,欢迎顺⼿star⼀波,另外有疑问或者代码有⽑病欢迎在博⽂
下⽅留⾔。
环境
基础环境
\"node\": \"^10.0.0\"
\"npm\": \"^6.0.0\"
\"pm2\": \"^2.10.3\"
\"rabbitmq\": \"^3.7.5\"
\"consul\": \"^1.1.0\"
\"mongodb\": \"^3.6\"
微服务⼯程
\"bluebird\": \"^3.5.1\"
\"koa\": \"^2.5.1\"
\"koa-router\": \"^7.4.0\"
\"seneca\": \"^3.4.3\"
\"seneca-web\": \"^2.2.0\"
\"seneca-web-adapter-koa2\": \"^1.1.0\"
\"amqplib\": \"^0.5.2\"
\"winston\": \"^2.4.2\"
\"mongoose\": \"^5.1.2\"
FEATURES
模式匹配做服务间调⽤:略微不同于SpringCloud服务发现(http协议、IP + PORT模式),它使⽤更加灵活的模式匹配(Patrun模
块)原则去进⾏微服务间的调⽤
接⼊koa2对C端提供RESTFUl API
插件:更灵活编写⼩⽽微的可复⽤模块
seneca内置⽇志输出
第三⽅⽇志库⽐较winston(选⽤)、bunyan、log4js
RabbitMQ消息队列
PM2:node服务部署(服务集群)、管理与监控
PM2:⾃动化部署
PM2集成docker
请求追踪(重建⽤户请求流程)
梳理Consul 服务注册与发现基本逻辑
框架集成node-consul
mongodb持久化存储
结合seneca与consul的路由服务中间件(可⽀持多个相同名字服务集群路由,通过$$version区别)
⽀持流处理(⽂件上传/下载等)
jenkins⾃动化部署
nginx负载均衡
持续集成⽅案
redis缓存
prisma提供GraphQL接⼝
模式匹配(Patrun模块)
(accout-server/src/)
const seneca = require(\'seneca\')()
(\'cmd:login\', (msg, done) => {
const { username, pass } = msg
if (username === \'asd\' && pass === \'123\') {
return done(null, { code: 1000 })
}
return done(null, { code: 2100 })
})
const Promise = require(\'bluebird\')
const act = ify(, { context: \'seneca\' })
act({
cmd: \'login\',
username: \'asd\',
pass: \'123\'
}).then(res => {
(res)
}).catch(err => {
(err)
})
执⾏后
{ code: 1000 }
{\"kind\":\"notice\",\"notice\":\"hello seneca k5i8j1cvw96h/64/10992/3.4.3/-\",\"level\":\"info\",\"seneca\":\"k5i8j1cvw96h/64/10992/3.4.3/-\",\"when\":1
⽅法,添加⼀个action pattern到Seneca实例中,它有三个参数:
1. pattern: ⽤于Seneca中JSON的消息匹配模式,对象或格式化字符串
2. sub_pattern: ⼦模式,优先级低于主模式(可选)
3. action: 当匹配成功后的动作函数
⽅法,执⾏Seneca实例中匹配成功的动作,它也有两个参数:
1. msg: JSON消息
2. sub_pattern: ⼦消息,优先级低于主消息(可选)
3. response: ⽤于接收服务调⽤结果
⽅法,为Seneca实例添加⼀个插件,它有两个参数:(此处插件的原理和中间件有⼀些不同)
1. func: 插件执⾏⽅法
2. options: 插件所需options(可选)
核⼼是利⽤JSON对象进⾏模式匹配。这个JSON对象既包含某个微服务所需要调取另⼀个微服务的特征,同时也包含传参。和Java微服务
发现有些类似不过是⽤模式代替ip+port,⽬前为⽌模式是完全可以实现服务发现功能,但是否更加灵活还有待去挖掘。
所需注意的点
各微服务之间模式需通过设计来区分
启动第⼀个微服务
(config-server/src/)
const seneca = require(\'seneca\')()
const config = {
SUCCESS_NORMAL_RES: {
code: 1000,
desc: \'服务端正常响应\'
}}
(\'$target$:config-server\', (msg, done) => {
return done(null, config)
}).listen(10011)
OR
const seneca = require(\'seneca\')()
const Promise = require(\'bluebird\')
const act = ify(, { context: seneca })
(10011)
act(\'$$target:config-server, default$:{msg:404}\').then(res => {
(res)
}).catch(err => {
(err)
})
对内:多个微服务相互调⽤(关键)
noname-server
const seneca = require(\'seneca\')()
(\'$$target:account-server\', (msg, done) => {
done(null, { seneca: \'666\' })
})
(10015)
config-server(同上)
call
const seneca = require(\'seneca\')()
const Promise = require(\'blurebird\')
const act = ify(, { context: seneca })
({
port: \'10011\',
pin: \'$$target:account-server\'
})
({
port: \'10015\',
pin: \'$$target:noname-server\'
})
act(\'$$target:account-server\').then(res => {
(res)
}).catch(err => {
(err)
})
act(\'$$target:noname-server\').then(res => {
(res)
}).catch(err => {
(err)
})
对外:提供REST服务(关键)
集成koa
const seneca = require(\'seneca\')()
const Promise = require(\'bluebird\')
const SenecaWeb = require(\'seneca-web\')
const Koa = require(\'koa\')
const Router = require(\'koa-router\')
const app = new Koa()
const userModule = require(\'./modules/\')
// 初始化⽤户模块
()
// 初始化seneca-web插件,并适配koa
(SenecaWeb, {
context: Router(),
adapter: require(\'seneca-web-adapter-koa2\'),
routes: [...]
})
// 将routes导出给koa app
(() => {
((\'web/context\')().routes())
})
(3333)
user模块
const $module = \'module:user\'
let userCount = 3
const REST_Routes = [
{
{
prefix: \'/user\',
pin: `${$module},if:*`,
map: {
list: {
GET: true,
name: \'\'
},
load: {
GET: true,
name: \'\',
suffix: \'/:id\'
},
edit: {
PUT: true,
name: \'\',
suffix: \'/:id\'
},
create: {
POST: true,
name: \'\'
},
delete: {
DELETE: true,
name: \'\',
suffix: \'/:id\'
}
}
}
]
const db = {
users: [{
id: 1,
name: \'甲\'
}, {
id: 2,
name: \'⼄\'
}, {
id: 3,
name: \'丙\'
}]
}
function user(options) {
(`${$module},if:list`, (msg, done) => {
done(null, )
})
(`${$module},if:load`, (msg, done) => {
const { id } =
done(null, (v => Number(id) === ))
})
(`${$module},if:edit`, (msg, done) => {
let { id } =
id = +id
const { name } =
const index = dex(v => === id)
if (index !== -1) {
(index, 1, {
id,
name
})
done(null, )
} else {
done(null, { success: false })
}
}
})
(`${$module},if:create`, (msg, done) => {
const { name } =
({
id: ++userCount,
name
})
done(null, )
})
(`${$module},if:delete`, (msg, done) => {
let { id } =
id = +id
const index = dex(v => === id)
if (index !== -1) {
(index, 1)
done(null, )
} else {
done(null, { success: false })
}
})
}
s = {
init: user,
routes: REST_Routes
}
vscode-restclient(vscode的restclient插件,⽤于发起RESTFUL请求)
### 1
POST localhost:3333/user HTTP/1.1
Content-Type: application/json
{
\"name\": \"测试添加⽤户\"
}
### delete
DELETE localhost:3333/user/2 HTTP/1.1
### PUT
PUT localhost:3333/user/2 HTTP/1.1
Content-Type: application/json
{
\"name\": \"测试修改⽤户信息\"
}
### GET
GET localhost:3333/user HTTP/1.1
### GET
GET localhost:3333/user/3 HTTP/1.1
seneca内置⽇志输出
可在构造函数中传⼊配置,log属性可以控制⽇志级别
例1:传字符串
require(\'seneca\')({
// quiet silent any all print standard test
log: \'all\'
})
例2:传对象
require(\'seneca\')({
log: {
// none debug+ info+ warn+
level: \'debug+\'
},
// 设置为true时,seneca⽇志功能会encapsulate senecaId,senecaTag,actId等字段后输出(⼀般为两字符)
short: true
})
建议例2代码,因为seneca-web-adapter-koa2插件打印的⽇志level为debug,利于做web接⼝访问⽇志记录。
winston⽇志模块
const { createLogger, format, transports } = require(\'winston\')
const { combine, timestamp, label, printf } = format
const logger = createLogger({
level: \'info\',
format: combine(
label({label: \'microservices\'}),
timestamp(),
printf(info => {
return `${amp} [${}] ${}: ${e}`
})
),
transports: [ new e() ]
})
// highest to lowest
const levels = {
error: 0,
warn: 1,
info: 2,
verbose: 3,
debug: 4,
silly: 5
}
s = logger
⽇志输出格式
2018-05-17T14:43:28.330Z [microservices] info: 接收到rpc客户端的调⽤请求
2018-05-17T14:43:28.331Z [microservices] warn: warn message
2018-05-17T14:43:28.331Z [microservices] error: error message
RabbitMQ消息队列服务
1. 单任务单consumer,⽣产者消费者模式
// 创建⼀个amqp对等体
const amqp = require(\'amqplib/callback_api\')
t(\'amqp://localhost\', (err, conn) => {
Channel((err, ch) => {
const q = \'taskQueue1\'
const msg = (2).join(\' \') || \'hello world\'
// 为⽅式RabbitMQ退出或者崩一剪梅歌词完整版 溃时重启后丢失队列信息,这⾥配置durable:true(同时在消费者脚本中也要配置durable:true)后,
Queue(q, { durable: true })
Queue(q, (msg), { persistent: true })
setTimeout(() => {
(); (0)
}, 100)
})
})
// 这⾥配置persistent:true,通过阅读官⽅⽂档,我理解为当程序重启后,会断点续传之前未send完成的数据消息。(但此功能并不可靠,因为不会为所有消息执⾏
// 创建⼀个amqp对等体
const amqp = require(\'amqplib/callback_api\')
t(\'amqp://localhost\', (err, conn) => {
Channel((err, ch) => {
const q = \'taskQueue1\'
// 为⽅式RabbitMQ退出或者崩溃时重启后丢失队列信息,这⾥配置durable:true(同时在消费者脚本中也要定义durable:true)后,
Queue(q, { durable: true })
ch(1)
(\" [*] Waiting for messages in %s. To exit press CTRL+C\", q)
e(q, msg => {
const secs = ng().split(\'.\').length - 1
(\" [x] Received %s\", ng())
setTimeout(() => {
(\" [x] Done\")
(msg)
}, secs * 1000)
})
})
})
// noAck配置(默认为false)表明consumer是否需要在处理完后反馈ack给producer,如果设置为true,则RabbitMQ服务如果将任务send⾄此consumer后不关⼼任
检验过程
执⾏rabbitmqctl list_queues查看当前队列
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
node (rabbitMQ执⾏过程为会先创建⼀个匿名exchange,⼀个指定queue然后将queue与该匿名exchange绑定)
rabbitmqctl list_bindings
Listing bindings for vhost /...
exchange taskQueue1 queue taskQueue1 []
rabbitmqctl list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
taskQueue1 1
node
Waiting for messages in taskQueue1. To exit press CTRL+C
[x] Received hello world
[x] Done
rabbitmqctl list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
taskQueue1 0
知识点
⽣产者消费者模式(⼀个⽣产者的消息在同⼀时间只交由⼀个消费者处理)
ACK机制(rabbitmq的确认机制)
创建队列{durable:true}以及向队列发送消息{persistent:true}(消息持久化存储,但不完全能保证,⽐如当某消息未从缓存中写到磁
盘中⽽程序崩溃时则会丢失)
Round-robin Dispatch(公平分发)
处理窗⼝控制(prefetch来控制分发窗⼝)
异步多任务处理机制(⽐如⼀个⼤任务分解,分⽽治之)
整个消息流流程(某个⽣产者进程 -> 匿名exchange -> 通过binding -> 指定queue -> 某⼀个消费者进程)
2. 单任务多consumer,发布/订阅模式(全消息模型)
const amqp = require(\'amqplib/callback_api\')
t(\'amqp://localhost\', (err, conn) => {
Channel((err, ch) => {
const ex = \'logs\'
const msg = (2).join(\' \') || \'Hello World!\'
// ex为exchange名称(唯⼀)
// 模式为fanout
// 不对消息持久化存储
Exchange(ex, \'fanout\', { durable: false })
// 第⼆个参数为指定某⼀个binding,如为空则由RabbitMQ随机指定
h(ex, \'\', (msg))
(\' [x] Send %s\', msg)
})
setTimeout(() => {
()
(0)
}, 100)
})
const amqp = require(\'amqplib/callback_api\')
t(\'amqp://localhost\', (err, conn) => {
Channel((err, ch) => {
const ex = \'logs\'
// ex -> exchange是发布/订阅消息的载体,
// fanout -> 分发消息的模式,fanout,direct,topic,headers
// durable设置为false降低⼀些可靠性,提⾼性能,因为不需要磁盘IO持久化存储消息,另外
Exchange(ex, \'fanout\', { durable: false })
// 使⽤匿名(也就是RabbitMQ⾃动⽣成随机名的queue)队列
// exclusive设置为true,即可以当其寄⽣的connection被close的时候⾃动deleted
Queue(\'\', { exclusive: true }, (err, q) => {
(\" [*] Waiting for messages in %s. To exit press CTRL+C\", )教的拼音
// 绑定队列到某个exchange载体(监听某个exchange的消息)
// 第三个⼊参为binding key
eue(, ex, \'\')
// 消费即订阅某个exchange的消息并设置处理句柄
e(, (msg) => {
(\' [x] %s\', ng())
}, { noAck: true })
})
})
})
// 因为发布/订阅消息的模式就是⾮可靠性,只有当订阅者订阅才能收到相关的消息⽽且发布者不关⼼该消息的订阅者是谁以及处理结果如何,所以这⾥noAck会置
检验过程
rabbitmqctl stop_app;rabbitmqctl reset;rabbitmqctl start_app(清空之前测试使⽤的queues、echanges、bindings)
node
[*] Waiting for messages in -lgNW51IeEfj9vt1yjMUuaw. To exit press CTRL+C
rabbitmqctl list_exchanges
Listing exchanges for vhost / ...
logs fanout
rabbitmqctl list_bindings
Listing bindings for vhost /...
exchange -jDbfwJR8TbSNJT2a2a83Og queue -jDbfwJR8TbSNJT2a2a83Og []
logs exchange -jDbfwJR8TbSNJT2a2a83Og queue []
node .
[x] . //
[x] . //
知识点
发布/订阅模式(发布者将消息以⼀对多的形式发送给订阅者处理)
noAck(此模式下推荐⽤⾮Ack机制,因为发布者往往不需要订阅者如何处理消息以及其结果)
durable:false(此模式下推荐不需要做数据持久化存储,原因如上)
exchange的⼯作模式(即路由类型,fanout,direct,topic,headers等,下节会讲解到)
整个消息流流程(某个发布者进程 -> 指定exchange -> 通过binding以及⼯作模式 -> 某个或多个匿名queue即订阅者进程)
3. Direct Routing
s = {
name: \'ex1\',
type: \'direct\',
option: {
durable: false
},
ranks: [\'info\', \'error\', \'warning\', \'severity\']
}
const amqp = require(\'amqplib/callback_api\')
const ex = require(\'./exchange\')
t(\'amqp://localhost\', (err, conn) => {
Channel((err, ch) => {
Exchange(, , s)
set十大必背经典文言文 Timeout(() => {
()
(0)
}, 0)
})
})
const amqp = require(\'amqplib/callback_api\')
const ex = require(\'./exchange\')
t(\'amqp://localhost\', (err, conn) => {
Channel((err, ch) => {
const ranks =
h(rank => {
// 声明⼀个⾮匿名queue
Queue(`${rank}-queue`, { exclusive: false }, (err, q) => {
eue(, , rank)
e(, msg => {
(\" [x] %s: \'%s\'\", gKey, ng());
}, { noAck: true })
})
})
})
})
const amqp = require(\'amqplib/callback_api\')
const ex = require(\'./exchange\')
t(\'amqp://localhost\', (err, conn) => {
Channel((err, ch) => {
const ranks =
h(rank => {
h(, rank, (`${rank} `))
})
setTimeout(() => {
()
(0)
}, 0)
})
})
检验过程
rabbitmqctl stop_app;rabbitmqctl reset;rabbitmqctl start_app(清空之前测试使⽤的queues、echanges、bindings)
node
rabbitmqctl list_exchanges
Listing exchanges for vhost / ...
s headers
ex1 direct
fanout
topic
topic
direct
direct
headers
node
rabbitmqctl list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
severity-queue 0
error-queue 0
info-queue 0
warning-queue 0
Listing bindings for vhost /...
exchange error-queue queue error-queue []
exchange info-queue queue info-queue []
exchange severity-queue queue severity-queue []
exchange warning-queue queue warning-queue []
ex1 exchange error-queue queue error []
ex1 exchange info-queue queue info []
ex1 exchange severity-queue queue severity []
ex1 exchange warning-queue queue warning []
node
[x] info: \'\'
[x] error: \'\'
[x] severity: \'\'
[x] warning: \'\'
知识点
路由key,⽤于exchange的direct⼯作模式下消息的路由
每当assertQueue时,该queue会在以queue名称当作路由key绑定到匿名exchange
可⽤于⽇志不同级别的log处理
4. Topic Routing
s = {
name: \'ex2\',
type: \'topic\',
option: {
durable: false
},
ranks: [\'info\', \'error\', \'warning\', \'severity\']
}
const amqp = require(\'amqplib/callback_api\')
const exchangeConfig = require(\'./exchange\')
t(\'amqp://localhost\', (err, conn) => {
Channel((err, ch) => {
Exchange(, , )
setTimeout(() => {
()
(0)
}, 0)
})
})
const amqp = require(\'amqplib/callback_api\')
const exchangeConfig = require(\'./exchange\')
t(\'amqp://localhost\', (err, conn) => {
Channel((err, ch) => {
const args = (2)
const keys = ( > 0) ? args : [\'\']
(\' [*] Waiting for logs. To exit press CTRL+C\');
h(key => {
Queue(\'\', { exclusive: true }, (err, q) => {
(` [x] Listen by routingKey ${key}`)
eue(, , key)
e(, msg => {
(\" [x] %s:\'%s\'\", gKey, ng());
}, { noAck: true })
})
})
})
})
const amqp = require(\'amqplib/callback_api\')
const exchangeConfig = require(\'./exchange\')
t(\'amqp://localhost\', (err, conn) => {
Channel((err, ch) => {
const args = (2)
const key = ( > 1) ? args[0] : \'\'
const msg = (1).join(\' \') || \'hello world\'
h(, key, (msg))
setTimeout(() => {
()
(0)
}, 0)
})
})
检验过程
rabbitmqctl stop_app;rabbitmqctl reset;rabbitmqctl start_app(清空之前测试使⽤的queues、echanges、bindings)
node
Listing exchanges for vhost / ...
fanout
topic
s headers
headers
ex2 topic
direct
topic
direct
node \"#.info\" \"*.error\"
[*] Waiting for logs. To exit press CTRL+C
[x] Listen by routingKey #.info
[x] Listen by routingKey *.error
node \"\" \"⽤户服务测试\"
node \"\" \"配置服务测试\"
node \"\" \"配置服务出错\"
[x] :\'⽤户服务测试\'
[x] :\'配置服务测试\'
[x] :\'配置服务出错\'
知识点
key最长为255字节
#可匹配0或多个单词,*可精确匹配1个单词
5. RPC
rpc_
const amqp = require(\'amqplib/callback_api\')
const logger = require(\'./Logger\')
let connection = null
t(\'amqp://localhost\', (err, conn) => {
connection = conn
Channel((err, ch) => {
const q = \'account_rpc_queue\'
Queue(q, { durable: true })
ch(2)
e(q, msg => {
let data = {}
let primitiveContent = ng()
try {
data = (primitiveContent)
} catch (e) {
(new Error(e))
}
(\'接收到rpc客户端的调⽤请求\')
if (ationId === \'10abc\') {
(primitiveContent)
const uid = Number() || -1
let r = getUserById(uid)
Queue(o, (ify(r)), { persistent: true })
(msg)
} else {
(\'不匹配的调⽤请求\')
}
})
})
})
function getUserById (uid) {
let result = \'\'
if (uid === +uid && uid > 0) {
result = {
state: 1000,
msg: \'成功\',
data: {
uid: uid,
name: \'⼩强\',
sex: 1
}
}
} else {
result = {
state: 2000,
msg: \'传参格式错误\'
}
}
return result
}
(\'SIGINT\', () => {
(\'SIGINT\')
connection && ()
(0)
})
rpc_
const amqp = require(\'amqplib/callback_api\')
t(\'amqp://localhost\', (err, conn) => {
Channel((err, ch) => {
const q = \'account_rpc_queue\'
const callback = \'callback_queue\'
Queue(callback, { durable: true })
e(callback, msg => {
const result = ng()
(`接收到回调的消息啦!`)
(result)
(msg)
setTimeout(() => {
()
(0)
}, 0)
})
Queue(q, { durable: true })
const msg = {
uid: 2
}
Queue(q, (ify(msg)), {
persistent: true,
correlationId: \'10abc\',
replyTo: \'callback_queue\'
})
})
})
检验过程
node rpc_
rabbitmqctl list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
account_rpc_queue 0
node rpc_
rpc_client的CLI打印
接收到回调的消息啦!
{\"state\":1000,\"msg\":\"成功\",\"data\":{\"uid\":2,\"name\":\"⼩强\",\"sex\":1}}
rpc_server的CLI打印
接收到rpc客户端的调⽤请求
{ uid: 2 }
PM2:node服务部署(服务集群)、管理与监控
启动
pm2 start
-w --watch:监听⽬录变化,如变化则⾃动重启应⽤
--ignore-file:监听⽬录变化时忽略的⽂件。如pm2 start rpc_ --watch --ignore-watch=\"rpc_\"
-n --name:设置应⽤名字,可⽤于区分应⽤
-i --instances:设置应⽤实例个数,0与max相同
-f --force: 强制启动某应⽤,常常⽤于有相同应⽤在运⾏的情况
-o --output
-e --error
--env
如pm2 start rpc_ -w -i max -n s1 --ignore-watch=\"rpc_\" -e ./server_ -o ./server_
在cluster-mode,也就是-i max下,⽇志⽂件会⾃动在后⾯追加-${index}保证不重复
其他简单且常⽤命令
pm2 stop app_name|app_id
pm2 restart app_name|app_id
pm2 delete app_name|app_id
pm2 show app_name|app_id OR pm2 describe app_name|app_id
pm2 list
pm2 monit
pm2 logs app_name|app_id --lines
Graceful Stop
pm2 stop app_name|app_id
(\'SIGINT\', () => {
(\'SIGINT\')
connection && ()
(0)
})
当进程结束前,程序会拦截SIGINT信号从⽽在进程即将被杀掉前去断开数据库连接等等占⽤内存的操作后再执⾏()从⽽优雅的
退出进程。(如在1.6s后进程还未结束则继续发送SIGKILL信号强制进程结束)
Process File
const appCfg = {
args: \'\',
max_memory_restart: \'150M\',
env: {
NODE_ENV: \'development\'
},
env_production: {
NODE_ENV: \'production\'
},
// source map
source_map_support: true,
// 不合并⽇志输出,⽤于集群服务
merge_logs: false,
// 常⽤于启动应⽤时异常,超时时间限制
listen_timeout: 5000,
// 进程SIGINT命令时间限制,即进程必须在监听到SIGINT信号后必须在以下设置时间结束进程
kill_timeout: 2000,
// 当启动异常后不尝试重启,运维⼈员尝试找原因后重试
autorestart: false,
// 不允许以相同脚本启动进程
force: false,
// 在Keymetrics dashboard中执⾏pull/upgrade操作后执⾏的命令队列
post_update: [\'npm install\'],
// 监听⽂件变化
watch: false,
// 忽略监听⽂件变化
ignore_watch: [\'node_modules\']
}
function GeneratePM2AppConfig({ name = \'\', script = \'\', error_file = \'\', out_file = \'\', exec_mode = \'fork\', instances = 1, args = \"\" }) {
if (name) {
return ({
name,
script: script || `${name}.js`,
error_file: error_file || `${name}-`,
out_file: out_file|| `${name}-`,
instances,
exec_mode: instances > 1 ? \'cluster\' : \'fork\',
args
}, appCfg)
} else {
写春的古诗句 return null
}
}
s = {
apps: [
GeneratePM2AppConfig({
name: \'client\',
script: \'./rpc_\'
}),
GeneratePM2AppConfig({
name: \'server\',
script: \'./rpc_\',
instances: 1
})
]
}
pm2 start
避坑指南:processFile⽂件命名建议为*.格式。否则后果⾃负。
监控
请移步
PM2:⾃动化部署
ssh准备
1. ssh-keygen -t rsa -C \'qingf deployment\' -b 4096
2. 如果有多密钥、多⽤户情况,建议配置~/.ssh/config⽂件,格式类似如下
// ⽤不同⽤户对不同远程主机发起ssh请求时指定私钥
Host
User deploy
IdentityFile ~/.ssh/qf_deployment_rsa
// 设置为no可去掉⾸次登陆(y/n)的选择
StrictHostKeyChecking no
// 别名⽤法
Host deployment
User deploy
Hostname
IdentityFile ~/.ssh/qingf_deployment_rsa
StrictHostKeyChecking no
1. 将公钥复制到远程(⼀般为部署服务器)对应⽤户⽬录,⽐如/home/deploy/.ssh/authorized_keys⽂件(authorized_keys⽂件
权限设置为600)
配置
与上述apps同级增加deploy属性,如下
deploy: {
production: {
\'user\': \'deploy\',
\'host\': \'\',
\'ref\': \'remotes/origin/master\',
\'repo\': \'/Cecil0o0/\',
\'path\': \'/home/deploy/apps/account-server\',
// ⽣命周期钩⼦,在ssh到远端之后setup操作之前执⾏
\'pre-setup秋思张籍 \': \'\',
// ⽣命周期钩⼦,在初始化设置即git pull之后执⾏
\'post-setup\': \'ls -la\',
// ⽣命周期钩⼦,在远端git fetch origin之前执⾏
\'pre-setup\': \'\',
// ⽣命周期钩⼦,在远端git修改HEAD指针到指定ref之后执⾏
\'post-deploy\': \'npm install && pm2 startOrRestart deploy/ --env production\',
// 以下这个环境变量将注⼊到所有app中
\"env\" : {
\"NODE_ENV\": \"test\"
}
}
}
tip:please make git working directory clean first!
然后先后执⾏以下两条命令(注意config⽂件路径)
1. pm2 deploy deploy/ production setup
2. pm2 deploy deploy/ production
其他命令
pm2 deploy
Commands:
setup run remote setup commands
update update deploy to the latest release
revert [n] revert to [n]th last deployment or 1
curr[ent] output current release commit
prev[ious] output previous release commit
exec|run
list list previous deploy commits
[ref] deploy to [ref], the \"ref\" setting, or latest tag
推荐shell toolkit
请求追踪
如何?
在以及中使⽤rgs[\'tx$\']值作为traceID标识处于某⼀条请求流程。另外seneca内置log系统会打
印此值。
疑问?
seneca内置log系统如何做⾃定义⽇志打印?
温馨提⽰:请以正常的http请求开始,因为经过测试如果微服务⾃主发起act,其rgs[\'tx$\']值不同。
Consul 服务注册与发现
是⼀个分布式集群服务注册发现⼯具,并具有健康检查、分级式KV存储、多数据中⼼等⾼级特性。
安装
可选择使⽤预编译的
也可选择克隆源码后编译安装
基础使⽤
以开发模式快速启动服务模式代理并开启web界⾯访问:8500
consul agent -dev -ui
编写服务定义⽂件
{
\"service\": {
// 服务名,稍后⽤于query服务
\"name\": \"account-server\",
// 服务标签
\"tags\": [\"account-server\"],
// 服务元信息
\"meta\": {
\"meta\": \"for my service\"
},
// 服务端⼝
\"port\": 3333,
// 不允许标签覆盖
\"enable_tag_override\": false,
// 脚本检测做health checks 与-enable-script-checks=true配合使⽤,有脚本模式、TCP模式、HTTP模式、TTL模式
\"checks\": [
{
\"http\": \"localhost:3333/user\",
\"interval\": \"10s\"
}
]
}
}
query定义的account-server服务
[
{
\"ID\": \"e66eb1ff-460c-e63f-b4ac-0cb42daed19c\",
\"Node\": \"\",
\"Address\": \"127.0.0.1\",
\"Datacenter\": \"dc1\",
\"TaggedAddresses\": {
\"lan\": \"127.0.0.1\",
\"wan\": \"127.0.0.1\"
},
\"NodeMeta\": {
\"consul-network-segment\": \"\"
},
\"ServiceID\": \"account-server\",
\"ServiceName\": \"account-server\",
\"ServiceTags\": [
\"account-server\"
],
\"ServiceAddress\": \"\",
\"ServiceMeta\": {
\"meta\": \"for my service\"
},
\"ServicePort\": 3333,
\"ServiceEnableTagOverride\": false,
\"CreateIndex\": 6,
\"ModifyIndex\": 6
}
]
⽣产级别使⽤(分布式集群)
某⼀个结点启动⼀个server模式代理,如下
consul agent -server -bootstrap-expect=1
-data-dir=/tmp/consul -node=agent-one -bind=valid extranet IP
-enable-script-checks=true -config-dir=/usr/local/etc/consul.d
查看集群成员
consul members
Node Address Status Type Build Protocol DC Segment
agent-one valid extranet IP:8301 alive server 1.1.0 2 dc1
另⼀个结点启动⼀个client模式代理,如下
consul agent
-data-dir=/tmp/consul -node=agent-two -bind=139.129.5.228
-enable-script-checks=true -config-dir=/usr/local/etc/consul.d
查看集群成员
consul members
Node Address Status Type Build Protocol DC Segment
agent-two 139.129.5.228:8301 alive server 1.1.0 2 dc1
加⼊Cluster
consul join 139.129.5.228
consul members
Node Address Status Type Build Protocol DC Segment
agent-one valid extranet IP:8301 alive server 1.1.0 2 dc1
agent-two 139.129.5.228:8301 alive server 1.1.0 2 dc1
集成node-consul
// 服务注册与发现
// /silas/node-consul#catalog-node-services
\'serverR&D\': {
consulServer: {
type: \'consul\',
host: \'127.0.0.1\',
port: 8500,
secure: false,
ca: [],
defaults: {
token: \'\'
},
promisify: true
},
bizService: {
name: \'defaultName\',
id: \'defaultId\',
address: \'127.0.0.1\',
port: 1000,
tags: [],
meta: {
version: \'\',
description: \'注册集群\'
},
check: {
http: \'\',
// check间隔时间(ex: 15s)
interval: \'10s\',
// check超时时间(ex: 10s)
timeout: \'2s\',
// 处于临界状态后⾃动注销服务的超时时间
deregistercriticalserviceafter: \'30s\',
十六字令三首注音版 // 初始化状态值为成功
status: \'passing\',
// 备注
notes: \'{\"version\":\"111\",\"microservice-port\":1115}\'
}
}
}
/*
* @Author: Cecil
* @Last Modified by: Cecil
* @Last Modified time: 2018-06-02 11:26:49
* @Description 微服务注册⽅法
*/
const defaultConf = require(\'../config\')[\'serverR&D\']
const { ObjectDeepSet, isString } = require(\'../helper/utils\')
const Consul = require(\'consul\')
const { generateServiceName, generateCheckHttp } = require(\'../helper/consul\')
// 注册服务
function register({ consulServer = {}, bizService = {} } = {}) {
if (! && isString()) throw new Error(\'name is invalid!\')
if ( !== +) throw new Error(\'port is invalid!\')
if (! && isString()) throw new Error(\'host is invalid!\')
if (!.$$version) throw new Error(\'meta.$$version is invalid!\')
四年级上册语文词语表拼音 if (!.$$microservicePort) throw new Error(\'meta.$$microservicePort is invalid!\')
const consul = Consul(ObjectDeepSet(Server, consulServer))
const service = vice
= generateServiceName()
=
s =
=
= generateCheckHttp(, )
= ify()
return new Promise((resolve, reject) => {
().then(services => {
// 检查主机+端⼝是否已被占⽤
(services).some(key => {
if (services[key].Address === s && services[key].Port === ) {
throw new Error(`该服务集群endpoint[${s}, ${}]已被占⽤!`)
}
})
// 注册集群服务
er(service).then(() => {
(`${}服务已注册`)
resolve(services)
}).catch(err => {
(err)
})
}).catch(err => {
throw new Error(err)
})
})
}
s = class ServerRegister {
constructor() {
er = register
}
}
验证
保证runtime中存在consul和mongodb服务后,clone该仓库,cd到⼯程根⽬录下,运⾏node src即可。
框架集成node-consul
/*
* @Author: Cecil
* @Last Modified by: Cecil
* @Last Modified time: 2018-06-02 13:58:22
* @Description 微服务注册⽅法
*/
const defaultConf = require(\'../config\')[\'serverR&D\']
const { ObjectDeepSet, isString } = require(\'../helper/utils\')
const Consul = require(\'consul\')
const { generateServiceName, generateCheckHttp } = require(\'../helper/consul\')
const logger = new (require(\'./logger\'))().generateLogger()
// 注册服务⽅法定义
function register({ consulServer = {}, bizService = {} } = {}) {
if (! && isString()) throw new Error(\'name is invalid!\')
if ( !== +) throw new Error(\'port is invalid!\')
if (! && isString()) throw new Error(\'host is invalid!\')
if (!.$$version) throw new Error(\'meta.$$version is invalid!\')
if (!.$$microservicePort) throw new Error(\'meta.$$microservicePort is invalid!\')
const consul = Consul(ObjectDeepSet(Server, consulServer))
const service = vice
= generateServiceName()
=
s =
=
= generateCheckHttp(, )
= ify()
return new Promise((resolve, reject) => {
().then(services => {
// 检查主机+端⼝是否已被占⽤
(services).some(key => {
if (services[key].Address === s && services[key].Port === ) {
throw new Error(`该服务集群endpoint[${s}, ${}]已被占⽤!`)
}
})
// 注册集群服务
er(service).then(() => {
(`${}服务注册成功`)
resolve(services)
}).catch(err => {
(err)
})
}).catch(err => {
throw new Error(err)
})
})
}
s = class ServerRegister {
constructor() {
er = register
}
}
account-server/src/
const vastify = require(\'vastify\')
const version = require(\'../\').version
const microservicePort = 10015
const httpPort = 3333
// 注册服务
er({
bizService: {
name: \'account-server\',
host: \'127.0.0.1\',
port: httpPort,
meta: {
$$version: version,
$$microservicePort: microservicePort
}
}
})
Mongodb持久化存储
框架使⽤mongoose做mongoClient,当然你也可以选⽤原⽣nodejs mongoClient。
改造之前的user模块,偷个懒就不贴代码了,具体请查看
结合seneca以及consul的路由服务中间件
/*
* @Author: Cecil
* @Last Modified by: Cecil
* @Last Modified time: 2018-06-02 16:22:02
* @Description 微服务内部路由中间件,暂不⽀持⾃定义路由匹配策略
*/
\'use strict\'
const Consul = require(\'consul\')
const defaultConf = require(\'../config\')
const { ObjectDeepSet, isNumber } = require(\'../helper/utils\')
const { getServiceNameByServiceKey, getServiceIdByServiceKey } = require(\'../helper/consul\')
const logger = new (require(\'../tools/logger\'))().generateLogger()
const { IPV4_REGEX } = require(\'../helper/regex\')
let services = {}
let consul = null
/**
* @author Cecil0o0
* @description 同步consul服务中⼼的所有可⽤服务以及对应check并组装成对象以⽅便取值
*/
function syncCheckList () {
return new Promise((resolve, reject) => {
().then(allServices => {
if ((allServices).length > 0) {
services = allServices
().then(checks => {
(checks).forEach(key => {
allServices[getServiceIdByServiceKey(key)][\'check\'] = checks[key]
})
resolve(services)
}).catch(err => {
throw new Error(err)
throw new Error(err)
})
} else {
const errmsg = \'未发现可⽤服务\'
(errmsg)
reject(errmsg)
}
}).catch(err => {
throw new Error(err)
})
})
}
function syncRoutingRule(senecaInstance = {}, services = {}) {
(services).forEach(key => {
let service = services[key]
let name = getServiceNameByServiceKey(key)
let $$addr = s
let $$microservicePort = \'\'
let $$version = \'\'
try {
let base = ()
$$microservicePort = base.$$microservicePort
$$version = base.$$version
} catch (e) {
}
if (IPV4_($$addr) && isNumber($$microservicePort)) {
if ( === \'passing\') {
({
host: $$addr,
port: $$microservicePort,
pin: {
$$version,
$$target: name
}
})
} else {
(`${$$target}@${$$version || \'⽆\'}服务处于critical,因此⽆法使⽤`)
}
} else {
警字组词 (`主机(${$$addr})或微服务端⼝号(${$$microservicePort})有误,请检查`)
}
})
}
(`服务名为${serviceName}。该服务为⾮标准JSON格式,程序已忽略。请检查服务注册⽅式(请确保调⽤ServerRegister的register来注册
function startTimeInterval() {
setInterval(syncCheckList, esRefresh)
}
function microRouting(consulServer) {
var self = this
consul = Consul(ObjectDeepSet(defaultConf[\'serverR&D\'].consulServer, consulServer))
syncCheckList().then(services => {
syncRoutingRule(self, services)
})
}
s = microRouting
在保证有consul与mongodb的runtime后,请结合这两个, Demo进⾏测试。
[未完待续....]
RFID是什么意思D在线翻译读音例句-求职英语
更多推荐
bluebird是什么意思ebird在线翻译读音例
发布评论