搭建基于consul,registrator,nsq的GO体系Docker开发环境

使用GO作为生产服务开发语言,避免不了与以下几大基础组件打交道,他们分别是:

consul,registrator,nsq

consul

Consul 是一个支持多数据中心分布式高可用的服务发现和配置共享的服务软件,通常用于docker 实例的注册与配置共享

registrator

Registrator 去除了需要手动管理Consul服务条目的复杂性,它监视容器的启动和停止,根据容器暴露的端口和环境变量自动注册服务。

nsq

NSQ是一个基于Go语言的分布式实时消息平台

基于Docker容器的组件搭建

以上组件都提供了官方docker镜像,我们只需要拉取镜像启动即可,以下提供组件使用命令,可供单机开发使用,复杂如consul组集群等,不在此讨论

consul启动命令
docker stop dev-consul
docker rm dev-consul
docker run -d --name=dev-consul -p 8400:8400 -p 8500:8500/tcp -p 8600:53/udp -e 'CONSUL_LOCAL_CONFIG={"bootstrap_expect":1,"datacenter":"dc1","data_dir":"/usr/local/bin/consul.d/data","server":true}' consul agent -server -bind=127.0.0.1 -client=0.0.0.0

bind为服务发布地址,默认为网卡发布,否则可能只发布到容器内部ip

registrator启动命令
docker stop registrator
docker rm registrator
docker run -d --restart=always --name=registrator \
-v /var/run/docker.sock:/tmp/docker.sock \
gliderlabs/registrator:latest -ip yourip  consul://yourip:8500

其中ip地址为该服务器ip地址,主动申明的目的在于防止registrator自动将127.0.0.1本地回环作为对外暴露的ip地址注册到consul上。

nsq启动命令

nsqlookupd: docker run -d --restart=always --name lookupd \ -p 4160:4160 \ -p 4161:4161 \ nsqio/nsq /nsqlookupd nsqd: docker run -d --restart=always --name nsqd \ -p 4150:4150 \ -p 4151:4151 \ nsqio/nsq /nsqd --broadcast-address=yourip --lookupd-tcp-address=yourip:4160 nsqadmin: docker run -d --restart=always --name nsqadmin \ -p 4171:4171 \ nsqio/nsq /nsqadmin --lookupd-http-address=yourip:4161

Docker配置使用harbor私服

Docker默认使用docker-hub拉取镜像,使用harbor私服,必须对docker进行配置,否则docker在拉取镜像时会报错

1. 添加harbor私服配置

创建或修改以下文件:

vim /etc/docker/daemon.json

添加或增加harbor配置:

{ "insecure-registries":["myhostname"] }
2. 登录harbor
docker login -u admin -p Harbor12345 yourhostname

登录完成后会在docker配置中增加一条授权记录

cat /root/.docker/config.json

Docker私服Harbor安装

Docker私服一般选用vmware的harbor

官方地址:

https://github.com/vmware/harbor/releases

1. 安装dcoker-compose

harbor依赖docker-compose组件,首先安装docker-compose

docker-compose官方地址:

https://github.com/docker/compose/releases

安装命令:

curl -L https://github.com/docker/compose/releases/download/1.13.0/docker-compose-`uname -s`-`uname -m` > /usr/local/bin/docker-compose
chmod +x /usr/local/bin/docker-compose
2. 安装harbor安装包

harbor安装包分为离线安装和在线安装,考虑到国内网络环境,一般选择离线安装

下载最新版本,目前为1.1.1
wget https://github.com/vmware/harbor/releases/download/v1.1.1/harbor-offline-installer-v1.1.1.tgz
解压缩
tar xzvf harbor-offline-installer-v1.1.1.tgz
修改配置文件

进入harbor安装目录,修改配置文件

vim harbor.cfg

以下必须修改:

hostname ,harbor启动后的访问ip或者域名

以下酌情修改:

harbor_admin_password ,harbor的默认用户名密码,默认为:admin/Harbor12345

安装harbor
./install.sh
3. 验证harbor

浏览器打开以下url:
http://yourhostname/harbor

4. 启停harbor

harbor使用docker作为容器,启停harbor前,必须首先启动docker服务

停止harbor
docker-compose stop
启动start
docker-compose stop

Centos安装Docker特定版本

某些情况下,不希望安装官方最新版本,这个时候就需要指定版本进行安装

