visit
In this example, the messages streamed by the client are named as
InputValues
containing the name of the item, price, and quantity. The server responds with a message named Bill
containing the type of the bill (final or interim bill), the total quantity of items ordered, and the total price.This application would have two modes of operation or remote procedure calls, which can be described as follows:quickBilling
— This is an example of the gRPC client streaming scenario. In this case, the server waits until the client has streamed all the InputValues
to be included in the bill. Once the client indicates that the stream is completed, one final Bill
is sent by the server.oneByOneBilling
— This is an example of the gRPC bi-directional streaming scenario. For each InputValue
streamed by the client, the server replies back with the interim Bill
(the total calculated so far). Similar to quickBilling
, the final Bill
is sent once the client indicates the stream’s completion.// Create a new Ballerina project
$ ballerina new grpc-streaming-example
$ cd grpc-streaming-example
//Add server module to the project
$ ballerina add server
// Add client module to the project
$ ballerina add client
// streaming-stub.proto
syntax="proto3";
package service;
service BillingServer {
rpc quickBilling(stream InputValue) returns (Bill);
rpc oneByOneBilling(stream InputValue) returns (stream Bill);
}
message InputValue {
string itemName = 1;
int64 quantity = 2;
float price = 3;
}
message Bill {
string billType = 1;
int64 totalQuantity = 2;
float totalPrice = 3;
}
$ ballerina grpc --input streaming-stub.proto --output .
// Copy the generated file to the client module directory
$ cp service/stub_pb.bal <PROJECT_DIRECTORY>/src/client
import ballerina/grpc;
public type BillingServerClient client object {
*grpc:AbstractClientEndpoint;
private grpc:Client grpcClient;
public function init(string url, grpc:ClientConfiguration? config = ()) {
// initialize client endpoint.
self.grpcClient = new(url, config);
checkpanic self.grpcClient.initStub(self, "non-blocking", ROOT_DESCRIPTOR, getDescriptorMap());
}
public remote function quickBilling(service msgListener, grpc:Headers? headers = ()) returns (grpc:StreamingClient|grpc:Error) {
return self.grpcClient->streamingExecute("service.BillingServer/quickBilling", msgListener, headers);
}
public remote function oneByOneBilling(service msgListener, grpc:Headers? headers = ()) returns (grpc:StreamingClient|grpc:Error) {
return self.grpcClient->streamingExecute("service.BillingServer/oneByOneBilling", msgListener, headers);
}
};
public type InputValue record {|
string itemName = "";
int quantity = 0;
float price = 0.0;
|};
public type Bill record {|
string billType = "";
int totalQuantity = 0;
float totalPrice = 0.0;
|};
const string ROOT_DESCRIPTOR = "0A16D696E672D737475622E70726F746F6696365225A0A0A496E70757456616C7565121A0A086974656D4E616D65952086974656D4E616D65121A0A087175616E74697479352087175616E7469747912140A05707269636525205707269636522680A0442696C6C121A0A0862696C6C547970659520862696C6C5479706512240A0D746F74616C5175616E746974793520D746F74616C5175616E74697479121E0A0A746F74616C50726963652520A746F74616C50726963653280010A0D42696C6C696E6753657276657212340A0C717569636B42696C6C696E6712132E736572766963652E496E70757456616C75651A0D2E736572766963652E42696C6C280112390A0F6F6E6542794F6E6542696C6C696E6712132E736572766963652E496E70757456616C75651A0D2E736572766963652E42696C6C280726F746F33";
function getDescriptorMap() returns map<string> {
return {
"streaming-stub.proto":"0A16D696E672D737475622E70726F746F6696365225A0A0A496E70757456616C7565121A0A086974656D4E616D65952086974656D4E616D65121A0A087175616E74697479352087175616E7469747912140A05707269636525205707269636522680A0442696C6C121A0A0862696C6C547970659520862696C6C5479706512240A0D746F74616C5175616E746974793520D746F74616C5175616E74697479121E0A0A746F74616C50726963652520A746F74616C50726963653280010A0D42696C6C696E6753657276657212340A0C717569636B42696C6C696E6712132E736572766963652E496E70757456616C75651A0D2E736572766963652E42696C6C280112390A0F6F6E6542794F6E6542696C6C696E6712132E736572766963652E496E70757456616C75651A0D2E736572766963652E42696C6C280726F746F33"
};
}
$ ballerina grpc --input streaming-stub.proto --mode service --output .
// Copy the generated file to the server module directory
$ cp service/BillingServer_sample_service.bal <PROJECT_DIRECTORY>/src/server
import ballerina/grpc;
listener grpc:Listener ep = new (9090);
service BillingServer on ep {
resource function quickBilling(grpc:Caller caller, stream<InputValue,error> clientStream) {
// Implementation goes here.
// You should return a Bill
}
resource function oneByOneBilling(grpc:Caller caller, stream<InputValue,error> clientStream) {
// Implementation goes here.
// You should return a Bill
}
}
public type InputValue record {|
string itemName = "";
int quantity = 0;
float price = 0.0;
|};
public type Bill record {|
string billType = "";
int totalQuantity = 0;
float totalPrice = 0.0;
|};
const string ROOT_DESCRIPTOR = "0A0A737475622E70726F746F6696365225A0A0A496E70757456616C7565121A0A086974656D4E616D65952086974656D4E616D65121A0A087175616E74697479352087175616E7469747912140A05707269636525205707269636522680A0442696C6C121A0A0862696C6C547970659520862696C6C5479706512240A0D746F74616C5175616E746974793520D746F74616C5175616E74697479121E0A0A746F74616C50726963652520A746F74616C50726963653280010A0D42696C6C696E6753657276657212340A0C717569636B42696C6C696E6712132E736572766963652E496E70757456616C75651A0D2E736572766963652E42696C6C280112390A0F6F6E6542794F6E6542696C6C696E6712132E736572766963652E496E70757456616C75651A0D2E736572766963652E42696C6C280726F746F33";
function getDescriptorMap() returns map<string> {
return {
"stub.proto":"0A0A737475622E70726F746F6696365225A0A0A496E70757456616C7565121A0A086974656D4E616D65952086974656D4E616D65121A0A087175616E74697479352087175616E7469747912140A05707269636525205707269636522680A0442696C6C121A0A0862696C6C547970659520862696C6C5479706512240A0D746F74616C5175616E746974793520D746F74616C5175616E74697479121E0A0A746F74616C50726963652520A746F74616C50726963653280010A0D42696C6C696E6753657276657212340A0C717569636B42696C6C696E6712132E736572766963652E496E70757456616C75651A0D2E736572766963652E42696C6C280112390A0F6F6E6542794F6E6542696C6C696E6712132E736572766963652E496E70757456616C75651A0D2E736572766963652E42696C6C280726F746F33"
};
}
$ ballerina grpc --input streaming-stub.proto --mode client --output .
// Copy the generated file to the client module directory located at
$ cp service/BillingServer_sample_client.bal <PROJECT_DIRECTORY>/src/client
The resource function for
quickBilling
(client streaming) can be defined as follows.// Make the edit in the server boilerplate code BillingServer_sample_service.bal
resource function quickBilling(grpc:Caller caller, stream<InputValue,error> clientStream) {
int totalNumber = 0;
float totalBill = 0;
log:printInfo("Starting Quick Billing Service");
//Iterating through streamed messages here
error? e = clientStream.forEach(function(InputValue value) {
log:printInfo("Item:" + value.itemName + " Quantity:" +
value.quantity.toString() + " Price:" + value.price.toString());
totalNumber += value.quantity;
totalBill += (value.quantity * value.price);
});
//Once the client completes stream, a grpc:EOS error is returned to indicate it
if (e is grpc:EOS) {
Bill finalBill = {
billType: "Final Bill",
totalQuantity: totalNumber,
totalPrice: totalBill
};
//Sending the total bill to the client
grpc:Error? result = caller->send(finalBill);
if (result is grpc:Error) {
log:printError("Error occured when sending the bill: " +
result.message() + " - " + <string>result.detail()["message"]);
} else {
log:printInfo ("Sending Final Bill Total: " + finalBill.totalPrice.toString() +
" for " + finalBill.totalQuantity.toString() + " items");
}
result = caller->complete();
if (result is grpc:Error) {
log:printError("Error occured when closing the connection: " + result.message() +
" - " + <string>result.detail()["message"]);
}
}
//If the client sends an error instead it can be handled here
else if (e is grpc:Error) {
log:printError("An unexpected error occured: " + e.message() + " - " + <string>e.detail()["message"]);
}
}
The client’s streamed messages are made available as a stream object argument, which can be iterated through using a loop processing each message sent by the client. Once the client stream has completed, a
grpc:EOS
error is returned, which can be used to identify when to send the final response message (the final bill) to the client using the caller object.The resource function
oneByOneBilling
(bi-directional streaming) can be defined as follows.// Make the edit to the server code BillingServer_sample_service.bal
resource function oneByOneBilling(grpc:Caller caller, stream<InputValue,error> clientStream) {
int totalNumber = 0;
float totalBill = 0;
log:printInfo("Starting One by One Billing Service");
//Iterating through streamed messages here
error? e = clientStream.forEach(function(InputValue value) {
log:printInfo("Item:" + value.itemName + " Quantity:" + value.quantity.toString() +
" Price:" + value.price.toString());
totalNumber += value.quantity;
totalBill += (value.quantity * value.price);
Bill tempBill = {
billType: "Interim Bill",
totalQuantity: totalNumber,
totalPrice: totalBill
};
//Sending the interim bill to the client
grpc:Error? result = caller->send(tempBill);
if (result is grpc:Error) {
log:printError("Error occured when sending the bill: " + result.message() +
" - " + <string>result.detail()["message"]);
} else {
log:printInfo ("Sending Interim Bill Total: " + tempBill.totalPrice.toString() +
" for " + tempBill.totalQuantity.toString() + " items");
}
});
//Once the client completes stream, a grpc:EOS error is returned to indicate it
if (e is grpc:EOS) {
Bill finalBill = {
billType: "Final Bill",
totalQuantity: totalNumber,
totalPrice: totalBill
};
//Sending the total bill to the client
grpc:Error? result = caller->send(finalBill);
if (result is grpc:Error) {
log:printError("Error occured when sending the bill: " + result.message() +
" - " + <string>result.detail()["message"]);
} else {
log:printInfo ("Sending Final Bill Total: " + finalBill.totalPrice.toString() +
" for " + finalBill.totalQuantity.toString() + " items");
}
result = caller->complete();
if (result is grpc:Error) {
log:printError("Error occured when closing the connection: " + result.message() +
" - " + <string>result.detail()["message"]);
}
}
//If the client sends an error instead it can be handled here
else if (e is grpc:Error) {
log:printError("An unexpected error occured: " + e.message() + " - " + <string>e.detail()["message"]);
}
}
The code above is similar to the
quickBilling
resource function code. The only difference here is that, as this is a bi-directional streaming resource, for each message that is processed, the interim bill value is sent back to the client using the caller object.import ballerina/grpc;
import ballerina/io;
string opMode = "";
public function main (string mode) {
BillingServerClient ep = new("//localhost:9090");
grpc:StreamingClient | grpc:Error streamClient;
if (mode == "quick") {
opMode = "Quick";
// Initialize call with quickBilling resource
streamClient = ep->quickBilling(BillingServerMessageListener);
} else if (mode == "oneByOne") {
opMode = "One by one";
// Initialize call with oneByOneBilling resource
streamClient = ep->oneByOneBilling(BillingServerMessageListener);
} else {
io:println("Unsupported operation mode entered!");
return;
}
if (streamClient is grpc:Error) {
io:println("Error from Connector: " + streamClient.message() + " - "
+ <string>streamClient.detail()["message"]);
return;
} else {
//Start sending messages to the server
io:println("Starting " + opMode + " billing service");
// Sending first message
InputValue item = {
itemName: "Apples",
quantity: 4,
price: 30.50
};
grpc:Error? connErr = streamClient->send(item);
if (connErr is grpc:Error) {
io:println("Error from Connector: " + connErr.message() + " - "
+ <string>connErr.detail()["message"]);
} else {
io:println("Sent item: " + item.itemName + " Quantity: " + item.quantity.toString() +
" Price: " + item.price.toString());
}
// Sending second message
item = {
itemName: "Oranges",
quantity: 6,
price: 43.4
};
connErr = streamClient->send(item);
if (connErr is grpc:Error) {
io:println("Error from Connector: " + connErr.message() + " - "
+ <string>connErr.detail()["message"]);
} else {
io:println("Sent item: " + item.itemName + " Quantity: " + item.quantity.toString() +
" Price: " + item.price.toString());
}
// Sending third message
item = {
itemName: "Grapes",
quantity: 20,
price: 11.5
};
connErr = streamClient->send(item);
if (connErr is grpc:Error) {
io:println("Error from Connector: " + connErr.message() + " - "
+ <string>connErr.detail()["message"]);
} else {
io:println("Sent item: " + item.itemName + " Quantity: " + item.quantity.toString() +
" Price: " + item.price.toString());
}
// Sending complete signal
connErr = streamClient->complete();
if (connErr is grpc:Error) {
io:println("Error from Connector: " + connErr.message() + " - "
+ <string>connErr.detail()["message"]);
}
}
}
// Message listener for incoming messages
service BillingServerMessageListener = service {
resource function onMessage(Bill message) {
io:println("Received " + message.billType + " Total: " + message.totalPrice.toString() +
" for " + message.totalQuantity.toString() + " items" );
}
resource function onError(error err) {
io:println("Error reported from server: " + err.message() + " - "
+ <string>err.detail()["message"]);
}
resource function onComplete() {
io:println(opMode +" billing completed");
}
};
When running the above client an argument is required, which indicates whether to use
quickBilling
or onByOneBilling
. Based on this, the correct call is initialized, returning a streaming client object. Using this object, three messages are streamed to the server, and finally the complete signal is sent to the server. The BillingServerMessageListener
, which was registered during the initialization phase, captures the messages and the complete signal sent by the server.$ ballerina run server
// Calling the quickBilling resource
$ ballerina run client quick
Starting Quick billing service
Sent item: Apples Quantity: 4 Price: 30.5
Sent item: Oranges Quantity: 6 Price: 43.4
Sent item: Grapes Quantity: 20 Price: 11.5
Received Final Bill Total: 612.4 for 30 items
Quick billing completed
// Calling the oneByOneBilling resource
$ ballerina run client oneByOne
Starting One by one billing service
Sent item: Apples Quantity: 4 Price: 30.5
Sent item: Oranges Quantity: 6 Price: 43.4
Sent item: Grapes Quantity: 20 Price: 11.5
Received Interim Bill Total: 122.0 for 4 items
Received Interim Bill Total: 382.4 for 10 items
Received Interim Bill Total: 612.4 for 30 items
Received Final Bill Total: 612.4 for 30 items
One by one billing completed