protobuf.js version: 6.7.1
I'm trying to implement streaming RPC function, to create services. But looking at the streaming example, I don't understand how I can emit these events:
greeter.on("data", function(response, method) { // data function
console.log("data in " + method.name + ":", response.message);
});
greeter.on("end", function() { // end function
console.log("end");
});
greeter.on("error", function(err, method) { // error function
console.log("error in " + method.name + ":", err);
});
greeter.on("status", function(code, text) { // status function
console.log("custom status:", code, text);
});
from the rpcImpl function:
// I only have a callback function. It's OK for unary method, but how to use for streaming method (data, end, error, status events)
var greeter = Greeter.create(function myRPCImpl(method, requestData, callback) {
responseData = fetch();
callback(null, responseData);
};
});
My guess is:
callback(null, data).callback(null, null).callback(error).If that's correct, does it also mean that rpcImpl will automatically fire events when receiving a streaming method, and fire a promise when receiving a unary method? Or fire both whatever the method?
Thank you for the clarification!
rpcImpl is just the low level method that implements a service request on the network level. It doesn't emit any events but solely uses a callback.
greeter is an instances of rpc.Service here (uses rpcImpl and extends an event emitter). That's where the events come from. MyService.create instantiates an rpc.Service.
The status event is a custom event present in this specific example only, to, well, demonstrate that one can also emit own events.
I just came here to create a similar issue (hi @dcodeIO 馃憢 馃槃)
I'm also confused by the streaming API. Shouldn't the rpc.Service do something different for streaming methods versus regular ones? Looks to me that a service can only have one method that is streaming and you can call the method only once before having to create a new rpc.Service instance.
Also I don't think the current implementation will work at all with promises, since once resolved the resolve callback is a no-op.
Something like this would make more sense to me:
let stream = myService.streamingMethod()
stream.on('data', (obj) => {
console.log(obj) // decoded message
})
stream.on('end', () => console.log('ended'))
stream.on('error', (error) => throw error)
Could be done with a simple pseudo-stream interface (like a event emitter with write/end methods) or by using the readable-stream package which is fairly lightweight and very well tested.
Nice to meet you again ;)
I'm also confused by the streaming API. Shouldn't the rpc.Service do something different for streaming methods versus regular ones? Looks to me that a service can only have one method that is streaming and you can call the method only once before having to create a new rpc.Service instance.
Individual methods do not return a stream, but the service instance's data event also references the method a message belongs to:
greeter.on("data", function(response, method) {
console.log("data in " + method.name + ":", response.message);
});
Likewise, rpcImpl also knows which method a request is coming from: function myRPCImpl(method, requestData, callback).
This is what happens:
rpcImpl is called with method and the request message and sends it to the server (can be anything, but most likely over a websocket or something like that)rpcImpl receives the response from the server and calls its callbackdata event with method and response messageAdditionally, when rpcImpl's callback is called with a null message, the service instance assumes that the stream has ended and emits the end event.
This should also work for multiple methods. I must admit, though, that the current implementation is built around the desire of not adding another dependency besides a minimal event emitter and it might not be the greatest API of all time to work with.
Edit: I believe that ideally, protobuf.js itself wouldn't offer a service wrapper on its own. An additional package doing solely services might be better suited, and could contain more dependencies because it wouldn't affect the size of the core library.
Hmm, I still don't see how the API allows for a proper implementation of streams. Take this example:
service ImageService {
rpc GetImage (ImageRequest) returns (stream ImageChunk) {}
}
message ImageRequest {
string name = 1;
}
message ImageChunk {
bytes data = 1;
}
let service = ImageService(function myRPCImpl(method, request, callback) {
let image = sometransport.createReadStream(request.name)
image.on('data', (chunk) => { callback(null, {chunk}) })
image.on('end', () => { callback(null) })
image.on('error', (error) => { callback(error) })
})
let data = await myService.getImage({name: 'bar.jpg'}) // here I have only the first chunk
myService.on('data', (chunk, method) => {
// here I don't know which image I asked so if I sent multiple getImage requests the chunks would be mixed up
})
// this would throw since myService.rpcImpl is set to null after a stream has finished
// but imagining I create a new service instance for every call I want to make this works:
myService.getImage({name: 'bar.jpg'}, (error, chunk) => {
// even though calling a callback multiple times is considered bad practice
})
IMHO the rpc.Service class needs a bit of a refactor before working with streams become possible. Unless I'm completely misunderstanding how streaming services should work :)
so if I sent multiple getImage requests the chunks would be mixed up
Ah, you are right, parallel requests using the same method don't work with the current design.
If you're interested I'd be happy to send a PR with a proposal for a better streaming API
this makes sense to me as well:
let stream = myService.streamingMethod(request)
stream.on('data', (obj) => {
console.log(obj) // decoded message
})
stream.on('end', () => console.log('ended'))
stream.on('error', (error) => throw error)
I'd be happy to test/review any proposal :+1:
@jnordberg Did you come up with a solution ?
@sulliwane nope, just using non-streaming services for now...
Would be nice to have streaming working for rpc.Service :)
Especially since protobuf.js API is so much nicer to work with than google-protobuf
any chance to get this going again?
Hi, is anyone working on this issue? If not I could work on this. Let me know since I need server streaming support. Current RPCImpl is suitable only for simple unary calls. It is even more clear when using with Typescript
Example:
proto
service Debug {
rpc GetServerStream(StreamReques) returns (stream StreamReply) {}
}
typescript generated method
public getServerStream(request: debug.IStreamRequest): Promise<debug.IStreamReply>;
Instead it should return Observable object or EventEmitter as @sulliwane pointed out.
interface Stream {
on(eventName: 'data' | 'end' | 'error', cb: (data) => void): void;
}
public getServerStream(request: debug.IStreamRequest): Stream<debug.IStreamReply>;
Also we need separate RCPStreaImp
const rcp: RPCStreamImpl = (method, requestData, stream) => {
underlingStream.start(method.name, requestData);
underlingStream.on('data', (data) => stream.emit('data', data));
underlingStream.on('end', () => stream.emit('end'));
underlingStream.on('error', (err) => stream.emit('error', err));
};
PS.
RPCImpl doesn't allow to get service and pacakge name. I'm also thinking to change method to return name, service and package names.
Hi, here is PR for streaming support, let me know what do you think https://github.com/dcodeIO/protobuf.js/pull/1115
Since I don't know when https://github.com/protobufjs/protobuf.js/pull/1115 will be accepted, here two workarounds, assuming you generated a protobufjs static file and wish to use a bidirectional stream:
Variant 1: inject custom RPC method definition into makeGenericClientConstructor, bidi stream
const grpc = require('grpc');
const messages = require('./pbjs-static.js').my.package;
const MyService = exports.MyService = {
myCall: {
path: '/my.package/myCall',
requestStream: true,
responseStream: true,
requestType: messages.MyCallRequest,
responseType: messages.MyCallResponse,
requestSerialize: arg => messages.MyCallRequest.encode(arg).finish(),
responseDeserialize: arg => messages.MyCallResponse.decode(arg)
}
};
const Client = grpc.makeGenericClientConstructor(MyService)
const client = new Client(
grpcServerUrl,
grpc.credentials.createInsecure()
)
// create bidi-stream
var stream = client.myCall();
// listen on incoming data (decoded via protobufjs)
stream.on('data', function(response){
})
// send data (json is encoded via protobufjs)
stream.write({my: "message"});
Variant 2: directly create a bidi stream from makeBidiStreamRequest
const grpc = require('grpc');
const messages = require('./pbjs-static.js').my.package;
const Client = grpc.makeGenericClientConstructor()
const client = new Client(
grpcServerUrl,
grpc.credentials.createInsecure()
)
var stream = client.makeBidiStreamRequest(
'/my.package/myCall',
arg => messages.MyCallRequest.encode(arg).finish(),
arg => messages.MyCallResponse.decode(arg)
);
// listen on incoming data (decoded via protobufjs)
stream.on('data', function(response){
})
// send data (json is encoded via protobufjs)
stream.write({my: "message"});
Most helpful comment
I just came here to create a similar issue (hi @dcodeIO 馃憢 馃槃)
I'm also confused by the streaming API. Shouldn't the rpc.Service do something different for streaming methods versus regular ones? Looks to me that a service can only have one method that is streaming and you can call the method only once before having to create a new rpc.Service instance.
Also I don't think the current implementation will work at all with promises, since once resolved the resolve callback is a no-op.
Something like this would make more sense to me:
Could be done with a simple pseudo-stream interface (like a event emitter with write/end methods) or by using the readable-stream package which is fairly lightweight and very well tested.