1. 前置条件
必须是CENTOS7,64位
2. 安装源
sudo tee /etc/yum.repos.d/docker.repo
<<-'EOF'
[dockerrepo]
name=Docker Repository
baseurl=https://yum.dockerproject.org/repo/main/centos/7/
enabled=1
gpgcheck=1
gpgkey=https://yum.dockerproject.org/gpg
EOF
3. 选择版本号

进入

https://yum.dockerproject.org/repo/main/centos/7/Packages/

选择需要安装的版本号,如:

docker-engine-1.12.4-1.el7.centos.src.rpm
docker-engine-1.12.4-1.el7.centos.x86_64.rpm
docker-engine-1.12.5-1.el7.centos.src.rpm
docker-engine-1.12.5-1.el7.centos.x86_64.rpm

4. 安装依赖包

docker-engine依赖docker-engine-selinux包,先安装相同版本的docker-engine-selinux包:

yum install docker-engine-selinux-1.12.5-1.el7.centos.noarch.rpm

5. 安装Docker

yum install docker-engine-1.12.5-1.el7.centos.x86_64.rpm

6. 启动Docker
sudo systemctl start docker
7. 测试Docker
sudo docker run hello-world

Centos安装Docker最新版

官方文档:

https://store.docker.com/editions/community/docker-ce-server-centos?tab=description

1. 前置条件
必须是CENTOS7,64位
2. 安装源

安装必要工具集

sudo yum install -y yum-utils

安装docker官方源

sudo yum-config-manager \
--add-repo \
https://download.docker.com/linux/centos/docker-ce.repo

更新yum缓存

sudo yum makecache fast
3. 安装Dcoker
sudo yum -y install docker-ce
4. 启动Docker
sudo systemctl start docker
5. 测试Docker
sudo docker run hello-world

hibernate自动添加永真1=1,导致Druid sql防火墙报错的问题

最近工程突然报错:
java.sql.SQLException: sql injection violation, part alway true condition not allow : select count(*) where this_.id<>? and 1=1
at com.alibaba.druid.wall.WallFilter.check(WallFilter.java:671)
at com.alibaba.druid.wall.WallFilter.connection_prepareStatement(WallFilter.java:214)

错误内容是druid的sql防火墙报警,发现是hibernate自动拼接了1=1的永真条件,而druid只会放行排在第一的永真条件,查看hibernate源代码后发现,当引用Junction生成sql时,如果条件为空,则会自动拼接1=1的永真条件。

解决办法也很简单,修改自己的代码,将引用junction的条件拼接放在第一位即可。

druid升级到最新的1.0.16-SNAPSHOT引发的配置问题

盲升druid至最新的1.0.16-SNAPSHOT版本后,启动工程报如下错误:
[com.alibaba.druid.pool.vendor.MySqlValidConnectionChecker]-[WARN] Unexpected error in ping
Caused by: java.lang.IllegalArgumentException: timeout can’t be negative

查源码后发现,原来validationQueryTimeout变量默认值为-1,而不是0,导致socket接口报错。

解决办法:
在datasource中增加以下配置:

<property name="validationQueryTimeout" value="10000" />

vert.x笔记:6.vert.x集群化部署

vert.x支持集群化部署,默认封装使用的是一个叫Hazelcast的框架,从官方github上看到的开发进度表示,3.1可能会引入比较大众点的zookeeper作为集群的协作框架。

demo工程还是使用第5章中的dubbo服务demo代码

修改启动类:

package com.heartlifes.vertx.demo.dubbo;

import io.vertx.core.AsyncResult;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.spi.cluster.ClusterManager;
import io.vertx.spi.cluster.hazelcast.HazelcastClusterManager;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

import com.hazelcast.config.Config;
import com.hazelcast.config.GroupConfig;

public class SpringMain {

    private static Vertx vertx = Vertx.vertx();// 集群初始化失败的情况下,可以使用默认vertx实例
    private static ApplicationContext ctx = null;

    public static void main(String[] args) {
        // 配置文件方式
        ctx = new ClassPathXmlApplicationContext("dubbo-consumer.xml");
        // Hazelcast配置类
        Config cfg = new Config();
        // 加入组的配置,防止广播环境下,负载串到别的开发机中
        GroupConfig group = new GroupConfig();
        group.setName("p-dev");
        group.setPassword("p-dev");
        cfg.setGroupConfig(group);
        // 申明集群管理器
        ClusterManager mgr = new HazelcastClusterManager(cfg);
        VertxOptions options = new VertxOptions().setClusterManager(mgr);
        // 集群化vertx
        Vertx.clusteredVertx(options, SpringMain::resultHandler);

    }

