root/trunk/whisperlib/net/rpc/lib/server/rpc_core_types.h

Revision 7, 10.1 kB (checked in by whispercastorg, 2 years ago)

version 0.2.0

Line 
1 // Copyright (c) 2009, Whispersoft s.r.l.
2 // All rights reserved.
3 //
4 // Redistribution and use in source and binary forms, with or without
5 // modification, are permitted provided that the following conditions are
6 // met:
7 //
8 // * Redistributions of source code must retain the above copyright
9 // notice, this list of conditions and the following disclaimer.
10 // * Redistributions in binary form must reproduce the above
11 // copyright notice, this list of conditions and the following disclaimer
12 // in the documentation and/or other materials provided with the
13 // distribution.
14 // * Neither the name of Whispersoft s.r.l. nor the names of its
15 // contributors may be used to endorse or promote products derived from
16 // this software without specific prior written permission.
17 //
18 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19 // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20 // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
21 // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
22 // OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23 // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24 // LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25 // DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26 // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28 // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 //
30 // Author: Cosmin Tudorache
31
32 #ifndef __NET_RPC_LIB_SERVER_RPC_CORE_TYPES_H__
33 #define __NET_RPC_LIB_SERVER_RPC_CORE_TYPES_H__
34
35 #include <vector>
36 #include <string>
37
38 #include <whisperlib/common/base/types.h>
39
40 #include <whisperlib/common/io/input_stream.h>
41 #include <whisperlib/common/io/output_stream.h>
42 #include <whisperlib/common/io/buffer/memory_stream.h>
43 #include <whisperlib/common/io/buffer/io_memory_stream.h>
44
45 #include <whisperlib/net/rpc/lib/types/rpc_all_types.h>
46 #include <whisperlib/net/rpc/lib/types/rpc_message.h>
47 #include <whisperlib/net/rpc/lib/codec/rpc_codec.h>
48 #include <whisperlib/common/base/callback.h>
49
50 // Types to be used inside server for RPC queries in transit.
51 // rpc::Transport: is just a container describing the transport layer for RPC.
52 // rpc::Query: represents a remote call, on the server side. It is built by
53 //           the transport layer (rpc::ServerConnection) and then circulates
54 //           through the execution layer until completed.
55 //           Initially contains the service name, the method name,
56 //           and the encoded arguments; but during execution it receives the
57 //           decoded call parameters to keep them alive while the call is
58 //           in execution.
59
60 namespace rpc {
61
62 class Transport {
63  public:
64   enum Protocol {
65     TCP,
66     HTTP,
67   };
68   explicit Transport(const Transport& transport)
69     : protocol_(transport.protocol()),
70       local_address_(transport.local_address()),
71       peer_address_(transport.peer_address()),
72       user_(transport.user()),
73       passwd_(transport.passwd()) {
74   }
75
76   Transport(Protocol protocol,
77             const net::HostPort& local_address,
78             const net::HostPort& peer_address)
79       : protocol_(protocol),
80         local_address_(local_address),
81         peer_address_(peer_address) {
82   }
83   virtual ~Transport() {
84   }
85
86   Protocol protocol() const                  { return protocol_;       }
87   const net::HostPort& local_address() const { return local_address_;  }
88   const net::HostPort& peer_address() const  { return peer_address_;   }
89   const string& user() const                 { return user_;           }
90   const string& passwd() const               { return passwd_;         }
91
92   void set_user_passwd(const string& user,
93                        const string& passwd) { user_ = user;
94                                                passwd_ = passwd;       }
95
96  protected:
97   const Protocol protocol_;
98   const net::HostPort local_address_;
99   const net::HostPort peer_address_;
100   string user_;
101   string passwd_;
102 };
103
104 class Query : public rpc::Loggable {
105  public:
106   // input:
107   //  - qid: a value to be passed back in result handling, usefull for mapping
108   //         pairs [qid, rpc::Query]
109   //  - service: service name.
110   //  - method: method name (inside service).
111   //  - args: contains the query arguments encoded. And that's all it contains.
112   //  - codec: the codec used to decode arguments and later encode the
113   //           call result into a rpc::Result.
114   //           It must be dinamically allocated! as we become owners of this
115   //           codec and will destroy.
116   //  - rid: result handler ID. Used by the IAsyncQueryExecutor to find the
117   //         result handler for this query.
118   Query(const rpc::Transport& transport,
119         uint32 qid,
120         const string& service,
121         const string& method,
122         io::MemoryStream& args,
123         const rpc::Codec& codec,
124         uint32 rid);
125   virtual ~Query();
126
127   const rpc::Transport& transport() const     { return transport_;  }
128   uint32 qid() const                          { return qid_; }
129   const string& service() const               { return service_; }
130   const string& method() const                { return method_; }
131   rpc::REPLY_STATUS status() const            { return  status_; }
132   const io::MemoryStream& result() const    { return result_; }
133   rpc::Codec& codec() const                   { return *codec_; }
134   uint32 rid() const                          { return rid_; }
135
136   io::MemoryStream& result()                { return result_; }
137
138   //////////////////////////////////////////////////////////////////////
139   //
140   //       Decoding arguments
141   //
142   io::MemoryStream& RewindParams();
143
144   // MUST be called before DecodeParam or HasMoreParams.
145   // returns success status.
146   bool InitDecodeParams();
147
148   template <typename T>
149   bool DecodeParam(T& obj) {
150     CHECK ( args_decoding_initialized_ );
151     bool has_more_attribs;
152     rpc::String argName;
153     return (DECODE_RESULT_SUCCESS ==
154               args_decoder_->DecodeArrayContinue(has_more_attribs) &&
155             has_more_attribs &&
156             DECODE_RESULT_SUCCESS == args_decoder_->Decode(obj));
157   }
158
159   // returns:
160   //  true -> more params available to DecodeParam
161   //  false -> reached parameter's array end.
162   bool HasMoreParams();
163
164   //////////////////////////////////////////////////////////////////////
165   //
166   // Encoding the result and completing the call
167   //
168
169   // Store decoded arguments inside query, so they are valid as long as the
170   // query is in execution.
171   void AddParam(rpc::Object* obj);
172
173   // Set by the execution layer.
174   // The completion_callback is usually a function inside the execution layer.
175   void SetCompletionCallback(
176       Callback1<const rpc::Query &>* completion_callback);
177
178   // Called by the service implementation to return query result back to the
179   // execution layer.
180   template <typename T>
181   void Complete(rpc::REPLY_STATUS status, const T& result) {
182     result_.Clear();
183     status_ = status;
184     result_encoder_->Encode(result);
185     CHECK_NOT_NULL(completion_callback_) << "CompletionCallback not set!";
186     completion_callback_->Run(*this);
187     delete this;   // This is bad, others solutions are welcomed.
188
189     // Alternative: let the transport layer delete the query.
190     // But keep in mind that we're calling the transport from this context,
191     // so the call will return here to find a deleted query.
192   }
193   template <typename T>
194   void Complete(const T& result) {
195     Complete(RPC_SUCCESS, result);
196   }
197   void Complete(rpc::REPLY_STATUS status = RPC_SUCCESS) {
198     Complete(status, rpc::Void());
199   }
200
201   //////////////////////////////////////////////////////////////////////
202   //
203   //                      rpc::Loggable interface
204   //
205   string ToString() const;
206  protected:
207   // transport
208   const rpc::Transport transport_;  // informations from the RPC trasport layer
209                                     // (like protocol, IP, port ..)
210
211   // codec
212   rpc::Codec* codec_;              // we own the codec. Must be deleted on
213                                    // destructor.
214
215   // input
216   const uint32 qid_;               // query identifier, used by the transport
217                                    // layer
218   const string service_;
219   const string method_;
220   io::MemoryStream args_;        // contains the query arguments encoded
221   rpc::Decoder* args_decoder_;     // always locally allocated. Must be deleted
222                                    // on destructor
223   bool args_decoding_initialized_; // false = the args_ stream position is
224                                    // on the first arg (== stream begining)
225
226   // output
227   rpc::REPLY_STATUS status_;       // receives the return status
228   io::MemoryStream result_;      // receives the encoded return value
229   rpc::Encoder* result_encoder_;   // always locally allocated.
230                                    // Must be deleted on destructor
231
232   // execution
233
234   // Contains the call arguments keeping them valid while the query is in
235   // executio, so you don't need to duplicate the arguments if you want to
236   // delay query completion.
237   vector<rpc::Object*> args_array_;
238
239   // Set by the execution layer.
240   // Used for passing the completed query's result back to the execution layer.
241   Callback1<const rpc::Query&>* completion_callback_;
242
243   // ID of an IResultHandler register in the execution layer who will
244   // receive the result
245   uint32 rid_;
246
247  private:
248   DISALLOW_EVIL_CONSTRUCTORS(Query);
249 };
250
251 // The rpc::CallContext is used in service implementation to access
252 // query's transport parameters and the Complete method.
253 // The service implementation should not see the entire rpc::Query.
254
255 template <typename T>
256 class CallContext {
257  public:
258   explicit CallContext(rpc::Query* query)
259       : query_(query) {
260   }
261   virtual ~CallContext() {
262   }
263
264   const rpc::Transport& Transport() const {
265     return query_->transport();
266   }
267   void Complete(rpc::REPLY_STATUS status) {
268     query_->Complete(status);
269     delete this;       // bad, but no other solution
270   }
271   void Complete(const T& result) {
272     query_->Complete(result);
273     delete this;        // bad, but no other solution
274   }
275  private:
276   rpc::Query* const query_;
277   DISALLOW_EVIL_CONSTRUCTORS(CallContext);
278 };
279 }
280
281 #endif  // __NET_RPC_LIB_SERVER_RPC_CORE_TYPES_H__
Note: See TracBrowser for help on using the browser.