[capnproto] Re: Schemas on the wire Dynamic API

2023-02-27 Thread Dane
A follow up after simplifying the API to the core issue:

The data flow is now:
1. A client connects to a server, passing a Schema.Node as a parameter. 
2. A server loads the Schema.Node and creates a new message of the given 
schema, which is returned as an AnyStruct

Problem 1 above still exists.

We have figured out how to handle point 2, but not without a malloc. How 
can I init the result as a DynamicStruct, instead of copying as in the 
below snippet:
auto message = context.getParams();
auto schema_ = message.getSchema();
auto schema_loader = capnp::SchemaLoader();
auto loaded_schema = schema_loader.load(schema_);

// how to do below without malloc?

// cannot init result as a DynamicStruct
auto mb = capnp::MallocMessageBuilder();
auto msg = mb.initRoot(loaded_schema.asStruct());
msg.set("text", "abc");

auto results = context.initResults();
results.setValue(msg.asReader());


Would this work as above if we had nested structs? 

Aside: I am also trying to replicate this in pycapnp, which is proving 
deeply problematic since it skips out on a lot of ported functionality from 
capnp.

- Dane

On Monday, 27 February 2023 at 11:34:50 UTC+2 Dane wrote:

> Hi 
>
> I am trying to implement a dynamic API. I have followed the suggestions in 
> https://groups.google.com/g/capnproto/c/Ivl3u4JJoEQ/m/MIH2a2nPBQAJ and 
> https://groups.google.com/g/capnproto/c/fNnr3Om9haM/m/1TdBqwsNBwAJ
>
> My implementation attempt lies at 
> https://github.com/DaneSlattery/capnp_generic_poc .
>
> The desired data flow is as follows:
>
> 1. A client connects to a server, and receives a capability that allows 
> the client to load a schema (the InterfaceLoader). When connecting, the 
> client also passes a ClientInterface to the server.
> 2. The client registers a schema using the InterfaceLoader. The client 
> must send a Schema.Node parameter to the server. The server must load the 
> schema.
> 3. The server uses the given loaded schema to communicate with the client 
> interface, which takes a parameter of AnyStruct.
>
> I have a few problems:
>
> Firstly, when the client attempts to set the schema using  
> capnp::Schema::from().getProto() , line 683 of 
> capnproto-c++-0.10.3/src/capnp/dynamic.c++ blows up. I have changed this 
> line to, instead assessing the ID of the schema for equality :
> KJ_REQUIRE(structValue.getSchema().getProto().getId() == 
> structType.getProto().getId(), "Value type mismatch.")
> {
> return;
> }
> I think this change should perhaps go in a PR, unless there is another, 
> better way.
>
> Secondly, I can't figure out how to send messages of the loaded schema 
> back from the server to the client. Can anyone see if the POC linked is 
> viable, and how to get to the final step?
>
> - Dane
>

-- 
You received this message because you are subscribed to the Google Groups 
"Cap'n Proto" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to capnproto+unsubscr...@googlegroups.com.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/capnproto/568ea931-e1b3-4ed5-80d2-1074cf6cbce7n%40googlegroups.com.


[capnproto] Schemas on the wire Dynamic API

2023-02-27 Thread Dane
Hi 

I am trying to implement a dynamic API. I have followed the suggestions in 
https://groups.google.com/g/capnproto/c/Ivl3u4JJoEQ/m/MIH2a2nPBQAJ and 
https://groups.google.com/g/capnproto/c/fNnr3Om9haM/m/1TdBqwsNBwAJ

My implementation attempt lies at 
https://github.com/DaneSlattery/capnp_generic_poc .

The desired data flow is as follows:

1. A client connects to a server, and receives a capability that allows the 
client to load a schema (the InterfaceLoader). When connecting, the client 
also passes a ClientInterface to the server.
2. The client registers a schema using the InterfaceLoader. The client must 
send a Schema.Node parameter to the server. The server must load the schema.
3. The server uses the given loaded schema to communicate with the client 
interface, which takes a parameter of AnyStruct.

I have a few problems:

Firstly, when the client attempts to set the schema using  
capnp::Schema::from().getProto() , line 683 of 
capnproto-c++-0.10.3/src/capnp/dynamic.c++ blows up. I have changed this 
line to, instead assessing the ID of the schema for equality :
KJ_REQUIRE(structValue.getSchema().getProto().getId() == 
structType.getProto().getId(), "Value type mismatch.")
{
return;
}
I think this change should perhaps go in a PR, unless there is another, 
better way.

Secondly, I can't figure out how to send messages of the loaded schema back 
from the server to the client. Can anyone see if the POC linked is viable, 
and how to get to the final step?

- Dane

-- 
You received this message because you are subscribed to the Google Groups 
"Cap'n Proto" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to capnproto+unsubscr...@googlegroups.com.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/capnproto/4e26ccf6-67bf-43e5-8f31-f8b582a7c304n%40googlegroups.com.


Re: [capnproto] Unsubscribe in pub/sub model