    private static void resultHandler(AsyncResult<Vertx> res) {
        // 如果成功,使用集群化的vertx实例
        if (res.succeeded()) {
            vertx = res.result();
            // 这里要注意,一定要在异步回调中,获取了vertx实例后,再去部署模块
            // 由于vert.x所有内部逻辑都是异步调用的,所以,如果你在异步回调前就去部署模块,最终会导致集群失败
            deploy(vertx);
        } else {
            System.out.println("cluster failed, using default vertx");
            deploy(vertx);
        }
    }

    private static void deploy(Vertx vertx) {
        vertx.deployVerticle(new SpringVerticle(ctx));
        vertx.deployVerticle(new ServerVerticle());
    }

}

启动多个主程序,会发现后台输出类似如下的日志信息

八月 04, 2015 2:05:09 下午 com.hazelcast.instance.DefaultAddressPicker
信息: [LOCAL] [p-dev] [3.5] Prefer IPv4 stack is true.
八月 04, 2015 2:05:09 下午 com.hazelcast.instance.DefaultAddressPicker
信息: [LOCAL] [p-dev] [3.5] Picked Address[192.168.1.119]:5701, using socket ServerSocket[addr=/0:0:0:0:0:0:0:0,localport=5701], bind any local is true
八月 04, 2015 2:05:09 下午 com.hazelcast.spi.OperationService
信息: [192.168.1.119]:5701 [p-dev] [3.5] Backpressure is disabled
八月 04, 2015 2:05:09 下午 com.hazelcast.spi.impl.operationexecutor.classic.ClassicOperationExecutor
信息: [192.168.1.119]:5701 [p-dev] [3.5] Starting with 2 generic operation threads and 4 partition operation threads.
八月 04, 2015 2:05:10 下午 com.hazelcast.system
信息: [192.168.1.119]:5701 [p-dev] [3.5] Hazelcast 3.5 (20150617 - 4270dc6) starting at Address[192.168.1.119]:5701
八月 04, 2015 2:05:10 下午 com.hazelcast.system
信息: [192.168.1.119]:5701 [p-dev] [3.5] Copyright (c) 2008-2015, Hazelcast, Inc. All Rights Reserved.
八月 04, 2015 2:05:10 下午 com.hazelcast.instance.Node
信息: [192.168.1.119]:5701 [p-dev] [3.5] Creating MulticastJoiner
八月 04, 2015 2:05:10 下午 com.hazelcast.core.LifecycleService
信息: [192.168.1.119]:5701 [p-dev] [3.5] Address[192.168.1.119]:5701 is STARTING
八月 04, 2015 2:05:14 下午 com.hazelcast.cluster.impl.MulticastJoiner
信息: [192.168.1.119]:5701 [p-dev] [3.5] 


Members [1] {
    Member [192.168.1.119]:5701 this
}

八月 04, 2015 2:05:14 下午 com.hazelcast.core.LifecycleService
信息: [192.168.1.119]:5701 [p-dev] [3.5] Address[192.168.1.119]:5701 is STARTED
八月 04, 2015 2:05:15 下午 com.hazelcast.partition.InternalPartitionService
信息: [192.168.1.119]:5701 [p-dev] [3.5] Initializing cluster partition table first arrangement...
八月 04, 2015 2:05:22 下午 com.hazelcast.nio.tcp.SocketAcceptor
信息: [192.168.1.119]:5701 [p-dev] [3.5] Accepting socket connection from /192.168.1.119:59906
八月 04, 2015 2:05:22 下午 com.hazelcast.nio.tcp.TcpIpConnectionManager
信息: [192.168.1.119]:5701 [p-dev] [3.5] Established socket connection between /192.168.1.119:5701
八月 04, 2015 2:05:28 下午 com.hazelcast.cluster.ClusterService
信息: [192.168.1.119]:5701 [p-dev] [3.5] 

Members [2] {
    Member [192.168.1.119]:5701 this
    Member [192.168.1.119]:5702
}

八月 04, 2015 2:05:29 下午 com.hazelcast.partition.InternalPartitionService
信息: [192.168.1.119]:5701 [p-dev] [3.5] Re-partitioning cluster data... Migration queue size: 135
八月 04, 2015 2:05:30 下午 com.hazelcast.partition.InternalPartitionService
信息: [192.168.1.119]:5701 [p-dev] [3.5] All migration tasks have been completed, queues are empty.

