Skip to content

Commit

Permalink
refactor(node): reimplement http client (#19122)
Browse files Browse the repository at this point in the history
This commit reimplements most of "node:http" client APIs using
"ext/fetch".

There is some duplicated code and two removed Node compat tests that
will be fixed in follow up PRs.

---------

Co-authored-by: Bartek Iwańczuk <biwanczuk@gmail.com>
  • Loading branch information
crowlKats and bartlomieju committed May 16, 2023
1 parent a22388b commit 867a6d3
Show file tree
Hide file tree
Showing 14 changed files with 1,705 additions and 1,166 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions cli/tests/node_compat/config.jsonc
Original file line number Diff line number Diff line change
Expand Up @@ -362,11 +362,13 @@
// failing
//"test-http-client-set-timeout.js",
"test-http-localaddress.js",
"test-http-outgoing-buffer.js",
// TODO(bartlomieju): temporarily disabled while we iterate on the HTTP client
// "test-http-outgoing-buffer.js",
"test-http-outgoing-internal-headernames-getter.js",
"test-http-outgoing-internal-headernames-setter.js",
"test-http-outgoing-internal-headers.js",
"test-http-outgoing-message-inheritance.js",
// TODO(bartlomieju): temporarily disabled while we iterate on the HTTP client
// "test-http-outgoing-message-inheritance.js",
"test-http-outgoing-renderHeaders.js",
"test-http-outgoing-settimeout.js",
"test-net-access-byteswritten.js",
Expand Down
26 changes: 0 additions & 26 deletions cli/tests/node_compat/test/parallel/test-http-outgoing-buffer.js

This file was deleted.

This file was deleted.

29 changes: 3 additions & 26 deletions cli/tests/unit_node/http_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ Deno.test("[node/http] server can respond with 101, 204, 205, 304 status", async

Deno.test("[node/http] request default protocol", async () => {
const promise = deferred<void>();
const promise2 = deferred<void>();
const server = http.createServer((_, res) => {
res.end("ok");
});
Expand All @@ -198,6 +199,7 @@ Deno.test("[node/http] request default protocol", async () => {
server.close();
});
assertEquals(res.statusCode, 200);
promise2.resolve();
},
);
req.end();
Expand All @@ -206,6 +208,7 @@ Deno.test("[node/http] request default protocol", async () => {
promise.resolve();
});
await promise;
await promise2;
});

