본문 바로가기

GRPC

2) GRPC Client - Server 양방향 통신

반응형

1부에서 간단한 client - server의 1:1 방식에 대해 테스트 해봤다 .

 

이제 clien -server의 1 : N , N : N  방식에 대해 테스트 해보자

 

먼저 앞에서 작성한 .proto파일을 수정해서 server와 client 양쪽 소스를 재빌드한다.

 

1) .proto 파일을 아래와 같이 수정

syntax = "proto3";

option java_multiple_files = true;
option java_outer_classname = "SampleProto";
option java_package = "com.devracoon.grpc.proto";

package com.devracoon.grpc;

message SampleRequest {
	string userId = 1; string message = 2; 
}

message SampleResponse { 
	string message = 1; 
} 

service SampleService {
	rpc sampleCall (SampleRequest) returns (SampleResponse) {} 
	
	rpc clientServerStream (SampleRequest) returns (stream SampleResponse) {}
	
	rpc clientStreamServerStream (stream SampleRequest) returns (stream SampleResponse) {}
}

 

위 proto 파일을 보면 steram 키워드가 붙어 있는것이 보인다. stream 키워드를 붙이면 여러개의 응답과 요청을 보낼수 있게 된다. stream 키워드를 붙이지 않고 여러개의 요청과 응답을 보내면  하나의 요청이나 응답만 보내거나 exceptiondl 발생하게 된다.

 

위와같이 소스를 수정하고나면 1부에서 처럼 gradle build를해서 소스를 generate 해준다.

 

서버쪽 소스에 위에 추가한 clientServerStream 과 clientStreamServerStream 두개의 함수를 추가한다.

 

2) Server쪽 소스에 추가.

package com.devracoon.grpc.sevice;

import org.springframework.stereotype.Service;

import com.devracoon.grpc.proto.SampleRequest;
import com.devracoon.grpc.proto.SampleResponse;
import com.devracoon.grpc.proto.SampleServiceGrpc;

import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@Service
public class SampleServiceImpl extends SampleServiceGrpc.SampleServiceImplBase {

	@Override
    public void sampleCall(SampleRequest request, StreamObserver<SampleResponse> responseObserver) {
 		log.info("SampleServiceImpl#sampleCall - {}, {}", request.getUserId(), request.getMessage());
        SampleResponse sampleResponse = SampleResponse.newBuilder()
                .setMessage("grpc service response")
                .build();
 
        responseObserver.onNext(sampleResponse);
        responseObserver.onCompleted();
    }

	@Override
	public void clientServerStream(SampleRequest request, StreamObserver<SampleResponse> responseObserver) {
		log.info("SampleServiceImpl#clientServerStream - {}, {}", request.getUserId(), request.getMessage());
		
		log.info("SampleServiceImpl#clientServerStream - {}, {}", request.getUserId(), request.getMessage());
        SampleResponse sampleResponse = SampleResponse.newBuilder()
                .setMessage("grpc service response #1")
                .build();
       
        SampleResponse sampleResponse2 = SampleResponse.newBuilder()
                .setMessage("grpc service response #2")
                .build();
       
        SampleResponse sampleResponse3 = SampleResponse.newBuilder()
                .setMessage("grpc service response #3")
                .build();
        responseObserver.onNext(sampleResponse);
        responseObserver.onNext(sampleResponse2);
        responseObserver.onNext(sampleResponse3);

        responseObserver.onCompleted();


				
	}

	@Override
	public StreamObserver<SampleRequest> clientStreamServerStream(StreamObserver<SampleResponse> responseObserver) {
		return new StreamObserver<SampleRequest>() {
			@Override 
			public void onNext(SampleRequest value) {
				//클라이언트로부터 데이터가 올 때마다 onNext가 호출된다. 
				//1개의 데이터가 올 때마다 3개의 응답을 던져줄 예정이다. 
				log.info("clientStreamServerStream : " + value.getUserId() + "/" + value.getMessage());
				responseObserver.onNext(SampleResponse.newBuilder().setMessage("Reseponse #1").build());
				responseObserver.onNext(SampleResponse.newBuilder().setMessage("Reseponse #2").build());
				responseObserver.onNext(SampleResponse.newBuilder().setMessage("Reseponse #3").build()); 
			} 
			@Override 
			public void onError(Throwable t) {
				log.info("clientStreamServerStream : onError"); 
			} 
			@Override 
			public void onCompleted() { 
				log.info("clientStreamServerStream : onCompleted");
				responseObserver.onCompleted(); 
			} 
		};

	}
 	
 	
}

 