vert.x笔记:5.vert.x集成dubbo服务

vert.x

基础介绍:

dubbo是阿里巴巴内部的rpc远程调用框架,和spring无缝对接,自带loadbalance,是用来搭建soa服务架构的利器,可惜听说在阿里内部斗争中,已经被hsf干掉了。但是,对于我们这种小企业来说,dubbo还是搭建高可用服务的不二选择。dubbo官方地址:http://dubbo.io

vert.x+dubbo可以搭建一个逼格很高的微服务架构,即vert.x用于发布服务,通过事件总线,调用后端的dubbo业务处理服务。从而完成rest服务与业务代码的完美解耦。

本章基用到前序章节的所有知识,并且这里将不会介绍dubbo服务的开发,默认你会玩dubbo服务。

pom中加入以下依赖:

<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>dubbo</artifactId>
    <version>${dubbo.version}</version>
    <exclusions>
            <exclusion>
            <groupId>org.springframework</groupId>
            <artifactId>spring</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.3.6</version>
    <exclusions>
        <exclusion>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>com.github.sgroschupf</groupId>
    <artifactId>zkclient</artifactId>
    <version>0.1</version>
</dependency>
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.1.41</version>
</dependency>
<dependency>
        <groupId>org.jboss.netty</groupId>
    <artifactId>netty</artifactId>
    <version>3.2.5.Final</version>
</dependency>

引用一个dubbo服务:

所谓集成dubbo,从本质来讲就是配置dubbo服务,并且以xml形式集成spring。
我们假设后台有这么一个dubbo服务,现在要在vert.x中调用,那么我们的做法和普通的dubbo consumer一样,申明一个spring配置文件,并引用该服务。配置文件如下:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:jdbc="http://www.springframework.org/schema/jdbc" xmlns:jee="http://www.springframework.org/schema/jee"
    xmlns:tx="http://www.springframework.org/schema/tx" xmlns:aop="http://www.springframework.org/schema/aop"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.2.xsd
        http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc-3.2.xsd
        http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-3.2.xsd
        http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.2.xsd
        http://code.alibabatech.com/schema/dubbo 
        http://code.alibabatech.com/schema/dubbo/dubbo.xsd
        http://www.springframework.org/schema/aop 
        http://www.springframework.org/schema/aop/spring-aop-3.2.xsd ">

    <!-- 使用Annotation自动注册Bean,解决事物失效问题:在主容器中不扫描@Controller注解,在SpringMvc中只扫描@Controller注解。 -->
    <context:component-scan base-package="com"/>
    <dubbo:application name="demo-consumer" />
    <dubbo:registry address="multicast://224.5.6.7:1234" />
    <dubbo:protocol name="dubbo" host="127.0.0.1" port="20802"
        serialization="hessian2" threadpool="cached" threads="1000" />
    <!-- dubbo引用的服务 -->
    <dubbo:reference id="dubboService"
        interface="com.heartlifes.dubbo.DubboService" />
</beans> 

创建SpringVerticle:

package com.heartlifes.vertx.demo.dubbo;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.Handler;
import io.vertx.core.eventbus.Message;

import org.springframework.context.ApplicationContext;

public class SpringVerticle extends AbstractVerticle {

    private DubboService service;

    public static final String PRINT_MSG_SERVICE_ADDRESS = "print_msg_service_address";

    public static final String GET_MSG_SERVICE_ADDRESS = "get_msg_service_address";

    public SpringVerticle(ApplicationContext ctx) {
        // 从spring上下文获取service
        this.service = (DubboService) ctx.getBean("dubboService");
    }

    @Override
    public void start() throws Exception {
        // 唤起事件总线,注册一个事件处理者,或者直译叫事件消费者
        vertx.eventBus().<String> consumer(PRINT_MSG_SERVICE_ADDRESS)
                .handler(msg -> {
                    // 获取事件内容后,调用service服务
                    // 这里是非阻塞式调用
                        service.printMsg("Asynchronous call dubbo service!!!");
                        msg.reply("success");
                    });

        vertx.eventBus().<String> consumer(GET_MSG_SERVICE_ADDRESS, printMsg());
    }

