DIVX テックブログ

catch-img

マイクロサービス構築の本命 gRPC入門


目次[非表示]

  1. 1.はじめに
  2. 2.gRPCって何?
  3. 3.公式クイックスタートをやってみよう
  4. 4.gRPCサーバーが実装される仕組みを見てみよう
    1. 4.1.各種変数、構造体、メソッドの定義
    2. 4.2.tcp通信のためのリスナーを作成
    3. 4.3.gRPCサーバーを新規作成
    4. 4.4.gRPCサーバーにサービスを登録
  5. 5.おわりに
  6. 6.採用情報
  7. 7.参考文献

はじめに

こんにちは。株式会社divxエンジニアの阪村です。

この記事では、gRPCの概要を説明しつつ、Goを用いて公式クイックスタートの内容を元にサーバー側の処理について紹介します。

みなさんはgRPCを扱ったことはありますか?

「最近よく聞くようになったけど、よくわかっていない」「ちょっと触ったことあるけど結局何をしているのかまでは理解していない」

そういった方も多いのではないでしょうか?何を隠そう私自身がそうでした。

初めてgRPCを用いた実装をおこなった際は「なんか思ってたより簡単に実装できたけど、何が起こってるだ?」という状態でした。

お手軽に実装が可能なgRPCの魅力とどのような処理によって成りなっているのかを公開されているgithubのコードを元に一緒に見ていきましょう。

gRPCって何?

gRPCを理解するには、前提としてRPCというプロトコルについて知っておく必要があります。

RPC(Remote Procedure Call)は日本語にすると「遠隔手続き呼出し」と表現されます。

これは、あるプログラムがネットワーク上の別のコンピューターにあるプログラムに対してサービスを要求する際に使用できるプロトコルのことです。

PC(ローカル)上にあるアプリケーションから別の環境(別のサーバー上など)にあるアプリケーションのメソッドを呼び出すケースなどが当てはまります。

RPCのオープンソースフレームワークとしてGoogleが2015年に開発されたシステムがgRPCです。デフォルトではプロトコルバッファーという構造化データのシリアライズ技術とHTTP/2通信を用いて分散アプリケーションやサービスを容易に構築できます。このことからも、マイクロサービスにおけるサービス間の通信を実現するための技術として採用を検討されることが多いのではないでしょうか。

ちなみにgRPC以外のRPC技術にはJava RMI、XML-RPC、JSON-RPCなどがあります。
またgRPCはUnary RPCと呼ばれる1つのリクエストに対して、1つのレスポンスを返す通信方式だけでなく、各種Streaming方式の通信にも対応しています。

以下にgPRCが対応しているRPC方式をまとめています。

  • Unary RPC
    • 通常の関数呼び出しと同様に、クライアントはサーバに単一のリクエストを送信し、単一のレスポンスを取得します。
  • Server streaming RPC
    • クライアントはサーバにリクエストを送信し、複数のレスポンスを取得します。つまり1つのリクエストに対して複数のレスポンスが返される方式です。
  • Client streaming RPC
    • クライアントが一連のメッセージを送信し、提供されたストリームを使用してサーバーに送信します。つまり複数のリクエストに対してサーバーから1つのレスポンスが返される方式です。
  • Bidirectional streaming RPC
    • 双方向ストリーミングのことを指します。2つのストリームが独立して動作します。たとえば、サーバはクライアントのメッセージをすべて受信してから応答を書き込むこともできますし、メッセージを読んでから書き込むことを交互に行ったり、他の読み書きの組み合わせも可能です。

公式クイックスタートをやってみよう

ここからはGoのgRPC公式クイックスタートを元にgRPCを用いたUnaryRPCの実装方法を解説します。※クイックスタートは2022/9月時点のものです。

内容やコードは抜粋しておりますので、実際に手を動かして検証、再現をしたい方は上記リンク記載の手順やコマンドをご参照ください。gRPC公式クイックスタートは、Go以外の言語でも用意されていますので、気になる言語がある方はぜひそちらを確認してみてください。

さっそくクイックスタートに記載の指示通り、grpc-goリポジトリをgit cloneしてきてexamples/helloworldディレクトリに移動しましょう。

