|
15 | 15 | */ |
16 | 16 | package com.alibaba.csp.sentinel.cluster.server; |
17 | 17 |
|
18 | | -import java.util.ArrayList; |
19 | | -import java.util.List; |
20 | | -import java.util.concurrent.atomic.AtomicInteger; |
21 | | - |
22 | 18 | import com.alibaba.csp.sentinel.cluster.server.codec.netty.NettyRequestDecoder; |
23 | 19 | import com.alibaba.csp.sentinel.cluster.server.codec.netty.NettyResponseEncoder; |
24 | 20 | import com.alibaba.csp.sentinel.cluster.server.connection.Connection; |
25 | 21 | import com.alibaba.csp.sentinel.cluster.server.connection.ConnectionPool; |
26 | 22 | import com.alibaba.csp.sentinel.cluster.server.handler.TokenServerHandler; |
27 | 23 | import com.alibaba.csp.sentinel.log.RecordLog; |
28 | | - |
29 | 24 | import io.netty.bootstrap.ServerBootstrap; |
30 | 25 | import io.netty.buffer.PooledByteBufAllocator; |
31 | 26 | import io.netty.channel.ChannelFuture; |
|
42 | 37 | import io.netty.util.concurrent.GenericFutureListener; |
43 | 38 | import io.netty.util.internal.SystemPropertyUtil; |
44 | 39 |
|
45 | | -import static com.alibaba.csp.sentinel.cluster.server.ServerConstants.*; |
| 40 | +import java.util.ArrayList; |
| 41 | +import java.util.List; |
| 42 | +import java.util.concurrent.atomic.AtomicInteger; |
| 43 | + |
| 44 | +import static com.alibaba.csp.sentinel.cluster.server.ServerConstants.NETTY_MAX_FRAME_LENGTH; |
| 45 | +import static com.alibaba.csp.sentinel.cluster.server.ServerConstants.SERVER_STATUS_OFF; |
| 46 | +import static com.alibaba.csp.sentinel.cluster.server.ServerConstants.SERVER_STATUS_STARTED; |
| 47 | +import static com.alibaba.csp.sentinel.cluster.server.ServerConstants.SERVER_STATUS_STARTING; |
46 | 48 |
|
47 | 49 | /** |
48 | 50 | * @author Eric Zhao |
|
51 | 53 | public class NettyTransportServer implements ClusterTokenServer { |
52 | 54 |
|
53 | 55 | private static final int DEFAULT_EVENT_LOOP_THREADS = Math.max(1, |
54 | | - SystemPropertyUtil.getInt("io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2)); |
| 56 | + SystemPropertyUtil.getInt("io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2)); |
55 | 57 | private static final int MAX_RETRY_TIMES = 3; |
56 | 58 | private static final int RETRY_SLEEP_MS = 2000; |
57 | 59 |
|
@@ -79,32 +81,32 @@ public void start() { |
79 | 81 | this.bossGroup = new NioEventLoopGroup(1); |
80 | 82 | this.workerGroup = new NioEventLoopGroup(DEFAULT_EVENT_LOOP_THREADS); |
81 | 83 | b.group(bossGroup, workerGroup) |
82 | | - .channel(NioServerSocketChannel.class) |
83 | | - .option(ChannelOption.SO_BACKLOG, 128) |
84 | | - .handler(new LoggingHandler(LogLevel.INFO)) |
85 | | - .childHandler(new ChannelInitializer<SocketChannel>() { |
86 | | - @Override |
87 | | - public void initChannel(SocketChannel ch) throws Exception { |
88 | | - ChannelPipeline p = ch.pipeline(); |
89 | | - p.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 2, 0, 2)); |
90 | | - p.addLast(new NettyRequestDecoder()); |
91 | | - p.addLast(new LengthFieldPrepender(2)); |
92 | | - p.addLast(new NettyResponseEncoder()); |
93 | | - p.addLast(new TokenServerHandler(connectionPool)); |
94 | | - } |
95 | | - }) |
96 | | - .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) |
97 | | - .childOption(ChannelOption.SO_SNDBUF, 32 * 1024) |
98 | | - .childOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000) |
99 | | - .childOption(ChannelOption.SO_TIMEOUT, 10) |
100 | | - .childOption(ChannelOption.TCP_NODELAY, true) |
101 | | - .childOption(ChannelOption.SO_RCVBUF, 32 * 1024); |
| 84 | + .channel(NioServerSocketChannel.class) |
| 85 | + .option(ChannelOption.SO_BACKLOG, 128) |
| 86 | + .handler(new LoggingHandler(LogLevel.INFO)) |
| 87 | + .childHandler(new ChannelInitializer<SocketChannel>() { |
| 88 | + @Override |
| 89 | + public void initChannel(SocketChannel ch) throws Exception { |
| 90 | + ChannelPipeline p = ch.pipeline(); |
| 91 | + p.addLast(new LengthFieldBasedFrameDecoder(NETTY_MAX_FRAME_LENGTH, 0, 2, 0, 2)); |
| 92 | + p.addLast(new NettyRequestDecoder()); |
| 93 | + p.addLast(new LengthFieldPrepender(2)); |
| 94 | + p.addLast(new NettyResponseEncoder()); |
| 95 | + p.addLast(new TokenServerHandler(connectionPool)); |
| 96 | + } |
| 97 | + }) |
| 98 | + .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) |
| 99 | + .childOption(ChannelOption.SO_SNDBUF, 32 * 1024) |
| 100 | + .childOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000) |
| 101 | + .childOption(ChannelOption.SO_TIMEOUT, 10) |
| 102 | + .childOption(ChannelOption.TCP_NODELAY, true) |
| 103 | + .childOption(ChannelOption.SO_RCVBUF, 32 * 1024); |
102 | 104 | b.bind(port).addListener(new GenericFutureListener<ChannelFuture>() { |
103 | 105 | @Override |
104 | 106 | public void operationComplete(ChannelFuture future) { |
105 | 107 | if (future.cause() != null) { |
106 | 108 | RecordLog.info("[NettyTransportServer] Token server start failed (port=" + port + "), failedTimes: " + failedTimes.get(), |
107 | | - future.cause()); |
| 109 | + future.cause()); |
108 | 110 | currentState.compareAndSet(SERVER_STATUS_STARTING, SERVER_STATUS_OFF); |
109 | 111 | int failCount = failedTimes.incrementAndGet(); |
110 | 112 | if (failCount > MAX_RETRY_TIMES) { |
|
0 commit comments