From 848cfe0805dff301ace7c8ec062ad3a1a9b74c4e Mon Sep 17 00:00:00 2001 From: Jonathan Hedley Date: Thu, 8 Aug 2024 13:32:01 +1000 Subject: [PATCH] In DataUtil, reuse the input stream when detecting charset When reading ahead to detect the charset, instead of buffering into a new ByteBuffer, reuse the ControllableInputStream. --- src/main/java/org/jsoup/helper/DataUtil.java | 62 ++++++------ .../java/org/jsoup/helper/HttpConnection.java | 16 +-- .../internal/ControllableInputStream.java | 98 +++++++++++++------ .../jsoup/internal/SimpleBufferedInput.java | 16 ++- .../java/org/jsoup/helper/DataUtilTest.java | 12 ++- .../java/org/jsoup/integration/ConnectIT.java | 8 +- 6 files changed, 135 insertions(+), 77 deletions(-) diff --git a/src/main/java/org/jsoup/helper/DataUtil.java b/src/main/java/org/jsoup/helper/DataUtil.java index ee6d705a31..22dc463d0c 100644 --- a/src/main/java/org/jsoup/helper/DataUtil.java +++ b/src/main/java/org/jsoup/helper/DataUtil.java @@ -119,8 +119,7 @@ public static Document load(Path path, @Nullable String charsetName, String base * @since 1.17.2 */ public static Document load(Path path, @Nullable String charsetName, String baseUri, Parser parser) throws IOException { - InputStream stream = openStream(path); - return parseInputStream(stream, charsetName, baseUri, parser); + return parseInputStream(openStream(path), charsetName, baseUri, parser); } /** @@ -151,7 +150,7 @@ public static StreamParser streamParser(Path path, @Nullable Charset charset, St } /** Open an input stream from a file; if it's a gzip file, returns a GZIPInputStream to unzip it. */ - private static InputStream openStream(Path path) throws IOException { + private static ControllableInputStream openStream(Path path) throws IOException { final SeekableByteChannel byteChannel = Files.newByteChannel(path); InputStream stream = Channels.newInputStream(byteChannel); String name = Normalizer.lowerCase(path.getFileName().toString()); @@ -162,7 +161,7 @@ private static InputStream openStream(Path path) throws IOException { stream = new GZIPInputStream(stream); } } - return stream; + return ControllableInputStream.wrap(stream, 0); } /** @@ -174,7 +173,7 @@ private static InputStream openStream(Path path) throws IOException { * @throws IOException on IO error */ public static Document load(InputStream in, @Nullable String charsetName, String baseUri) throws IOException { - return parseInputStream(in, charsetName, baseUri, Parser.htmlParser()); + return parseInputStream(ControllableInputStream.wrap(in, 0), charsetName, baseUri, Parser.htmlParser()); } /** @@ -187,7 +186,7 @@ public static Document load(InputStream in, @Nullable String charsetName, String * @throws IOException on IO error */ public static Document load(InputStream in, @Nullable String charsetName, String baseUri, Parser parser) throws IOException { - return parseInputStream(in, charsetName, baseUri, parser); + return parseInputStream(ControllableInputStream.wrap(in, 0), charsetName, baseUri, parser); } /** @@ -219,7 +218,7 @@ static class CharsetDoc { } } - static Document parseInputStream(@Nullable InputStream input, @Nullable String charsetName, String baseUri, Parser parser) throws IOException { + static Document parseInputStream(@Nullable ControllableInputStream input, @Nullable String charsetName, String baseUri, Parser parser) throws IOException { if (input == null) // empty body // todo reconsider? return new Document(baseUri); @@ -235,30 +234,28 @@ static Document parseInputStream(@Nullable InputStream input, @Nullable String c return doc; } - static CharsetDoc detectCharset(InputStream input, @Nullable String charsetName, String baseUri, Parser parser) throws IOException { + static CharsetDoc detectCharset(ControllableInputStream input, @Nullable String charsetName, String baseUri, Parser parser) throws IOException { Document doc = null; - - // read the start of the stream and look for a BOM or meta charset - InputStream wrappedInputStream = ControllableInputStream.wrap(input, DefaultBufferSize, 0); - wrappedInputStream.mark(DefaultBufferSize); - ByteBuffer firstBytes = readToByteBuffer(wrappedInputStream, firstReadBufferSize - 1); // -1 because we read one more to see if completed. First read is < buffer size, so can't be invalid. - boolean fullyRead = (wrappedInputStream.read() == -1); - wrappedInputStream.reset(); - + // read the start of the stream and look for a BOM or meta charset: // look for BOM - overrides any other header or input - BomCharset bomCharset = detectCharsetFromBom(firstBytes); + BomCharset bomCharset = detectCharsetFromBom(input); // resets if (bomCharset != null) charsetName = bomCharset.charset; - if (charsetName == null) { // determine from meta. safe first parse as UTF-8 + if (charsetName == null) { // read ahead and determine from meta. safe first parse as UTF-8 + int origMax = input.max(); + input.max(firstReadBufferSize); + input.mark(firstReadBufferSize); + input.allowClose(false); // ignores closes during parse, in case we need to rewind try { - CharBuffer defaultDecoded = UTF_8.decode(firstBytes); - if (defaultDecoded.hasArray()) - doc = parser.parseInput(new CharArrayReader(defaultDecoded.array(), defaultDecoded.arrayOffset(), defaultDecoded.limit()), baseUri); - else - doc = parser.parseInput(defaultDecoded.toString(), baseUri); + Reader reader = new InputStreamReader(input, UTF_8); // input is currently capped to firstReadBufferSize + doc = parser.parseInput(reader, baseUri); + input.reset(); + input.max(origMax); // reset for a full read if required } catch (UncheckedIOException e) { throw e.getCause(); + } finally { + input.allowClose(true); } // look for or HTML5 @@ -293,7 +290,9 @@ else if (first instanceof Comment) { foundCharset = foundCharset.trim().replaceAll("[\"']", ""); charsetName = foundCharset; doc = null; - } else if (!fullyRead) { + } else if (input.baseReadFully()) { // if we have read fully, and the charset was correct, keep that current parse + input.close(); // the parser tried to close it + } else { doc = null; } } else { // specified by content type header (or by user on file load) @@ -306,7 +305,7 @@ else if (first instanceof Comment) { Charset charset = charsetName.equals(defaultCharsetName) ? UTF_8 : Charset.forName(charsetName); boolean skip = bomCharset != null && bomCharset.offset; // skip 1 if the BOM is there and needs offset // if consumer needs to parse the input; prep it if there's a BOM. Can't skip in inputstream as wrapping buffer will ignore the pos - return new CharsetDoc(charset, doc, wrappedInputStream, skip); + return new CharsetDoc(charset, doc, input, skip); } static Document parseInputStream(CharsetDoc charsetDoc, String baseUri, Parser parser) throws IOException { @@ -400,14 +399,13 @@ static String mimeBoundary() { return StringUtil.releaseBuilder(mime); } - private static @Nullable BomCharset detectCharsetFromBom(final ByteBuffer byteData) { - @SuppressWarnings("UnnecessaryLocalVariable") final Buffer buffer = byteData; // .mark and rewind used to return Buffer, now ByteBuffer, so cast for backward compat - buffer.mark(); + private static @Nullable BomCharset detectCharsetFromBom(ControllableInputStream input) throws IOException { byte[] bom = new byte[4]; - if (byteData.remaining() >= bom.length) { - byteData.get(bom); - buffer.rewind(); - } + input.mark(bom.length); + //noinspection ResultOfMethodCallIgnored + input.read(bom, 0, 4); + input.reset(); + if (bom[0] == 0x00 && bom[1] == 0x00 && bom[2] == (byte) 0xFE && bom[3] == (byte) 0xFF || // BE bom[0] == (byte) 0xFF && bom[1] == (byte) 0xFE && bom[2] == 0x00 && bom[3] == 0x00) { // LE return new BomCharset("UTF-32", false); // and I hope it's on your system diff --git a/src/main/java/org/jsoup/helper/HttpConnection.java b/src/main/java/org/jsoup/helper/HttpConnection.java index c464260c3e..853019b15a 100644 --- a/src/main/java/org/jsoup/helper/HttpConnection.java +++ b/src/main/java/org/jsoup/helper/HttpConnection.java @@ -52,6 +52,7 @@ import static org.jsoup.Connection.Method.HEAD; import static org.jsoup.helper.DataUtil.UTF_8; import static org.jsoup.internal.Normalizer.lowerCase; +import static org.jsoup.internal.SharedConstants.DefaultBufferSize; /** * Implementation of {@link Connection}. @@ -915,7 +916,7 @@ else if (res.hasHeaderWithValue(CONTENT_ENCODING, "deflate")) stream = new InflaterInputStream(stream, new Inflater(true)); res.bodyStream = ControllableInputStream.wrap( - stream, SharedConstants.DefaultBufferSize, req.maxBodySize()) + stream, DefaultBufferSize, req.maxBodySize()) .timeout(startTime, req.timeout()); if (req.responseProgress != null) // set response progress listener @@ -965,11 +966,12 @@ public String contentType() { } /** Called from parse() or streamParser(), validates and prepares the input stream, and aligns common settings. */ - private InputStream prepareParse() { + private ControllableInputStream prepareParse() { Validate.isTrue(executed, "Request must be executed (with .execute(), .get(), or .post() before parsing response"); - InputStream stream = bodyStream; + ControllableInputStream stream = bodyStream; if (byteData != null) { // bytes have been read in to the buffer, parse that - stream = new ByteArrayInputStream(byteData.array(), 0, byteData.limit()); + ByteArrayInputStream bytes = new ByteArrayInputStream(byteData.array(), 0, byteData.limit()); + stream = ControllableInputStream.wrap(bytes, 0); // no max inputStreamRead = false; // ok to reparse if in bytes } Validate.isFalse(inputStreamRead, "Input stream already read and parsed, cannot re-read."); @@ -979,7 +981,7 @@ private InputStream prepareParse() { } @Override public Document parse() throws IOException { - InputStream stream = prepareParse(); + ControllableInputStream stream = prepareParse(); Document doc = DataUtil.parseInputStream(stream, charset, url.toExternalForm(), req.parser()); doc.connection(new HttpConnection(req, this)); // because we're static, don't have the connection obj. // todo - maybe hold in the req? charset = doc.outputSettings().charset().name(); // update charset from meta-equiv, possibly @@ -988,7 +990,7 @@ private InputStream prepareParse() { } @Override public StreamParser streamParser() throws IOException { - InputStream stream = prepareParse(); + ControllableInputStream stream = prepareParse(); String baseUri = url.toExternalForm(); DataUtil.CharsetDoc charsetDoc = DataUtil.detectCharset(stream, charset, baseUri, req.parser()); // note that there may be a document in CharsetDoc as a result of scanning meta-data -- but as requires a stream parse, it is not used here. todo - revisit. @@ -1064,7 +1066,7 @@ public BufferedInputStream bodyStream() { if (byteData != null) { return new BufferedInputStream( new ByteArrayInputStream(byteData.array(), 0, byteData.limit()), - SharedConstants.DefaultBufferSize); + DefaultBufferSize); } Validate.isFalse(inputStreamRead, "Request has already been read"); diff --git a/src/main/java/org/jsoup/internal/ControllableInputStream.java b/src/main/java/org/jsoup/internal/ControllableInputStream.java index 82cc113d41..5d8e8bba93 100644 --- a/src/main/java/org/jsoup/internal/ControllableInputStream.java +++ b/src/main/java/org/jsoup/internal/ControllableInputStream.java @@ -14,19 +14,19 @@ import static org.jsoup.internal.SharedConstants.DefaultBufferSize; /** - * A jsoup internal class (so don't use it as there is no contract API) that enables controls on a Buffered Input Stream, + * A jsoup internal class (so don't use it as there is no contract API) that enables controls on a buffered input stream, * namely a maximum read size, and the ability to Thread.interrupt() the read. */ // reimplemented from ConstrainableInputStream for JDK21 - extending BufferedInputStream will pin threads during read public class ControllableInputStream extends FilterInputStream { - private final InputStream buff; - private final boolean capped; - private final int maxSize; + private final SimpleBufferedInput buff; // super.in, but typed as SimpleBufferedInput + private int maxSize; private long startTime; private long timeout = 0; // optional max time of request private int remaining; private int markPos; private boolean interrupted; + private boolean allowClose = true; // for cases where we want to re-read the input, can ignore .close() from the parser // if we are tracking progress, will have the expected content length, progress callback, connection private @Nullable Progress progress; @@ -34,11 +34,10 @@ public class ControllableInputStream extends FilterInputStream { private int contentLength = -1; private int readPos = 0; // amount read; can be reset() - private ControllableInputStream(InputStream in, int maxSize) { + private ControllableInputStream(SimpleBufferedInput in, int maxSize) { super(in); Validate.isTrue(maxSize >= 0); buff = in; - capped = maxSize != 0; this.maxSize = maxSize; remaining = maxSize; markPos = -1; @@ -48,24 +47,34 @@ private ControllableInputStream(InputStream in, int maxSize) { /** * If this InputStream is not already a ControllableInputStream, let it be one. * @param in the input stream to (maybe) wrap - * @param bufferSize the buffer size to use when reading * @param maxSize the maximum size to allow to be read. 0 == infinite. * @return a controllable input stream */ - public static ControllableInputStream wrap(InputStream in, int bufferSize, int maxSize) { + public static ControllableInputStream wrap(InputStream in, int maxSize) { // bufferSize currently unused; consider implementing as a min size in the SoftPool recycler if (in instanceof ControllableInputStream) return (ControllableInputStream) in; - else if (in instanceof BufferedInputStream) - return new ControllableInputStream(in, maxSize); else return new ControllableInputStream(new SimpleBufferedInput(in), maxSize); } + /** + * If this InputStream is not already a ControllableInputStream, let it be one. + * @param in the input stream to (maybe) wrap + * @param bufferSize the buffer size to use when reading + * @param maxSize the maximum size to allow to be read. 0 == infinite. + * @return a controllable input stream + */ + public static ControllableInputStream wrap(InputStream in, int bufferSize, int maxSize) { + // todo - bufferSize currently unused; consider implementing as a min size in the SoftPool recycler; or just deprecate if always DefaultBufferSize + return wrap(in, maxSize); + } + @Override public int read(byte[] b, int off, int len) throws IOException { if (readPos == 0) emitProgress(); // emits a progress - + + boolean capped = maxSize != 0; if (interrupted || capped && remaining <= 0) return -1; if (Thread.currentThread().isInterrupted()) { @@ -73,27 +82,28 @@ public int read(byte[] b, int off, int len) throws IOException { interrupted = true; return -1; } - if (expired()) - throw new SocketTimeoutException("Read timeout"); if (capped && len > remaining) len = remaining; // don't read more than desired, even if available - try { - final int read = super.read(b, off, len); - if (read == -1) { // completed - contentLength = readPos; - } else { - remaining -= read; - readPos += read; - } - emitProgress(); - - return read; - } catch (SocketTimeoutException e) { + while (true) { // loop trying to read until we get some data or hit the overall timeout, if we have one if (expired()) - throw e; - return 0; + throw new SocketTimeoutException("Read timeout"); + + try { + final int read = super.read(b, off, len); + if (read == -1) { // completed + contentLength = readPos; + } else { + remaining -= read; + readPos += read; + } + emitProgress(); + return read; + } catch (SocketTimeoutException e) { + if (expired() || timeout == 0) + throw e; + } } } @@ -146,6 +156,36 @@ public static ByteBuffer readToByteBuffer(InputStream in, int max) throws IOExce markPos = maxSize - remaining; } + /** + Check if the underlying InputStream has been read fully. There may still content in buffers to be consumed, and + read methods may return -1 if hit the read limit. + @return true if the underlying inputstream has been read fully. + */ + public boolean baseReadFully() { + return buff.baseReadFully(); + } + + /** + Get the max size of this stream (how far at most will be read from the underlying stream) + * @return the max size + */ + public int max() { + return maxSize; + } + + public void max(int newMax) { + remaining += newMax - maxSize; // update remaining to reflect the difference in the new maxsize + maxSize = newMax; + } + + public void allowClose(boolean allowClose) { + this.allowClose = allowClose; + } + + @Override public void close() throws IOException { + if (allowClose) super.close(); + } + public ControllableInputStream timeout(long startTimeNanos, long timeoutMillis) { this.startTime = startTimeNanos; this.timeout = timeoutMillis * 1000000; @@ -181,8 +221,6 @@ private boolean expired() { public BufferedInputStream inputStream() { // called via HttpConnection.Response.bodyStream(), needs an OG BufferedInputStream - if (buff instanceof BufferedInputStream) - return (BufferedInputStream) buff; // if originally supplied a BIS in .wrap() - else return new BufferedInputStream(buff); + return new BufferedInputStream(buff); } } diff --git a/src/main/java/org/jsoup/internal/SimpleBufferedInput.java b/src/main/java/org/jsoup/internal/SimpleBufferedInput.java index 64d8d95ec9..4004d1a01a 100644 --- a/src/main/java/org/jsoup/internal/SimpleBufferedInput.java +++ b/src/main/java/org/jsoup/internal/SimpleBufferedInput.java @@ -22,6 +22,7 @@ class SimpleBufferedInput extends FilterInputStream { private int bufPos; private int bufLength; private int bufMark = -1; + private boolean inReadFully = false; // true when the underlying inputstream has been read fully SimpleBufferedInput(InputStream in) { super(in); @@ -67,6 +68,7 @@ public int read(byte[] dest, int offset, int desiredLen) throws IOException { } private void fill() throws IOException { + if (inReadFully) return; if (byteBuf == null) { // get one on first demand byteBuf = BufferPool.borrow(); } @@ -95,6 +97,10 @@ private void fill() throws IOException { bufLength += read; } } + if (read == -1) { + inReadFully = true; + super.close(); // close underlying stream immediately; frees resources a little earlier + } } byte[] getBuf() { @@ -102,11 +108,19 @@ byte[] getBuf() { return byteBuf; } + /** + Check if the underlying InputStream has been read fully. There may still content in this buffer to be consumed. + @return true if the underlying inputstream has been read fully. + */ + boolean baseReadFully() { + return inReadFully; + } + @Override public int available() throws IOException { if (byteBuf != null && bufLength - bufPos > 0) return bufLength - bufPos; // doesn't include those in.available(), but mostly used as a block test - return in.available(); + return inReadFully ? 0 : in.available(); } @SuppressWarnings("NonSynchronizedMethodOverridesSynchronizedMethod") // explicitly not synced diff --git a/src/test/java/org/jsoup/helper/DataUtilTest.java b/src/test/java/org/jsoup/helper/DataUtilTest.java index 4e75c02a80..a583ddc0af 100644 --- a/src/test/java/org/jsoup/helper/DataUtilTest.java +++ b/src/test/java/org/jsoup/helper/DataUtilTest.java @@ -2,6 +2,7 @@ import org.jsoup.Jsoup; import org.jsoup.integration.ParseTest; +import org.jsoup.internal.ControllableInputStream; import org.jsoup.nodes.Document; import org.jsoup.parser.Parser; import org.junit.jupiter.api.Test; @@ -37,12 +38,12 @@ public void testQuotedCharset() { assertEquals("UTF-8", DataUtil.getCharsetFromContentType("text/html; charset='UTF-8'")); } - private InputStream stream(String data) { - return new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8)); + private ControllableInputStream stream(String data) { + return ControllableInputStream.wrap(new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8)), 0); } - private InputStream stream(String data, String charset) { - return new ByteArrayInputStream(data.getBytes(Charset.forName(charset))); + private ControllableInputStream stream(String data, String charset) { + return ControllableInputStream.wrap(new ByteArrayInputStream(data.getBytes(Charset.forName(charset))), 0); } @Test @@ -143,7 +144,8 @@ public void parseSequenceInputStream() throws IOException { stream(firstPart), stream(secondPart) ); - Document doc = DataUtil.parseInputStream(sequenceStream, null, "", Parser.htmlParser()); + ControllableInputStream stream = ControllableInputStream.wrap(sequenceStream, 0); + Document doc = DataUtil.parseInputStream(stream, null, "", Parser.htmlParser()); assertEquals(fileContent, doc.outerHtml()); } diff --git a/src/test/java/org/jsoup/integration/ConnectIT.java b/src/test/java/org/jsoup/integration/ConnectIT.java index cbc40a7f3d..672fed2911 100644 --- a/src/test/java/org/jsoup/integration/ConnectIT.java +++ b/src/test/java/org/jsoup/integration/ConnectIT.java @@ -54,6 +54,7 @@ public void canInterruptBodyStringRead() throws InterruptedException { @Test public void canInterruptDocumentRead() throws InterruptedException { // todo - implement in interruptable channels, so it's immediate + long start = System.currentTimeMillis(); final String[] body = new String[1]; Thread runner = new Thread(() -> { try { @@ -68,12 +69,15 @@ public void canInterruptDocumentRead() throws InterruptedException { }); runner.start(); - Thread.sleep(1000 * 3); + Thread.sleep(3 * 1000); runner.interrupt(); assertTrue(runner.isInterrupted()); runner.join(); - assertEquals(0, body[0].length()); // doesn't read a failed doc + long end = System.currentTimeMillis(); + // check we are between 3 and connect timeout seconds (should be just over 3; but allow some slack for slow CI runners) + assertTrue(end - start > 3 * 1000); + assertTrue(end - start < 10 * 1000); } @Test public void canInterruptThenJoinASpawnedThread() throws InterruptedException {