exampleshelloworldディレクトリ内には、さらに3つのディレクトリ(greeter_client、greeter_server、helloworld)が含まれています。

この中のhelloworldディレクトリ(examples/helloworld/helloworld)にprotoという拡張子のファイルがあります。

前述の通り、gRPCはデフォルトではプロトコルバッファー (Protocol Buffer)というインターフェイス定義言語(IDL) でメソッド(エンドポイント)やサービスの構造を定義します。ここでのサービスは1つ以上のメソッドを持ったグループのことを指します。

プロトコルバッファーではこのprotoファイルにメソッドとサービスを定義してます。
git cloneしてきたリポジトリのファイルには、すでに1つのサービスとメソッドが定義されています。ファイルの中身を見てましょう。コメントで簡単に解説を加えています。

helloworld/helloworld.proto

// 省略
package helloworld;

// The greeting service definition.
service Greeter {
// Sends a greeting
rpc SayHello (HelloRequest) returns (HelloReply) {}
}

// The request message containing the user's name.
message HelloRequest {
string name = 1;
}

// The response message containing the greetings
message HelloReply {
string message = 1;
}


Greeter(挨拶する人)というサービスを定義しています。さらにGreeterサービスのメソッドとしてHelloRequest型の引数を受け取り、HelloReply型のレスポンスを返すSayHelloというメソッドを定義しています。

HelloRequestとHelloReplyについては、それぞれnameというstring型のフィールドとmessageというstring型のフィールドを含むように定義されています。

※protoファイルの詳しい書き方など知りたい方は以下を参照ください。
https://developers.google.com/protocol-buffers/docs/overview

クイックスタートではこのGreeterサービスに新たにSayHelloAgainというメソッドを追加します。

helloworld/helloworld.proto

// 省略
package helloworld;

// The greeting service definition.
service Greeter {
// Sends a greeting
rpc SayHello (HelloRequest) returns (HelloReply) {}
// 新しくSayHelloAgainメソッドを定義
rpc SayHelloAgain (HelloRequest) returns (HelloReply) {}
}

// The request message containing the user's name.
message HelloRequest {
string name = 1;
}

// The response message containing the greetings
message HelloReply {
string message = 1;
}

以下のprotocコマンドを対象ディレクトリで入力するとprotoファイルに追記した内容がhelloworldディレクトリ内にあるhelloworld_grpc.pb.goとhelloworld.pb.goに自動でコンパイルされ更新されます。

$ protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative helloworld/helloworld.proto

ちなみにクイックスタートでは元々ある2つの該当のファイルが更新されますが、該当ファイルがない場合はファイルが新規で生成されます。

helloworld_grpc.pb.goには定義したサービスとメソッドの内容が反映され、helloworld.pb.goには定義したリクエストとレスポンスの内容が反映されます。

その後、server側とclient側のファイルに以下の記述を追加しSayHelloAgainメソッドとそれを呼び出す処理を実装します。

greeter_server/main.go

package main

import (
	"context"
	"flag"
	"fmt"
	"log"
	"net"

	"google.golang.org/grpc"
	pb "google.golang.org/grpc/examples/helloworld/helloworld"
)

var (
	port = flag.Int("port", 50051, "The server port")
)

// server is used to implement helloworld.GreeterServer.
type server struct {
	pb.UnimplementedGreeterServer
}

// SayHello implements helloworld.GreeterServer
func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
	log.Printf("Received: %v", in.GetName())
	return &pb.HelloReply{Message: "Hello " + in.GetName()}, nil
}

// SayHelloAgainメソッドを追記
func (s *server) SayHelloAgain(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply,error) {
return &pb.HelloReply{Message: "Hello again " + in.GetName()},nil
}

func main() {
	flag.Parse()
	lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))
	if err != nil {
		log.Fatalf("failed to listen: %v", err)
	}
	s := grpc.NewServer()
	pb.RegisterGreeterServer(s, &server{})
	log.Printf("server listening at %v", lis.Addr())
	if err := s.Serve(lis); err != nil {
		log.Fatalf("failed to serve: %v", err)
	}
}

