Spring

[Spring] Netty Server & Client 구현

lakelight 2022. 11. 23. 15:53
728x90
반응형
 

[Spring] Netty 개념

Netty 비동기 네트워크 프레임워크입니다. Channel에서 발생하는 이벤트들을 EventLoop가 처리하는 구조를 가집니다. Channel 기본 입출력 작업은 네트워크 전송에서 제공하는 기본형을 이용합니다. 자

lakelight.tistory.com

 

Java Netty Server TCP 통신

  • non-blocking model (NIO)
    • I/O 작업이 진행되는 동안 유저 프로세스의 작업을 중단시키기 않는 방식입니다.
    • non-blocking model은 작업 완료를 기다릴 필요가 없기 때문에 작업을 실행 시키고 다른 일을 할 수 있습니다.
    • 이벤트가 오면 그 이벤트를 받아서 처리하는 시스템입니다.
      • 많은 이벤트를 빠르고 경제적으로 처리할 수 있습니다. (Netty의 핵심)

 

Netty 핵심 컴포넌트

  • Channel
    • 하나 이상의 입출력 작업을 수행할 수 있는 하드웨어 장치, 파일, 네트워크 소켓, 프로그램 컴포넌트와 같은 엔티티에 대한 열린 연결입니다.
    • 간단하게 말하면 들어오고 나가는 데이터를 위한 운송 수단으로 생각하면 됩니다.
  • Callback
    • 다른 메서드로 자신에 대한 참조를 제공할 수 있는 메서드 입니다.
    • 관심 대상에게 작업 완료를 알리는 일반적인 방법 중 하나 입니다.
    • Netty는 이벤트를 처리할 때 Callback을 사용합니다. Callback이 트리거되면 ChannelHandler 인터페이스 구현을 통해 이벤트를 처리하게 됩니다.
  • ChannelFuture
    • Netty에서 작업이 완료되면 이를 애플리케이션에게 알리는 방법입니다. 이 객체는 비동기 작업의 결과를 담는 역할을 하고, 미래의 어떤 시점에 작업이 완료되면 그 결과에 접근할 수 있게 해줍니다.
    • Netty 에서는 ChannelFuture를 제공합니다.
  • Event & Handler
    • Netty는 작업의 상태 변화를 알리기 위해 고유한 이벤트를 이용합니다.
    • 발생한 이벤트를 기준으로 적절한 동작을 트리거할 수 있습니다.
      • 로깅
      • 데이터 변환
      • 흐름 제어
      • 애플리케이션 논리
    • Netty는 네트워크 프레임워크이므로, 인바운드, 아웃바운드 데이터 흐름에 대한 연관성을 기준으로 분류합니다.
    • 인바운드 데이터나 연관된 상태 변화로 트리거되는 이벤트
      • 연결 활성화, 비활성화
      • 데이터 읽기
      • 사용자 이벤트
      • 오류 이벤트
    • 아웃바운드 이벤트
      • 원격 피어로 연결 열기, 닫기
      • 소켓으로 데이터 쓰기, 플러시

 

Netty Server 구현

NettySocketServer.java

public class NettySocketServer {
    private int port;

    public NettySocketServer(int port) {
        this.port = port;
    }

    public void run(){
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        ServerBootstrap bootstrap = new ServerBootstrap();
        ServerBootstrap group = bootstrap.group(bossGroup, workerGroup);
        ServerBootstrap channel = group.channel(NioServerSocketChannel.class);
        ServerBootstrap bootstrap_childHandler = channel.childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                ChannelPipeline pipeline = ch.pipeline();
                pipeline.addLast(new StringEncoder());
                pipeline.addLast(new StringDecoder());
                pipeline.addLast(new NettySocketServerHandler());
            }
        });

        bootstrap_childHandler
                .option(ChannelOption.SO_BACKLOG, 128)
                .childOption(ChannelOption.SO_KEEPALIVE, true);
        try{
            ChannelFuture f = bootstrap.bind(port).sync();
            f.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

 

<Code Review>

EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
* boss 그룹은 연결을 담당하는 스레드들로 '동시'에 처리가능한 접속요청과 관련이 있습니다.
* boss 그룹에 핸들러를 추가해서 병목을 만들지 않는 이상
* 1개의 boss 스레드만으로도 충분히 많은 접속 요청을 처리할 수 있습니다.
* boss 스레드는 클라이언트의 연결을 수락하는 부모 스레드 -> 매개변수로 지정한 스레드 개수에 맞춰서 일 처리
* workerGroup은 worker쓰레드가 10개라면 100명의 클라이언트가 동시 접속 했을 때
* worker스레드 하나당 10명의 클라이언트를 처리합니다.

 

ServerBootstrap bootstrap = new ServerBootstrap();
* 네티의 부트스트랩은 네티가 작동할 때 기본적으로 설정해야하는 클래스 입니다.
* 부트스트랩을 사용하므로써 네티 소켓의 모드나 스레드 등을 쉽게 설정할 수 있습니다.
* 또한 이벤트 핸들러도 부트스트랩에서 설정해야합니다.
* ServerBootstrap은 두가지로 나뉘는데 하나는 서버 애플리케이션을 위한 것이고,
* 다른 하나는 클라이언트를 위한 부트스트랩입니다. 둘은 유사하긴 하지만 분명
* 다른 부분이 있습니다.

 

ServerBootstrap group = bootstrap.group(bossGroup, workerGroup);
* group()메서드는 EventLoopGroup을 설정하는 역할입니다.
* EventLoopGroup은 스레드의 그룹이라고 생각해도 됩니다.
* group()은 인자로 parentGroup과 childGroup을 받습니다.
* group()은 인자로 받은 스레드들로 스레드 그룹을 초기화 해줍니다.
* parentGroup은 부모 스레드로써 클라이언트 요청을 수락하는 역할을 하고
* childGroup은 자식 스레드로써 IO와 이벤트 처리를 담당합니다.

 

ServerBootstrap channel = group.channel(NioServerSocketChannel.class);
* channel() 메서드는 소켓의 입출력 모드를 설정하는 역할을 합니다.
* NioServerSocketChannel클래스는 논블로킹 모드로 채널을 만든다는 의미입니다.
* 소켓 관련 시스템 콜에 대하여 네트워크 시스템이 즉시 처리할 수 없는 경우라도
* 시스템콜이 바로 리턴되어 응용 프로그램이 block되지 않게 하는 소켓 모드입니다.
* 통신 상대가 여럿이거나 여러가지 작업을 병행하려면 nonblocking 또는 비동기 모드를 사용해야 합니다.
* non-blcking 모드를 사용하는 경우에는 일반적으로 어떤 시스템 콜이 성공적으로 실행될때까지 계속 루프를
* 돌면서 확인하는 방법(폴링)을 사용합니다.

 

ServerBootstrap bootstrap_childHandler = channel.childHandler(new ChannelInitializer<SocketChannel>() {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new StringEncoder());
        pipeline.addLast(new StringDecoder());
        pipeline.addLast(new NettySocketServerHandler());
    }
});
* InitChannel() 에서 Channel 설정을 해줍니다.
* 들어오는 데이터와 나가는 데이터에 대한 Encoder, Decoder를 설정합니다.
* 데이터가 들어올 때 어떻게 처리할지에 대한 Handler를 설정합니다.

 

