gRPC streaming API

Hello , Can we get a better example for using gRPC streaming api without using sleep function in it ?
I would like the VU iteration to complete as soon the sever finishing the stream , Using Promise doesnt seem to work.

Referring to the example given in grpc-server-streaming.js

HI @rahulgouthamgs!

As I see, you can ommit the sleep, because the iteration will wait the end of the grpc stream, and completes as soon the server finishes the stream.

2 Likes

Hi @bandorko , I see the iteration terminates and cancels the stream if we remove the sleep. Would you be able to confirm this ?

I got some more time yesterday and had to return a promise to make it work which would resolve/reject based on the stream event handler callbacks. Also i had to switch to using default async function to be able to use the async/await pattern waiting for the stream results.

For me, if I remove the sleep, the iteration terminates just after the server sent all the messages it wanted.

@rahulgouthamgs :

This is my proto file:

syntax = "proto3";

package proto;
option go_package = "./;proto";

message GreetRequest {
    string name = 1;
}

message GreetResponse {
    string greeting = 1;
}

service GreetStreamingService {
    rpc GetGreetingStream(GreetRequest) returns (stream GreetResponse) {}
}

This is my test script.js:

import { Client, Stream } from 'k6/experimental/grpc';

const GRPC_ADDR = '127.0.0.1:10000';
const GRPC_PROTO_PATH = 'greetingStream.proto';

const client = new Client();
client.load([], GRPC_PROTO_PATH);


export default () => {
  if (__ITER == 0) {
    client.connect(GRPC_ADDR, { plaintext: true });
  }
  const stream = new Stream(client, 'proto.GreetStreamingService/GetGreetingStream');

  stream.on('data', (resp) => {
    console.log('Greeting received, part:', resp.greeting);
  });

  stream.on('error', (err) => {
    console.log('Stream Error: ' + JSON.stringify(err));
  });

  stream.on('end', () => {
    client.close();
    console.log('All done');
  });

  stream.write({name:"Balazs"});
  stream.end();
};

this is my streaming server:

package main

import (
	"fmt"
	"log"
	"net"
	"time"

	pb "grpctest/pkg/proto"

	"google.golang.org/grpc"
)

type server struct{}

func (s server) GetGreetingStream(req *pb.GreetRequest, srv pb.GreetStreamingService_GetGreetingStreamServer) error {
	log.Println("Get greetings")

	for i := 0; i < 10; i++ {
		resp := pb.GreetResponse{
			Greeting: fmt.Sprintf("Hello %s! %d.", req.Name, i),
		}

		if err := srv.Send(&resp); err != nil {
			log.Println("error sending response")
			return err
		}
		time.Sleep(1 * time.Second)
		fmt.Println("greetings sent")
	}

	return nil
}

func main() {
	listener, err := net.Listen("tcp", "localhost:10000")

	if err != nil {
		panic("error building server: " + err.Error())
	}

	s := grpc.NewServer()
	pb.RegisterGreetStreamingServiceServer(s, server{})

	log.Println("start server")

	if err := s.Serve(listener); err != nil {
		panic("error building server: " + err.Error())
	}

}

And this is the ouptut:
https://asciinema.org/a/cCzl9xs1tSw1NUnu0X7CI52CP

hi @bandorko , in my case since i abstract out all the stream calls into different method , my default main method look pretty simple and i have wait for stream end event to trigger before returning the final result.

It looks something like this


connected = false
client = null
export default async function (data) {
let input = "some random input";
let response = await grpc_streaming_call(input);
}

grpc_streaming_call(input) {
 let output = []
 if (!connected)  {
   client.connect(url,param)
 }
 return new Promise((resolve, reject) => {
  const stream = new Stream(client, uri, params);
  stream.on('data', (resp) => {
   output.push(resp);
  });
  stream.on('error', (err) => {
    console.log('Stream Error: ' + JSON.stringify(err));
    reject(err)
  });
  stream.on('end', () => {
   resolve(result);
  });
  stream.write(input);
  stream.end();
 });
}

This helps if i have to do additional calls based on the result from grpc response , @bandorko do you think this is the way to go or any other ways to do this better ?