greeter_client/main.go

package main

import (
	"context"
	"flag"
	"log"
	"time"

	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"
	pb "google.golang.org/grpc/examples/helloworld/helloworld"
)

const (
	defaultName = "world"
)

var (
	addr = flag.String("addr", "localhost:50051", "the address to connect to")
	name = flag.String("name", defaultName, "Name to greet")
)

func main() {
	flag.Parse()
  // Set up a connection to the server.
	conn, err := grpc.Dial(*addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
	if err != nil {
		log.Fatalf("did not connect: %v", err)
	}
	defer conn.Close()
	c := pb.NewGreeterClient(conn)

	// Contact the server and print out its response.
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()
	r, err := c.SayHello(ctx, &pb.HelloRequest{Name: *name})
	if err != nil {
		log.Fatalf("could not greet: %v", err)
	}
	log.Printf("Greeting: %s", r.GetMessage())
	// SayHelloAgainの呼び出しを追記
	r, err = c.SayHelloAgain(ctx, &pb.HelloRequest{Name: *name})
	if err != nil {
		log.Fatalf("could not greet: %v", err)
	}
	log.Printf("Greeting: %s", r.GetMessage())
}

あとはサーバー側とクライアント側でそれぞれターミナルからmain.goをrunさせれば、Greeting(挨拶)を2回返してくれることが確認できます。

ターミナルからサーバー側のmain.goを起動させます。

$ go run greeter_server/main.go

以下は別ターミナルで実行します。引数にnameを渡します。

$ go run greeter_client/main.go --name=Momotaro

以下のレスポンスがログで表示されることを確認できます。

Greeting: Hello Momotaro Greeting: Hello again Momotaro

これだけでgRPCでUnary RPC通信を用いたメソッドの追加と呼び出しが実装できました。

クイックスタートでは、単一の言語を用いて同じリポジトリでエンドポイント(サーバー)とクライアントの実装を行ったので、gRPCの良さがイマイチ把握しづらいですが、gRPCの大きな利点の1つに多言語をサポートしている点が挙げられます。

今回取り上げたGoの他に、C#, C++, Dart, Java, Node.js, PHP, Python, Rubyなどをサポートしています。マイクロサービスにおいて、異なる言語で複数サービスを実装している場合でも、gRPCが一貫性のある形でサービス間の接続やデータ転送を処理してくれます。

gRPCサーバーが実装される仕組みを見てみよう

さてここからは、クイックスタートで実装したコードの内容をとくにサーバー側に絞って少し深堀りして、どういう仕組みでこの挨拶を返すアプリが動いているのかを見ていきます。

クイックスタートの最後に実装したサーバー側アプリであるgreeter_server/main.goの内容をコードを分けて見ていきます。

最初にざっくりと概要を説明すると今回greeter_server/main.goに記述されているコードは以下の要素で構成されています。

  • 各種変数、構造体、メソッドの定義
  • main関数
    •  tcp通信のためのリスナーを作成
    • gRPCサーバーを新規作成
    • gRPCサーバーにサービスを登録
    • サービスを登録したサーバーを起動

各種変数、構造体、メソッドの定義

それでは、まずは以下のコードからです。

greeter_server/main.go

// server is used to implement helloworld.GreeterServer.
type server struct {
	pb.UnimplementedGreeterServer
}

serverという名前の構造体を定義しています。

フィールドであるUnimplementedGreeterServerは、protoファイルで定義されたコードから自動コンパイルされたhelloworld_grpc.pb.go内に実装されています。英語で補足のコメントもされています。

helloworld/helloworld_grpc.pb.go

// UnimplementedGreeterServer must be embedded to have forward compatible implementations.
type UnimplementedGreeterServer struct {
}

(コメント訳)UnimplementedGreeterServerは前方互換性のある実装を持つように埋め込む必要があります。


アプリ側でgRPCサーバー実装時にUnimplementedGreeterServerを埋め込んだ構造体を作る必要があると記されています。

main.goではこのコメントの通り、構造体serverにpb.UnimplementedGreeterServerを埋め込んでいる訳ですね。

このUnimplementedGreeterServerには以下の3つのメソッドが定義されています。

helloworld/helloworld_grpc.pb.go

func (UnimplementedGreeterServer) SayHello(context.Context, HelloRequest) (HelloReply, error) {
return nil, status.Errorf(codes.Unimplemented, "method SayHello not implemented")
}
func (UnimplementedGreeterServer) SayHelloAgain(context.Context, HelloRequest) (HelloReply, error) {
return nil, status.Errorf(codes.Unimplemented, "method SayHelloAgain not implemented")
}
func (UnimplementedGreeterServer) mustEmbedUnimplementedGreeterServer() {}

これらは同ファイル内の下部にあるGreetingServiceServerインターフェイスを満たすのに必要なメソッドとなっています。

helloworld/helloworld_grpc.pb.go

// GreeterServer is the server API for Greeter service.
// All implementations must embed UnimplementedGreeterServer
// for forward compatibility
type GreeterServer interface {
	SayHello(context.Context, *HelloRequest) (*HelloReply, error)
	SayHelloAgain(context.Context, *HelloRequest) (*HelloReply, error)
	mustEmbedUnimplementedGreeterServer()
}

(コメント訳)GreeterServer は Greeter サービスのためのサーバ API です。 すべての実装は、前方互換性のためにUnimplementedGreeterServer を埋め込む必要があります。


ここにもUnimplementedGreeterServerを埋め込む必要がある旨のコメントが記述されています。

pb.UnimplementedGreeterServerを埋めこむことでserverはGreeterServerインターフェイスを満たすことができます。

後ほどGreeterServer型を引数に渡すメソッドが出てきますので、そこでこのserverが必要になります。

さて、GreeterServerのメソッドであるSayHelloとSayHelloAgainはprotoファイルに定義したメソッドがコンパイルされたものだとわかりますが、最後のmustEmbedUninplementedGreeterServerはなんでしょうか?

このメソッドはgRPCサーバーを作成する際にUninplementedGreeterServerを埋め込むことを強制する役割を持ちます。

たとえば、パブリックなメソッドであるSayHelloとSayHelloAgainを満たす構造体は、以下のような形でも作ることができます。

// SayHelloメソッドとSayHelloAgainメソッドだけなら以下のような形で外部からでも満たすことができる
type sampleServer interface {
	SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error)
	SayHelloAgain(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error)
}

