在事件总线上交换生成的 Protobuf 类

本文档将向您展示如何在事件总线上交换由 Protocol Buffers 生成的消息类型。

您将构建什么

您将构建一个定期生成问候消息的应用程序。该应用程序包含:

  • 一个发送 GreetingRequest 的发送器 verticle

  • 一个使用 GreetingResponse 回复请求的接收器 verticle

您需要什么

  • 文本编辑器或 IDE,

  • Java 11 或更高版本

创建项目

浏览到 https://start.vertx.io。点击“高级选项”展开隐藏面板,然后更改以下字段的值:

  • Group Id: 设置为 io.vertx.howtos

  • Artifact Id: 设置为 protobuf-eventbus-howto

  • Dependencies: 添加 Hazelcast Cluster Manager

  • Package: 设置为 io.vertx.howtos.protobuf.eventbus

完成后,点击 Generate Project(生成项目),并将生成的归档文件内容解压到您文件系统的某个位置。

提示

您也可以通过命令行执行此操作

curl -G https://start.vertx.io/starter.zip -d "groupId=io.vertx.howtos" -d "artifactId=protobuf-eventbus-howto" -d "packageName=io.vertx.howtos.protobuf.eventbus" -d "vertxDependencies=vertx-hazelcast" -d "jdkVersion=11" -d "buildTool=maven" --output protobuf-eventbus-howto.zip
unzip -d protobuf-eventbus-howto protobuf-eventbus-howto.zip

在编码之前,我们需要对构建文件进行一些调整

  • 配置一个自定义的 Vert.x Launcher 类(在运行可执行 JAR 时将用作入口点)

  • 添加 Protocol Buffers 的依赖

  • 配置 Maven 插件以从 .proto 文件生成消息类。

以下是您应该使用的 pom.xml 文件的内容

应用程序实现

消息定义

src/main/proto/greetings.proto 中,我们定义了

  • 一个包含 nameGreetingRequest,以及

  • 一个包含 messageGreetingReply

greetings.proto
syntax = "proto3";

package greeting;

option java_multiple_files = true;
option java_package = "io.vertx.howtos.protobuf.eventbus";
option java_outer_classname = "GreetingProtos";

message GreetingRequest {
  string name = 1;
}

message GreetingReply {
  string message = 1;
}

接收器 verticle

接收器 verticle 在事件总线上注册一个消费者。当接收到请求时,

  1. 请求会连同其系统 hash code 一起打印到控制台

  2. 生成回复

  3. 回复会连同其系统 hash code 一起打印到控制台

  4. 发送回复

ReceiverVerticle.java
package io.vertx.howtos.protobuf.eventbus;

import io.vertx.core.Future;
import io.vertx.core.VerticleBase;

public class ReceiverVerticle extends VerticleBase {

  @Override
  public Future<?> start() {
    return vertx.eventBus().<GreetingRequest>consumer("greetings", msg -> {
      var request = msg.body();
      System.out.printf("Received request = %s (%d)%n", request.getName(), System.identityHashCode(request));
      var greeting = String.format("Hello %s", request.getName());
      var reply = GreetingReply.newBuilder().setMessage(greeting).build();
      System.out.printf("Sending reply = %s (%d)%n", reply.getMessage(), System.identityHashCode(reply));
      msg.reply(reply);
    }).completion();
  }
}
注意
打印系统 hash code 的目的是,当我们在单个虚拟机中运行应用程序时,可以查看事件总线是否复制了对象。

发送器 verticle

发送器 verticle 安排一个周期性任务。每隔五秒:

  1. 生成一个请求

  2. 请求会连同其系统 hash code 一起打印到控制台

  3. 发送请求

  4. 回复会连同其系统 hash code 一起打印到控制台

SenderVerticle.java
package io.vertx.howtos.protobuf.eventbus;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.eventbus.Message;

public class SenderVerticle extends AbstractVerticle {

  @Override
  public void start() throws Exception {
    vertx.setPeriodic(5000, l -> {
      var request = GreetingRequest.newBuilder().setName("Jane Doe").build();
      System.out.printf("Sending request = %s (%d)%n", request.getName(), System.identityHashCode(request));
      vertx.eventBus().<GreetingReply>request("greetings", request)
        .map(Message::body)
        .onFailure(Throwable::printStackTrace)
        .onSuccess(reply -> System.out.printf("Received reply = %s (%d)%n", reply.getMessage(), System.identityHashCode(reply)));
    });
  }
}

EventBus 编解码器

在为 Protocol Buffer 消息类设计编解码器时,我们可以利用它们的特性:

  • 所有消息在 Java 平台的意义上都是 Serializable(可序列化的)

  • 消息对象是不可变的

因此,消息类在网络传输时可以透明地进行序列化/反序列化。此外,当消息对象在本地交换时,我们不需要复制它们。

ProtobufCodec.java
package io.vertx.howtos.protobuf.eventbus;

import com.google.protobuf.GeneratedMessageV3;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.MessageCodec;
import io.vertx.core.impl.SerializableUtils;

public class ProtobufCodec implements MessageCodec<GeneratedMessageV3, GeneratedMessageV3> {

  static final String PROTOS_PACKAGE_NAME = "io.vertx.howtos.protobuf.eventbus.";

  @Override
  public void encodeToWire(Buffer buffer, GeneratedMessageV3 o) {
    var bytes = SerializableUtils.toBytes(o);
    buffer.appendInt(bytes.length);
    buffer.appendBytes(bytes);
  }