Deno.test("[node/http] request with headers", async () => {
Expand Down Expand Up @@ -292,32 +295,6 @@ Deno.test("[node/http] http.IncomingMessage can be created without url", () => {
});
*/

Deno.test("[node/http] set http.IncomingMessage.statusMessage", () => {
// deno-lint-ignore no-explicit-any
const message = new (http as any).IncomingMessageForClient(
new Response(null, { status: 404, statusText: "Not Found" }),
{
encrypted: true,
readable: false,
remoteAddress: "foo",
address() {
return { port: 443, family: "IPv4" };
},
// deno-lint-ignore no-explicit-any
end(_cb: any) {
return this;
},
// deno-lint-ignore no-explicit-any
destroy(_e: any) {
return;
},
},
);
assertEquals(message.statusMessage, "Not Found");
message.statusMessage = "boom";
assertEquals(message.statusMessage, "boom");
});

Deno.test("[node/http] send request with non-chunked body", async () => {
let requestHeaders: Headers;
let requestBody = "";
Expand Down
44 changes: 22 additions & 22 deletions ext/fetch/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ pub use reqwest;

pub use fs_fetch_handler::FsFetchHandler;

use crate::byte_stream::MpscByteStream;
pub use crate::byte_stream::MpscByteStream;

#[derive(Clone)]
pub struct Options {
Expand Down Expand Up @@ -186,9 +186,9 @@ pub fn get_declaration() -> PathBuf {
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
pub struct FetchReturn {
request_rid: ResourceId,
request_body_rid: Option<ResourceId>,
cancel_handle_rid: Option<ResourceId>,
pub request_rid: ResourceId,
pub request_body_rid: Option<ResourceId>,
pub cancel_handle_rid: Option<ResourceId>,
}

pub fn get_or_create_client_from_state(
Expand Down Expand Up @@ -302,7 +302,7 @@ where
}
Some(data) => {
// If a body is passed, we use it, and don't return a body for streaming.
request = request.body(Vec::from(&*data));
request = request.body(data.to_vec());
None
}
}
Expand Down Expand Up @@ -400,12 +400,12 @@ where
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
pub struct FetchResponse {
status: u16,
status_text: String,
headers: Vec<(ByteString, ByteString)>,
url: String,
response_rid: ResourceId,
content_length: Option<u64>,
pub status: u16,
pub status_text: String,
pub headers: Vec<(ByteString, ByteString)>,
pub url: String,
pub response_rid: ResourceId,
pub content_length: Option<u64>,
}

#[op]
Expand Down Expand Up @@ -462,8 +462,8 @@ pub async fn op_fetch_send(

type CancelableResponseResult = Result<Result<Response, AnyError>, Canceled>;

struct FetchRequestResource(
Pin<Box<dyn Future<Output = CancelableResponseResult>>>,
pub struct FetchRequestResource(
pub Pin<Box<dyn Future<Output = CancelableResponseResult>>>,
);

impl Resource for FetchRequestResource {
Expand All @@ -472,7 +472,7 @@ impl Resource for FetchRequestResource {
}
}

struct FetchCancelHandle(Rc<CancelHandle>);
pub struct FetchCancelHandle(pub Rc<CancelHandle>);

impl Resource for FetchCancelHandle {
fn name(&self) -> Cow<str> {
Expand All @@ -485,8 +485,8 @@ impl Resource for FetchCancelHandle {
}

pub struct FetchRequestBodyResource {
body: AsyncRefCell<mpsc::Sender<Option<bytes::Bytes>>>,
cancel: CancelHandle,
pub body: AsyncRefCell<mpsc::Sender<Option<bytes::Bytes>>>,
pub cancel: CancelHandle,
}

impl Resource for FetchRequestBodyResource {
Expand Down Expand Up @@ -537,10 +537,10 @@ impl Resource for FetchRequestBodyResource {
type BytesStream =
Pin<Box<dyn Stream<Item = Result<bytes::Bytes, std::io::Error>> + Unpin>>;

struct FetchResponseBodyResource {
reader: AsyncRefCell<Peekable<BytesStream>>,
cancel: CancelHandle,
size: Option<u64>,
pub struct FetchResponseBodyResource {
pub reader: AsyncRefCell<Peekable<BytesStream>>,
pub cancel: CancelHandle,
pub size: Option<u64>,
}

impl Resource for FetchResponseBodyResource {
Expand Down Expand Up @@ -590,8 +590,8 @@ impl Resource for FetchResponseBodyResource {
}
}

struct HttpClientResource {
client: Client,
pub struct HttpClientResource {
pub client: Client,
}

impl Resource for HttpClientResource {
Expand Down
2 changes: 2 additions & 0 deletions ext/node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ aes.workspace = true
cbc.workspace = true
data-encoding = "2.3.3"
deno_core.workspace = true
deno_fetch.workspace = true
deno_fs.workspace = true
deno_media_type.workspace = true
deno_npm.workspace = true
Expand Down Expand Up @@ -46,6 +47,7 @@ path-clean = "=0.1.0"
pbkdf2 = "0.12.1"
rand.workspace = true
regex.workspace = true
reqwest.workspace = true
ring.workspace = true
ripemd = "0.1.3"
rsa.workspace = true
Expand Down
1 change: 1 addition & 0 deletions ext/node/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ deno_core::extension!(deno_node,
ops::zlib::op_zlib_write_async,
ops::zlib::op_zlib_init,
ops::zlib::op_zlib_reset,
ops::http::op_node_http_request,
op_node_build_os,
ops::require::op_require_init_paths,
ops::require::op_require_node_module_paths<P>,
Expand Down
101 changes: 101 additions & 0 deletions ext/node/ops/http.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.

use deno_core::error::type_error;
use deno_core::error::AnyError;
use deno_core::op;
use deno_core::url::Url;
use deno_core::AsyncRefCell;
use deno_core::ByteString;
use deno_core::CancelFuture;
use deno_core::CancelHandle;
use deno_core::OpState;
use deno_fetch::get_or_create_client_from_state;
use deno_fetch::FetchCancelHandle;
use deno_fetch::FetchRequestBodyResource;
use deno_fetch::FetchRequestResource;
use deno_fetch::FetchReturn;
use deno_fetch::HttpClientResource;
use deno_fetch::MpscByteStream;
use reqwest::header::HeaderMap;
use reqwest::header::HeaderName;
use reqwest::header::HeaderValue;
use reqwest::header::CONTENT_LENGTH;
use reqwest::Body;
use reqwest::Method;

#[op]
pub fn op_node_http_request(
state: &mut OpState,
method: ByteString,
url: String,
headers: Vec<(ByteString, ByteString)>,
client_rid: Option<u32>,
has_body: bool,
) -> Result<FetchReturn, AnyError> {
let client = if let Some(rid) = client_rid {
let r = state.resource_table.get::<HttpClientResource>(rid)?;
r.client.clone()
} else {
get_or_create_client_from_state(state)?
};

let method = Method::from_bytes(&method)?;
let url = Url::parse(&url)?;

let mut header_map = HeaderMap::new();
for (key, value) in headers {
let name = HeaderName::from_bytes(&key)
.map_err(|err| type_error(err.to_string()))?;
let v = HeaderValue::from_bytes(&value)
.map_err(|err| type_error(err.to_string()))?;

header_map.append(name, v);
}

let mut request = client.request(method.clone(), url).headers(header_map);

let request_body_rid = if has_body {
// If no body is passed, we return a writer for streaming the body.
let (stream, tx) = MpscByteStream::new();

request = request.body(Body::wrap_stream(stream));

let request_body_rid = state.resource_table.add(FetchRequestBodyResource {
body: AsyncRefCell::new(tx),
cancel: CancelHandle::default(),
});

Some(request_body_rid)
} else {
// POST and PUT requests should always have a 0 length content-length,
// if there is no body. https://fetch.spec.whatwg.org/#http-network-or-cache-fetch
if matches!(method, Method::POST | Method::PUT) {
request = request.header(CONTENT_LENGTH, HeaderValue::from(0));
}
None
};

let cancel_handle = CancelHandle::new_rc();
let cancel_handle_ = cancel_handle.clone();

let fut = async move {
request
.send()
.or_cancel(cancel_handle_)
.await
.map(|res| res.map_err(|err| type_error(err.to_string())))
};

let request_rid = state
.resource_table
.add(FetchRequestResource(Box::pin(fut)));

let cancel_handle_rid =
state.resource_table.add(FetchCancelHandle(cancel_handle));

Ok(FetchReturn {
request_rid,
request_body_rid,
cancel_handle_rid: Some(cancel_handle_rid),
})
}
Loading

0 comments on commit 867a6d3

Please sign in to comment.