type server struct {
	sampleServer
}

ですが、プライベートメソッドであるmustEmbedUninplementedGreeterServerを含めることでUnimplementedGreeterServer自体を埋め込む以外にファイル外でインターフェイスGreeterServerを満たすことができなくなります。つまり、mustEmbedUninplementedGreeterServerがあることでインターフェイスGreeterServerを満たすためにUninplementedGreeterServerの埋め込みを強制していることになります。

type sampleServer interface {
	SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error)
	SayHelloAgain(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error)
	mustEmbedUnimplementedGreeterServer()
}

// 形だけmustEmbedUnimplementedGreeterServerを追記してもGreeterServerを満たしたことにはならない
type server struct {
	sampleServer
}

// GreeterServerを満たすにはUnimplementedGreeterServer埋め込むしかない
type server struct {
	pb.UnimplementedGreeterServer
}

またSayHelloメソッドとSayHelloAgainメソッドはUnimplementedGreeterServerでは以下のようにメソッドが定義されています。
helloworld/helloworld_grpc.pb.go

func (UnimplementedGreeterServer) SayHello(context.Context, *HelloRequest) (*HelloReply, error) {
	return nil, status.Errorf(codes.Unimplemented, "method SayHello not implemented")
}
func (UnimplementedGreeterServer) SayHelloAgain(context.Context, *HelloRequest) (*HelloReply, error) {
	return nil, status.Errorf(codes.Unimplemented, "method SayHelloAgain not implemented")
}
func (UnimplementedGreeterServer) mustEmbedUnimplementedGreeterServer() {}

これによりアプリ側でUnimplementedGreeterServerを埋め込んだサーバーを自作する際にメソッドをオーバーライドしないと実装がされていない旨のエラーが発せられるようになっています。