bootstrap_childHandler
        .option(ChannelOption.SO_BACKLOG, 128)
        .childOption(ChannelOption.SO_KEEPALIVE, true);
* childHandler() 메서드는 소켓 채널로 송수신 되는 데이터를 가공하는 역할을 합니다.
* ChannelInitializer 객체의 initChannel 메서드를 구현해서 파이프라인 객체를 만들고
* 핸들러를 파이프라인에 추가했습니다.* option() 메서드는 서버 소켓의 옵션을 설정할 수 있고,
* childOption() 메서드는 서버에 접속한 클라이언트 소켓에 대한 옵션을 설정합니다.
* TCP_NODELAY : 데이터 송수신에 네이글 알고리즘 비활 성화 여부 지정
* SO_KEEPALIVE : 운영체제에서 지정된 시간에 한번씩 keepalive 패킷을 상대 방에게 전송
* SO_BACKLOG : 동시에 수용 가능한 소켓 연결 요청수

 

ChannelFuture f = bootstrap.bind(port).sync();
f.channel().closeFuture().sync();
* 서버를 비동기 식으로 바인딩 한다. sync() 는 바인딩이 완료되기를 대기한다.
* ChannelFuture 는 작업이 완료되면 그 결과에 접근 할 수 있게 해주는
* 자리 표시자 역활을 하는 인터페이스입니다.
* 아래 코드는 부트스트랩 시동장치에 포트번호를 부여한다고 이해하였습니다.
* 아래 코드는 채널의 CloseFuture를 얻고 완료 될때 까지 현재 스레드를 블로킹한다.

 

NettySocketServerHandler.java

@ChannelHandler.Sharable //안전하게 데이터를 처리하도록 하는 어노테이션
public class NettySocketServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        final ByteBuf in = (ByteBuf) msg;
        System.out.println(
            "Server received: " + in.toString(CharsetUtil.UTF_8)
        );
        ctx.write(in);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush(Unpooled.EMPTY_BUFFER)
            .addListener(ChannelFutureListener.CLOSE);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

 

<Code Review>

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg){
데이터가 들어왔을 때 처리하는 로직입니다.

 

public void channelReadComplete(ChannelHandlerContext ctx) throws Exception{
모든 데이터가 들어오면 실행하는 로직입니다.

 

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause){
에러를 처리하는 로직입니다.

 

NettyController.java

@Controller
@Slf4j
public class NettyController {
    private NettySocketServer server;

    @PostConstruct
    private void start(){
        new Thread(new Runnable() {
            @Override
            public void run() {
                try{
                    log.info("Start Socket TCP { Port: 5050 }");
                    server = new NettySocketServer(5050);
                    server.run();
                } catch (Exception e){
                    e.printStackTrace();
                }
            }
        }).start();
    }

    @PreDestroy
    private void destroy(){
        log.info("Destroy Socket TCP { Port: 5050 }");
    }
}

 

 

Netty Client 구현

NettySocketClientHandler.java

@Sharable
public class NettySocketClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush(
        Unpooled.copiedBuffer("Netty rocks!", CharsetUtil.UTF_8)
        );
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
        System.out.println(
                "Client received: " + msg.toString(CharsetUtil.UTF_8)
        );
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

 

NettySocketClient.java

public class NettySocketClient {
    private final String host;
    private final int port;

    public EchoClient(String host, int port) {
        this.port = port;
        this.host = host;
    }

    public void start() throws InterruptedException {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group)
                    .channel(NioSocketChannel.class)
                    .remoteAddress(new InetSocketAddress(host, port))
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline()
                        .addLast(new NettySocketClientHandler());
                        }
                    });
            ChannelFuture f = b.connect().sync();
            f.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully().sync();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        if (args.length < 1) {
            System.err.println(
                    "Usage: " + EchoClient.class.getSimpleName() + "<host> <port>"
            );
        }
        String host = args[0];
        int port = Integer.parseInt(args[1]);
        new EchoClient(host, port).start();
    }
}

 

 

[참고]

1. Netty 개념과 아키텍처

2. Netty-In-Action - 1부, 네티 개념과 아키텍처

728x90
반응형