Netty series (1): Springboot integrates Netty, custom protocol implementation

crocodile 2022-11-24 20:58:02 阅读数:699

nettyseriesspringbootintegratesnetty

Netty是由JBOSS提供的一个java开源框架,现为 Github上的独立项目.Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序.

也就是说,Netty 是一个基于NIO的客户、服务器端的编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户、服务端应用.Netty相当于简化和流线化了网络应用的编程开发过程.

Springboot整合Netty

新建springboot项目,And import it in the projectnetty包,用fastjson包处理jsonStr.

 <!-- netty -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.42.Final</version>
</dependency>
<!-- Json处理 -->
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>2.0.16</version>
</dependency>

创建netty相关配置信息文件

  1. yml配置文件——application.yml
# netty 配置
netty:
# boss线程数量
boss: 4
# worker线程数量
worker: 2
# 连接超时时间
timeout: 6000
# 服务器主端口
port: 18023
# 服务器备用端口
portSalve: 18026
# 服务器地址
host: 127.0.0.1
  1. netty配置实体类——NettyProperties与yml配置文件绑定
    通过@ConfigurationProperties(prefix = "netty")Annotations read from configuration filesnetty配置,通过反射注入值,The corresponding one needs to be provided in the entity classsetter和getter方法.

@ConfigurationProperties(prefix = "netty")The names of corresponding entity class attributes are not required to be the same,只需保证“set”字符串拼接配置文件的属性和setter方法名相同即可.

@Configuration
@ConfigurationProperties(prefix = "netty")
public class NettyProperties {

/** * boss线程数量 */
private Integer boss;
/** * worker线程数量 */
private Integer worker;
/** * 连接超时时间 */
private Integer timeout = 30000;
/** * 服务器主端口 */
private Integer port = 18023;
/** * 服务器备用端口 */
private Integer portSalve = 18026;
/** * 服务器地址 默认为本地 */
private String host = "127.0.0.1";
// setter、getter ....
}
  1. 对netty进行配置,绑定netty相关配置设置
    Netty通常由一个Bootstrap开始,主要作用是配置整个Netty程序,串联各个组件,Netty中Bootstrap类是客户端程序的启动引导类,ServerBootstrap是服务端启动引导类.
@Configuration
@EnableConfigurationProperties
public class NettyConfig {

final NettyProperties nettyProperties;
public NettyConfig(NettyProperties nettyProperties) {

this.nettyProperties = nettyProperties;
}
/** * boss线程池-进行客户端连接 * * @return */
@Bean
public NioEventLoopGroup boosGroup() {

return new NioEventLoopGroup(nettyProperties.getBoss());
}
/** * worker线程池-进行业务处理 * * @return */
@Bean
public NioEventLoopGroup workerGroup() {

return new NioEventLoopGroup(nettyProperties.getWorker());
}
/** * 服务端启动器,监听客户端连接 * * @return */
@Bean
public ServerBootstrap serverBootstrap() {

ServerBootstrap serverBootstrap = new ServerBootstrap()
// 指定使用的线程组
.group(boosGroup(), workerGroup())
// 指定使用的通道
.channel(NioServerSocketChannel.class)
// 指定连接超时时间
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyProperties.getTimeout())
// 指定worker处理器
.childHandler(new NettyServerHandler());
return serverBootstrap;
}
}
  1. worker处理器,初始化通道以及配置对应管道的处理器
    自定义了##@##分割符,通过DelimiterBasedFrameDecoderTo deal with the problem of unpacking and sticking;
    通过MessageDecodeHandlerDecode the received message into an object instance;
    通过MessageEncodeHandlerAdd the delimiter to the sent message and encode it;
    最后通过ServerListenerHandlerDifferent messages are processed correspondingly according to the message type.
public class NettyServerHandler extends ChannelInitializer<SocketChannel> {

@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {

// 数据分割符
String delimiterStr = "##@##";
ByteBuf delimiter = Unpooled.copiedBuffer(delimiterStr.getBytes());
ChannelPipeline pipeline = socketChannel.pipeline();
// Use custom to handle unpacking/沾包,And the maximum length of each search is 1024字节
pipeline.addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
// Convert the decoded data in the previous step to Message实例
pipeline.addLast(new MessageDecodeHandler());
// Encodes the data sent to the client,and add data delimiters
pipeline.addLast(new MessageEncodeHandler(delimiterStr));
// Perform final processing on the data
pipeline.addLast(new ServerListenerHandler());
}
}
  1. 数据解码
    Both data decoding and encoding are employedUTF8格式
public class MessageDecodeHandler extends ByteToMessageDecoder {

@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf in, List<Object> list) throws Exception {

ByteBuf frame = in.retainedDuplicate();
final String content = frame.toString(CharsetUtil.UTF_8);
Message message = new Message(content);
list.add(message);
in.skipBytes(in.readableBytes());
}
}
  1. An instance of a data decoding transformation
    MessageClasses are used to carry messages、转JsonString