そこでmain.goでは以下ようにメソッドのオーバーライドをしています。

greeter_server/main.go

// SayHello implements helloworld.GreeterServer
func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
	log.Printf("Received: %v", in.GetName())
	return &pb.HelloReply{Message: "Hello " + in.GetName()}, nil
}

func (s *server) SayHelloAgain(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
	return &pb.HelloReply{Message: "Hello again " + in.GetName()}, nil
}

tcp通信のためのリスナーを作成

次はmain.goのmain関数を見ていきます。

greeter_server/main.go

flag.Parse()
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))
if err != nil {
log.Fatalf("failed to listen: %v", err)
}

この部分ではGoの公式パッケージを利用した処理のみでgRPC特有の処理は実装されていません。

flag.Parse()でコマンドラインの引数のフラグが解析し、適切なフラグが渡されていればフラグ内容をファイルの上部で定義している変数portにバインドしています。

次にnet.Listenで変数portに代入されているアドレスの値を元にtcp接続のListenポートを開いてリスナー(net.Listener)を受け取っています。

gRPCサーバーを新規作成

次のコードにいってみましょう。

greeter_server/main.go

s := grpc.NewServer()

なんとなくgrpc用のサーバーを立てていることはわかりますね。

詳しく見るためにimport元であるgRPC-goパッケージのコードを覗いてみましょう。

ちなみにクイックスタートではgRPC-goリポジトリをまるごとgit cloneしてきているので、ローカルに存在するファイルからでも確認できます。

serer.go

// NewServer creates a gRPC server which has no service registered and has not
// started to accept requests yet.
func NewServer(opt ...ServerOption) *Server {
// 省略

(コメント訳)NewServerは、サービスが登録されておらず、リクエストの受け付けを開始していないgRPCサーバを作成します。


NewServerは引数にServerOption型でオプションを指定でき、返り値としてはServerのポインタを返す関数になっています。

ServerOption自体はserverOptions型のポインタをapply(反映)させるメソッドを持つインターフェイスです。

実際のオプションの管理はserverOptionsの各フィールドで行っています。

server.go

// A ServerOption sets options such as credentials, codec and keepalive parameters, etc.
type ServerOption interface {
	apply(*serverOptions)
}

type serverOptions struct {
	creds                 credentials.TransportCredentials
	codec                 baseCodec
	cp                    Compressor
	dc                    Decompressor
	unaryInt              UnaryServerInterceptor
	streamInt             StreamServerInterceptor
	chainUnaryInts        []UnaryServerInterceptor
	chainStreamInts       []StreamServerInterceptor
	inTapHandle           tap.ServerInHandle
	statsHandlers         []stats.Handler
	maxConcurrentStreams  uint32
	maxReceiveMessageSize int
	maxSendMessageSize    int
	unknownStreamDesc     *StreamDesc
	keepaliveParams       keepalive.ServerParameters
	keepalivePolicy       keepalive.EnforcementPolicy
	initialWindowSize     int32
	initialConnWindowSize int32
	writeBufferSize       int
	readBufferSize        int
	connectionTimeout     time.Duration
	maxHeaderListSize     *uint32
	headerTableSize       *uint32
	numServerWorkers      uint32
}

NewServerが定義されているファイルと同じファイル(server.go)にServerOption型を返す関数が定義されており、それらを使って通信方式の変更などをオプションとして設定できます。

関数は20個以上あり、たとえば以下のようなものがあります。
// サーバーのkeepalive実施ポリシーを設定するServerOptionを返します。

// サーバーのkeepalive実施ポリシーを設定するServerOptionを返します。
func KeepaliveEnforcementPolicy(kep keepalive.EnforcementPolicy) ServerOption {
	return newFuncServerOption(func(o *serverOptions) {
		o.keepalivePolicy = kep
	})
}

// メッセージのマーシャルとアンマーシャルにCodecを設定するServerOptionを返します。
func CustomCodec(codec Codec) ServerOption {
	return newFuncServerOption(func(o *serverOptions) {
		o.codec = codec
	})
}

// Stream通信を実装するために必要なStreamServerInterceptor(Middalewareのようなもの)を設定するServerOptionを返します。 
func StreamInterceptor(i StreamServerInterceptor) ServerOption {
	return newFuncServerOption(func(o *serverOptions) {
		if o.streamInt != nil {
			panic("The stream server interceptor was already set and may not be reset.")
		}
		o.streamInt = i
	})
}

次はNewServerの返り値に設定されているServerの方も見てみましょう。以下のように定義されています。

server.go

// Server is a gRPC server to serve RPC requests.
type Server struct {
	opts serverOptions

	mu  sync.Mutex // guards following
	lis map[net.Listener]bool
	// conns contains all active server transports. It is a map keyed on a
	// listener address with the value being the set of active transports
	// belonging to that listener.
	conns    map[string]map[transport.ServerTransport]bool
	serve    bool
	drain    bool
	cv       *sync.Cond              // signaled when connections close for GracefulStop
	services map[string]*serviceInfo // service name -> service info
	events   trace.EventLog

	quit               *grpcsync.Event
	done               *grpcsync.Event
	channelzRemoveOnce sync.Once
	serveWG            sync.WaitGroup // counts active Serve goroutines for GracefulStop

	channelzID *channelz.Identifier
	czData     *channelzData

	serverWorkerChannels []chan *serverWorkerData
}

serverOptionsに加え、排他制御用のsync.Mutexや各種通信接続を管理するためのフィールドが設定されています。

serer.go

func NewServer(opt ...ServerOption) *Server {
	opts := defaultServerOptions
	for _, o := range extraServerOptions {
		o.apply(&opts)
	}
	for _, o := range opt {
		o.apply(&opts)
	}
	s := &Server{
		lis:      make(map[net.Listener]bool),
		opts:     opts,
		conns:    make(map[string]map[transport.ServerTransport]bool),
		services: make(map[string]*serviceInfo),
		quit:     grpcsync.NewEvent(),
		done:     grpcsync.NewEvent(),
		czData:   new(channelzData),
	}
	chainUnaryServerInterceptors(s)
	chainStreamServerInterceptors(s)
	s.cv = sync.NewCond(&s.mu)
	if EnableTracing {
		_, file, line, _ := runtime.Caller(1)
		s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line))
	}

	if s.opts.numServerWorkers > 0 {
		s.initServerWorkers()
	}

	s.channelzID = channelz.RegisterServer(&channelzServer{s}, "")
	channelz.Info(logger, s.channelzID, "Server created")
	return s
}