    // 模拟dubbo服务要从后台数据库获取数据,所以这里就是vert.x中的阻塞式调用
    // vert.x中规定,所有调用不可以阻塞其eventloop,所以当有数据库调用、thread.sleep等可能会阻塞线程的服务调动时
    // 需要使用vertx接口中的阻塞式调用接口
    private Handler<Message<String>> printMsg() {
        return msg -> {
            System.out.println("bus msg body is:" + msg.body());
            // 阻塞式接口调用
            vertx.<String> executeBlocking(future -> {
                // 通过future等待调用返回结果
                    String dubboMsg = "";
                    try {
                        dubboMsg = this.service.getMsg();
                    } catch (Exception e) {
                        e.printStackTrace();
                        future.fail(e);
                    }
                    // 把结果放到result中
                    future.complete(dubboMsg);
                }, result -> {
                    // 判断接口调用结果,成功的话讲结果放到事件总线的msg中传递给server端展示
                    if (result.succeeded()) {
                        System.out.println("msg from dubbo service is: "
                                + result.result());
                        msg.reply(result.result());
                    }
                    if (result.failed()) {
                        msg.fail(400, result.cause().getMessage());
                    }
                });
        };
    }

}

创建SpringVerticle:

package com.heartlifes.vertx.demo.dubbo;

import io.vertx.core.AbstractVerticle;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.handler.BodyHandler;

/**
 * 基本代码注释,请参见vert.x笔记:3.使用vert.x发布restful接口
 * 
 * @author john
 *
 */
public class ServerVerticle extends AbstractVerticle {

    @Override
    public void start() throws Exception {
        Router router = Router.router(vertx);
        router.route().handler(BodyHandler.create());
        router.route("/dubbo/get").handler(
        // 唤起vert.x的事件总线,并发送一个简单消息
                ctx -> vertx.eventBus().<String> send(
                        SpringVerticle.GET_MSG_SERVICE_ADDRESS,// 消息地址
                        "event bus calls dubbo service",// 消息内容
                        result -> {// 异步结果处理
                            if (result.succeeded()) {
                                // 成功的话,返回处理结果给前台,这里的处理结果就是service返回的一段字符串
                                ctx.response()
                                        .putHeader("content-type",
                                                "application/json")
                                        .end(result.result().body());
                            } else {
                                ctx.response().setStatusCode(400)
                                        .end(result.cause().toString());
                            }
                        }));
        router.route("/dubbo/print").handler(
        // 唤起vert.x的事件总线,并发送一个简单消息
                ctx -> vertx.eventBus().<String> send(
                        SpringVerticle.PRINT_MSG_SERVICE_ADDRESS,// 消息地址
                        "event bus calls dubbo service",// 消息内容
                        result -> {// 异步结果处理
                            if (result.succeeded()) {
                                // 成功的话,返回处理结果给前台
                                ctx.response()
                                        .putHeader("content-type",
                                                "application/json")
                                        .end("success");
                            } else {
                                ctx.response().setStatusCode(400)
                                        .end(result.cause().toString());
                            }
                        }));
        vertx.createHttpServer().requestHandler(router::accept).listen(8080);
    }
}

创建启动器:

package com.heartlifes.vertx.demo.dubbo;

import io.vertx.core.Vertx;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class SpringMain {

    public static void main(String[] args) {
        // 配置文件方式
        ApplicationContext ctx = new ClassPathXmlApplicationContext(
                "dubbo-consumer.xml");
        Vertx vertx = Vertx.vertx();
        // 部署spring模块
        vertx.deployVerticle(new SpringVerticle(ctx));
        // 部署服务器模块
        vertx.deployVerticle(new ServerVerticle());
    }

}

vert.x笔记:4.vert.x中调用spring服务

evenbus事件总线介绍:

在介绍怎么在vert.x中集成spring服务前,我们要先简单介绍一下什么是vert.x的事件总线。
eventbus是vert.x的神经总线,每个vert.x实例维护了一个事件总线。简单来说,vert.x有以下几个概念

寻址:

vert.x将事件消息,通过地址发送到后端的处理程序上。一个地址就是一个全局唯一的字符串。

处理程序:

后端的处理程序,通过地址,将自己注册到事件总线上,并告诉事件总线,我是这个地址的处理程序。

发布/订阅模式:

消息被发布到一个地址,后台所有注册过这个地址的处理程序接收消息并进行处理。

修改pom,加入依赖

在pom.xml中加入以下配置和依赖包:

<properties>
    <spring.version>4.1.7.RELEASE</spring.version>
</properties>
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-tx</artifactId>
    <version>${spring.version}</version>
</dependency>

<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-context</artifactId>
    <version>${spring.version}</version>
</dependency>

<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-context-support</artifactId>
    <version>${spring.version}</version>
</dependency>

<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-jdbc</artifactId>
    <version>${spring.version}</version>
</dependency>

<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-orm</artifactId>
    <version>${spring.version}</version>
</dependency>

vert.x集成spring:

创建一个spring service

很简单的服务,输出一个hello spring字符串。

package com.heartlifes.vertx.demo.hello;

import org.springframework.stereotype.Component;

@Component(value = "springService")
public class SpringService {

    public String getHello() {
        return "hello spring";
    }
}

创建SpringVerticle

springVerticle作为事件总线中的后台处理程序,接收事件总线消息,并调用springService完成服务处理。

package com.heartlifes.vertx.demo.hello;

import io.vertx.core.AbstractVerticle;

import org.springframework.context.ApplicationContext;

public class SpringVerticle extends AbstractVerticle {

    private SpringService service;

    public static final String GET_HELLO_MSG_SERVICE_ADDRESS = "get_hello_msg_service";

    public SpringVerticle(ApplicationContext ctx) {
        // 从spring上下文获取service
        this.service = (SpringService) ctx.getBean("springService");
    }

    @Override
    public void start() throws Exception {
        // 唤起事件总线,注册一个事件处理者,或者直译叫事件消费者
        vertx.eventBus()
                .<String> consumer(GET_HELLO_MSG_SERVICE_ADDRESS)
                .handler(msg -> {
                    // 获取事件内容后,调用service服务
                        System.out.println("bus msg body is:" + msg.body());
                        String helloMsg = service.getHello();
                        System.out.println("msg from hello service is: "
                                + helloMsg);
                        // 将service返回的字符串,回应给消息返回体
                        msg.reply(helloMsg);
                    });
    }

}

创建ServerVerticle

serverVerticle负责接收前端http请求,并将消息发布到事件总线上,等待后台处理程序处理完该事件后,返回事件处理结果。

package com.heartlifes.vertx.demo.hello;

import io.vertx.core.AbstractVerticle;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.handler.BodyHandler;

/**
 * 基本代码注释,请参见vert.x笔记:3.使用vert.x发布restful接口
 * 
 * @author john
 *
 */
public class ServerVerticle extends AbstractVerticle {

    @Override
    public void start() throws Exception {
        Router router = Router.router(vertx);
        router.route().handler(BodyHandler.create());
        router.route("/spring/hello").handler(
        // 唤起vert.x的事件总线,并发送一个简单消息
                ctx -> vertx.eventBus().<String> send(
                        SpringVerticle.GET_HELLO_MSG_SERVICE_ADDRESS,// 消息地址
                        "event bus calls spring service",// 消息内容
                        result -> {// 异步结果处理
                            if (result.succeeded()) {
                                // 成功的话,返回处理结果给前台,这里的处理结果就是service返回的一段字符串
                                ctx.response()
                                        .putHeader("content-type",
                                                "application/json")
                                        .end(result.result().body());
                            } else {
                                ctx.response().setStatusCode(400)
                                        .end(result.cause().toString());
                            }
                        }));
        vertx.createHttpServer().requestHandler(router::accept).listen(8080);
    }
}

模块部署

整个demo的启动类,负责启动spring容器,部署上面的两个模块,分别是spring模块和服务模块。

package com.heartlifes.vertx.demo.hello;

import io.vertx.core.Vertx;

import org.springframework.context.annotation.AnnotationConfigApplicationContext;

public class SpringMain {

    public static void main(String[] args) {
        // 注解方式配置,不需要配置文件
        AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext();
        // 扫描哪些包内的注解
        ctx.scan("com.heartlifes.vertx.demo.hello");
        ctx.refresh();
        Vertx vertx = Vertx.vertx();
        // 部署spring模块
        vertx.deployVerticle(new SpringVerticle(ctx));
        // 部署服务器模块
        vertx.deployVerticle(new ServerVerticle());
    }

}

http://localhost:8080/spring/hello,界面输出hello spring。
可以看到,使用事件总线后,可以将模块间的耦合度降到最低,仅仅通过事件的发布和订阅,就可以将原来揉成一块的显示服务调用,变成y