  @Override
  public GeneratedMessageV3 decodeFromWire(int pos, Buffer buffer) {
    var length = buffer.getInt(pos);
    pos += 4;
    var bytes = buffer.getBytes(pos, pos + length);
    return (GeneratedMessageV3) SerializableUtils.fromBytes(bytes, CheckedClassNameObjectInputStream::new);
  }

  @Override
  public GeneratedMessageV3 transform(GeneratedMessageV3 o) {
    return o;
  }

  @Override
  public String name() {
    return "ProtobufCodec";
  }

  @Override
  public byte systemCodecID() {
    return -1; // -1 for a user codec
  }

  public boolean appliesTo(String className) {
    return className.startsWith(PROTOS_PACKAGE_NAME);
  }
}

出于安全原因,我们不希望在接收端反序列化任何对象。这就是为什么我们使用 CheckedClassNameObjectInputStream 而不是普通的 ObjectInputStream

该实现保证只允许某些类:

  • 当然是我们的消息类

  • Protocol Buffer 的 Java 实现类

  • Vert.x 事件总线默认允许的类(例如字节数组)

CheckedClassNameObjectInputStream.java
package io.vertx.howtos.protobuf.eventbus;

import io.vertx.core.eventbus.EventBus;

import java.io.*;

class CheckedClassNameObjectInputStream extends ObjectInputStream {

  CheckedClassNameObjectInputStream(InputStream in) throws IOException {
    super(in);
  }

  @Override
  protected Class<?> resolveClass(ObjectStreamClass desc) throws IOException, ClassNotFoundException {
    var name = desc.getName();
    if (name.startsWith("com.google.protobuf.")
      || name.startsWith(ProtobufCodec.PROTOS_PACKAGE_NAME)
      || EventBus.DEFAULT_SERIALIZABLE_CHECKER.apply(name)) {
      return super.resolveClass(desc);
    }
    throw new InvalidClassException("Class not allowed: " + name);
  }
}

最后,在一个自定义的 Launcher 类中,我们必须:

  • 注册这个编解码器

  • 配置事件总线,使其在消息体类型属于我们的包时使用此编解码器

CustomLauncher.java
package io.vertx.howtos.protobuf.eventbus;

import io.vertx.launcher.application.HookContext;
import io.vertx.launcher.application.VertxApplication;
import io.vertx.launcher.application.VertxApplicationHooks;

public class CustomLauncher extends VertxApplication implements VertxApplicationHooks {

  public CustomLauncher(String[] args) {
    super(args);
  }

  public static void main(String[] args) {
    new CustomLauncher(args).launch();
  }

  @Override
  public void afterVertxStarted(HookContext context) {
    var vertx = context.vertx();
    var protobufCodec = new ProtobufCodec();
    vertx.eventBus().registerCodec(protobufCodec);
    vertx.eventBus().codecSelector(body -> {
      return protobufCodec.appliesTo(body.getClass().getName()) ? protobufCodec.name() : null;
    });
  }
}

运行应用程序

首先您必须构建应用程序

./mvnw clean package

然后启动接收器

java -Djava.net.preferIPv4Stack=true -jar target/protobuf-eventbus-howto-1.0.0-SNAPSHOT-fat.jar io.vertx.howtos.protobuf.eventbus.ReceiverVerticle -cluster

当它准备就绪时,您将看到:INFO: Succeeded in deploying verticle

现在在另一个终端中启动发送器

java -Djava.net.preferIPv4Stack=true -jar target/protobuf-eventbus-howto-1.0.0-SNAPSHOT-fat.jar io.vertx.howtos.protobuf.eventbus.SenderVerticle -cluster

当它准备就绪时,您将看到:INFO: Succeeded in deploying verticle

一段时间后,您将在发送器控制台中看到

Sending request = Jane Doe (1445840961)
Received reply = Hello Jane Doe (654163465)

并在接收器控制台中看到

Received request = Jane Doe (449456520)
Sending reply = Hello Jane Doe (522259462)

在集群模式下,打印的系统 hash code 不重要:显然,存在于不同虚拟机中的对象是不同的。

那么本地模式呢?要在同一个虚拟机中运行发送器和接收器,我们可以使用第三个 verticle,其唯一目的是部署它们。

MainVerticle.java
package io.vertx.howtos.protobuf.eventbus;

import io.vertx.core.Future;
import io.vertx.core.VerticleBase;

public class MainVerticle extends VerticleBase {

  @Override
  public Future<?> start() {
    return Future.join(
      vertx.deployVerticle(new ReceiverVerticle()),
      vertx.deployVerticle(new SenderVerticle())
    );
  }
}

打开一个终端,再次构建项目并运行可执行 JAR。

./mvnw clean package
java -jar target/protobuf-eventbus-howto-1.0.0-SNAPSHOT-fat.jar

当它准备就绪时,您将看到:INFO: Succeeded in deploying verticle

一段时间后,您将在控制台中看到

Sending request = Jane Doe (346056258)
Received request = Jane Doe (346056258)
Sending reply = Hello Jane Doe (1483137857)
Received reply = Hello Jane Doe (1483137857)

请注意系统 hash code。您会注意到请求对象在发送器和接收器中是相同的。回复对象也是如此。

总结

本文档涵盖了

  1. Protocol Buffers 生成的消息类型创建编解码器

  2. 注册此编解码器并配置事件总线默认使用它

  3. 在本地和跨网络发送和接收消息对象


最新发布时间:2025-02-01 01:20:54 +0000。