NewServerの中身の処理を見てみるとdefaultOptionと引数で渡したオプションの内容を登録(場合によっては上書き)してServer内のoptsに代入しています。opts以外のフィールドでは初期化のための処理をしています。

chainUnaryServerInterceptors関数とchainStreamServerInterceptors関数にs(Server)を渡すことで登録したoptから、それぞれUnaryServerのInterceptorとStreamServerのInterceptorを取得してひとまとめにする処理をしています。

次に標準パッケージsyncのNewCond関数をServer内のmuフィールドの値(この時点では初期値)を渡して呼び出し、同じくServer内のフィールドであるcvに結果を代入して排他制御を行うための準備をしています。

※syncパッケージについて、詳しく知りたい方は以下をご参照ください。https://pkg.go.dev/sync

グローバル変数であるEnableTracingはbool型であり、golang.org/x/net/trace パッケージを使用してRPCをトレースするかどうかを制御をしています。

基本的にはtrueになっており、Serverのeventsフィールドにトレースを開始する旨のログを代入します。

ベンチマークを取る処理などにおいてはfalseにされることがあります。

benchmark/worker/main.go

// 省略
func main() {
	grpc.EnableTracing = false
// 省略

次にオプションにより設定されたワーカー数が0より多ければ、ServerのinitServerWorkersメソッドを呼び出し、受信接続を処理するワーカーゴルーチンとチャンネルを作成します。

※デフォルト(defaulOption)では設定されていません。

そして、ServerをgRPC内部のchannelzデータベースに登録して、サーバに割り当てられた一意のchannelzトラッキングIDをServerのchannelzIDフィールドに代入します。

最後にここまでの設定をしたsを返り値として返します。

gRPCサーバーにサービスを登録

再度main.goのmain関数に戻ります。

greeter_server/main.go

pb.RegisterGreeterServer(s, &server{})

NewServerにより返されたServer構造体であるsを初期化したserverのアドレスと共にhelloworld_grpc.pb.goに定義されているRegisterGreeterServer関数に渡しています。

helloworld/helloworld_grpc.pb.go

func RegisterGreeterServer(s grpc.ServiceRegistrar, srv GreeterServer) {
	s.RegisterService(&Greeter_ServiceDesc, srv)
}

このRegisterGreeterServer関数でさきほど、pb.UnimplementedGreeterServerを埋め込むことでGreeterServerインターフェイスを満たしたserverが必要となります。

RegisterGreeterServer関数ではServerのRegisterServiceメソッドが呼ばれています。第一引数でアドレスが渡されているGreeter_ServiceDescは今回作成したGreeterサービス用のServiceDescです。

ServiceDescはgo-grpcにおいて以下のように定義されている構造体でサービスの仕様情報をフィールドとして持ちます。

server.go

// ServiceDesc represents an RPC service's specification.
type ServiceDesc struct {
	ServiceName string
	// The pointer to the service interface. Used to check whether the user
	// provided implementation satisfies the interface requirements.
	HandlerType interface{}
	Methods     []MethodDesc
	Streams     []StreamDesc
	Metadata    interface{}
}



RegisterServiceメソッドはその名前の通り、渡されたサービスをregisterメソッドによりServer構造体のservicesフィールドにその内容が代入することでサービスをgRPCサーバーに登録します。

server.go

// RegisterService registers a service and its implementation to the gRPC
// server. It is called from the IDL generated code. This must be called before
// invoking Serve. If ss is non-nil (for legacy code), its type is checked to
// ensure it implements sd.HandlerType.
func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) {
	if ss != nil {
		ht := reflect.TypeOf(sd.HandlerType).Elem()
		st := reflect.TypeOf(ss)
		if !st.Implements(ht) {
			logger.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht)
		}
	}
	s.register(sd, ss)
}