2022-02-08 Thread Dane
Hi All,

Implementing a similar thing, but with an asynchronous client in python. I 
can't for the life of me get the client object to become derefenced. 
The asyncio tx/rx seems to get stuck trying to read/write to the 
now-missing publisher.

I think the async loop owns a reference to the subscriber object (as well 
as the publisher), but I need to know when the publisher releases the 
reference in order to stop the async loop. 

Any ideas? 

On Friday, 3 December 2021 at 20:02:18 UTC+2 ken...@cloudflare.com wrote:

> What I usually do is something like:
>
> auto impl = kj::heap();
> auto& ref = *impl;
> Subscriber::Client client = kj::mv(impl);
> rq.setSubscriber(client);
>
> Now you can keep a copy of `client` locally, and as long as it still 
> exists, then `ref` remains valid -- because `client` is itself a strong 
> reference to the object.
>
> Note this doesn't work if you need to get notification of when the 
> `SubscriberImpl` is dropped from the remote side, since holding a strong 
> ref locally prevents the destructor from being called. If this is an issue, 
> then you need to do something different. What I usually do here is store a 
> pointer somewhere, and then have the destructor null out that pointer. This 
> effectively implements a weak reference.
>
> -Kenton
>
> On Thu, Dec 2, 2021 at 2:22 PM Jens Alfke  wrote:
>
>> I'm also implementing pub-sub, so I was glad to see this thread before I 
>> wasted too much time. I'm implementing this pattern, but having trouble on 
>> the client side.
>>
>> In terms of the example interface, I've created my SubscriberImpl class, 
>> and written the code to send the "subscribe" message. Now I want to store 
>> the returned Subscription capability reference in my SubscriberImpl, so it 
>> can own it and drop it when it's done.
>>
>> However, I can't figure out how to keep a reference to the 
>> SubscriberImpl, since I have to move it to the Request object (calling 
>> setSubscriber) and afterwards it's gone, so I can't call it again.
>>
>> auto rq = remotePublisher.subscribeRequest();
>> auto impl = kj::heap();
>> rq.setSubscriber(std::move(impl));
>> auto promise = rq.send().then([&](auto response) {return 
>> response.getSubscription();});
>> // *somehow convey the promise to the SubscriberImpl...?*
>>
>> I'm sure this is just due to my incomplete understanding of how 
>> Promise/Client/Server objects work...
>> Thanks,
>>
>> --Jens
>>
>> On Monday, November 22, 2021 at 8:53:42 AM UTC-8 ken...@cloudflare.com 
>> wrote:
>>
>>> Hi Mitsuo,
>>>
>>> I recommend designing the interface like this:
>>>
>>> interface EventPublisher{
>>> interface Subscriber {
>>> updateEvent @0 (event: Int32) -> ();
>>> }
>>>
>>> interface Subscription {}
>>> subscribe @0 (subscriber: Subscriber) -> (result: Int32, 
>>> subscription: Subscription);
>>> # To unsubscribe, drop the returned `subscription`.
>>> }
>>>
>>>
>>> Here, subscribe() returns a `subscription` object. This object has no 
>>> methods. But, when the capability is dropped, then the destructor will run 
>>> on the server side. Atn that point, you can remove the subscription.
>>>
>>> A big advantage of this approach is that it handles connection failures 
>>> gracefully. If the client randomly disconnects, the `subscription` 
>>> capability is automatically dropped, thus unsubscribing the client. This 
>>> way you don't end up stuck sending messages to a disconnected subscriber.
>>>
>>> It also solves your problem because you can remember arbitrary metadata 
>>> about the subscriber within the `Subscription` object, so you know what to 
>>> remove when it's destroyed.
>>>
>>> -Kenton
>>>
>>> On Mon, Nov 22, 2021 at 12:40 AM mitsuo  
>>> wrote:
>>>
 Hi,

 I'm trying to implement pub/sub like communication model with the 
 following scheme.
 *pubsub.capnp*
 *interface EventPublisher{*
 *interface Subscriber {*
 *updateEvent @0 (event: Int32) -> ();*
 *}*
 *subscribe @0 (subscriber: Subscriber) -> (result: Int32);*
 *unsubscribe @1 (subscriber: Subscriber) -> (result: Int32);*
 *}*

 I'm using *kj::Vector 
 m_subscribers *to store the current subscribers.
 When I try to implement unsubscribe and remove the Subscriber from the 
 Vector, I couldn't find good method to do that.
 Could you give me some advice?

 *server implementation*
 *class EventPublisherImpl final : public EventPublisher::Server {*
 * protected:*
 *  ::kj::Promise subscribe(SubscribeContext context) {*
 *cout << "subscribe request received" << endl;*
 *m_subscribers.add(context.getParams().getSubscriber());*
 *return kj::READY_NOW;*
 *  }*

 *  ::kj::Promise unsubscribe(UnsubscribeContext context) {*
 *cout << "unsubscribe request received" << endl;*
 *auto unsub =