Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Custom Codecs Upgrade to Lucene99 Codec #95

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ public CustomCodecService(MapperService mapperService, IndexSettings indexSettin
int compressionLevel = indexSettings.getValue(INDEX_CODEC_COMPRESSION_LEVEL_SETTING);
final MapBuilder<String, Codec> codecs = MapBuilder.<String, Codec>newMapBuilder();
if (mapperService == null) {
codecs.put(ZSTD_CODEC, new ZstdCodec(compressionLevel));
codecs.put(ZSTD_NO_DICT_CODEC, new ZstdNoDictCodec(compressionLevel));
codecs.put(ZSTD_CODEC, new Zstd99Codec(compressionLevel));
codecs.put(ZSTD_NO_DICT_CODEC, new ZstdNoDict99Codec(compressionLevel));
} else {
codecs.put(ZSTD_CODEC, new ZstdCodec(mapperService, logger, compressionLevel));
codecs.put(ZSTD_NO_DICT_CODEC, new ZstdNoDictCodec(mapperService, logger, compressionLevel));
codecs.put(ZSTD_CODEC, new Zstd99Codec(mapperService, logger, compressionLevel));
codecs.put(ZSTD_NO_DICT_CODEC, new ZstdNoDict99Codec(mapperService, logger, compressionLevel));
}
this.codecs = codecs.immutableMap();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,21 @@
import org.apache.logging.log4j.Logger;
import org.apache.lucene.codecs.FilterCodec;
import org.apache.lucene.codecs.StoredFieldsFormat;
import org.apache.lucene.codecs.lucene95.Lucene95Codec;
import org.apache.lucene.codecs.lucene99.Lucene99Codec;
import org.opensearch.index.codec.PerFieldMappingPostingFormatCodec;
import org.opensearch.index.mapper.MapperService;

import java.util.Collections;
import java.util.Set;

/**
*
* Extends {@link FilterCodec} to reuse the functionality of Lucene Codec.
* Supports two modes zstd and zstd_no_dict.
* Uses Lucene99 as the delegate codec
*
* @opensearch.internal
*/
public abstract class Lucene95CustomCodec extends FilterCodec {
public abstract class Lucene99CustomCodec extends FilterCodec {

/** Default compression level used for compression */
public static final int DEFAULT_COMPRESSION_LEVEL = 3;
Expand All @@ -35,17 +35,11 @@ public enum Mode {
/**
* ZStandard mode with dictionary
*/
ZSTD("ZSTD", Set.of("zstd")),
ZSTD("ZSTD99", Set.of("zstd")),
Copy link
Collaborator

@reta reta Jan 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sarthakaggarwal97 I don't think we should approach codec backward compatibility this way - it will cause an explosion of compression codecs. Ideally, we should be able:

  • read / write indices that use older (9.5) codec
  • create new indices using new (9.9) codec
  • read / write new indices that use (9.9) codec

The codec name stays the same zstd / zstdnodict. @msfroh does it make sense (from Apache Lucene perspective)?

Copy link
Collaborator Author

@sarthakaggarwal97 sarthakaggarwal97 Jan 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@reta I was initially not planning to introduce new codecs. I was looking to extend the current ZSTD and ZSTD_NO_DICT codecs from Lucene99 instead of Lucene95, and finally read the stored fields using the same Lucene95CustomStoredFieldsFormat, but I started to run into problems.

This initial approach worked well to ensure that we are now indexing and reading the newly created segments with Lucene99 codec in the background, but upon reading the older segments (created with zstandard in <OSv2.12), things started to fail.

We would see the shards are getting unassigned upon recovery with the exception

{
    "index": "index-26",
    "shard": 0,
    "primary": true,
    "current_state": "unassigned",
    "unassigned_info": {
        "reason": "CLUSTER_RECOVERED",
        "at": "2024-01-17T11:11:00.285Z",
        "last_allocation_status": "no_valid_shard_copy"
    },
    "can_allocate": "no_valid_shard_copy",
    "allocate_explanation": "cannot allocate because all found copies of the shard are either stale or corrupt",
    "node_allocation_decisions": [
        {
            "node_id": "GK-KmiQoQEWlE44bvkt2Rg",
            "node_name": "bcd0743e920a.ant.amazon.com",
            "transport_address": "127.0.0.1:9300",
            "node_attributes": {
                "shard_indexing_pressure_enabled": "true"
            },
            "node_decision": "no",
            "store": {
                "in_sync": true,
                "allocation_id": "CnmW07YyQtqjAI0G4xSOMQ",
                "store_exception": {
                    "type": "corrupt_index_exception",
                    "reason": "checksum status indeterminate: remaining=0; please run checkindex for more details (resource=BufferedChecksumIndexInput(NIOFSIndexInput(path=\"/Users/sarthagg/workplace/actual/OpenSearch/build/distribution/local/opensearch-3.0.0-SNAPSHOT/data/nodes/0/indices/DthI0nwRTqafGvWDltUf1g/0/index/_f.si\")))",
                    "suppressed": [
                        {
                            "type": "e_o_f_exception",
                            "reason": "read past EOF: NIOFSIndexInput(path=\"/Users/sarthagg/workplace/actual/OpenSearch/build/distribution/local/opensearch-3.0.0-SNAPSHOT/data/nodes/0/indices/DthI0nwRTqafGvWDltUf1g/0/index/_f.si\")"
                        },
                        {
                            "type": "corrupt_index_exception",
                            "reason": "checksum passed (94fdd3c1). possibly transient resource issue, or a Lucene or JVM bug (resource=BufferedChecksumIndexInput(NIOFSIndexInput(path=\"/Users/sarthagg/workplace/actual/OpenSearch/build/distribution/local/opensearch-3.0.0-SNAPSHOT/data/nodes/0/indices/DthI0nwRTqafGvWDltUf1g/0/index/segments_7\")))"
                        }
                    ]
                }
            }
        }
    ]
}

Even though the codecs name were same, we would still see the recovery to fail. The only difference was that instead of Lucene95 as the delegate codec, we were using Lucene99 as the delegate codec. It should have worked since even with Lucene99, lucene is still relying on the Lucene90StoredFieldsFormat.

So what changed?

There was a slight change in the way we parse the segments in Lucene95 and Lucene99.
This comparison can be viewed over here: https://editor.mergely.com/Mb1lKm8z

There is an additional call to readbyte in the Lucene99SegmentInfoFormat, post which we see EOF exception if we try to read the old segments with Lucene99SegmentInfoFormat.

It was in this commit where this change in format and how we parse the segments was introduced: apache/lucene@6677109

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, it definitely makes sense, I was confused a bit why we have to duplicate Mode for each codec, but looking into code it is clear - we use it as codec name, so what you've implemented makes perfect sense. Thanks @sarthakaggarwal97 !

/**
* ZStandard mode without dictionary
*/
ZSTD_NO_DICT("ZSTDNODICT", Set.of("zstd_no_dict")),
/**
* Deprecated ZStandard mode, added for backward compatibility to support indices created in 2.9.0 where
* both ZSTD and ZSTD_NO_DICT used Lucene95CustomCodec underneath. This should not be used to
* create new indices.
*/
ZSTD_DEPRECATED("Lucene95CustomCodec", Collections.emptySet());
ZSTD_NO_DICT("ZSTDNODICT99", Set.of("zstd_no_dict"));

private final String codec;
private final Set<String> aliases;
Expand Down Expand Up @@ -77,7 +71,7 @@ public Set<String> getAliases() {
*
* @param mode The compression codec (ZSTD or ZSTDNODICT).
*/
public Lucene95CustomCodec(Mode mode) {
public Lucene99CustomCodec(Mode mode) {
this(mode, DEFAULT_COMPRESSION_LEVEL);
}

Expand All @@ -89,9 +83,9 @@ public Lucene95CustomCodec(Mode mode) {
* @param mode The compression codec (ZSTD or ZSTDNODICT).
* @param compressionLevel The compression level.
*/
public Lucene95CustomCodec(Mode mode, int compressionLevel) {
super(mode.getCodec(), new Lucene95Codec());
this.storedFieldsFormat = new Lucene95CustomStoredFieldsFormat(mode, compressionLevel);
public Lucene99CustomCodec(Mode mode, int compressionLevel) {
super(mode.getCodec(), new Lucene99Codec());
this.storedFieldsFormat = new Lucene99CustomStoredFieldsFormat(mode, compressionLevel);
}

/**
Expand All @@ -104,9 +98,9 @@ public Lucene95CustomCodec(Mode mode, int compressionLevel) {
* @param mapperService The mapper service.
* @param logger The logger.
*/
public Lucene95CustomCodec(Mode mode, int compressionLevel, MapperService mapperService, Logger logger) {
super(mode.getCodec(), new PerFieldMappingPostingFormatCodec(Lucene95Codec.Mode.BEST_SPEED, mapperService, logger));
this.storedFieldsFormat = new Lucene95CustomStoredFieldsFormat(mode, compressionLevel);
public Lucene99CustomCodec(Mode mode, int compressionLevel, MapperService mapperService, Logger logger) {
super(mode.getCodec(), new PerFieldMappingPostingFormatCodec(Lucene99Codec.Mode.BEST_SPEED, mapperService, logger));
this.storedFieldsFormat = new Lucene99CustomStoredFieldsFormat(mode, compressionLevel);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,17 @@
import org.apache.lucene.index.SegmentInfo;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.opensearch.index.codec.customcodecs.backward_codecs.Lucene95CustomCodec;

import java.io.IOException;
import java.util.Objects;

/** Stored field format used by pluggable codec */
public class Lucene95CustomStoredFieldsFormat extends StoredFieldsFormat {
public class Lucene99CustomStoredFieldsFormat extends StoredFieldsFormat {

/** A key that we use to map to a mode */
public static final String MODE_KEY = Lucene95CustomStoredFieldsFormat.class.getSimpleName() + ".mode";
public static final String LUCENE95_MODE_KEY = "Lucene95CustomStoredFieldsFormat.mode";
public static final String MODE_KEY = Lucene99CustomStoredFieldsFormat.class.getSimpleName() + ".mode";

protected static final int ZSTD_BLOCK_LENGTH = 10 * 48 * 1024;
protected static final int ZSTD_MAX_DOCS_PER_BLOCK = 4096;
Expand All @@ -34,21 +36,21 @@ public class Lucene95CustomStoredFieldsFormat extends StoredFieldsFormat {
private final CompressionMode zstdCompressionMode;
private final CompressionMode zstdNoDictCompressionMode;

private final Lucene95CustomCodec.Mode mode;
private final Lucene99CustomCodec.Mode mode;
private final int compressionLevel;

/** default constructor */
public Lucene95CustomStoredFieldsFormat() {
this(Lucene95CustomCodec.Mode.ZSTD, Lucene95CustomCodec.DEFAULT_COMPRESSION_LEVEL);
public Lucene99CustomStoredFieldsFormat() {
this(Lucene99CustomCodec.Mode.ZSTD, Lucene99CustomCodec.DEFAULT_COMPRESSION_LEVEL);
}

/**
* Creates a new instance.
*
* @param mode The mode represents ZSTD or ZSTDNODICT
*/
public Lucene95CustomStoredFieldsFormat(Lucene95CustomCodec.Mode mode) {
this(mode, Lucene95CustomCodec.DEFAULT_COMPRESSION_LEVEL);
public Lucene99CustomStoredFieldsFormat(Lucene99CustomCodec.Mode mode) {
this(mode, Lucene99CustomCodec.DEFAULT_COMPRESSION_LEVEL);
}

/**
Expand All @@ -57,7 +59,7 @@ public Lucene95CustomStoredFieldsFormat(Lucene95CustomCodec.Mode mode) {
* @param mode The mode represents ZSTD or ZSTDNODICT
* @param compressionLevel The compression level for the mode.
*/
public Lucene95CustomStoredFieldsFormat(Lucene95CustomCodec.Mode mode, int compressionLevel) {
public Lucene99CustomStoredFieldsFormat(Lucene99CustomCodec.Mode mode, int compressionLevel) {
this.mode = Objects.requireNonNull(mode);
this.compressionLevel = compressionLevel;
zstdCompressionMode = new ZstdCompressionMode(compressionLevel);
Expand All @@ -73,12 +75,17 @@ public Lucene95CustomStoredFieldsFormat(Lucene95CustomCodec.Mode mode, int compr
*/
@Override
public StoredFieldsReader fieldsReader(Directory directory, SegmentInfo si, FieldInfos fn, IOContext context) throws IOException {
String value = si.getAttribute(MODE_KEY);
if (value == null) {
if (si.getAttribute(LUCENE95_MODE_KEY) != null) {
String value = si.getAttribute(LUCENE95_MODE_KEY);
Lucene95CustomCodec.Mode mode = Lucene95CustomCodec.Mode.valueOf(value);
return impl(mode).fieldsReader(directory, si, fn, context);
} else if (si.getAttribute(MODE_KEY) !=null){
String value = si.getAttribute(MODE_KEY);
Lucene99CustomCodec.Mode mode = Lucene99CustomCodec.Mode.valueOf(value);
return impl(mode).fieldsReader(directory, si, fn, context);
} else {
throw new IllegalStateException("missing value for " + MODE_KEY + " for segment: " + si.name);
}
Lucene95CustomCodec.Mode mode = Lucene95CustomCodec.Mode.valueOf(value);
return impl(mode).fieldsReader(directory, si, fn, context);
}

/**
Expand All @@ -98,31 +105,40 @@ public StoredFieldsWriter fieldsWriter(Directory directory, SegmentInfo si, IOCo
return impl(mode).fieldsWriter(directory, si, context);
}

StoredFieldsFormat impl(Lucene99CustomCodec.Mode mode) {
switch (mode) {
case ZSTD:
return getCustomCompressingStoredFieldsFormat("CustomStoredFieldsZstd", this.zstdCompressionMode);
case ZSTD_NO_DICT:
return getCustomCompressingStoredFieldsFormat("CustomStoredFieldsZstdNoDict", this.zstdNoDictCompressionMode);
default:
throw new AssertionError();
}
}

StoredFieldsFormat impl(Lucene95CustomCodec.Mode mode) {
switch (mode) {
case ZSTD:
case ZSTD_DEPRECATED:
return new Lucene90CompressingStoredFieldsFormat(
"CustomStoredFieldsZstd",
zstdCompressionMode,
ZSTD_BLOCK_LENGTH,
ZSTD_MAX_DOCS_PER_BLOCK,
ZSTD_BLOCK_SHIFT
);
return getCustomCompressingStoredFieldsFormat("CustomStoredFieldsZstd", this.zstdCompressionMode);
case ZSTD_NO_DICT:
return new Lucene90CompressingStoredFieldsFormat(
"CustomStoredFieldsZstdNoDict",
zstdNoDictCompressionMode,
ZSTD_BLOCK_LENGTH,
ZSTD_MAX_DOCS_PER_BLOCK,
ZSTD_BLOCK_SHIFT
);
return getCustomCompressingStoredFieldsFormat("CustomStoredFieldsZstdNoDict", this.zstdNoDictCompressionMode);
default:
throw new AssertionError();
}
}

public Lucene95CustomCodec.Mode getMode() {
private StoredFieldsFormat getCustomCompressingStoredFieldsFormat(String formatName, CompressionMode compressionMode) {
return new Lucene90CompressingStoredFieldsFormat(
formatName,
compressionMode,
ZSTD_BLOCK_LENGTH,
ZSTD_MAX_DOCS_PER_BLOCK,
ZSTD_BLOCK_SHIFT
);
}

public Lucene99CustomCodec.Mode getMode() {
return mode;
}

Expand All @@ -134,7 +150,7 @@ public int getCompressionLevel() {
}

public CompressionMode getCompressionMode() {
return mode == Lucene95CustomCodec.Mode.ZSTD_NO_DICT ? zstdNoDictCompressionMode : zstdCompressionMode;
return mode == Lucene99CustomCodec.Mode.ZSTD_NO_DICT ? zstdNoDictCompressionMode : zstdCompressionMode;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@
/**
* ZstdCodec provides ZSTD compressor using the <a href="https://github.com/luben/zstd-jni">zstd-jni</a> library.
*/
public class ZstdCodec extends Lucene95CustomCodec implements CodecSettings, CodecAliases {
public class Zstd99Codec extends Lucene99CustomCodec implements CodecSettings, CodecAliases {

/**
* Creates a new ZstdCodec instance with the default compression level.
*/
public ZstdCodec() {
public Zstd99Codec() {
this(DEFAULT_COMPRESSION_LEVEL);
}

Expand All @@ -34,7 +34,7 @@ public ZstdCodec() {
*
* @param compressionLevel The compression level.
*/
public ZstdCodec(int compressionLevel) {
public Zstd99Codec(int compressionLevel) {
super(Mode.ZSTD, compressionLevel);
}

Expand All @@ -45,7 +45,7 @@ public ZstdCodec(int compressionLevel) {
* @param logger The logger.
* @param compressionLevel The compression level.
*/
public ZstdCodec(MapperService mapperService, Logger logger, int compressionLevel) {
public Zstd99Codec(MapperService mapperService, Logger logger, int compressionLevel) {
super(Mode.ZSTD, compressionLevel, mapperService, logger);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@
/**
* ZstdNoDictCodec provides ZSTD compressor without a dictionary support.
*/
public class ZstdNoDictCodec extends Lucene95CustomCodec implements CodecSettings, CodecAliases {
public class ZstdNoDict99Codec extends Lucene99CustomCodec implements CodecSettings, CodecAliases {

/**
* Creates a new ZstdNoDictCodec instance with the default compression level.
*/
public ZstdNoDictCodec() {
public ZstdNoDict99Codec() {
this(DEFAULT_COMPRESSION_LEVEL);
}

Expand All @@ -34,7 +34,7 @@ public ZstdNoDictCodec() {
*
* @param compressionLevel The compression level.
*/
public ZstdNoDictCodec(int compressionLevel) {
public ZstdNoDict99Codec(int compressionLevel) {
super(Mode.ZSTD_NO_DICT, compressionLevel);
}

Expand All @@ -45,7 +45,7 @@ public ZstdNoDictCodec(int compressionLevel) {
* @param logger The logger.
* @param compressionLevel The compression level.
*/
public ZstdNoDictCodec(MapperService mapperService, Logger logger, int compressionLevel) {
public ZstdNoDict99Codec(MapperService mapperService, Logger logger, int compressionLevel) {
super(Mode.ZSTD_NO_DICT, compressionLevel, mapperService, logger);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.codec.customcodecs.backward_codecs;

import org.apache.lucene.codecs.FilterCodec;
import org.apache.lucene.codecs.StoredFieldsFormat;
import org.apache.lucene.backward_codecs.lucene95.Lucene95Codec;
import org.opensearch.index.codec.customcodecs.Lucene99CustomStoredFieldsFormat;

import java.util.Collections;
import java.util.Set;

/**
*
* Extends {@link FilterCodec} to reuse the functionality of Lucene Codec.
* Supports two modes zstd and zstd_no_dict.
* Uses Lucene95 as the delegate codec
*
* @opensearch.internal
*/
public abstract class Lucene95CustomCodec extends FilterCodec {

/** Default compression level used for compression */
public static final int DEFAULT_COMPRESSION_LEVEL = 3;

/** Each mode represents a compression algorithm. */
public enum Mode {
/**
* ZStandard mode with dictionary
*/
ZSTD("ZSTD", Set.of("zstd")),
/**
* ZStandard mode without dictionary
*/
ZSTD_NO_DICT("ZSTDNODICT", Set.of("zstd_no_dict")),
/**
* Deprecated ZStandard mode, added for backward compatibility to support indices created in 2.9.0 where
* both ZSTD and ZSTD_NO_DICT used Lucene95CustomCodec underneath. This should not be used to
* create new indices.
*/
ZSTD_DEPRECATED("Lucene95CustomCodec", Collections.emptySet());

private final String codec;
private final Set<String> aliases;

Mode(String codec, Set<String> aliases) {
this.codec = codec;
this.aliases = aliases;
}

/**
* Returns the Codec that is registered with Lucene
*/
public String getCodec() {
return codec;
}

/**
* Returns the aliases of the Codec
*/
public Set<String> getAliases() {
return aliases;
}
}

private final StoredFieldsFormat storedFieldsFormat;

/**
* Creates a new compression codec.
*
* @param mode The compression codec (ZSTD or ZSTDNODICT).
*/

public Lucene95CustomCodec(Mode mode) {
super(mode.getCodec(), new Lucene95Codec());
this.storedFieldsFormat = new Lucene99CustomStoredFieldsFormat();
}

@Override
public StoredFieldsFormat storedFieldsFormat() {
return storedFieldsFormat;
}

@Override
public String toString() {
return getClass().getSimpleName();
}
}
Loading
Loading