server.go

func (s *Server) register(sd *ServiceDesc, ss interface{}) {
	s.mu.Lock()
	defer s.mu.Unlock()
	s.printf("RegisterService(%q)", sd.ServiceName)
	if s.serve {
		logger.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName)
	}
	if _, ok := s.services[sd.ServiceName]; ok {
		logger.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)
	}
	info := &serviceInfo{
		serviceImpl: ss,
		methods:     make(map[string]*MethodDesc),
		streams:     make(map[string]*StreamDesc),
		mdata:       sd.Metadata,
	}
	for i := range sd.Methods {
		d := &sd.Methods[i]
		info.methods[d.MethodName] = d
	}
	for i := range sd.Streams {
		d := &sd.Streams[i]
		info.streams[d.StreamName] = d
	}
	s.services[sd.ServiceName] = info
}

サービスを登録したサーバーを起動

greeter_server/main.go

log.Printf("server listening at %v", lis.Addr())
if err := s.Serve(lis); err != nil {
		log.Fatalf("failed to serve: %v", err)
	}

取得しておいたリスナーのアドレスをログで出力した後に、そのリスナーを引数にServerのServeメソッドを呼び出しています。

server.go

// Serve accepts incoming connections on the listener lis, creating a new
// ServerTransport and service goroutine for each. The service goroutines
// read gRPC requests and then call the registered handlers to reply to them.
// Serve returns when lis.Accept fails with fatal errors.  lis will be closed when
// this method returns.
// Serve will return a non-nil error unless Stop or GracefulStop is called.
func (s *Server) Serve(lis net.Listener) error {
	s.mu.Lock()
	s.printf("serving")
	s.serve = true
	if s.lis == nil {
		// Serve called after Stop or GracefulStop.
		s.mu.Unlock()
		lis.Close()
		return ErrServerStopped
	}

	s.serveWG.Add(1)
	defer func() {
		s.serveWG.Done()
		if s.quit.HasFired() {
			// Stop or GracefulStop called; block until done and return nil.
			<-s.done.Done()
		}
	}()

	ls := &listenSocket{Listener: lis}
	s.lis[ls] = true

	defer func() {
		s.mu.Lock()
		if s.lis != nil && s.lis[ls] {
			ls.Close()
			delete(s.lis, ls)
		}
		s.mu.Unlock()
	}()

	var err error
	ls.channelzID, err = channelz.RegisterListenSocket(ls, s.channelzID, lis.Addr().String())
	if err != nil {
		s.mu.Unlock()
		return err
	}
	s.mu.Unlock()
	channelz.Info(logger, ls.channelzID, "ListenSocket created")

	var tempDelay time.Duration // how long to sleep on accept failure
	for {
		rawConn, err := lis.Accept()
		if err != nil {
			if ne, ok := err.(interface {
				Temporary() bool
			}); ok && ne.Temporary() {
				if tempDelay == 0 {
					tempDelay = 5 * time.Millisecond
				} else {
					tempDelay *= 2
				}
				if max := 1 * time.Second; tempDelay > max {
					tempDelay = max
				}
				s.mu.Lock()
				s.printf("Accept error: %v; retrying in %v", err, tempDelay)
				s.mu.Unlock()
				timer := time.NewTimer(tempDelay)
				select {
				case <-timer.C:
				case <-s.quit.Done():
					timer.Stop()
					return nil
				}
				continue
			}
			s.mu.Lock()
			s.printf("done serving; Accept = %v", err)
			s.mu.Unlock()

			if s.quit.HasFired() {
				return nil
			}
			return err
		}
		tempDelay = 0
		// Start a new goroutine to deal with rawConn so we don't stall this Accept
		// loop goroutine.
		//
		// Make sure we account for the goroutine so GracefulStop doesn't nil out
		// s.conns before this conn can be added.
		s.serveWG.Add(1)
		go func() {
			s.handleRawConn(lis.Addr().String(), rawConn)
			s.serveWG.Done()
		}()
	}
}