public class Message {

/** * 数据长度 */
private Integer len;
/** * Received communication databody */
private String content;
/** * 消息类型 */
private Integer msgType;
public Message(Object object) {

String str = object.toString();
JSONObject jsonObject = JSONObject.parseObject(str);
msgType = Integer.valueOf(jsonObject.getString("msg_type"));
content = jsonObject.getString("body");
len = str.length();
}
public String toJsonString() {

return "{" +
"\"msg_type\": " + msgType + ",\n" +
"\"body\": " + content +
"}";
}
// setter、getter ....
}
  1. 数据编码
    nettyWhen the server replies to the message,forward to the messageJsonString增加分割符,并进行编码.
public class MessageEncodeHandler extends MessageToByteEncoder<Message> {

// 数据分割符
String delimiter;
public MessageEncodeHandler(String delimiter) {

this.delimiter = delimiter;
}
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, Message message, ByteBuf out) throws Exception {

out.writeBytes((message.toJsonString() + delimiter).getBytes(CharsetUtil.UTF_8));
}
}
  1. 数据处理器,Classify and process different types of data
    Enumerated types are used when handling different received data,在使用switchYou can do the next processing,具体参考代码,Here is just how to do it,The data processing business class is not implemented.

public class ServerListenerHandler extends SimpleChannelInboundHandler<Message> {

private static final Logger log = LoggerFactory.getLogger(ServerListenerHandler.class);
/** * Handled when a device joins a connection * * @param ctx */
@Override
public void handlerAdded(ChannelHandlerContext ctx) {

log.info("有新的连接:[{}]", ctx.channel().id().asLongText());
}
/** * 数据处理 * * @param ctx * @param msg */
@Override
protected void channelRead0(ChannelHandlerContext ctx, Message msg) {

// Get the message body in the message instance
String content = msg.getContent();
// Handle different message types
MessageEnum type = MessageEnum.getStructureEnum(msg);
switch (type) {

case CONNECT:
// TODO 心跳消息处理
case STATE:
// TODO 设备状态
default:
System.out.println(type.content + "消息内容" + content);
}
}
/** * The device goes offline for processing * * @param ctx */
@Override
public void handlerRemoved(ChannelHandlerContext ctx) {

log.info("The device is offline:{}", ctx.channel().id().asLongText());
}
/** * Device connection exception handling * * @param ctx * @param cause */
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {

// 打印异常
log.info("异常:{}", cause.getMessage());
// 关闭连接
ctx.close();
}
}
  1. Data type enumeration class

public enum MessageEnum {

CONNECT(1, "心跳消息"),
STATE(2, "设备状态");
public final Integer type;
public final String content;
MessageEnum(Integer type, String content) {

this.type = type;
this.content = content;
}
// case中判断使用
public static MessageEnum getStructureEnum(Message msg) {

Integer type = Optional.ofNullable(msg)
.map(Message::getMsgType)
.orElse(0);
if (type == 0) {

return null;
} else {

List<MessageEnum> objectEnums = Arrays.stream(MessageEnum.values())
.filter((item) -> item.getType() == type)
.distinct()
.collect(Collectors.toList());
if (objectEnums.size() > 0) {

return objectEnums.get(0);
}
return null;
}
}
// setter、getter....
}

到此NettyThe entire configuration has been completed,But if you want to followspringboot一起启动,Still need to do some configuration.

  1. netty启动类配置
@Component
public class NettyServerBoot {

private static final Logger log = LoggerFactory.getLogger(NettyServerBoot.class);
@Resource
NioEventLoopGroup boosGroup;
@Resource
NioEventLoopGroup workerGroup;
final ServerBootstrap serverBootstrap;
final NettyProperties nettyProperties;
public NettyServerBoot(ServerBootstrap serverBootstrap, NettyProperties nettyProperties) {

this.serverBootstrap = serverBootstrap;
this.nettyProperties = nettyProperties;
}
/** * 启动netty * * @throws InterruptedException */
@PostConstruct
public void start() throws InterruptedException {

// 绑定端口启动
serverBootstrap.bind(nettyProperties.getPort()).sync();
// 备用端口
serverBootstrap.bind(nettyProperties.getPortSalve()).sync();
log.info("启动Netty: {},{}", nettyProperties.getPort(), nettyProperties.getPortSalve());
}
/** * 关闭netty */
@PreDestroy
public void close() {

log.info("关闭Netty");
boosGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}

增加NettyServerBoot配置后,启动application时,nettyThe server will start along with it.
在这里插入图片描述
同时,在springboot关闭前,会先销毁netty服务.
在这里插入图片描述

完整源码

https://github.com/BerBai/JavaExample/tree/master/netty

copyright:author[crocodile],Please bring the original link to reprint, thank you. https://en.javamana.com/2022/328/202211242053414270.html