Streaming with gRPC on .NET

rpc GetWeatherForecastForDate (google.protobuf.Timestamp) returns (WeatherForecastReply);
rpc GetWeatherForecastStream (google.protobuf.Empty) returns (stream WeatherForecast);
rpc GetWeatherForecastDuplexStream (stream StreamMessage) returns (WeatherForecastReply);
rpc GetWeatherForecastDuplexStream (stream StreamMessage) returns (stream WeatherForecast);
syntax = "proto3";import "google/protobuf/timestamp.proto";
import "google/protobuf/empty.proto";
option csharp_namespace = "GrpcServiceDemo";package weatherForcast;// The weather forecast service definition.
service WeatherForcast {
// Get weather forecast: Unary RPCs
rpc GetWeatherForecast (google.protobuf.Empty) returns (WeatherForecastReply);
// Get weather forecast: Unary RPCs
rpc GetWeatherForecastForDate (google.protobuf.Timestamp) returns (WeatherForecastReply);
// Get weather forecast: Server Streaming RPCs
rpc GetWeatherForecastStream (google.protobuf.Empty) returns (stream WeatherForecast);
// Get weather forecast: Client Streaming RPCs
rpc GetWeatherForecastClientStream (stream StreamMessage) returns (WeatherForecastReply);
// Get weather forecast: Bidirectional streaming RPCs
rpc GetWeatherForecastDuplexStream (stream StreamMessage) returns (stream WeatherForecast);
}// The response message containing the weather information.
message WeatherForecastReply {
repeated WeatherForecast Result = 1;
}
message WeatherForecast {
google.protobuf.Timestamp Date = 1;

int32 TemperatureC = 2;

int32 TemperatureF = 3;

string Summary = 4;
}
message StreamMessage{
int32 index = 1;
}
using Google.Protobuf.WellKnownTypes;
using Grpc.Core;
namespace GrpcServiceDemo.Services
{
public interface IWeatherForecastService
{
Task<WeatherForecastReply> GetWeatherForecast(ServerCallContext context);
Task<WeatherForecastReply> GetWeatherForecastForDate(Timestamp date, ServerCallContext context);WeatherForecast GetWeatherForecast(int index);
}
}
using Grpc.Core;
using Google.Protobuf.WellKnownTypes;
namespace GrpcServiceDemo.Services
{
public class WeatherForecastService : IWeatherForecastService
{
private static readonly string[] Summaries = new[]
{
"Freezing", "Bracing", "Chilly", "Cool", "Mild", "Warm", "Balmy", "Hot", "Sweltering", "Scorching"
};
private readonly ILogger<WeatherForecastService> _logger;public WeatherForecastService(ILogger<WeatherForecastService> logger)
{
_logger = logger;
}
public Task<WeatherForecastReply> GetWeatherForecast(ServerCallContext context)
{
return Task.FromResult<WeatherForecastReply>(GetWeather());
}
public Task<WeatherForecastReply> GetWeatherForecastForDate(Timestamp date, ServerCallContext context)
{
return Task.FromResult<WeatherForecastReply>(GetWeather(date));
}
public WeatherForecast GetWeatherForecast(int index)
{
return GetWeather(index);
}
private WeatherForecastReply GetWeather()
{
var result = new WeatherForecastReply();
for (var index = 1; index <= 5; index++)
{
result.Result.Add(
GetWeather(index)
);
}
return result;
}
private static WeatherForecast GetWeather(int index)
{
return new WeatherForecast
{
Date = Timestamp.FromDateTime(DateTime.UtcNow.AddDays(index)),
TemperatureC = Random.Shared.Next(-20, 55),
Summary = Summaries[Random.Shared.Next(Summaries.Length)],
TemperatureF = (int)(32 + (Random.Shared.Next(-20, 55) / 0.5556))
};
}
private WeatherForecastReply GetWeather(Timestamp date)
{
var result = new WeatherForecastReply();
result.Result.Add(
new WeatherForecast
{
Date = date,
TemperatureC = Random.Shared.Next(-20, 55),
Summary = Summaries[Random.Shared.Next(Summaries.Length)],
TemperatureF = (int)(32 + (Random.Shared.Next(-20, 55) / 0.5556))
}
);
return result;
}
}
}
public override async Task GetWeatherForecastStream(Empty request, IServerStreamWriter<WeatherForecast> responseStream, ServerCallContext context)
{
var i = 0;
while(!context.CancellationToken.IsCancellationRequested && i <50)
{
await Task.Delay(1000);
await responseStream.WriteAsync(_weatherForecastService.GetWeatherForecast(i));
i++;
}
}
public override async Task<WeatherForecastReply> GetWeatherForecastClientStream(IAsyncStreamReader<StreamMessage> requestStream, ServerCallContext context)
{
var response = new WeatherForecastReply();
while (await requestStream.MoveNext() && !context.CancellationToken.IsCancellationRequested)
{
var i = requestStream.Current.Index;
response.Result.Add(_weatherForecastService.GetWeatherForecast(i));
}
return await Task.FromResult<WeatherForecastReply>(response);
}
public override async Task GetWeatherForecastDuplexStream(IAsyncStreamReader<StreamMessage> requestStream, IServerStreamWriter<WeatherForecast> responseStream, ServerCallContext context)
{
while (await requestStream.MoveNext() && !context.CancellationToken.IsCancellationRequested)
{
var i = requestStream.Current.Index;
await Task.Delay(1000);
await responseStream.WriteAsync(_weatherForecastService.GetWeatherForecast(i));
}
}
try
{
var cancellationToken = new CancellationTokenSource(TimeSpan.FromSeconds(10));
using var streamingCall = weatherClient.GetWeatherForecastStream(new Empty(), cancellationToken: cancellationToken.Token);
await foreach (var weatherData in streamingCall.ResponseStream.ReadAllAsync(cancellationToken: cancellationToken.Token))
{
Console.WriteLine(weatherData);
}

Console.WriteLine("Stream completed.");
}
catch (RpcException ex) when (ex.StatusCode == StatusCode.Cancelled)
{
Console.WriteLine("Stream cancelled.");
}
try
{
var cancellationToken = new CancellationTokenSource(TimeSpan.FromSeconds(10));
using AsyncClientStreamingCall<StreamMessage, WeatherForecastReply> clientStreamingCall = weatherClient.GetWeatherForecastClientStream(cancellationToken: cancellationToken.Token);
var i = 0;
while (true)
{
if (i >= 10)
{
await clientStreamingCall.RequestStream.CompleteAsync();
Console.WriteLine("Client Streaming completed.");
break;
}
else
{
//write to stream
await clientStreamingCall.RequestStream.WriteAsync(new StreamMessage { Index = i });
i++;
}
}
var response = await clientStreamingCall;
Console.WriteLine(response.Result);
}
catch (RpcException ex) when (ex.StatusCode == StatusCode.Cancelled)
{
Console.WriteLine("Stream cancelled.");
}
try
{
var cancellationToken = new CancellationTokenSource(TimeSpan.FromSeconds(10));
using AsyncDuplexStreamingCall<StreamMessage, WeatherForecast> duplexStreamingCall = weatherClient.GetWeatherForecastDuplexStream(cancellationToken: cancellationToken.Token);
var i = 0;
Task task = Task.WhenAll(new[]
{
Task.Run(async () =>{
while (true)
{
if (i >= 10)
{
await duplexStreamingCall.RequestStream.CompleteAsync();
Console.WriteLine("Client Streaming completed.");
break;
}
else
{
//write to stream
await duplexStreamingCall.RequestStream.WriteAsync(new StreamMessage { Index = i });
i++;
}
}
}),
Task.Run(async () =>{
//read from stream
while (!cancellationToken.IsCancellationRequested && await duplexStreamingCall.ResponseStream.MoveNext())
{
Console.WriteLine(duplexStreamingCall.ResponseStream.Current);
}
})
});
try
{
task.Wait(cancellationToken.Token);
}
catch (OperationCanceledException e)
{
await duplexStreamingCall.RequestStream.CompleteAsync();
Thread.Sleep(6000);
}
}
catch (RpcException ex) when (ex.StatusCode == StatusCode.Cancelled)
{
Console.WriteLine("Stream cancelled.");
}

--

--

--

Passionate about Software designing & development and Learning Technologies and Love to share what I learn.

Love podcasts or audiobooks? Learn on the go with our new app.

Recommended from Medium

Tips to Make Mobile App Development Outsourcing Easier

QNAP Using FFmpeg for AC/3 in TV Recordings Conversion

CS 373 Sprint 2021 Week 6: Joseph Graham

WINLAMBO Jackpots Explainer

Trello + Google Sheets: Dynamic Values on Your Cards

Crud operation with REST API using JAX-RS

Hydraverse Announcing Videos Creator Contest on Facebook, Twitter and YouTube

Write Your Own Filters and Functions in Twig

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Binod Mahto

Binod Mahto

Passionate about Software designing & development and Learning Technologies and Love to share what I learn.

More from Medium

gRPC on .NET

Easy Modular Monolith — Part 6 — Synchronous communication between modules

Multiple Request/Response examples for Swagger UI in ASP.NET core

Option patterns with custom configuration provider in .NET