Serveメソッドは上記のように少し長めのコードが書かれていますが、処理のメインとなっているのはfor文の部分の処理です。

lis.Accept()でリスナーからの接続を受け付け、それぞれに新しいゴルーチンを生成します。ゴルーチンはhandleRawConnメソッド内の処理でgRPCリクエストを読み込み、今回であればRegisterGreeterServerによって登録されたサービスを呼び出します。

これでgreeter_server/main.goにオーバーライドしたSayHelloとSayHelloAgainのエンドポイントに対してクライアント側からリクエストに対して応答ができるサーバーを立ち上げることができました。

おわりに

さてgRPCの概要を説明しつつ、公式クイックスタートの内容を元にGoでgRPCサーバーを立ち上げるまでに裏でどのような処理が行われているか順を追って確認してきました。

今回はクイックスタートに沿ってUnary RPC方式の通信を実装しましたが、前述の通りgRPCの通信方式はこれだけではありません。gRPCには今回の記事では紹介しきれないほど魅力はまだまだあります。

また、この記事で深堀りをした内容はすべてgrpc-goリポジトリ内のコードに書かれている内容です。パッケージのコードリーディングは、ググってもなかなか出てこない情報を取ってくることができ、自分としてもとても勉強になりました。

今回はサーバー側の内容だけを深堀って見ていきましたが、興味がある方はぜひクライアント側のコードも覗いてみてください。

採用情報

divxでは一緒に働ける仲間を募集しています。
興味があるかたはぜひ採用ページを御覧ください。

  採用情報 | 株式会社divx(ディブエックス) 可能性を広げる転職を。DIVXの採用情報ページです。 株式会社divx(ディブエックス)

参考文献

https://grpc.io/docs/languages/go/quickstart/
https://github.com/grpc/grpc-go
https://cloud.google.com/api-gateway/docs/grpc-overview?hl=ja
https://developers.google.com/protocol-buffers/docs/overview

お気軽にご相談ください


ご不明な点はお気軽に
お問い合わせください

サービス資料や
お役立ち資料はこちら

DIVXメディアカテゴリー

テックブログ タグ一覧

採用ブログ タグ一覧

人気記事ランキング

GoTopイメージ