centos8安装nsq

1.NSQ官网

http://nsq.io

2.相关概念


    nsqlookupd:管理nsqd节点拓扑信息并提供最终一致性的发现服务的守护进程
    nsqd:负责接收、排队、转发消息到客户端的守护进程,并且定时向nsqlookupd服务发送心跳
    nsqadmin:nsq的web统计界面,可实时查看集群的统计数据和执行一些管理任务
    utilities:常见基础功能、数据流处理工具,如nsq_stat、nsq_tail、nsq_to_file、nsq_to_http、nsq_to_nsq、to_nsq

3.二进制安装(无需Go环境)

 

下载地址:

https://nsq.io/deployment/installing.html

https://github.com/nsqio/nsq

      nsq官网下载二进制linux安装包:nsq-1.0.0-compat.linux-amd64.go1.8.tar.gz
      解压到家目录tar -xvzf nsq-1.0.0-compat.linux-amd64.go1.8.tar.gz
给bin目录下文件添加执行权限

cd ~/nsq-1.0.0-compat.linux-amd64.go1.8sudo chmod -R u+x bin/

3.1.简单启动

进入bin目录

打开一个终端,启动nsqlookupd

nohup ./nsqlookupd &

打开另一个终端,启动nsqd

nohup ./nsqd --lookupd-tcp-address=127.0.0.1:4160 &

打开另一个终端,启动nsqadmin

nohup ./nsqadmin --lookupd-http-address=127.0.0.1:4161 &

访问nsqadmin监控

http://127.0.0.1:4171

打开另外一个终端,启动nsq_to_file,将消息写入/tmp文件的日志文件,文件名默认由主题topic+主机+日期时间戳组成

nohup ./nsq_to_file --topic=test --output-dir=/tmp --lookupd-http-address=127.0.0.1:4161 &

使用curl命令,发布一条消息

curl -d 'hello world' 'http://127.0.0.1:4151/pub?topic=test'

鼎云博客

鼎云博客

 

3.2本地多nsq启动
 

把~nsq/bin目录加入path

start.sh

#!/bin/sh
#服务启动
#lookupd:151 152
#更改 --data-path 所指定的数据存放路径,否则无法运行
echo '删除日志文件'
rm -rf ./log/*
 
echo '启动nsqlookupd服务'
nohup nsqlookupd >./log/nsqlookupd.log 2>&1 &
 
 
echo '启动nsqd服务'
nohup nsqd --lookupd-tcp-address=0.0.0.0:4160 -tcp-address="0.0.0.0:4153" --data-path=./data1  >./log/nsqd1.log 2>&1 &
nohup nsqd --lookupd-tcp-address=0.0.0.0:4160 -tcp-address="0.0.0.0:4154" --data-path=./data2 -http-address="0.0.0.0:4155" >./log/nsqd2.log 2>&1 &
 
 
echo '启动nsqdadmin服务'
nohup nsqadmin --lookupd-http-address=127.0.0.1:4161 >./log/nsqadmin.log 2>&1 &

stop.sh

#!/bin/sh#服务停止ps -ef | grep nsq| grep -v grep | awk '{print $2}' | xargs kill -2

 

3.2集群nsq启动

假设我们的服务器安装下面要求编排。

nsqlookup 集群列表

192.168.234.77192.168.234.36192.168.234.39

nsq 节点

192.168.234.117192.168.234.118

nsqadmin 节点

192.168.234.119


构建环境变量

设置nsq的环境变量,假设nsq的bin路径在/usr/local/software/nsq下。

运行命令如下:

vi /etc/profileexport NSQ_HOME=/usr/local/software/nsqexport PATH=$PATH:$NSQ_HOME/bin. /etc/profile

构建运行脚本

创建执行脚本,分别创建start-nsqlookup.sh, start-nsq.sh和 start-nsqadmin.

start-nsqlookup.sh

#nsqlookupeval nsqlookupd --verbose > nsqlookup.log 2>&1 "&"

start-nsq.sh 

#nsq eval nsqd –data-path=/usr/local/software/nsq/data –lookupd-tcp-address=192.168.234.77:4160,192.168.234.36:4160,192.168.234.39:4160 > nsqd.log 2>&1 “&”

start-nsqadmin.sh 

#nsqadmin eval nsqadmin –lookupd-http-address=192.168.234.77:4161,192.168.234.36:4161,192.168.234.39:4161 > nsqadmin.log 2>&1 “&”

运行脚本

首先需要安装次序运行上面的脚本,必选先运行nsqlookup集群,在运行start-nsqadmin.sh或者start-nsq.sh.

在对应服务器上执行对应的脚本。

nsqlookup节点

#192.168.234.77
#192.168.234.36
#192.168.234.39
sh start-nsqlookup.sh

nsq节点上 

#192.168.234.117 
#192.168.234.118 
sh start-nsq.sh

nsqadmin节点上 

#192.168.234.119 
sh start-nsqadmin.sh

查看运行效果,可以直接访问nsqadmin节点

http://192.168.234.119:4171/。
鼎云博客

 

创建Producer

package mainimport (    "log"    "github.com/bitly/go-nsq"    "io/ioutil"    "strconv")var nullLogger = log.New(ioutil.Discard, "", log.LstdFlags)func sendMsg(message string){    //init default config    config := nsq.NewConfig()    w, _ := nsq.NewProducer("192.168.2.117:4150", config)    err := w.Ping()    if err != nil {        //192.168.2.117:4150,192.168.2.68:4150        log.Fatalln("error ping 10.50.115.16:4150", err)        // switch the second nsq. You can use nginx or HAProxy for HA.        w, _ = nsq.NewProducer("192.168.2.68:4150", config)    }    w.SetLogger(nullLogger, nsq.LogLevelInfo)    err2 := w.Publish("a-test", []byte(message))    if err2 != nil {        log.Panic("Could not connect nsq")    }    w.Stop()}func main() {    for i := 0; i < 2; i ++ {        sendMsg("msg index "+ strconv.Itoa(i + 10000))    }}

恭喜,你已经成功发送两个消息到nsq节点。创建Consumer

package mainimport (    "log"    "github.com/bitly/go-nsq"    "fmt")func doSimpleConsumerTask(){    config := nsq.NewConfig()    q, _ := nsq.NewConsumer("a-test", "ch", config)    q.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error {        log.Printf("message: %v", string(message.Body))        message.Finish()        return nil    }))    lookupAddr := []string {        "192.168.234.77:4161",        "192.168.234.36:4161",        "192.168.234.39:4161",    }    err := q.ConnectToNSQLookupds(lookupAddr)    if err != nil {        log.Panic("Could not connect")    }    <-q.StopChan    stats := q.Stats()    fmt.Sprintf("message received %d, finished %d", stats.MessagesReceived, stats.MessagesFinished)}func main(){    doSimpleConsumerTask()}


程序运行输出:

2017/02/23 09:07:50 message: msg index 10000
2017/02/23 09:07:50 message: msg index 10001

消费者持续接受发送来的信息,运行程序后端。


参考:

https://blog.csdn.net/xie1xiao1jun/article/details/78685036

https://blog.csdn.net/john_f_lau/article/details/56666196 
https://www.jianshu.com/p/4257dd2d3946


鼎云博客
支持免登录发表评论
  • 最新评论
  • 总共0条评论