< dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>5.0.0.Alpha2</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
@SpringBootApplication
public class CloudApplication implements CommandLineRunner {
public static void main(String[] args) {
SpringApplication.run(CloudApplication.class, args);
}
@Override
public void run(String... strings) {
}
// 读取yml的一个配置类
import com.edu.hart.modules.constant.NettyConfig;
// Netty连接信息配置类
import com.edu.hart.modules.constant.NettyConstant;
//
import com.edu.hart.rpc.util.ObjectCodec;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
/**
* 服务启动监听器
*
* @author 叶云轩
*/
@Component
public class NettyServerListener {
/**
* NettyServerListener 日志输出器
*
* @author 叶云轩 create by 2017/10/31 18:05
*/
private static final Logger LOGGER = LoggerFactory.getLogger(NettyServerListener.class);
/**
* 创建bootstrap
*/
ServerBootstrap serverBootstrap = new ServerBootstrap();
/**
* BOSS
*/
EventLoopGroup boss = new NioEventLoopGroup();
/**
* Worker
*/
EventLoopGroup work = new NioEventLoopGroup();
/**
* 通道适配器
*/
@Resource
private ServerChannelHandlerAdapter channelHandlerAdapter;
/**
* NETT服务器配置类
*/
@Resource
private NettyConfig nettyConfig;
/**
* 关闭服务器方法
*/
@PreDestroy
public void close() {
LOGGER.info("关闭服务器....");
//优雅退出
boss.shutdownGracefully();
work.shutdownGracefully();
}
/**
* 开启及服务线程
*/
public void start() {
// 从配置文件中(application.yml)获取服务端监听端口号
int port = nettyConfig.getPort();
serverBootstrap.group(boss, work)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO));
try {
//设置事件处理
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new LengthFieldBasedFrameDecoder(nettyConfig.getMaxFrameLength()
, 0, 2, 0, 2));
pipeline.addLast(new LengthFieldPrepender(2));
pipeline.addLast(new ObjectCodec());
pipeline.addLast(channelHandlerAdapter);
}
});
LOGGER.info("netty服务器在[{}]端口启动监听", port);
ChannelFuture f = serverBootstrap.bind(port).sync();
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
LOGGER.info("[出现异常] 释放资源");
boss.shutdownGracefully();
work.shutdownGracefully();
}
}
}
// 记录调用方法的元信息的类
import com.edu.hart.rpc.entity.MethodInvokeMeta;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* 多线程共享
*/
@Component
@Sharable
public class ServerChannelHandlerAdapter extends ChannelHandlerAdapter {
/**
* 日志处理
*/
private Logger logger = LoggerFactory.getLogger(ServerChannelHandlerAdapter.class);
/**
* 注入请求分排器
*/
@Resource
private RequestDispatcher dispatcher;
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
MethodInvokeMeta invokeMeta = (MethodInvokeMeta) msg;
// 屏蔽toString()方法
if (invokeMeta.getMethodName().endsWith("toString()")
&& !"class java.lang.String".equals(invokeMeta.getReturnType().toString()))
logger.info("客户端传入参数 :{},返回值:{}",
invokeMeta.getArgs(), invokeMeta.getReturnType());
dispatcher.dispatcher(ctx, invokeMeta);
}
}
RequestDispatcher.java
// 封装的返回信息枚举类
import com.edu.hart.modules.communicate.ResponseCodeEnum;
// 封装的返回信息实体类
import com.edu.hart.modules.communicate.ResponseResult;
// 封装的连接常量类
import com.edu.hart.modules.constant.NettyConstant;
// 记录元方法信息的实体类
import com.edu.hart.rpc.entity.MethodInvokeMeta;
// 对于返回值为空的一个处理
import com.edu.hart.rpc.entity.NullWritable;
// 封装的返回信息实体工具类
import com.edu.hart.rpc.util.ResponseResultUtil;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
import java.lang.reflect.Method;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 请求分排器
*/
@Component
public class RequestDispatcher implements ApplicationContextAware {
private ExecutorService executorService = Executors.newFixedThreadPool(NettyConstant.getMaxThreads());
private ApplicationContext app;
/**
* 发送
*
* @param ctx
* @param invokeMeta
*/
public void dispatcher(final ChannelHandlerContext ctx, final MethodInvokeMeta invokeMeta) {
executorService.submit(() -> {
ChannelFuture f = null;
try {
Class<?> interfaceClass = invokeMeta.getInterfaceClass();
String name = invokeMeta.getMethodName();
Object[] args = invokeMeta.getArgs();
Class<?>[] parameterTypes = invokeMeta.getParameterTypes();
Object targetObject = app.getBean(interfaceClass);
Method method = targetObject.getClass().getMethod(name, parameterTypes);
Object obj = method.invoke(targetObject, args);
if (obj == null) {
f = ctx.writeAndFlush(NullWritable.nullWritable());
} else {
f = ctx.writeAndFlush(obj);
}
f.addListener(ChannelFutureListener.CLOSE);
} catch (Exception e) {
ResponseResult error = ResponseResultUtil.error(ResponseCodeEnum.SERVER_ERROR);
f = ctx.writeAndFlush(error);
} finally {
f.addListener(ChannelFutureListener.CLOSE);
}
});
}
/**
* 加载当前application.xml
*
* @param ctx
* @throws BeansException
*/
public void setApplicationContext(ApplicationContext ctx) throws BeansException {
this.app = ctx;
}
}
yml 配置文件
netty:
port: 11111
NettyConfig.java
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
/**
* 读取yml配置文件中的信息
* Created by 叶云轩 on 2017/10/31 - 18:38
* Concat [email protected]
*/
@Component
@ConfigurationProperties(prefix = "netty")
public class NettyConfig {
private int port;
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
}
NettyConstanct.java
import org.springframework.stereotype.Component;
/**
* Netty服务器常量
* Created by 叶云轩 on 2017/10/31 - 17:47
* Concat [email protected]
*/
@Component
public class NettyConstant {
/**
* 最大线程量
*/
private static final int MAX_THREADS = 1024;
/**
* 数据包最大长度
*/
private static final int MAX_FRAME_LENGTH = 65535;
public static int getMaxFrameLength() {
return MAX_FRAME_LENGTH;
}
public static int getMaxThreads() {
return MAX_THREADS;
}
}