Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/beast2
8 : //
9 :
10 : #ifndef BOOST_BEAST2_SERVER_HTTP_STREAM_HPP
11 : #define BOOST_BEAST2_SERVER_HTTP_STREAM_HPP
12 :
13 : #include <boost/beast2/detail/config.hpp>
14 : #include <boost/beast2/log_service.hpp>
15 : #include <boost/beast2/format.hpp>
16 : #include <boost/beast2/read.hpp>
17 : #include <boost/beast2/write.hpp>
18 : #include <boost/beast2/server/any_lambda.hpp>
19 : #include <boost/beast2/server/route_handler_asio.hpp>
20 : #include <boost/beast2/server/router_asio.hpp>
21 : #include <boost/beast2/error.hpp>
22 : #include <boost/beast2/detail/except.hpp>
23 : #include <boost/capy/application.hpp>
24 : #include <boost/http_proto/request_parser.hpp>
25 : #include <boost/http_proto/response.hpp>
26 : #include <boost/http_proto/serializer.hpp>
27 : #include <boost/http_proto/string_body.hpp>
28 : #include <boost/http_proto/server/basic_router.hpp>
29 : #include <boost/url/parse.hpp>
30 : #include <boost/asio/prepend.hpp>
31 :
32 : namespace boost {
33 : namespace beast2 {
34 :
35 : //------------------------------------------------
36 :
37 : /** An HTTP server stream which routes requests to handlers and sends responses.
38 :
39 : An object of this type wraps an asynchronous Boost.ASIO stream and implements
40 : a high level server connection which reads HTTP requests, routes them to
41 : handlers installed in a router, and sends the HTTP response.
42 :
43 : @par Requires
44 : `AsyncStream` must satisfy <em>AsyncReadStream</em> and <em>AsyncWriteStream</em>
45 :
46 : @tparam AsyncStream The type of asynchronous stream.
47 : */
48 : template<class AsyncStream>
49 : class http_stream
50 : : private http::suspender::owner
51 : {
52 : public:
53 : /** Constructor.
54 :
55 : This initializes a new HTTP connection object that operates on
56 : the given stream, uses the specified router to dispatch incoming
57 : requests, and calls the supplied completion function when the
58 : connection closes or fails.
59 :
60 : Construction does not start any I/O; call @ref on_stream_begin when
61 : the stream is connected to the remote peer to begin reading
62 : requests and processing them.
63 :
64 : @param app The owning application, used to access shared services
65 : such as logging and protocol objects.
66 : @param stream The underlying asynchronous stream to read from
67 : and write to. The caller is responsible for maintaining its
68 : lifetime for the duration of the session.
69 : @param routes The router used to dispatch incoming HTTP requests.
70 : @param close_fn The function invoked when the connection is closed
71 : or an unrecoverable error occurs.
72 : */
73 : http_stream(
74 : capy::application& app,
75 : AsyncStream& stream,
76 : router_asio<AsyncStream&> routes,
77 : any_lambda<void(system::error_code)> close_fn);
78 :
79 : /** Called to start a new HTTP session
80 :
81 : The stream must be in a connected,
82 : correct state for a new session.
83 : */
84 : void on_stream_begin(http::acceptor_config const& config);
85 :
86 : private:
87 : void do_read();
88 : void on_read(
89 : system::error_code ec,
90 : std::size_t bytes_transferred);
91 : void on_headers();
92 : void do_dispatch(http::route_result rv = {});
93 : void do_respond(http::route_result rv);
94 : void do_write();
95 : void on_write(
96 : system::error_code const& ec,
97 : std::size_t bytes_transferred);
98 : void on_complete();
99 : http::resumer do_suspend() override;
100 : void do_resume(http::route_result const& ec) override;
101 : void do_close();
102 : void do_fail(core::string_view s,
103 : system::error_code const& ec);
104 : void clear() noexcept;
105 :
106 : protected:
107 0 : std::string id() const
108 : {
109 0 : return std::string("[") + std::to_string(id_) + "] ";
110 : }
111 :
112 : protected:
113 : struct resetter;
114 : section sect_;
115 : std::size_t id_ = 0;
116 : AsyncStream& stream_;
117 : router_asio<AsyncStream&> routes_;
118 : any_lambda<void(system::error_code)> close_;
119 : http::acceptor_config const* pconfig_ = nullptr;
120 :
121 : using work_guard = asio::executor_work_guard<decltype(
122 : std::declval<AsyncStream&>().get_executor())>;
123 : std::unique_ptr<work_guard> pwg_;
124 : asio_route_params<AsyncStream&> p_;
125 : };
126 :
127 : //------------------------------------------------
128 :
129 : // for exception safety
130 : template<class AsyncStream>
131 : struct http_stream<AsyncStream>::
132 : resetter
133 : {
134 : ~resetter()
135 : {
136 : if(clear_)
137 : owner_.clear();
138 : }
139 :
140 : explicit resetter(
141 : http_stream<AsyncStream>& owner) noexcept
142 : : owner_(owner)
143 : {
144 : }
145 :
146 : void accept()
147 : {
148 : clear_ = false;
149 : }
150 :
151 : private:
152 : http_stream<AsyncStream>& owner_;
153 : bool clear_ = true;
154 : };
155 :
156 : //------------------------------------------------
157 :
158 : template<class AsyncStream>
159 0 : http_stream<AsyncStream>::
160 : http_stream(
161 : capy::application& app,
162 : AsyncStream& stream,
163 : router_asio<AsyncStream&> routes,
164 : any_lambda<void(system::error_code)> close)
165 0 : : sect_(use_log_service(app).get_section("http_stream"))
166 0 : , id_(
167 0 : []() noexcept
168 : {
169 : static std::size_t n = 0;
170 0 : return ++n;
171 0 : }())
172 0 : , stream_(stream)
173 0 : , routes_(std::move(routes))
174 0 : , close_(close)
175 0 : , p_(stream_)
176 : {
177 0 : p_.parser = http::request_parser(app);
178 :
179 0 : p_.serializer = http::serializer(app);
180 0 : p_.suspend = http::suspender(*this);
181 0 : }
182 :
183 : // called to start a new HTTP session.
184 : // the connection must be in the correct state already.
185 : template<class AsyncStream>
186 : void
187 0 : http_stream<AsyncStream>::
188 : on_stream_begin(
189 : http::acceptor_config const& config)
190 : {
191 0 : pconfig_ = &config;
192 :
193 0 : p_.parser.reset();
194 0 : p_.session_data.clear();
195 0 : do_read();
196 0 : }
197 :
198 : // begin reading the request
199 : template<class AsyncStream>
200 : void
201 0 : http_stream<AsyncStream>::
202 : do_read()
203 : {
204 0 : p_.parser.start();
205 :
206 0 : beast2::async_read(stream_, p_.parser,
207 0 : call_mf(&http_stream::on_read, this));
208 0 : }
209 :
210 : // called when the read operation completes
211 : template<class AsyncStream>
212 : void
213 0 : http_stream<AsyncStream>::
214 : on_read(
215 : system::error_code ec,
216 : std::size_t bytes_transferred)
217 : {
218 : (void)bytes_transferred;
219 :
220 0 : if(ec.failed())
221 0 : return do_fail("http_stream::on_read", ec);
222 :
223 0 : LOG_TRC(this->sect_)(
224 : "{} http_stream::on_read bytes={}",
225 : this->id(), bytes_transferred);
226 :
227 0 : BOOST_ASSERT(p_.parser.is_complete());
228 :
229 0 : on_headers();
230 : }
231 :
232 : // called to set up the response after reading the request
233 : template<class AsyncStream>
234 : void
235 0 : http_stream<AsyncStream>::
236 : on_headers()
237 : {
238 : // set up Request and Response objects
239 : // VFALCO HACK for now we make a copy of the message
240 0 : p_.req = p_.parser.get();
241 0 : p_.route_data.clear();
242 0 : p_.res.set_start_line( // VFALCO WTF
243 : http::status::ok, p_.req.version());
244 0 : p_.res.set_keep_alive(p_.req.keep_alive());
245 0 : p_.serializer.reset();
246 :
247 : // parse the URL
248 : {
249 0 : auto rv = urls::parse_uri_reference(p_.req.target());
250 0 : if(rv.has_error())
251 : {
252 : // error parsing URL
253 0 : p_.status(http::status::bad_request);
254 0 : p_.set_body("Bad Request: " + rv.error().message());
255 0 : return do_respond(rv.error());
256 : }
257 :
258 0 : p_.url = rv.value();
259 : }
260 :
261 : // invoke handlers for the route
262 0 : do_dispatch();
263 : }
264 :
265 : // called to dispatch or resume the route
266 : template<class AsyncStream>
267 : void
268 0 : http_stream<AsyncStream>::
269 : do_dispatch(
270 : http::route_result rv)
271 : {
272 0 : if(! rv.failed())
273 : {
274 0 : BOOST_ASSERT(! pwg_); // can't be suspended
275 0 : rv = routes_.dispatch(
276 0 : p_.req.method(), p_.url, p_);
277 : }
278 : else
279 : {
280 0 : rv = routes_.resume(p_, rv);
281 : }
282 :
283 0 : do_respond(rv);
284 0 : }
285 :
286 : // called after obtaining a route result
287 : template<class AsyncStream>
288 : void
289 0 : http_stream<AsyncStream>::
290 : do_respond(
291 : http::route_result rv)
292 : {
293 0 : BOOST_ASSERT(rv != http::route::next_route);
294 :
295 0 : if(rv == http::route::close)
296 : {
297 0 : return do_close();
298 : }
299 :
300 0 : if(rv == http::route::complete)
301 : {
302 : // VFALCO what if the connection was closed or keep-alive=false?
303 : // handler sendt the response?
304 0 : BOOST_ASSERT(p_.serializer.is_done());
305 0 : return on_write(system::error_code(), 0);
306 : }
307 :
308 0 : if(rv == http::route::suspend)
309 : {
310 : // didn't call suspend()?
311 0 : if(! pwg_)
312 0 : detail::throw_logic_error();
313 0 : return;
314 : }
315 :
316 0 : if(rv == http::route::next)
317 : {
318 : // unhandled request
319 0 : auto const status = http::status::not_found;
320 0 : p_.status(status);
321 0 : p_.set_body(http::to_string(status));
322 : }
323 0 : else if(rv != http::route::send)
324 : {
325 : // error message of last resort
326 0 : BOOST_ASSERT(rv.failed());
327 0 : BOOST_ASSERT(! http::is_route_result(rv));
328 0 : p_.status(http::status::internal_server_error);
329 0 : std::string s;
330 0 : format_to(s, "An internal server error occurred: {}", rv.message());
331 0 : p_.res.set_keep_alive(false); // VFALCO?
332 0 : p_.set_body(s);
333 0 : }
334 :
335 0 : do_write();
336 : }
337 :
338 : // begin writing the response
339 : template<class AsyncStream>
340 : void
341 0 : http_stream<AsyncStream>::
342 : do_write()
343 : {
344 0 : BOOST_ASSERT(! p_.serializer.is_done());
345 0 : beast2::async_write(stream_, p_.serializer,
346 0 : call_mf(&http_stream::on_write, this));
347 0 : }
348 :
349 : // called when the write operation completes
350 : template<class AsyncStream>
351 : void
352 0 : http_stream<AsyncStream>::
353 : on_write(
354 : system::error_code const& ec,
355 : std::size_t bytes_transferred)
356 : {
357 : (void)bytes_transferred;
358 :
359 0 : if(ec.failed())
360 0 : return do_fail("http_stream::on_write", ec);
361 :
362 0 : BOOST_ASSERT(p_.serializer.is_done());
363 :
364 0 : LOG_TRC(this->sect_)(
365 : "{} http_stream::on_write bytes={}",
366 : this->id(), bytes_transferred);
367 :
368 0 : if(p_.res.keep_alive())
369 0 : return do_read();
370 :
371 0 : do_close();
372 : }
373 :
374 : template<class AsyncStream>
375 : auto
376 0 : http_stream<AsyncStream>::
377 : do_suspend() ->
378 : http::resumer
379 : {
380 0 : BOOST_ASSERT(stream_.get_executor().running_in_this_thread());
381 :
382 : // can't call twice
383 0 : BOOST_ASSERT(! pwg_);
384 0 : pwg_.reset(new work_guard(stream_.get_executor()));
385 :
386 : // VFALCO cancel timer
387 :
388 0 : return http::resumer(*this);
389 : }
390 :
391 : // called by resume(rv)
392 : template<class AsyncStream>
393 : void
394 0 : http_stream<AsyncStream>::
395 : do_resume(
396 : http::route_result const& rv)
397 : {
398 0 : asio::dispatch(
399 0 : stream_.get_executor(),
400 0 : [this, rv]
401 : {
402 0 : BOOST_ASSERT(pwg_.get() != nullptr);
403 0 : pwg_.reset();
404 :
405 0 : do_dispatch(rv);
406 : });
407 0 : }
408 :
409 : // called when a non-recoverable error occurs
410 : template<class AsyncStream>
411 : void
412 0 : http_stream<AsyncStream>::
413 : do_fail(
414 : core::string_view s, system::error_code const& ec)
415 : {
416 0 : LOG_TRC(this->sect_)("{}: {}", s, ec.message());
417 :
418 : // tidy up lingering objects
419 0 : p_.parser.reset();
420 0 : p_.serializer.reset();
421 :
422 0 : close_(ec);
423 0 : }
424 :
425 : // end the session
426 : template<class AsyncStream>
427 : void
428 0 : http_stream<AsyncStream>::
429 : do_close()
430 : {
431 0 : clear();
432 0 : close_({});
433 0 : }
434 :
435 : // clear everything, releasing transient objects
436 : template<class AsyncStream>
437 : void
438 0 : http_stream<AsyncStream>::
439 : clear() noexcept
440 : {
441 0 : p_.parser.reset();
442 0 : p_.serializer.reset();
443 0 : p_.res.clear();
444 0 : }
445 :
446 : } // beast2
447 : } // boost
448 :
449 : #endif
|