3) client 쪽 소스를 수정

package com.devracoon.grpc;

import org.springframework.stereotype.Service;

import com.devracoon.grpc.proto.SampleRequest;
import com.devracoon.grpc.proto.SampleResponse;
import com.devracoon.grpc.proto.SampleServiceGrpc;

import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@Service
public class GrpcClientServiceImpl  {

	private static final int PORT = 3030;
    public static final String HOST = "localhost";
    private final SampleServiceGrpc.SampleServiceStub asyncStub = SampleServiceGrpc.newStub(
            ManagedChannelBuilder.forAddress(HOST, PORT)
            .usePlaintext()
            .build()
    );
    
    public String asyncCall() {
        SampleRequest sampleRequest = SampleRequest.newBuilder()
                .setUserId("devracoon")
                .setMessage("grpc async test")
                .build();
 
        asyncStub.sampleCall(sampleRequest, new StreamObserver<SampleResponse>() {
            @Override
            public void onNext(SampleResponse value) {
                log.info("asyncCall onNext - {}", value);
            }
 
            @Override
            public void onError(Throwable t) {
                log.error("asyncCall - onError");
            }
 
            @Override
            public void onCompleted() {
                log.info("asyncCall - onCompleted");
            }
        });
        return "string";
    }
    
    public void clientServerStream() {
        SampleRequest sampleRequest = SampleRequest.newBuilder()
                .setUserId("devracoon")
                .setMessage("grpc clientServerStream test")
                .build();
 
        asyncStub.sampleCall(sampleRequest, new StreamObserver<SampleResponse>() {
            @Override
            public void onNext(SampleResponse value) {
                log.info("clientServerStream onNext - {}", value);
            }
 
            @Override
            public void onError(Throwable t) {
                log.error("clientServerStream - onError");
            }
 
            @Override
            public void onCompleted() {
                log.info("clientServerStream - onCompleted");
            }
        });
    }
    
    public void clientStreamServerStream() {
        SampleRequest sampleRequest1 = SampleRequest.newBuilder()
                .setUserId("devracoon")
                .setMessage("grpc clientStreamServerStream test #1")
                .build();
        SampleRequest sampleRequest2 = SampleRequest.newBuilder()
                .setUserId("devracoon")
                .setMessage("grpc clientStreamServerStream test #2")
                .build();
        SampleRequest sampleRequest3 = SampleRequest.newBuilder()
                .setUserId("devracoon")
                .setMessage("grpc clientStreamServerStream test #3")
                .build();
        
        StreamObserver<SampleResponse> responseObserver = new StreamObserver<SampleResponse>() {
            @Override
            public void onNext(SampleResponse value) {
            	log.info("clientStreamServerStream onNext - {}", value);
            }

            @Override
            public void onError(Throwable t) {
            	log.error("clientStreamServerStream - onError");
            }

            @Override
            public void onCompleted() {
            	log.info("clientStreamServerStream - onCompleted");
            }
        };
        
        StreamObserver<SampleRequest> requestObserver = asyncStub.clientStreamServerStream(responseObserver);
        try {
            requestObserver.onNext(sampleRequest1);
            requestObserver.onNext(sampleRequest2);
            requestObserver.onNext(sampleRequest3);
        } catch (Exception e) {
            requestObserver.onError(e);
            throw e;
        }
        
        requestObserver.onCompleted();

    }
    

}

 

이제 새로 추가한 두개의 함수를 각각 호출해 보자.

 

1) clientServerStream 의 결과

 

하나의 요청에 3개의 응답을 받았다.

 

2) clientStreamServerStream 의 서버측 요청 내용

서버측에서 보면 3개의 요청을 받았다.

 

3) clientStreamServerStream 의 클라이언트측 응답 내용

 

각 Request에 대해 3개의 Response로 총 9개의 Response가 온것을 확인.

 

위와 같이 양방향으로 다양한 방법으로 서버의 클라이언트의 통신을 해봤다.

 

실무에서는 아직 사용을 해보진 않았지만 차후에 굉장히 다양하게 활용이 가능해 보인다. 

 

실무 적용후 내용과 그에 대한 성능 및 결과에 대해 공유하도록 하겠다.

 

 

'GRPC' 카테고리의 다른 글

1) GRPC 란 ?  (0) 2021.01.16